#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Acoustic instruments (:mod:`hydrac.instruments.acoustique`)
===========================================================
.. autoclass:: Acoustic
:members:
:private-members:
"""
from .instruments import Instrument
from hydrac.util.parameters import Parameters
from hydrac.model.water import Water
from hydrac.instruments.mod_deployment import ModeDeployment
import numpy as np
[docs]class Acoustic(Instrument):
"""
Acoustic class.
Base class : :class:`hydrac.instruments.instruments.Instrument`
This class is meant to be subclassed, not instantiated directly.
The Acoustic class handles hydroacoustic devices data contained in
``param`` and contains methods dedicated to their processing such as water
absorption correction and spherical spreading correction along an acoustic
profile (:func:`spreading_and_att_correction`) given the carrier frequency
of the device, the physical parameters of the water (Temperature,
Salinity...).
The ``param`` structure is common to all hydroacoustic instruments. Each
``param`` can contain several sets of profiles (ex. ``param.P0``,
``param.P1``...), the structure of which is described in the table below.
.. list-table:: Description of the ``param`` container
:widths: 25 15 15 50
:header-rows: 1
* - Parameter Name
- Type
- Size
- Description
* - BurstTime
- float
- 1
- Time of first ping
* - PingRate
- int
- 1
- Ping rate in Hz
* - NumPingsTot
- int
- 1
- Total number of pings
* - NumChannels
- int
- 1
- Number of active frequency channels = F
* - NumProfilesSamples
- float
- F
- Effective number of profiles = Ns
* - NumAverage
- float
- F
- Number of pings averaged to create a profile
* - SampleRate
- float
- F
- Profile rate in Hz
* - BinLengthMM
- float
- F
- Cellsize in mm
* - StartBin
- float
- F
- Range of the first cell
* - NumBins
- float
- F
- Number of cells along a profile
* - TransducerRadius
- float
- F
- Radius of each active transducer = N
* - TransducerBeamWidth
- float
- F
- BeamWidth of each active transducer
* - TransducerKt
- float
- F
- Calibration constant of each transducer
* - Frequency
- float
- F
- Carrier frequency of each transducer in Hz
* - StartBin
- float
- F
- Range of the first cell
* - PulseLength
- float
- F
- Pulse length in s
* - RxGain/TxGain
- float
- F
- Gain at Emission/Reception in dB
* - RawIntensity
- float
- NxNsxF
- Raw HAC data
* - BinRange
- float
- NxF
- Ranges of each cell per channel
* - time
- float
- Ns
- Time of each sample in s
* - TransducerAngle
- float
- F
- Gain at Emission/Reception in dB
* - depth
- float
- Ns
- Depth of the transducers
* - FileName
- str
- (-)
- Filename used to generate the data
"""
def __init__(self):
super().__init__()
self.instr_type = 'acoustic'
self.param = Parameters({})
self.meta = Parameters({})
def fast_pre_proc(self):
if self.param.__len__():
ModeDeployment(self)
[docs] def preproc_acoustic_data(self, w_obj=None, phy_instr=None):
"""The most useful method for user is :func:`preproc_acoustic_data`.
From this method, several kinds of processing are possible, the goal of
which is to calculate the calibrated hydroacoustic intensity parameter
``Intensity`` :
1 - If the HAC data were collected along with environmental data, it is
possible to input these data, stored as an object K from
:class:`hydrac.instruments.physicalparam.PhysicalParam`::
# Intanciate HAC object
>>> A=hydrac.instruments.aquascat.Aquascat('Campaign_1')
# Intanciate environnemental data object
>>> K=hydrac.instruments.sbe.SBE('Campaign_1')
>>> A.preproc_acoustic_data(phy_instr=K)
2 - If no environemental data were acquired along with the HAS data,
the user is prompted to at least refer to the water temperature from
which waterabsorption can be computed from
:class:`hydrac.model.water.Water`::
# Intanciate HAC object
>>> A=hydrac.instruments.aquascat.Aquascat('Campaign_1')
>>> A.preproc_acoustic_data()
# The user is prompted for water temperature
No water parameters data... Please enter a mean temerature value to
estimate sound attenuation :
14
The calibrated intensity is calculated using the raw intensity measured
by the instrument (stored in the variable ``RawIntensity``), corrected
from water absorption, spherical spreading, nearfield effects. A
calibration coefficient taking account of the transducers sensitivity
and directivity patterns is also applied. Finally, the calibrated (or
absolute) intensity is returned normalized by the sample volume and
stored in the variable ``Intensity``, as a volume backscattering
coefficient, or s\ :sub:`v`\
in m\ :sup:`2`\/m\ :sup:`3`\
(see McLennan et al. 2002, https://doi.org/10.1006/jmsc.2001.1158).
.. math:: s_v = \\frac{3}{\pi} \\frac{V_{rms}^2 \\Psi^2r^2}{16K_t^2}e^{4r(\\alpha_{w})}
where :
- :math:`V_{rms}` is the root mean square voltage measured by the instrument
- :math:`\\Psi` is the nearfield correction factor
- :math:`K_t` is the calibration coefficient
- :math:`\\alpha_{w}` is the attenuation due to water
This method also attaches a deployment strategy (prompted from the
user) to assert the measurement geometry (from the surface, from the
bottom, profiles in the water column...) from the class
:class:`hydrac.instruments.mod_deployment.ModeDeployment`::
>>> A.preproc_acoustic_data()
# The user is prompted for water temperature
No water parameters data... Please enter a mean temerature value to
estimate sound attenuation :
14
# The user is prompted for a deployment strategy
Deployment Mod :
Mooring
# The user is prompted for potential temporal averaging of the data
Select a temporal window for averaging of moored acoustic parameter
instrument (0 for no averaging):
1
Select a vertical bin size for averaging of casted physical parameter
instrument (0 for no averaging):
0.5
>>> A.mode_depl
'Mooring'
Parameters
----------
w_obj=None : object
Water object from :class:`hydrac.model.water.Water`
phy_instr=None : object
Physical Instrument object from
:mod:`hydrac.instruments.physicalparam`
"""
# w_obj = water class, computed from the physical
# param data (or not), phy_instr = physical param dict
self.sort_profiles()
if phy_instr is None or (w_obj is None and phy_instr is None):
print('honolulu')
if w_obj is None:
w_obj = Water()
if w_obj.paramw.T is None:
import re
temp_in = input('No water parameters data... Please enter ' +
'a mean temerature value to estimate sound ' +
'attenuation : ')
if not temp_in:
print('No temperature Data entered...please enter ' +
'temperature')
return self.preproc_acoustic_data()
temp_in = [float(x) for x in re.split('[,/; .]', temp_in)]
ctr = 0
if len(temp_in) == 1:
self.single_model = True
else:
self.single_model = False
while len(temp_in) < len(self.param):
temp_in.append(temp_in[-1])
ctr += 1
if ctr > 0:
print('{} last entries set to default equal to the ' +
'last Temp assigned T = {}'.format(ctr, temp_in[-1]))
for i in range(0, len(self.param)):
self.param['P' + str(i)].w_obj = Water(T=temp_in[i])
self.param[
'P' + str(i)
]['alphaw'] = w_obj.water_absorption_coefficient2(
self.param['P' + str(i)][
'FrequencyRx'], temp_in[i])
self.param['P' + str(i)]['alphaw'] = np.squeeze(
self.param['P' + str(i)]['alphaw'])
else:
self.single_model = False
for i in range(0, len(self.param)):
self.param['P' + str(i)]['alphaw'] = np.squeeze(
w_obj.water_absorption_coefficient(
self.param['P' + str(i)]['FrequencyRx']))
cut = sum([1 for i in range(len(self.param))
if hasattr(self.param['P'+str(i)], 'Intensity')])
if cut == 0:
for i1 in range(0, len(self.param)):
self.param['P' + str(i1)].Intensity = \
self.spreading_and_att_correction(i1)
else:
pass
ModeDeployment(self)
if self.mode_depl == 'Mooring':
for i1 in range(0, len(self.param)):
self.param['P' + str(i1)].asv = self.param[
'P' + str(i1)].Intensity_T
self.param['P' + str(i1)].at = self.param[
'P' + str(i1)].time_T
if hasattr(self.param['P' + str(i1)], 'adepth'):
self.param['P' + str(i1)].__rename__('BinRange',
'BinRange_raw')
self.param['P' + str(i1)].BinRange =\
np.zeros((len(self.param['P' + str(i1)].adepth),
len(self.param['P' + str(i1)].Frequency))
)
for frq in range(len(self.param['P' +
str(i1)].Frequency)):
self.param['P' + str(i1)].BinRange[:, frq] =\
self.param['P' + str(i1)].adepth
try:
self.param['P' + str(i1)].__delattr__('adepth')
except AttributeError():
pass
else:
# pour l'instant, que les paramètres moyens
# sur la colonne d'eau en cas de profils...
iT = np.atleast_2d(np.array([[], []])).T
for i3 in range(0, len(phy_instr.param)):
iT = np.vstack((iT, np.hstack((np.atleast_2d(
phy_instr.param['P' + str(i3)]['at'].T).T,
i3 * np.ones(np.shape(np.atleast_2d(
phy_instr.param['P' + str(i3)
]['at'].T).T))))))
for i1 in range(0, len(self.param)):
# temporary storage for water absorption coefficients
w_tmp = []
# vector containing indexes of the physical instrument profile
# Pin which the time corresponds to the acoustical time
idx = []
jx = []
[jx.append(np.where(np.abs(iT[:, 0] -
self.param['P' + str(i1)].time[i2]) == min(
np.abs(iT[:, 0] - self.param[
'P' + str(i1)].time[i2]))))
for i2 in range(0, len(self.param['P' + str(i1)].time))]
# calculation of idx based on iT
[idx.append(np.unique(iT[jx[u][0], 1]))
for u in range(0, len(jx))]
if len(np.unique(idx)) > 1:
tmp_idx = np.unique(idx)
ko = []
[ko.append(np.min(np.where(idx[:] == u)[0]))
for u in tmp_idx]
ko.append(len(self.param['P' + str(i1)].time))
if len(ko) >= 3 and ko[-1] - ko[-2] > 1 / 10 * ko[-1]:
lol = Parameters({})
ind = len(self.param)
# fonctionne pas encore
tmp = self.split_profiles(
self.param['P' + str(i1)], ko[0:-1], [], ko)
idx_e = ind + len(tmp)
[lol.update({"P"+str(u1): tmp[u1-ind]})
for u1 in range(ind, idx_e)]
[self.param.update({'P' + str(yu): lol['P' + str(yu)]})
for yu in range(ind, idx_e)]
self.param.__delattr__('P' + str(i1))
del lol
self.preproc_acoustic_data(phy_instr=phy_instr)
return
# calculation of the water abs. coef. averaged over the entire
# physical instr profile => needs to be changed in case of
# mooring deployment since you musn't average over the entire
# time series
for i2 in range(0, len(self.param['P' + str(i1)].time)):
id_time=np.min(jx[i2][0]) - len(np.where(iT[:, 1] < int(iT[np.min(jx[i2][0]), 1]))[0])
w_tmp.append(np.nanmean(phy_instr.param['P' + str(int(float(
idx[i2])))].w_obj.water_absorption_coefficient(
self.param['P' + str(i1)].FrequencyRx, id_time=id_time), 0))
# [w_tmp.append(np.nanmean(phy_instr.param['P' + str(int(float(
# idx[i2])))].w_obj.water_absorption_coefficient(
# self.param['P' + str(i1)].
# FrequencyRx, id_time=np.min(jx[i2][0]) -
# len(np.where(iT[:, 1] < int(iT[np.min(jx[i2][0]), 1]))[0]
# )), 0)) for i2 in range(0, len(self.param['P' + str(i1)
# ].time))]
# saving water abs. coefs.
self.param['P' + str(i1)].w_tmp = np.array(w_tmp)
cut = sum([1 for i in range(len(self.param))
if hasattr(self.param['P' + str(i)], 'Intensity')])
# computing corrected intensities based on water abs coef.
self.param['P' + str(i1)].Intensity = \
self.spreading_and_att_correction(i1, aux=1)
ioio = input('Same deployment strategy as CTD ? y/n :')
if ioio == 'y' or ioio == 'Y':
ModeDeployment(self, phy_instr.mode_depl)
elif ioio == 'n' or ioio == 'N':
ModeDeployment(self)
if self.mode_depl == 'Cast':
for i1 in range(0, len(self.param)):
idx = np.unique(iT[np.where(np.abs(iT[:, 0] -
np.unique(self.param['P' + str(i1)].at)) ==
min(np.abs(iT[:, 0] - np.unique(self.param
['P'+str(i1)].at)))), 1])
self.param['P' + str(i1)].update({
'w_obj': phy_instr.param['P' + str(int(float(idx)))
].w_obj})
elif self.mode_depl == 'Mooring':
for i1 in range(0, len(self.param)):
self.param['P' + str(i1)].asv = \
self.param['P' + str(i1)].pop('Intensity_T')
self.param['P' + str(i1)].at = \
self.param['P' + str(i1)].pop('time_T')
idx = np.unique(iT[np.where(np.abs(iT[:, 0] -
np.nanmean(self.param['P' + str(i1)].at
)) ==
min(np.abs(iT[:, 0] - np.nanmean(self.param
['P'+str(i1)].at)))), 1])
self.param['P' + str(i1)].update({
'w_obj': phy_instr.param['P' + str(int(float(idx)))
].w_obj})
if hasattr(self.param['P' + str(i1)], 'adepth'):
self.param['P' + str(i1)].__rename__('BinRange',
'BinRange_raw')
self.param['P' + str(i1)].BinRange =\
np.zeros((len(self.param['P' + str(i1)].adepth),
len(self.param['P' + str(i1)].Frequency))
)
for frq in range(len(self.param['P' + str(i1)].Frequency)):
self.param['P' + str(i1)].BinRange[:, frq] =\
self.param['P' + str(i1)].adepth
try:
self.param['P' + str(i1)].__delattr__('adepth')
except AttributeError():
pass
# ICI : Cas mooring, ajouter un "reshape profile according
# to cw and rho" dans le cas où observations longues durées qui
# nécessite recalcule de ces quantitiés...
[docs] def spreading_and_att_correction(self, i1, aux=None):
"""
Corrects RawIntensity Samples from water absorption and spherical
spreading.
1 - **Attenuation due to water** : several models exist to predict the
amount of energy lost by the acoustic wave as it travels into water; A
most famous example is the model of François & Garrison (1982),
(https://doi.org/10.1121/1.388170).
2 - **Spherical spreading** : the latter is a function of the range and
follows a decrease proportionnal to
:math:`\\frac{1}{r^2}`
for spherical waves.
.. image:: ../_image/tvg.pdf
:width: 100%
:align: center
Parameters
----------
i1=None : int
Profile set number in ``param``
aux=None : bool
if physical parameters were as input of
:func:`hydrac.instruments.acoustique.Acoustic.preproc_acoustic_data`,
water absorption has been computed for
each cell in the HAC profile for each frequency channel hence a
different implementation.
Returns
-------
Intensity : numpy.array
The corrected Intensity profiles for each sets of profiles
"""
if aux is not None:
tag = "self.param['P' + str(i1)].w_tmp[:, freq]"
else:
tag = "np.tile(self.param['P' + str(i1)].alphaw[freq]," +\
"(np.shape(self.param['P' + str(i1)].RawIntensity)[1]))"
print('toto = {}'.format(i1))
any_in = lambda a, b: bool(set(a).intersection(b))
if any_in(self.param['P' + str(i1)], ['ub_mes', 'avcp']):
if any_in(self.param['P' + str(i1)], ['ub_mes']):
instr = 'ub_mes'
else:
instr = 'acvp'
r_tmp1 = np.sqrt(self.param['P' + str(i1)].BinRange[:, 0] *
self.param['P' + str(i1)][instr].BinRange_bj[:, 0]
)
r_tmp2 = .5*(self.param['P' + str(i1)].BinRange[:, 0] +
self.param['P' + str(i1)][instr].BinRange_bj[:, 0])
else:
r_tmp1 = self.param['P' + str(i1)].BinRange[:, 0]
r_tmp2 = self.param['P' + str(i1)].BinRange[:, 0]
r_tmp = self.param['P' + str(i1)].BinRange[:, 0]
Intensity = np.zeros(np.shape(
self.param['P' + str(i1)].RawIntensity))
for freq in range(0, self.param['P' + str(i1)].NumChannels):
Intensity[:, :, freq] = np.power(
10, np.array([20 * np.log10(np.multiply(
np.sqrt(x), np.sqrt(3/(16*np.array(
self.param[
'P' + str(i1)].TransducerKt
)[freq, 1]**2*np.pi)
)*r_tmp1)) + 2 * r_tmp2 * y
for x, y in zip(
self.param['P' + str(i1)].RawIntensity[
:, :, freq].T, eval(tag))]).T / 10)
return Intensity
[docs] def hac_data_decimation(self, autodetect='off', st=None, ed=None):
"""This method corrects for potential corrupt data, for example when
attenuation is such that no more useful signal can be extracted from
the data, or when data in the nearfield have not been corrrected. This
noise detection is done using :func:`autodetect_noise`
Also, given the application, instruments can be deployed
horizontally in the water column. Supposing weak attenuation and
homogeneous medium, one can average the HAC profiles at each depth
sampled by the instrument to increase the Signal to Noise Ratio (SNR).
Corresponding modifications are directly applied to the ``Intensity``
variable of the profile set P contained in ``param``.
Parameters
----------
autodetect : str, {'on', 'off'}
automatic detection of noise floor reached by the instrument and
removal of noise data.
st, ed : int, int
start and end cell number to be considered for either averaging or
noise/nearfield removal.
"""
if self.instr_name != 'aquascat':
return
for i1 in range(0, len(self.param)):
if 'mode_depl' not in self.__dict__.keys():
return print(
'Data need to be assigned a deployment strategy first.'
)
else:
if self.mode_depl != 'Cast':
return print("Data can't be decimated with" +
"{} deployment strategy.".format(
self.param['P' + str(i1)].mode_depl))
else:
bx = np.zeros(np.shape(self.param[
'P' + str(i1)].asv[0, :, :]))
i_maxfreq = int(np.where(self.param[
'P' + str(i1)].FrequencyRx ==
np.max(self.param['P' + str(i1)].
FrequencyRx))[0])
if autodetect == 'on':
st, ed = self.autodetect_noise(self.param
['P' + str(i1)]
['asv' + str(i_maxfreq)]
)
else:
if st is None or ed is None:
st = np.shape(self.param[
'P' + str(i1)].asv1)[1] / 5
ed = 3 * np.shape(self.param[
'P' + str(i1)].asv1)[1] / 5
for i in range(1, 5):
exec('self.param.P' + str(i1) + '.asv' + str(i) +
'=np.nanmean(self.param.P' + str(i1) + '.asv' +
str(i) + '[:,np.int(st):np.int(ed)],1)')
self.param['P' + str(i1)].asv = np.transpose(
np.atleast_3d(np.nanmean(self.param[
'P' + str(i1)].asv[
np.int(st): np.int(ed), :, :], 0)
), (2, 0, 1))
self.depr = 1
[docs] def autodetect_noise(self, data):
"""Noise detection...
"""
return 0, 1
[docs] def red(self, C, x1, x2):
"""Boundary detection for profile spliting
"""
tl = np.shape(C)
if len(tl) == 1:
return '[' + str(x1) + ':' + str(x2) + ']'
elif len(tl) == 2:
if x2 > tl[0]:
return '[:,' + str(x1) + ':' + str(x2) + ']'
else:
return '[' + str(x1) + ':' + str(x2) + ',:]'
elif len(tl) == 3:
if x2 > tl[0] and x2 > tl[1]:
return '[:,:,' + str(x1) + ':' + str(x2) + ']'
elif x2 > tl[0] and x2 > tl[2]:
return '[:,' + str(x1) + ':' + str(x2) + ',:]'
else:
return '[' + str(x1) + ':' + str(x2) + ',:,:]'
[docs] def split_profiles(self, B, b1, b2, bx):
"""If profiles are too big or if several physical instrument profile
sets overlap to the current HAC profile, the latter is split into
several profiles of shorter duration each. The original profile is
removed.
"""
tmpL = []
print('b1 = {}, b2= {}, bx ={}'.format(b1, b2, bx))
target = B.keys()
split_target = [x for x in B.keys() if
[y for y in np.shape(B[x]) if y == len(B.time)]]
npsplit_target = list(target-split_target)
for tke in range(0, len(b1), 1):
tmp = Parameters({})
[tmp.update({x: eval('B["' + x + '"]' + self.red(B[x], bx[tke],
bx[tke + 1]))}) for x in split_target]
[tmp.update({x: B[x]}) for x in npsplit_target]
tmpL.append(tmp)
return tmpL
[docs] def sort_profiles(self):
"""Renames the profile sets according to the current number of profiles
in the set. For exemple, if the original profile set contains the
profile P0, after spliting profiles P0 into P1, P2 and P3 of shorter
durations, those are renames P0, P1 and P2 respectively.
"""
tt1 = list(self.param.keys())
tt1_l = []
[tt1_l.append(int(tt1[idx].split('P')[1]))
for idx in range(0, len(tt1))]
tt2_l = list(range(0, len(tt1_l)))
for u in range(0, len(self.param)):
self.param.__rename__('P' + str(tt1_l[u]), 'P' + str(tt2_l[u]))
def hdf5_save(self):
pass
def netcdf_save(self):
pass
def pickle_save(self):
pass