Source code for hydrac.instruments.sbe

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Seabird instruments (:mod:`hydrac.instruments.sbe`)
===================================================


.. autoclass:: SBE
   :members:
   :private-members:

"""
from .physicalparam import PhysicalParam
import numpy as np
# import struct as st
import os
import io
import calendar
import math
# import glob
import datetime
from tkinter import filedialog
from tkinter import Tk
from hydrac.util.parameters import Parameters
import hydrac.calcul.seawater as sw

# %gui inline
# %gui tk
# ex :
#    a=hydrac.Instruments.aquascat.aquascat('nnn','lll')
#    a.param=hydrac.aquascat.aquascat.batch_read(a.filepath,'',a.param)
#    test=a.param
#    test[a.filepath[0]]['AbsRxFrequency']


[docs]class SBE(PhysicalParam): """ Seabird multiparameter probe instrument class. Base class : :mod:`hydrac.instruments.physicalparam.PhysicalParam` The SBE class reads the seabird raw csv files (UTF-8 encoding) and stores the valuable information into the modified dictionnary ``param`` with the common shape handled by hydrac (see :mod:`hydrac.instruments.physicalparam`) A general description of the different Seabird profiling instruments is given on their website (https://www.seabird.com/profiling/family?productCategoryId=54627473767). A typical example of how should the data considered is as follows :: >>> K = SBE('Campaign_1') The above line will prompt the user to select one or multiple files in a directory, read and store each file data into ``param`` instanciated in :class: `hydrac.instruments.physicalparam.PhysicalParam`, as separate modified dictionnaries ``PX``, X being the file number. Ex: for the first file loaded in ``param``, one can look at the differents variables stored in ``param.P0`` :: >>> K.param.P0.keys() dict_keys(['time', 'Depth', 'Temperature', 'Salinity', 'Turbidity'...]) One also gets the base Acoustic class instance ``instr_type``:: >>> K.instr_type 'param_phy' Or methods:: >>> K.preproc_acoustic_data() The :func:`hydrac.instruments.sbe.SBE.__init__` automatically calls the :func:`hydrac.instruments.physicalparam.PhysicalParam.preproc_phy_shape` method from the PhysicalParam base class, dedicated to affecting a deployment strategy to the data. So while loading the data, the user will be prompted for extra information like the deployment strategy, the wish or not to resample the data... :: >>> K = SBE('Campaign_1') # The user is prompted for a deployment strategy Deployment Mod : Mooring # The user is prompted for potential time averaging of the data (note) # the user is not prompted for spatial averaging in this case as these # measurements are point-wise. Select a temporal bin size foraveraging of moored physical parameter instrument (0 for no averaging): 2 Parameters ---------- instr_type : str {'param_phy'} Type of instrument name : str, {'sbe'} Instrument name""" def __init__(self, name): PhysicalParam.__init__(self) self.instr_name = 'sbe' self.name = name self.filepath, self.tempdir = self.file_select() self.load_data() self.preproc_phy_shape()
[docs] def file_select(self): """ User input selection of the target files to read """ root = Tk() filez = filedialog.askopenfilenames(parent=root, title='Choose a file') root.withdraw() # use to hide tkinter window filepath = list(filez) filepath.sort() filepath = tuple(filepath) tempdir = os.path.dirname(os.path.abspath(filez[0])) return filepath, tempdir
[docs] def file_len(self, fname, e): """ Calculates file length """ with open(fname, encoding=e) as f: for i, l in enumerate(f): pass return i + 1
[docs] def find_encoding(self, name, encodings=['utf-8', 'ISO-8859-1', 'ascii', 'windows-1250', 'windows-1251', 'windows-1252']): """ Select right encoding and opens file using a good one """ for e in encodings: try: fh = io.open(name, 'r', encoding=e) fh.readlines() fh.seek(0) except (UnicodeDecodeError, UnicodeError, LookupError): pass print('got unicode error with %s , trying different encoding' % e) else: print('opening the file with encoding: %s ' % e) break return e
[docs] def load_data(self): """ Launching file reading """ if hasattr(self, 'filepath'): for kk in range(len(self.filepath)): self.Ncurrfilepath = kk self.currfilepath = self.filepath[kk] tmp1, tmp2 = self.read_SBE() try: tmp2.__rename__('OBS', 'Turbidity') except KeyError: if hasattr(tmp2, 'Turbidity') is False: tmp2.Turbidity = np.zeros(np.shape(tmp2.Temperature)) self.meta.update({"P"+str(kk): tmp1}) self.param.update({"P"+str(kk): tmp2}) del self.Ncurrfilepath, self.currfilepath else: raise AttributeError('No file selected')
def replace_all(self, text, dic): for i, j in dic.items(): text = text.replace(i, j) return text def assign_var_from_header(self, x): names = [] name_list = ['Temperature', 'Salinity', 'Conductivity', 'Depth', 'Pressure', 'OBS', 'Turbidity'] for i in range(len(x)): n_i1 = x[i][1].strip().split(':') n_i1 = [x for x in n_i1 if x != ''][0] n_i2 = x[i][2].strip().split(':') n_i2 = [x for x in n_i2 if x != ''][0] if n_i2 in name_list: names.append(n_i2) elif 'time' in n_i1: if 'J' in n_i1: names.append('timeJ') elif 'S' in n_i1: names.append('timeS') elif 'scan' in n_i1: names.append('scan') elif 'flag' in n_i1: names.append('flag') else: raise AttributeError('Error whiule reading variable names') if len(list(np.unique(names))) < len(names): import copy names2 = copy.deepcopy(names) [names2.remove(j) for j in set(names)] for u in range(len(names2)): ux = names.count(names2[u]) tag = names2[u] for uxx in range(ux): names[names.index(tag)] = tag + '_' + str(uxx+1) return names
[docs] def days_to_hmsm(self, days): """Converts fractional days (between 0 and 1) to hour,minutes,sec, millisec.""" hours = days * 24. hours, hour = math.modf(hours) mins = hours * 60. mins, min_ = math.modf(mins) secs = mins * 60. secs, sec = math.modf(secs) micro = round(secs * 1.e6) return int(hour), int(min_), int(sec), int(micro)
[docs] def JulianDate_to_MMDDYYY(self, y, jd): """Converts Julian days to month,day,year""" month = 1 day = 0 while np.floor(jd) - (calendar.monthrange(y,month)[1]) > 0 and month <= 12: jd = jd - calendar.monthrange(y, month)[1] month = month + 1 return month, jd, y
[docs] def julian_to_datenum(self, timeJ, start_date_): """Converts Julian days to datenum""" k = [] for u in range(len(timeJ)): m, jd, y = self.JulianDate_to_MMDDYYY(int(start_date_[-1]), timeJ[u]) frac_day, day = math.modf(jd) day = int(day) hour, min_, sec, micro = self.days_to_hmsm(frac_day) T1 = str(m) + '/' + str(day) + '/' + str(y) T2 = str(hour) + ':' + str(min_) + ':' + str(np.round(1e3 * (sec + micro / 1e6) ) / 1e3) k.append(datetime.datetime.timestamp(datetime.datetime. strptime(T1 + ' ' + T2, "%m/%d/%Y " + "%H:%M:%S.%f") )) return np.array(k)
[docs] def read_SBE(self): """ Reads a unique SBE .csv file """ import re ctr = -1 header = ['header'] data_raw = [] data = {} meta = {'header_info': []} self.encod = self.find_encoding(self.currfilepath) row_num_tot = self.file_len(self.currfilepath, self.encod) with open(self.currfilepath, 'r', encoding=self.encod) as f: # define desired replacements here rep = {'(': '', ')': '', '/': '_', '-': '_', ' ': '_', '.': '_', '°': '', ':': '_'} while ('END' not in header[-1]): rep_head = ' ' ctr += 1 head_ = f.readlines(1) toto = head_[0] header.append(toto) base_header = [re.split('[ , *, #]', x) for x in header] base_header = [[x for x in base_header[k] if x != '' and x != '\n'] for k in range(0, len(base_header))] channels_idx = [i for i, j in enumerate(base_header) if 'name' in j] channels = [[x for x in base_header[k] if x != '' and x != 'name' and x != '='] for k in channels_idx] names = self.assign_var_from_header(channels) nquan_idx = [i for i, j in enumerate(base_header) if 'nquan' in j] nval_idx = [i for i, j in enumerate(base_header) if 'nvalues' in j] interval_idx = [i for i, j in enumerate(base_header) if 'interval' in j] starttime_idx = [i for i, j in enumerate(base_header) if 'start_time' in j] base_header[nval_idx[0]] = [x for x in base_header[nval_idx[0]] if x != '' and x != '\n'] base_header[nquan_idx[0]] = [x for x in base_header[nquan_idx[0]] if x != '' and x != '\n'] base_header[interval_idx[0]] =\ [x for x in base_header[interval_idx[0]] if x != '' and x != '\n' and x != '='] base_header[starttime_idx[0]] =\ [x for x in base_header[starttime_idx[0]] if x != '' and x != '\n' and x != '='] nquan = int(base_header[nquan_idx[0]][-1]) nval = int(base_header[nval_idx[0]][-1]) date_dict = dict((v, k) for k, v in enumerate(calendar.month_abbr)) start_date_ = base_header[starttime_idx[0]][1: 4] start_date = str(date_dict[start_date_[0]]) +\ '/' + start_date_[1] + '/' + start_date_[2] start_time = base_header[starttime_idx[0]][4] while ctr + 1 < row_num_tot: ctr += 1 head_ = f.readlines(1) toto = head_[0] data_raw.append(toto.strip().split(' ')) data_pre = np.array([[x for x in data_raw[k] if x != '' and x != '\n'] for k in range(0, len(data_raw))], dtype=float) [data.update({names[i]: data_pre[:, i]}) for i in range(0, nquan)] print('Keys = {}'.format(data.keys())) if 'Pressure' in data.keys(): data.update({'Depth': sw.dpth(data['Pressure'], lat=float(input('Enter a ' + 'latitude for ' + 'Pressure to ' + 'depth ' + 'conversion :') ))}) if 'timeJ' in data.keys(): data.update({'time': self.julian_to_datenum(data['timeJ'], start_date_)}) elif ('timeJ' in data.keys() is False) and ('timeS' in data.keys()): data.update({'time': data['timeS'] + datetime.datetime.timestamp( datetime.datetime.strptime(start_date + ' ' + start_time, "%m/%d/%Y %H:%M:%S"))}) if ('timeJ' in data.keys() is False) and ('timeS' in data.keys() is False): raise AttributeError('No Time stamp found in the data') meta['header_info'].append(base_header) meta.update({'nquan': nquan, 'nval': nval, 'start_date': start_date, 'start_time': start_time}) duplicates = [i for i, j in enumerate(names) if '_1' in j] duplicates = [names[duplicates[ko]][0: -2] for ko in range(len(duplicates))] num_duplicates = len(duplicates) for ty in range(num_duplicates): ifx = [i for i, j in enumerate(names) if duplicates[ty] in j] h = input('Which ' + duplicates[ty] + ' channel do you wish to keep ? ' + str([names[r] for r in ifx])) data.update({duplicates[ty]: data[names[ifx[int(h)-1]]]}) [data.__delitem__(io) for io in [names[r] for r in ifx]] return Parameters(meta), Parameters(data)