Source code for hydrac.inversion.multifrequence.nnls

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
NNLS algorithm (:mod:`hydrac.inversion.multifrequence.nnls`)
============================================================

.. autoclass:: NNLS
   :members:
   :private-members:
"""


import numpy as np
from copy import deepcopy
from hydrac.util.parameters import Parameters


class WrongFileTypeError(Exception):
    pass


[docs]class NNLS(object): """ NNLS class Non-negative least square algorithm, as described in Lawson & Hanson (1972). """ def __init__(self, C, d, **kwargs): __acceptable_keys_list = ['display', 'TolX'] if type(C) == list: C = np.array(C) if type(d) == list: d = np.array(d) self.C = C self.d = d for k in kwargs.keys(): if k in __acceptable_keys_list: self.__setattr__(k, kwargs[k]) self.solution, self.w, self.resid, self.resnorm, self.out =\ self.run_alg_NNLS() del self.C, self.d def run_alg_NNLS(self): defaultopt = {'display': 'notify', 'TolX': '10*np.spacing(1) * np.linalg.norm(C, 1) * len(C)'} if not hasattr(self, 'display'): self.display = defaultopt['display'] if not hasattr(self, 'TolX'): self.TolX = defaultopt['TolX'] C = self.C d = self.d if C.dtype == 'complex' or d.dtype == 'complex': if len(np.where(np.imag(C) > 0)) > 0 or\ len(np.where(np.imag(d) > 0)) > 0: raise TypeError('Complex matrix or observation vector ' + 'not tolerated...') # Check for non-double inputs if C.dtype != 'float' or d.dtype != 'float': C = C.astype(float) d = d.astype(float) printtype = self.display tol = self.TolX if type(tol) == str: tol = eval(tol) if printtype == 'notify' or printtype == 'notify-detailed': verbosity = 1 elif printtype == 'none' or printtype == 'off': verbosity = 0 elif printtype == 'iter' or printtype == 'iter-detailed': verbosity = 3 elif printtype == 'final' or printtype == 'final-detailed': verbosity = 2 else: verbosity = None n = np.size(C, 1) # Initialize vector of n zeros and Infs (to be used later) nZeros = np.zeros((n, 1)) wz = np.zeros((n, 1)) # Initialize set of non-active columns to null P = np.zeros((n, 1), dtype=bool) # Initialize set of active columns to all and # the initial point to zeros Z = np.ones((n, 1), dtype=bool) x = deepcopy(nZeros) resid = d - np.dot(C, x) w = np.dot(C.T, resid) output = {} # Set up iteration criterion outeriter = 0 _iter = 0 itmax = 3*n exitflag = 1 # Outer loop to put variables into set to hold positive coefficients while any(Z) and any(w[Z] > tol): outeriter = outeriter + 1 # Reset intermediate solution z z = deepcopy(nZeros) # Create wz, a Lagrange multiplier vector of variables in the # zero set. wz must have the same size as w to preserve the # correct indices, so set multipliers to -Inf for variables # outside of the zero set. wz[P] = -np.Inf wz[Z] = w[Z] # Find variable with largest Lagrange multiplier t, tt = np.where(wz == wz.max()) t = t[0] # Move variable t from zero set to positive set P[t] = True Z[t] = False # Compute intermediate solution using # only variables in positive set z[P.T[0] == True] = np.linalg.lstsq(C[:, P.T[0] == True], d)[0] # inner loop to remove elements from # the positive set which no longer belong while any(z[P] <= 0): _iter = _iter + 1 if _iter > itmax: msg = 'Iteration Count Exceeded' if verbosity: print(msg) exitflag = 0 output['iterations'] = outeriter output['message'] = msg output['algorithm'] = 'active-set' resnorm = sum(resid*resid) x = deepcopy(z) lambd = deepcopy(w) return x, lambd, resid, resnorm, Parameters(output) # Find indices where intermediate solution # z is approximately negative Q = (z <= 0) == P # Choose new x subject to keeping new x nonnegative alpha = min(x[Q] / (x[Q] - z[Q])) x = x + alpha * (z - x) # Reset Z and P given intermediate values of x Z = np.logical_or(((abs(x) < tol) == P), Z) P = np.logical_not(Z) # Reset z z = deepcopy(nZeros) # Re-solve for z z[P.T[0] == True] = np.linalg.lstsq(C[:, P.T[0] == True], d)[0] x = deepcopy(z) resid = d - np.dot(C, x) w = np.dot(C.T, resid) lambd = deepcopy(w) resnorm = np.dot(resid.T, resid) output['iterations'] = outeriter output['algorithm'] = 'active-set' msg = 'Optimization Terminated' if verbosity > 1: print(msg) output['message'] = msg return x, w, resid, resnorm, Parameters(output) def update_solution(self, new_sol): for k in self.__dict__.keys(): if type(self.__getattribute__(k)) == np.float64 \ or type(self.__getattribute__(k)) == np.ndarray: self.__setattr__(k, np.concatenate(( np.atleast_2d(self.__getattribute__(k)), np.atleast_2d(new_sol.__getattribute__(k))), axis=1))