#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Seabird instruments (:mod:`hydrac.instruments.sbe`)
===================================================
.. autoclass:: SBE
:members:
:private-members:
"""
from .physicalparam import PhysicalParam
import numpy as np
# import struct as st
import os
import io
import calendar
import math
# import glob
import datetime
from tkinter import filedialog
from tkinter import Tk
from hydrac.util.parameters import Parameters
import hydrac.calcul.seawater as sw
# %gui inline
# %gui tk
# ex :
# a=hydrac.Instruments.aquascat.aquascat('nnn','lll')
# a.param=hydrac.aquascat.aquascat.batch_read(a.filepath,'',a.param)
# test=a.param
# test[a.filepath[0]]['AbsRxFrequency']
[docs]class SBE(PhysicalParam):
""" Seabird multiparameter probe instrument class.
Base class : :mod:`hydrac.instruments.physicalparam.PhysicalParam`
The SBE class reads the seabird raw csv files (UTF-8 encoding) and stores
the valuable information into the modified dictionnary ``param`` with the
common shape handled by hydrac
(see :mod:`hydrac.instruments.physicalparam`)
A general description of the different Seabird profiling instruments is
given on their website
(https://www.seabird.com/profiling/family?productCategoryId=54627473767).
A typical example of how should the data considered is as follows ::
>>> K = SBE('Campaign_1')
The above line will prompt the user to select one or multiple files in a
directory, read and store each file data into ``param``
instanciated in :class: `hydrac.instruments.physicalparam.PhysicalParam`,
as separate modified dictionnaries ``PX``, X being the file number.
Ex: for the first file loaded in ``param``, one can look at the differents
variables stored in ``param.P0`` ::
>>> K.param.P0.keys()
dict_keys(['time', 'Depth', 'Temperature', 'Salinity', 'Turbidity'...])
One also gets the base Acoustic class instance ``instr_type``::
>>> K.instr_type
'param_phy'
Or methods::
>>> K.preproc_acoustic_data()
The :func:`hydrac.instruments.sbe.SBE.__init__` automatically calls the
:func:`hydrac.instruments.physicalparam.PhysicalParam.preproc_phy_shape`
method from the PhysicalParam base class, dedicated to affecting a
deployment strategy to the data. So while loading the data, the user will
be prompted for extra information like the deployment strategy,
the wish or not to resample the data... ::
>>> K = SBE('Campaign_1')
# The user is prompted for a deployment strategy
Deployment Mod :
Mooring
# The user is prompted for potential time averaging of the data (note)
# the user is not prompted for spatial averaging in this case as these
# measurements are point-wise.
Select a temporal bin size foraveraging of moored physical parameter
instrument (0 for no averaging):
2
Parameters
----------
instr_type : str {'param_phy'}
Type of instrument
name : str, {'sbe'}
Instrument name"""
def __init__(self, name):
PhysicalParam.__init__(self)
self.instr_name = 'sbe'
self.name = name
self.filepath, self.tempdir = self.file_select()
self.load_data()
self.preproc_phy_shape()
[docs] def file_select(self):
""" User input selection of the target files to read """
root = Tk()
filez = filedialog.askopenfilenames(parent=root, title='Choose a file')
root.withdraw() # use to hide tkinter window
filepath = list(filez)
filepath.sort()
filepath = tuple(filepath)
tempdir = os.path.dirname(os.path.abspath(filez[0]))
return filepath, tempdir
[docs] def file_len(self, fname, e):
""" Calculates file length """
with open(fname, encoding=e) as f:
for i, l in enumerate(f):
pass
return i + 1
[docs] def find_encoding(self, name, encodings=['utf-8', 'ISO-8859-1', 'ascii',
'windows-1250', 'windows-1251',
'windows-1252']):
""" Select right encoding and opens file using a good one """
for e in encodings:
try:
fh = io.open(name, 'r', encoding=e)
fh.readlines()
fh.seek(0)
except (UnicodeDecodeError, UnicodeError, LookupError):
pass
print('got unicode error with %s , trying different encoding'
% e)
else:
print('opening the file with encoding: %s ' % e)
break
return e
[docs] def load_data(self):
""" Launching file reading """
if hasattr(self, 'filepath'):
for kk in range(len(self.filepath)):
self.Ncurrfilepath = kk
self.currfilepath = self.filepath[kk]
tmp1, tmp2 = self.read_SBE()
try:
tmp2.__rename__('OBS', 'Turbidity')
except KeyError:
if hasattr(tmp2, 'Turbidity') is False:
tmp2.Turbidity = np.zeros(np.shape(tmp2.Temperature))
self.meta.update({"P"+str(kk): tmp1})
self.param.update({"P"+str(kk): tmp2})
del self.Ncurrfilepath, self.currfilepath
else:
raise AttributeError('No file selected')
def replace_all(self, text, dic):
for i, j in dic.items():
text = text.replace(i, j)
return text
def assign_var_from_header(self, x):
names = []
name_list = ['Temperature',
'Salinity',
'Conductivity',
'Depth',
'Pressure',
'OBS',
'Turbidity']
for i in range(len(x)):
n_i1 = x[i][1].strip().split(':')
n_i1 = [x for x in n_i1 if x != ''][0]
n_i2 = x[i][2].strip().split(':')
n_i2 = [x for x in n_i2 if x != ''][0]
if n_i2 in name_list:
names.append(n_i2)
elif 'time' in n_i1:
if 'J' in n_i1:
names.append('timeJ')
elif 'S' in n_i1:
names.append('timeS')
elif 'scan' in n_i1:
names.append('scan')
elif 'flag' in n_i1:
names.append('flag')
else:
raise AttributeError('Error whiule reading variable names')
if len(list(np.unique(names))) < len(names):
import copy
names2 = copy.deepcopy(names)
[names2.remove(j) for j in set(names)]
for u in range(len(names2)):
ux = names.count(names2[u])
tag = names2[u]
for uxx in range(ux):
names[names.index(tag)] = tag + '_' + str(uxx+1)
return names
[docs] def days_to_hmsm(self, days):
"""Converts fractional days (between 0 and 1) to hour,minutes,sec,
millisec."""
hours = days * 24.
hours, hour = math.modf(hours)
mins = hours * 60.
mins, min_ = math.modf(mins)
secs = mins * 60.
secs, sec = math.modf(secs)
micro = round(secs * 1.e6)
return int(hour), int(min_), int(sec), int(micro)
[docs] def JulianDate_to_MMDDYYY(self, y, jd):
"""Converts Julian days to month,day,year"""
month = 1
day = 0
while np.floor(jd) - (calendar.monthrange(y,month)[1]) > 0 and month <= 12:
jd = jd - calendar.monthrange(y, month)[1]
month = month + 1
return month, jd, y
[docs] def julian_to_datenum(self, timeJ, start_date_):
"""Converts Julian days to datenum"""
k = []
for u in range(len(timeJ)):
m, jd, y = self.JulianDate_to_MMDDYYY(int(start_date_[-1]),
timeJ[u])
frac_day, day = math.modf(jd)
day = int(day)
hour, min_, sec, micro = self.days_to_hmsm(frac_day)
T1 = str(m) + '/' + str(day) + '/' + str(y)
T2 = str(hour) + ':' + str(min_) + ':' + str(np.round(1e3 *
(sec + micro
/ 1e6)
) / 1e3)
k.append(datetime.datetime.timestamp(datetime.datetime.
strptime(T1 + ' ' + T2,
"%m/%d/%Y " +
"%H:%M:%S.%f")
))
return np.array(k)
[docs] def read_SBE(self):
""" Reads a unique SBE .csv file """
import re
ctr = -1
header = ['header']
data_raw = []
data = {}
meta = {'header_info': []}
self.encod = self.find_encoding(self.currfilepath)
row_num_tot = self.file_len(self.currfilepath, self.encod)
with open(self.currfilepath, 'r', encoding=self.encod) as f:
# define desired replacements here
rep = {'(': '', ')': '', '/': '_', '-': '_', ' ': '_', '.': '_',
'°': '', ':': '_'}
while ('END' not in header[-1]):
rep_head = ' '
ctr += 1
head_ = f.readlines(1)
toto = head_[0]
header.append(toto)
base_header = [re.split('[ , *, #]', x) for x in header]
base_header = [[x for x in base_header[k] if x != '' and x != '\n']
for k in range(0, len(base_header))]
channels_idx = [i for i, j in enumerate(base_header)
if 'name' in j]
channels = [[x for x in base_header[k]
if x != '' and x != 'name' and x != '=']
for k in channels_idx]
names = self.assign_var_from_header(channels)
nquan_idx = [i for i, j in enumerate(base_header) if 'nquan' in j]
nval_idx = [i for i, j in enumerate(base_header) if 'nvalues' in j]
interval_idx = [i for i, j in enumerate(base_header)
if 'interval' in j]
starttime_idx = [i for i, j in enumerate(base_header)
if 'start_time' in j]
base_header[nval_idx[0]] = [x for x in base_header[nval_idx[0]]
if x != '' and x != '\n']
base_header[nquan_idx[0]] = [x for x in base_header[nquan_idx[0]]
if x != '' and x != '\n']
base_header[interval_idx[0]] =\
[x for x in base_header[interval_idx[0]]
if x != '' and x != '\n' and x != '=']
base_header[starttime_idx[0]] =\
[x for x in base_header[starttime_idx[0]]
if x != '' and x != '\n' and x != '=']
nquan = int(base_header[nquan_idx[0]][-1])
nval = int(base_header[nval_idx[0]][-1])
date_dict = dict((v, k) for k, v in enumerate(calendar.month_abbr))
start_date_ = base_header[starttime_idx[0]][1: 4]
start_date = str(date_dict[start_date_[0]]) +\
'/' + start_date_[1] + '/' + start_date_[2]
start_time = base_header[starttime_idx[0]][4]
while ctr + 1 < row_num_tot:
ctr += 1
head_ = f.readlines(1)
toto = head_[0]
data_raw.append(toto.strip().split(' '))
data_pre = np.array([[x for x in data_raw[k]
if x != '' and x != '\n']
for k in range(0, len(data_raw))],
dtype=float)
[data.update({names[i]: data_pre[:, i]}) for i in range(0, nquan)]
print('Keys = {}'.format(data.keys()))
if 'Pressure' in data.keys():
data.update({'Depth': sw.dpth(data['Pressure'],
lat=float(input('Enter a ' +
'latitude for ' +
'Pressure to ' +
'depth ' +
'conversion :')
))})
if 'timeJ' in data.keys():
data.update({'time': self.julian_to_datenum(data['timeJ'],
start_date_)})
elif ('timeJ' in data.keys() is False) and ('timeS' in data.keys()):
data.update({'time':
data['timeS'] +
datetime.datetime.timestamp(
datetime.datetime.strptime(start_date +
' ' + start_time,
"%m/%d/%Y %H:%M:%S"))})
if ('timeJ' in data.keys() is False) and ('timeS' in data.keys() is False):
raise AttributeError('No Time stamp found in the data')
meta['header_info'].append(base_header)
meta.update({'nquan': nquan,
'nval': nval,
'start_date': start_date,
'start_time': start_time})
duplicates = [i for i, j in enumerate(names) if '_1' in j]
duplicates = [names[duplicates[ko]][0: -2]
for ko in range(len(duplicates))]
num_duplicates = len(duplicates)
for ty in range(num_duplicates):
ifx = [i for i, j in enumerate(names) if duplicates[ty] in j]
h = input('Which ' + duplicates[ty] +
' channel do you wish to keep ? ' +
str([names[r] for r in ifx]))
data.update({duplicates[ty]: data[names[ifx[int(h)-1]]]})
[data.__delitem__(io) for io in [names[r] for r in ifx]]
return Parameters(meta), Parameters(data)