Source code for hydrac.inversion.run_inversion_NNLS

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Non-Negative Laast Square Algorithm method (:mod:`hydrac.inversion.run_inversion_NNLS`)
=======================================================================================

.. autoclass:: Run_inversion_NNLS
   :members:
   :private-members:

"""
import numpy as np
from copy import deepcopy
from hydrac.inversion.multifrequence.nnls import NNLS


[docs]class Run_inversion_NNLS(object): """NNLS based inversion scheme class This class uses an Non-Negative Least-Square scheme as descibed in Greenlaw & Johnson. 1983 ("Multiple-frequency acoustical estimation." Biological Oceanography 2.2-4: 227-252), with a special constrained brought by the Levenberg-Marquardt method. This inversion method can work for multifrequency observations only. This class calls out the :mod:`hydrac.inversion.multifrequence`, in which the NNLS algorithm is contained (:class:`hydrac.inversion.multifrequence.nnls.NNLS`). The system inverted by the NNLS algorithm is underdetermined by nature as the number of size classes contained in the sample volume is infinite in practice, while we only have a few frequency observation at our disposal. Hopwever the Levenberg-Marquardt method allows to solve for such ill-posed systems with an underestimation parameter of 4 (meaning there is 4 times more unknowns than observations). Parameters ---------- obj : Hydroacoustic object defined by :class:`hydrac.instruments.acoustique.Acoustic`. """ def __init__(self, obj, **kwargs): self.obj = obj self.kwargs = kwargs if 'depr' in self.kwargs: if kwargs['depr'] is not None: if hasattr(self.obj.o_hac, 'depr') is False: self.obj.o_hac.hac_data_decimation() if self.obj.o_hac.tag_M == 'multi': self.obj.calc_inv(self.elementary_inv, self.obj.o_hac.param) else: return print('Impossible to invert the data with NNLS method')
[docs] def elementary_inv(self, BX, model): """ Computes the inversion of a single sample profile set of N observations by calling the class :class:`hydrac.inversion.multifrequence.nnls.NNLS` . Parameters ---------- BX : Subset of the calibrated Intensity 3D - matrix for one sample, thus keeping the information relative to the range in order to account for the potential effects of sediment scattering attenuation. Intensity (N x Ns x F) --> BX (N x F) model : paramcontainer of model presets computed by the package :mod:`hydrac.model`. Contains all the information relative to the particle physical properties, as well as scattering properties. """ BX[np.where(np.logical_or(10 * np.log10(BX) < -140, 10 * np.log10(BX) > -10))] = np.NaN print(model.name_model()) sfactr = 1e-6 for ix in range(0, np.size(BX, 0)): print(ix) tmp = BX[ix, :, :] tmp2 = np.multiply(tmp, 1 / tmp) inv_tag = np.isfinite(tmp) # to be corrected when switch to models for each time stamp/depth a, b = np.shape(model.scattering.aij) Cinit = [] anorm = [] [Cinit.append(np.multiply(model.scattering.aij.T, np.tile( np.atleast_2d(np.array(1 / oo)), (np.size( model.scattering.aij, 1), 1))) ) for oo in list(tmp)] [anorm.append(np.sqrt(np.nansum(np.nansum(np.multiply( o, o))) / (a * b))) for o in Cinit] tmp2 = np.concatenate((tmp2, np.zeros((np.size(tmp2, 0), b)) ), axis=1) taij = [] [taij.append(np.concatenate((op1, op2 * sfactr * np.eye(b, b)), axis=1) ) for op1, op2 in zip(Cinit, anorm)] inv_nnls = [NNLS(mat.T, np.atleast_2d(obs).T, display='none') for mat, obs in zip(taij, tmp2)] norm = [] [norm.append(op1[0:b, 0:a].T@op2.solution[0:b]-np.atleast_2d( op3[0:a]).T) for op1, op2, op3 in zip(taij, inv_nnls, tmp2)] list(map(lambda x, y: x.__setattr__('norm', y), inv_nnls, norm)) fnj = lambda o, y, z, t: o.__setattr__(y, z(o.__getattribute__(t))) list(map(fnj, inv_nnls, ['xnorm'] * len(inv_nnls), [np.linalg.norm] * len(inv_nnls), ['solution'] * len(inv_nnls))) list(map(fnj, inv_nnls, ['rnorm'] * len(inv_nnls), [np.linalg.norm] * len(inv_nnls), ['norm'] * len(inv_nnls))) [o.__setattr__('bv', (4 * np.pi / 3) * np.multiply(np.power( model.scattering.a_UND, 3), o.solution.T).T) for o in inv_nnls] if ix == 0: INV = deepcopy(inv_nnls) else: [o.update_solution(v) for o, v in zip(INV, inv_nnls)] return INV