#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
NNLS algorithm (:mod:`hydrac.inversion.multifrequence.nnls`)
============================================================
.. autoclass:: NNLS
:members:
:private-members:
"""
import numpy as np
from copy import deepcopy
from hydrac.util.parameters import Parameters
class WrongFileTypeError(Exception):
pass
[docs]class NNLS(object):
""" NNLS class
Non-negative least square algorithm, as described in
Lawson & Hanson (1972).
"""
def __init__(self, C, d, **kwargs):
__acceptable_keys_list = ['display', 'TolX']
if type(C) == list:
C = np.array(C)
if type(d) == list:
d = np.array(d)
self.C = C
self.d = d
for k in kwargs.keys():
if k in __acceptable_keys_list:
self.__setattr__(k, kwargs[k])
self.solution, self.w, self.resid, self.resnorm, self.out =\
self.run_alg_NNLS()
del self.C, self.d
def run_alg_NNLS(self):
defaultopt = {'display': 'notify',
'TolX':
'10*np.spacing(1) * np.linalg.norm(C, 1) * len(C)'}
if not hasattr(self, 'display'):
self.display = defaultopt['display']
if not hasattr(self, 'TolX'):
self.TolX = defaultopt['TolX']
C = self.C
d = self.d
if C.dtype == 'complex' or d.dtype == 'complex':
if len(np.where(np.imag(C) > 0)) > 0 or\
len(np.where(np.imag(d) > 0)) > 0:
raise TypeError('Complex matrix or observation vector ' +
'not tolerated...')
# Check for non-double inputs
if C.dtype != 'float' or d.dtype != 'float':
C = C.astype(float)
d = d.astype(float)
printtype = self.display
tol = self.TolX
if type(tol) == str:
tol = eval(tol)
if printtype == 'notify' or printtype == 'notify-detailed':
verbosity = 1
elif printtype == 'none' or printtype == 'off':
verbosity = 0
elif printtype == 'iter' or printtype == 'iter-detailed':
verbosity = 3
elif printtype == 'final' or printtype == 'final-detailed':
verbosity = 2
else:
verbosity = None
n = np.size(C, 1)
# Initialize vector of n zeros and Infs (to be used later)
nZeros = np.zeros((n, 1))
wz = np.zeros((n, 1))
# Initialize set of non-active columns to null
P = np.zeros((n, 1), dtype=bool)
# Initialize set of active columns to all and
# the initial point to zeros
Z = np.ones((n, 1), dtype=bool)
x = deepcopy(nZeros)
resid = d - np.dot(C, x)
w = np.dot(C.T, resid)
output = {}
# Set up iteration criterion
outeriter = 0
_iter = 0
itmax = 3*n
exitflag = 1
# Outer loop to put variables into set to hold positive coefficients
while any(Z) and any(w[Z] > tol):
outeriter = outeriter + 1
# Reset intermediate solution z
z = deepcopy(nZeros)
# Create wz, a Lagrange multiplier vector of variables in the
# zero set. wz must have the same size as w to preserve the
# correct indices, so set multipliers to -Inf for variables
# outside of the zero set.
wz[P] = -np.Inf
wz[Z] = w[Z]
# Find variable with largest Lagrange multiplier
t, tt = np.where(wz == wz.max())
t = t[0]
# Move variable t from zero set to positive set
P[t] = True
Z[t] = False
# Compute intermediate solution using
# only variables in positive set
z[P.T[0] == True] = np.linalg.lstsq(C[:, P.T[0] == True], d)[0]
# inner loop to remove elements from
# the positive set which no longer belong
while any(z[P] <= 0):
_iter = _iter + 1
if _iter > itmax:
msg = 'Iteration Count Exceeded'
if verbosity:
print(msg)
exitflag = 0
output['iterations'] = outeriter
output['message'] = msg
output['algorithm'] = 'active-set'
resnorm = sum(resid*resid)
x = deepcopy(z)
lambd = deepcopy(w)
return x, lambd, resid, resnorm, Parameters(output)
# Find indices where intermediate solution
# z is approximately negative
Q = (z <= 0) == P
# Choose new x subject to keeping new x nonnegative
alpha = min(x[Q] / (x[Q] - z[Q]))
x = x + alpha * (z - x)
# Reset Z and P given intermediate values of x
Z = np.logical_or(((abs(x) < tol) == P), Z)
P = np.logical_not(Z)
# Reset z
z = deepcopy(nZeros)
# Re-solve for z
z[P.T[0] == True] = np.linalg.lstsq(C[:, P.T[0] == True], d)[0]
x = deepcopy(z)
resid = d - np.dot(C, x)
w = np.dot(C.T, resid)
lambd = deepcopy(w)
resnorm = np.dot(resid.T, resid)
output['iterations'] = outeriter
output['algorithm'] = 'active-set'
msg = 'Optimization Terminated'
if verbosity > 1:
print(msg)
output['message'] = msg
return x, w, resid, resnorm, Parameters(output)
def update_solution(self, new_sol):
for k in self.__dict__.keys():
if type(self.__getattribute__(k)) == np.float64 \
or type(self.__getattribute__(k)) == np.ndarray:
self.__setattr__(k, np.concatenate((
np.atleast_2d(self.__getattribute__(k)),
np.atleast_2d(new_sol.__getattribute__(k))), axis=1))