Source code for hydrac.instruments.acoustique

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Acoustic instruments (:mod:`hydrac.instruments.acoustique`)
===========================================================


.. autoclass:: Acoustic
   :members:
   :private-members:

"""

from .instruments import Instrument
from hydrac.util.parameters import Parameters
from hydrac.model.water import Water
from hydrac.instruments.mod_deployment import ModeDeployment
import numpy as np


[docs]class Acoustic(Instrument): """ Acoustic class. Base class : :class:`hydrac.instruments.instruments.Instrument` This class is meant to be subclassed, not instantiated directly. The Acoustic class handles hydroacoustic devices data contained in ``param`` and contains methods dedicated to their processing such as water absorption correction and spherical spreading correction along an acoustic profile (:func:`spreading_and_att_correction`) given the carrier frequency of the device, the physical parameters of the water (Temperature, Salinity...). The ``param`` structure is common to all hydroacoustic instruments. Each ``param`` can contain several sets of profiles (ex. ``param.P0``, ``param.P1``...), the structure of which is described in the table below. .. list-table:: Description of the ``param`` container :widths: 25 15 15 50 :header-rows: 1 * - Parameter Name - Type - Size - Description * - BurstTime - float - 1 - Time of first ping * - PingRate - int - 1 - Ping rate in Hz * - NumPingsTot - int - 1 - Total number of pings * - NumChannels - int - 1 - Number of active frequency channels = F * - NumProfilesSamples - float - F - Effective number of profiles = Ns * - NumAverage - float - F - Number of pings averaged to create a profile * - SampleRate - float - F - Profile rate in Hz * - BinLengthMM - float - F - Cellsize in mm * - StartBin - float - F - Range of the first cell * - NumBins - float - F - Number of cells along a profile * - TransducerRadius - float - F - Radius of each active transducer = N * - TransducerBeamWidth - float - F - BeamWidth of each active transducer * - TransducerKt - float - F - Calibration constant of each transducer * - Frequency - float - F - Carrier frequency of each transducer in Hz * - StartBin - float - F - Range of the first cell * - PulseLength - float - F - Pulse length in s * - RxGain/TxGain - float - F - Gain at Emission/Reception in dB * - RawIntensity - float - NxNsxF - Raw HAC data * - BinRange - float - NxF - Ranges of each cell per channel * - time - float - Ns - Time of each sample in s * - TransducerAngle - float - F - Gain at Emission/Reception in dB * - depth - float - Ns - Depth of the transducers * - FileName - str - (-) - Filename used to generate the data """ def __init__(self): super().__init__() self.instr_type = 'acoustic' self.param = Parameters({}) self.meta = Parameters({}) def fast_pre_proc(self): if self.param.__len__(): ModeDeployment(self)
[docs] def preproc_acoustic_data(self, w_obj=None, phy_instr=None): """The most useful method for user is :func:`preproc_acoustic_data`. From this method, several kinds of processing are possible, the goal of which is to calculate the calibrated hydroacoustic intensity parameter ``Intensity`` : 1 - If the HAC data were collected along with environmental data, it is possible to input these data, stored as an object K from :class:`hydrac.instruments.physicalparam.PhysicalParam`:: # Intanciate HAC object >>> A=hydrac.instruments.aquascat.Aquascat('Campaign_1') # Intanciate environnemental data object >>> K=hydrac.instruments.sbe.SBE('Campaign_1') >>> A.preproc_acoustic_data(phy_instr=K) 2 - If no environemental data were acquired along with the HAS data, the user is prompted to at least refer to the water temperature from which waterabsorption can be computed from :class:`hydrac.model.water.Water`:: # Intanciate HAC object >>> A=hydrac.instruments.aquascat.Aquascat('Campaign_1') >>> A.preproc_acoustic_data() # The user is prompted for water temperature No water parameters data... Please enter a mean temerature value to estimate sound attenuation : 14 The calibrated intensity is calculated using the raw intensity measured by the instrument (stored in the variable ``RawIntensity``), corrected from water absorption, spherical spreading, nearfield effects. A calibration coefficient taking account of the transducers sensitivity and directivity patterns is also applied. Finally, the calibrated (or absolute) intensity is returned normalized by the sample volume and stored in the variable ``Intensity``, as a volume backscattering coefficient, or s\ :sub:`v`\ in m\ :sup:`2`\/m\ :sup:`3`\ (see McLennan et al. 2002, https://doi.org/10.1006/jmsc.2001.1158). .. math:: s_v = \\frac{3}{\pi} \\frac{V_{rms}^2 \\Psi^2r^2}{16K_t^2}e^{4r(\\alpha_{w})} where : - :math:`V_{rms}` is the root mean square voltage measured by the instrument - :math:`\\Psi` is the nearfield correction factor - :math:`K_t` is the calibration coefficient - :math:`\\alpha_{w}` is the attenuation due to water This method also attaches a deployment strategy (prompted from the user) to assert the measurement geometry (from the surface, from the bottom, profiles in the water column...) from the class :class:`hydrac.instruments.mod_deployment.ModeDeployment`:: >>> A.preproc_acoustic_data() # The user is prompted for water temperature No water parameters data... Please enter a mean temerature value to estimate sound attenuation : 14 # The user is prompted for a deployment strategy Deployment Mod : Mooring # The user is prompted for potential temporal averaging of the data Select a temporal window for averaging of moored acoustic parameter instrument (0 for no averaging): 1 Select a vertical bin size for averaging of casted physical parameter instrument (0 for no averaging): 0.5 >>> A.mode_depl 'Mooring' Parameters ---------- w_obj=None : object Water object from :class:`hydrac.model.water.Water` phy_instr=None : object Physical Instrument object from :mod:`hydrac.instruments.physicalparam` """ # w_obj = water class, computed from the physical # param data (or not), phy_instr = physical param dict self.sort_profiles() if phy_instr is None or (w_obj is None and phy_instr is None): print('honolulu') if w_obj is None: w_obj = Water() if w_obj.paramw.T is None: import re temp_in = input('No water parameters data... Please enter ' + 'a mean temerature value to estimate sound ' + 'attenuation : ') if not temp_in: print('No temperature Data entered...please enter ' + 'temperature') return self.preproc_acoustic_data() temp_in = [float(x) for x in re.split('[,/; .]', temp_in)] ctr = 0 if len(temp_in) == 1: self.single_model = True else: self.single_model = False while len(temp_in) < len(self.param): temp_in.append(temp_in[-1]) ctr += 1 if ctr > 0: print('{} last entries set to default equal to the ' + 'last Temp assigned T = {}'.format(ctr, temp_in[-1])) for i in range(0, len(self.param)): self.param['P' + str(i)].w_obj = Water(T=temp_in[i]) self.param[ 'P' + str(i) ]['alphaw'] = w_obj.water_absorption_coefficient2( self.param['P' + str(i)][ 'FrequencyRx'], temp_in[i]) self.param['P' + str(i)]['alphaw'] = np.squeeze( self.param['P' + str(i)]['alphaw']) else: self.single_model = False for i in range(0, len(self.param)): self.param['P' + str(i)]['alphaw'] = np.squeeze( w_obj.water_absorption_coefficient( self.param['P' + str(i)]['FrequencyRx'])) cut = sum([1 for i in range(len(self.param)) if hasattr(self.param['P'+str(i)], 'Intensity')]) if cut == 0: for i1 in range(0, len(self.param)): self.param['P' + str(i1)].Intensity = \ self.spreading_and_att_correction(i1) else: pass ModeDeployment(self) if self.mode_depl == 'Mooring': for i1 in range(0, len(self.param)): self.param['P' + str(i1)].asv = self.param[ 'P' + str(i1)].Intensity_T self.param['P' + str(i1)].at = self.param[ 'P' + str(i1)].time_T if hasattr(self.param['P' + str(i1)], 'adepth'): self.param['P' + str(i1)].__rename__('BinRange', 'BinRange_raw') self.param['P' + str(i1)].BinRange =\ np.zeros((len(self.param['P' + str(i1)].adepth), len(self.param['P' + str(i1)].Frequency)) ) for frq in range(len(self.param['P' + str(i1)].Frequency)): self.param['P' + str(i1)].BinRange[:, frq] =\ self.param['P' + str(i1)].adepth try: self.param['P' + str(i1)].__delattr__('adepth') except AttributeError(): pass else: # pour l'instant, que les paramètres moyens # sur la colonne d'eau en cas de profils... iT = np.atleast_2d(np.array([[], []])).T for i3 in range(0, len(phy_instr.param)): iT = np.vstack((iT, np.hstack((np.atleast_2d( phy_instr.param['P' + str(i3)]['at'].T).T, i3 * np.ones(np.shape(np.atleast_2d( phy_instr.param['P' + str(i3) ]['at'].T).T)))))) for i1 in range(0, len(self.param)): # temporary storage for water absorption coefficients w_tmp = [] # vector containing indexes of the physical instrument profile # Pin which the time corresponds to the acoustical time idx = [] jx = [] [jx.append(np.where(np.abs(iT[:, 0] - self.param['P' + str(i1)].time[i2]) == min( np.abs(iT[:, 0] - self.param[ 'P' + str(i1)].time[i2])))) for i2 in range(0, len(self.param['P' + str(i1)].time))] # calculation of idx based on iT [idx.append(np.unique(iT[jx[u][0], 1])) for u in range(0, len(jx))] if len(np.unique(idx)) > 1: tmp_idx = np.unique(idx) ko = [] [ko.append(np.min(np.where(idx[:] == u)[0])) for u in tmp_idx] ko.append(len(self.param['P' + str(i1)].time)) if len(ko) >= 3 and ko[-1] - ko[-2] > 1 / 10 * ko[-1]: lol = Parameters({}) ind = len(self.param) # fonctionne pas encore tmp = self.split_profiles( self.param['P' + str(i1)], ko[0:-1], [], ko) idx_e = ind + len(tmp) [lol.update({"P"+str(u1): tmp[u1-ind]}) for u1 in range(ind, idx_e)] [self.param.update({'P' + str(yu): lol['P' + str(yu)]}) for yu in range(ind, idx_e)] self.param.__delattr__('P' + str(i1)) del lol self.preproc_acoustic_data(phy_instr=phy_instr) return # calculation of the water abs. coef. averaged over the entire # physical instr profile => needs to be changed in case of # mooring deployment since you musn't average over the entire # time series for i2 in range(0, len(self.param['P' + str(i1)].time)): id_time=np.min(jx[i2][0]) - len(np.where(iT[:, 1] < int(iT[np.min(jx[i2][0]), 1]))[0]) w_tmp.append(np.nanmean(phy_instr.param['P' + str(int(float( idx[i2])))].w_obj.water_absorption_coefficient( self.param['P' + str(i1)].FrequencyRx, id_time=id_time), 0)) # [w_tmp.append(np.nanmean(phy_instr.param['P' + str(int(float( # idx[i2])))].w_obj.water_absorption_coefficient( # self.param['P' + str(i1)]. # FrequencyRx, id_time=np.min(jx[i2][0]) - # len(np.where(iT[:, 1] < int(iT[np.min(jx[i2][0]), 1]))[0] # )), 0)) for i2 in range(0, len(self.param['P' + str(i1) # ].time))] # saving water abs. coefs. self.param['P' + str(i1)].w_tmp = np.array(w_tmp) cut = sum([1 for i in range(len(self.param)) if hasattr(self.param['P' + str(i)], 'Intensity')]) # computing corrected intensities based on water abs coef. self.param['P' + str(i1)].Intensity = \ self.spreading_and_att_correction(i1, aux=1) ioio = input('Same deployment strategy as CTD ? y/n :') if ioio == 'y' or ioio == 'Y': ModeDeployment(self, phy_instr.mode_depl) elif ioio == 'n' or ioio == 'N': ModeDeployment(self) if self.mode_depl == 'Cast': for i1 in range(0, len(self.param)): idx = np.unique(iT[np.where(np.abs(iT[:, 0] - np.unique(self.param['P' + str(i1)].at)) == min(np.abs(iT[:, 0] - np.unique(self.param ['P'+str(i1)].at)))), 1]) self.param['P' + str(i1)].update({ 'w_obj': phy_instr.param['P' + str(int(float(idx))) ].w_obj}) elif self.mode_depl == 'Mooring': for i1 in range(0, len(self.param)): self.param['P' + str(i1)].asv = \ self.param['P' + str(i1)].pop('Intensity_T') self.param['P' + str(i1)].at = \ self.param['P' + str(i1)].pop('time_T') idx = np.unique(iT[np.where(np.abs(iT[:, 0] - np.nanmean(self.param['P' + str(i1)].at )) == min(np.abs(iT[:, 0] - np.nanmean(self.param ['P'+str(i1)].at)))), 1]) self.param['P' + str(i1)].update({ 'w_obj': phy_instr.param['P' + str(int(float(idx))) ].w_obj}) if hasattr(self.param['P' + str(i1)], 'adepth'): self.param['P' + str(i1)].__rename__('BinRange', 'BinRange_raw') self.param['P' + str(i1)].BinRange =\ np.zeros((len(self.param['P' + str(i1)].adepth), len(self.param['P' + str(i1)].Frequency)) ) for frq in range(len(self.param['P' + str(i1)].Frequency)): self.param['P' + str(i1)].BinRange[:, frq] =\ self.param['P' + str(i1)].adepth try: self.param['P' + str(i1)].__delattr__('adepth') except AttributeError(): pass
# ICI : Cas mooring, ajouter un "reshape profile according # to cw and rho" dans le cas où observations longues durées qui # nécessite recalcule de ces quantitiés...
[docs] def spreading_and_att_correction(self, i1, aux=None): """ Corrects RawIntensity Samples from water absorption and spherical spreading. 1 - **Attenuation due to water** : several models exist to predict the amount of energy lost by the acoustic wave as it travels into water; A most famous example is the model of François & Garrison (1982), (https://doi.org/10.1121/1.388170). 2 - **Spherical spreading** : the latter is a function of the range and follows a decrease proportionnal to :math:`\\frac{1}{r^2}` for spherical waves. .. image:: ../_image/tvg.pdf :width: 100% :align: center Parameters ---------- i1=None : int Profile set number in ``param`` aux=None : bool if physical parameters were as input of :func:`hydrac.instruments.acoustique.Acoustic.preproc_acoustic_data`, water absorption has been computed for each cell in the HAC profile for each frequency channel hence a different implementation. Returns ------- Intensity : numpy.array The corrected Intensity profiles for each sets of profiles """ if aux is not None: tag = "self.param['P' + str(i1)].w_tmp[:, freq]" else: tag = "np.tile(self.param['P' + str(i1)].alphaw[freq]," +\ "(np.shape(self.param['P' + str(i1)].RawIntensity)[1]))" print('toto = {}'.format(i1)) any_in = lambda a, b: bool(set(a).intersection(b)) if any_in(self.param['P' + str(i1)], ['ub_mes', 'avcp']): if any_in(self.param['P' + str(i1)], ['ub_mes']): instr = 'ub_mes' else: instr = 'acvp' r_tmp1 = np.sqrt(self.param['P' + str(i1)].BinRange[:, 0] * self.param['P' + str(i1)][instr].BinRange_bj[:, 0] ) r_tmp2 = .5*(self.param['P' + str(i1)].BinRange[:, 0] + self.param['P' + str(i1)][instr].BinRange_bj[:, 0]) else: r_tmp1 = self.param['P' + str(i1)].BinRange[:, 0] r_tmp2 = self.param['P' + str(i1)].BinRange[:, 0] r_tmp = self.param['P' + str(i1)].BinRange[:, 0] Intensity = np.zeros(np.shape( self.param['P' + str(i1)].RawIntensity)) for freq in range(0, self.param['P' + str(i1)].NumChannels): Intensity[:, :, freq] = np.power( 10, np.array([20 * np.log10(np.multiply( np.sqrt(x), np.sqrt(3/(16*np.array( self.param[ 'P' + str(i1)].TransducerKt )[freq, 1]**2*np.pi) )*r_tmp1)) + 2 * r_tmp2 * y for x, y in zip( self.param['P' + str(i1)].RawIntensity[ :, :, freq].T, eval(tag))]).T / 10) return Intensity
[docs] def hac_data_decimation(self, autodetect='off', st=None, ed=None): """This method corrects for potential corrupt data, for example when attenuation is such that no more useful signal can be extracted from the data, or when data in the nearfield have not been corrrected. This noise detection is done using :func:`autodetect_noise` Also, given the application, instruments can be deployed horizontally in the water column. Supposing weak attenuation and homogeneous medium, one can average the HAC profiles at each depth sampled by the instrument to increase the Signal to Noise Ratio (SNR). Corresponding modifications are directly applied to the ``Intensity`` variable of the profile set P contained in ``param``. Parameters ---------- autodetect : str, {'on', 'off'} automatic detection of noise floor reached by the instrument and removal of noise data. st, ed : int, int start and end cell number to be considered for either averaging or noise/nearfield removal. """ if self.instr_name != 'aquascat': return for i1 in range(0, len(self.param)): if 'mode_depl' not in self.__dict__.keys(): return print( 'Data need to be assigned a deployment strategy first.' ) else: if self.mode_depl != 'Cast': return print("Data can't be decimated with" + "{} deployment strategy.".format( self.param['P' + str(i1)].mode_depl)) else: bx = np.zeros(np.shape(self.param[ 'P' + str(i1)].asv[0, :, :])) i_maxfreq = int(np.where(self.param[ 'P' + str(i1)].FrequencyRx == np.max(self.param['P' + str(i1)]. FrequencyRx))[0]) if autodetect == 'on': st, ed = self.autodetect_noise(self.param ['P' + str(i1)] ['asv' + str(i_maxfreq)] ) else: if st is None or ed is None: st = np.shape(self.param[ 'P' + str(i1)].asv1)[1] / 5 ed = 3 * np.shape(self.param[ 'P' + str(i1)].asv1)[1] / 5 for i in range(1, 5): exec('self.param.P' + str(i1) + '.asv' + str(i) + '=np.nanmean(self.param.P' + str(i1) + '.asv' + str(i) + '[:,np.int(st):np.int(ed)],1)') self.param['P' + str(i1)].asv = np.transpose( np.atleast_3d(np.nanmean(self.param[ 'P' + str(i1)].asv[ np.int(st): np.int(ed), :, :], 0) ), (2, 0, 1)) self.depr = 1
[docs] def autodetect_noise(self, data): """Noise detection... """ return 0, 1
[docs] def red(self, C, x1, x2): """Boundary detection for profile spliting """ tl = np.shape(C) if len(tl) == 1: return '[' + str(x1) + ':' + str(x2) + ']' elif len(tl) == 2: if x2 > tl[0]: return '[:,' + str(x1) + ':' + str(x2) + ']' else: return '[' + str(x1) + ':' + str(x2) + ',:]' elif len(tl) == 3: if x2 > tl[0] and x2 > tl[1]: return '[:,:,' + str(x1) + ':' + str(x2) + ']' elif x2 > tl[0] and x2 > tl[2]: return '[:,' + str(x1) + ':' + str(x2) + ',:]' else: return '[' + str(x1) + ':' + str(x2) + ',:,:]'
[docs] def split_profiles(self, B, b1, b2, bx): """If profiles are too big or if several physical instrument profile sets overlap to the current HAC profile, the latter is split into several profiles of shorter duration each. The original profile is removed. """ tmpL = [] print('b1 = {}, b2= {}, bx ={}'.format(b1, b2, bx)) target = B.keys() split_target = [x for x in B.keys() if [y for y in np.shape(B[x]) if y == len(B.time)]] npsplit_target = list(target-split_target) for tke in range(0, len(b1), 1): tmp = Parameters({}) [tmp.update({x: eval('B["' + x + '"]' + self.red(B[x], bx[tke], bx[tke + 1]))}) for x in split_target] [tmp.update({x: B[x]}) for x in npsplit_target] tmpL.append(tmp) return tmpL
[docs] def sort_profiles(self): """Renames the profile sets according to the current number of profiles in the set. For exemple, if the original profile set contains the profile P0, after spliting profiles P0 into P1, P2 and P3 of shorter durations, those are renames P0, P1 and P2 respectively. """ tt1 = list(self.param.keys()) tt1_l = [] [tt1_l.append(int(tt1[idx].split('P')[1])) for idx in range(0, len(tt1))] tt2_l = list(range(0, len(tt1_l))) for u in range(0, len(self.param)): self.param.__rename__('P' + str(tt1_l[u]), 'P' + str(tt2_l[u]))
def hdf5_save(self): pass def netcdf_save(self): pass def pickle_save(self): pass