Source code for hydrac.instruments.korexo

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
KorExo instruments (:mod:`hydrac.instruments.korexo`)
=====================================================

.. autoclass:: KorExo
   :members:
   :private-members:

"""
from .physicalparam import PhysicalParam
import numpy as np
import os
import io
import datetime
from tkinter import filedialog
from tkinter import Tk
from hydrac.util.parameters import Parameters


[docs]class KorExo(PhysicalParam): """ KorExo YSI multiparameter probe instrument class. Base class : :mod:`hydrac.instruments.physicalparam.PhysicalParam` The KorExo class reads the korexo raw text files and stores the valuable information into the paramcontainers param with the common shape handled by hydrac (see :mod:`hydrac.instruments.physicalparam`). A general description of the different Kor-Exo profiling instruments is given on their website (https://www.ysi.com/products/multiparameter-sondes) The way the data are to be considered is similar to the example shown in :mod:`hydrac.instruments.sbe`. Parameters ---------- name : str, {'sbe'} Instrument name""" def __init__(self, name): PhysicalParam.__init__(self) self.instr_name = 'korexo' self.name = name self.filepath, self.tempdir = self.file_select() self.load_data() [[self.param['P' + str(i)].move_to_other_dict_and_delete_item( self.param['P' + str(i)], self.meta['P' + str(i)], k) for k in list(self.param['P' + str(i)].keys()) if type(self.param['P' + str(i)][k]) != np.ndarray] for i in range(0, len(self.param))] self.preproc_phy_shape()
[docs] def file_select(self): """ User input selection of the target files to read """ root = Tk() filez = filedialog.askopenfilenames(parent=root, title='Choose a file') root.withdraw() # use to hide tkinter window filepath = list(filez) filepath.sort() filepath = tuple(filepath) tempdir = os.path.dirname(os.path.abspath(filez[0])) return filepath, tempdir
[docs] def file_len(self, fname, e): """ Calculates file length """ with open(fname, encoding=e) as f: for i, l in enumerate(f): pass return i + 1
# encodings = ['utf-8','ISO-8859-1', 'windows-1250', 'windows-1251', # 'windows-1252','ascii'] # encod=encodings.aliases.aliases.keys()
[docs] def find_encoding(self, name, encodings=['utf-8', 'ISO-8859-1', 'windows-1250', 'windows-1251', 'windows-1252', 'ascii']): """ Select right encoding and opens file using a good one """ for e in encodings: try: fh = io.open(name, 'r', encoding=e) fh.readlines() fh.seek(0) except (UnicodeDecodeError, UnicodeError, LookupError): pass print('got unicode error with %s , trying different encoding' % e) else: print('opening the file with encoding: %s ' % e) break return e
[docs] def load_data(self): """ Launching file reading """ if hasattr(self, 'filepath'): for kk in range(len(self.filepath)): self.Ncurrfilepath = kk self.currfilepath = self.filepath[kk] tmp1, tmp2 = self.read_KOR() print('rrr{}'.format(tmp2.keys())) tmp2.__rename__('Depth_m', 'Depth') tmp2.__rename__('Temp_C', 'Temperature') tmp2.__rename__('Sal_psu', 'Salinity') try: tmp2.__rename__('Turbidity_NTU', 'Turbidity') except KeyError: tmp2.Turbidity=np.zeros(np.shape(tmp2.Temperature)) self.meta.update({"P" + str(kk): tmp1}) self.param.update({"P" + str(kk): tmp2}) del self.Ncurrfilepath, self.currfilepath else: raise AttributeError('No file selected')
def replace_all(self, text, dic): for i, j in dic.items(): text = text.replace(i, j) return text
[docs] def read_KOR(self): """ Reads a unique KorExo .txt file """ ctr = -1 header = ['header'] data_raw = [] data = {} meta = {'header_info': []} self.encod = self.find_encoding(self.currfilepath) row_num_tot = self.file_len(self.currfilepath, self.encod) with open(self.currfilepath, 'r', encoding=self.encod) as f: # define desired replacements here rep = {'(': '', ')': '', '/': '_', '-': '_', ' ': '_', '.': '_', '°': '', ':': '_'} while ('1\t2\t3\t4\t5' not in header[-1])\ or (list(map(str, range(1, len(header[-1].strip().split('\t'))+1))) != header[-1].strip().split('\t')): ctr += 1 head_ = f.readlines(1) toto = head_[0] if toto[1::2] != '' and toto[1::2] != '\n': if ctr == 0: header.append(toto[0::2][1:]) else: header.append(toto[1::2]) while ctr + 1 < row_num_tot: ctr += 1 head_ = f.readlines(1) toto = head_[0] if toto[1::2] != '' and toto[1::2] != '\n': data_raw.append(toto[1::2]) data_raw = [x.replace(',', '.') for x in data_raw] header.append(data_raw[0]) row_num = len(data_raw) data_raw_split = [data_raw[i].strip().split('\t') for i in range(0, len(data_raw))] data_raw_split[0][0] for titi in range(0, len(data_raw_split[0])): try: float(data_raw_split[1][titi]) data.update({self.replace_all(data_raw_split[0][titi], rep): np.zeros((row_num-1), dtype=float)}) except ValueError: data.update({self.replace_all(data_raw_split[0][titi], rep): np.chararray( (row_num-1), itemsize=12, unicode=True)}) for titi in range(1, row_num): for idx, keys in enumerate(data): data[keys][titi-1] = data_raw_split[titi][idx] for idx in range(0, len(header)): if 'Devices List' in header[idx]: meta['header_info'][-1] = meta['header_info'][-1].\ replace(' ', '_') meta.update({'DeviceList': {}}) idx += 1 tmp = header[idx].strip().split('\t') tmp.remove(tmp[tmp.index('Name')]) break else: meta['header_info'].append(header[idx]) if 'DeviceList' in meta.keys(): idx += 1 while len(header[idx].strip().split('\t')) < len(data): meta['DeviceList'].update({ self.replace_all(header[idx].strip(). split('\t')[0], rep): {}}) [meta['DeviceList'] [self.replace_all(header[idx].strip().split('\t')[0], rep )].update({self.replace_all(tmp[x], rep): header[idx].strip(). split('\t')[x+1]}) for x in range(0, len(tmp))] idx += 1 L = [] T1 = data['Date_MM_DD_YYYY'] T2 = data['Time_HH_MM_SS'] [L.append(datetime.datetime.timestamp(datetime.datetime. strptime(T1[i] + ' ' + T2[i], "%m/%d/%Y %H:%M:%S") )) for i in range(0, len(T1))] data.update({'time': np.array(L)}) return Parameters(meta), Parameters(data)