#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
KorExo instruments (:mod:`hydrac.instruments.korexo`)
=====================================================
.. autoclass:: KorExo
:members:
:private-members:
"""
from .physicalparam import PhysicalParam
import numpy as np
import os
import io
import datetime
from tkinter import filedialog
from tkinter import Tk
from hydrac.util.parameters import Parameters
[docs]class KorExo(PhysicalParam):
""" KorExo YSI multiparameter probe instrument class.
Base class : :mod:`hydrac.instruments.physicalparam.PhysicalParam`
The KorExo class reads the korexo raw text files and stores
the valuable information into the paramcontainers param with the common
shape handled by hydrac (see :mod:`hydrac.instruments.physicalparam`).
A general description of the different Kor-Exo profiling instruments is
given on their website (https://www.ysi.com/products/multiparameter-sondes)
The way the data are to be considered is similar to the example shown in
:mod:`hydrac.instruments.sbe`.
Parameters
----------
name : str, {'sbe'}
Instrument name"""
def __init__(self, name):
PhysicalParam.__init__(self)
self.instr_name = 'korexo'
self.name = name
self.filepath, self.tempdir = self.file_select()
self.load_data()
[[self.param['P' + str(i)].move_to_other_dict_and_delete_item(
self.param['P' + str(i)], self.meta['P' + str(i)], k)
for k in list(self.param['P' + str(i)].keys())
if type(self.param['P' + str(i)][k]) != np.ndarray]
for i in range(0, len(self.param))]
self.preproc_phy_shape()
[docs] def file_select(self):
""" User input selection of the target files to read """
root = Tk()
filez = filedialog.askopenfilenames(parent=root, title='Choose a file')
root.withdraw() # use to hide tkinter window
filepath = list(filez)
filepath.sort()
filepath = tuple(filepath)
tempdir = os.path.dirname(os.path.abspath(filez[0]))
return filepath, tempdir
[docs] def file_len(self, fname, e):
""" Calculates file length """
with open(fname, encoding=e) as f:
for i, l in enumerate(f):
pass
return i + 1
# encodings = ['utf-8','ISO-8859-1', 'windows-1250', 'windows-1251',
# 'windows-1252','ascii']
# encod=encodings.aliases.aliases.keys()
[docs] def find_encoding(self, name, encodings=['utf-8', 'ISO-8859-1',
'windows-1250', 'windows-1251',
'windows-1252', 'ascii']):
""" Select right encoding and opens file using a good one """
for e in encodings:
try:
fh = io.open(name, 'r', encoding=e)
fh.readlines()
fh.seek(0)
except (UnicodeDecodeError, UnicodeError, LookupError):
pass
print('got unicode error with %s , trying different encoding'
% e)
else:
print('opening the file with encoding: %s ' % e)
break
return e
[docs] def load_data(self):
""" Launching file reading """
if hasattr(self, 'filepath'):
for kk in range(len(self.filepath)):
self.Ncurrfilepath = kk
self.currfilepath = self.filepath[kk]
tmp1, tmp2 = self.read_KOR()
print('rrr{}'.format(tmp2.keys()))
tmp2.__rename__('Depth_m', 'Depth')
tmp2.__rename__('Temp_C', 'Temperature')
tmp2.__rename__('Sal_psu', 'Salinity')
try:
tmp2.__rename__('Turbidity_NTU', 'Turbidity')
except KeyError:
tmp2.Turbidity=np.zeros(np.shape(tmp2.Temperature))
self.meta.update({"P" + str(kk): tmp1})
self.param.update({"P" + str(kk): tmp2})
del self.Ncurrfilepath, self.currfilepath
else:
raise AttributeError('No file selected')
def replace_all(self, text, dic):
for i, j in dic.items():
text = text.replace(i, j)
return text
[docs] def read_KOR(self):
""" Reads a unique KorExo .txt file """
ctr = -1
header = ['header']
data_raw = []
data = {}
meta = {'header_info': []}
self.encod = self.find_encoding(self.currfilepath)
row_num_tot = self.file_len(self.currfilepath, self.encod)
with open(self.currfilepath, 'r', encoding=self.encod) as f:
# define desired replacements here
rep = {'(': '', ')': '', '/': '_', '-': '_', ' ': '_', '.': '_',
'°': '', ':': '_'}
while ('1\t2\t3\t4\t5' not in header[-1])\
or (list(map(str,
range(1,
len(header[-1].strip().split('\t'))+1)))
!= header[-1].strip().split('\t')):
ctr += 1
head_ = f.readlines(1)
toto = head_[0]
if toto[1::2] != '' and toto[1::2] != '\n':
if ctr == 0:
header.append(toto[0::2][1:])
else:
header.append(toto[1::2])
while ctr + 1 < row_num_tot:
ctr += 1
head_ = f.readlines(1)
toto = head_[0]
if toto[1::2] != '' and toto[1::2] != '\n':
data_raw.append(toto[1::2])
data_raw = [x.replace(',', '.') for x in data_raw]
header.append(data_raw[0])
row_num = len(data_raw)
data_raw_split = [data_raw[i].strip().split('\t')
for i in range(0, len(data_raw))]
data_raw_split[0][0]
for titi in range(0, len(data_raw_split[0])):
try:
float(data_raw_split[1][titi])
data.update({self.replace_all(data_raw_split[0][titi],
rep): np.zeros((row_num-1),
dtype=float)})
except ValueError:
data.update({self.replace_all(data_raw_split[0][titi],
rep): np.chararray(
(row_num-1), itemsize=12,
unicode=True)})
for titi in range(1, row_num):
for idx, keys in enumerate(data):
data[keys][titi-1] = data_raw_split[titi][idx]
for idx in range(0, len(header)):
if 'Devices List' in header[idx]:
meta['header_info'][-1] = meta['header_info'][-1].\
replace(' ', '_')
meta.update({'DeviceList': {}})
idx += 1
tmp = header[idx].strip().split('\t')
tmp.remove(tmp[tmp.index('Name')])
break
else:
meta['header_info'].append(header[idx])
if 'DeviceList' in meta.keys():
idx += 1
while len(header[idx].strip().split('\t')) < len(data):
meta['DeviceList'].update({
self.replace_all(header[idx].strip().
split('\t')[0], rep): {}})
[meta['DeviceList']
[self.replace_all(header[idx].strip().split('\t')[0], rep
)].update({self.replace_all(tmp[x],
rep):
header[idx].strip().
split('\t')[x+1]})
for x in range(0, len(tmp))]
idx += 1
L = []
T1 = data['Date_MM_DD_YYYY']
T2 = data['Time_HH_MM_SS']
[L.append(datetime.datetime.timestamp(datetime.datetime.
strptime(T1[i] + ' ' + T2[i],
"%m/%d/%Y %H:%M:%S")
))
for i in range(0, len(T1))]
data.update({'time': np.array(L)})
return Parameters(meta), Parameters(data)