# -*- coding: utf-8 -*-
from pygsp import utils
from pygsp.operators import fast_filtering, operator
import numpy as np
from math import log
from copy import deepcopy
[docs]class Filter(object):
r"""Parent class for all Filters or Filterbanks, contains the shared methods for those classes."""
def __init__(self, G, filters=None, **kwargs):
self.logger = utils.build_logger(__name__, **kwargs)
if not hasattr(G, 'lmax'):
self.logger.info('{} : has to compute lmax'.format(
self.__class__.__name__))
G.estimate_lmax()
self.G = G
if filters:
if isinstance(filters, list):
self.g = filters
else:
self.g = [filters]
else:
self.g = []
[docs] def analysis(self, s, method=None, cheb_order=30, lanczos_order=30, **kwargs):
r"""
Operator to analyse a filterbank
Parameters
----------
s : ndarray
graph signals to analyse
method : string
wether using an exact method, cheby approx or lanczos
cheb_order : int
Order for chebyshev
Returns
-------
c : ndarray
Transform coefficients
Examples
--------
>>> import numpy as np
>>> from pygsp import graphs, filters
>>> G = graphs.Logo()
>>> MH = filters.MexicanHat(G)
>>> x = np.arange(G.N**2).reshape(G.N, G.N)
>>> co = MH.analysis(x)
:cite:`hammond2011wavelets`
"""
if not method:
method = 'exact' if hasattr(self.G, 'U') else 'cheby'
self.logger.info('The analysis method is {}'.format(method))
Nf = len(self.g)
N = self.G.N
if method == 'exact':
if not hasattr(self.G, 'e') or not hasattr(self.G, 'U'):
self.logger.info('The Fourier matrix is not available. '
'The function will compute it for you.')
self.G.compute_fourier_basis()
try:
Nv = np.shape(s)[1]
c = np.zeros((N * Nf, Nv))
is2d = True
except IndexError:
c = np.zeros((N * Nf))
is2d = False
fie = self.evaluate(self.G.e)
if Nf == 1:
if is2d:
return operator.igft(self.G, np.tile(fie, (Nv, 1)).T *
operator.gft(self.G, s))
else:
return operator.igft(self.G, fie * operator.gft(self.G, s))
else:
tmpN = np.arange(N, dtype=int)
for i in range(Nf):
if is2d:
c[tmpN + N*i] = operator.igft(self.G, np.tile(fie[:][i], (Nv, 1)).T *
operator.gft(self.G, s))
else:
c[tmpN + N*i] = operator.igft(self.G, fie[:][i] *
operator.gft(self.G, s))
elif method == 'cheby': # Chebyshev approx
if not hasattr(self.G, 'lmax'):
self.logger.info('FILTER_ANALYSIS: The variable lmax is not '
'available. The function will compute '
'it for you.')
self.G.estimate_lmax()
cheb_coef = fast_filtering.compute_cheby_coeff(self, m=cheb_order)
c = fast_filtering.cheby_op(self.G, cheb_coef, s)
elif method == 'lanczos':
c = fast_filtering.lanczos_op(self, s, self.G, order=lanczos_order)
else:
raise ValueError('Unknown method: please select exact, '
'cheby or lanczos')
return c
@utils.filterbank_handler
def evaluate(self, x, i=0):
r"""
Evaluation of the Filterbank
Parameters
----------
x = ndarray
Data
i = int
Indice of the filter to evaluate
Returns
-------
fd = ndarray
Response of the filter
Examples
--------
>>> import numpy as np
>>> from pygsp import graphs, filters
>>> G = graphs.Logo()
>>> MH = filters.MexicanHat(G)
>>> x = np.arange(2)
>>> eva = MH.evaluate(x)
"""
fd = np.zeros(x.size)
fd = self.g[i](x)
return fd
[docs] def inverse(self, c, **kwargs):
raise NotImplementedError
[docs] def synthesis(self, c, order=30, method=None, **kwargs):
r"""
Synthesis operator of a filterbank
Parameters
----------
G : Graph structure.
c : Transform coefficients
method : Select the method ot be used for the computation.
- 'exact' : Exact method using the graph Fourier matrix
- 'cheby' : Chebyshev polynomial approximation
- 'lanczos' : Lanczos approximation
Default : if the Fourier matrix is present: 'exact' otherwise
'cheby'
order : Degree of the Chebyshev approximation
Default is 30
Returns
-------
signal : sythesis signal
Examples
--------
Reference
----------
See :cite:`hammond2011wavelets` for more details.
"""
Nf = len(self.g)
N = self.G.N
if not method:
if hasattr(self.G, 'U'):
method = 'exact'
else:
method = 'cheby'
if method == 'exact':
if not hasattr(self.G, 'e') or not hasattr(self.G, 'U'):
self.logger.info("The Fourier matrix is not available. "
"The function will compute it for you.")
self.G.compute_fourier_basis()
fie = self.evaluate(self.G.e)
Nv = np.shape(c)[1]
s = np.zeros((N, Nv))
tmpN = np.arange(N, dtype=int)
if Nf == 1:
s += operator.igft(np.conjugate(self.G.U),
np.tile(fie, (Nv, 1)).T *
operator.gft(self.G, c[N*i + tmpN]))
else:
for i in range(Nf):
s += operator.igft(np.conjugate(self.G.U),
np.tile(fie[:][i], (Nv, 1)).T *
operator.gft(self.G, c[N*i + tmpN]))
elif method == 'cheby':
if hasattr(self.G, 'lmax'):
self.logger.info('The variable lmax is not available. '
'The function will compute it for you.')
self.G.estimate_lmax()
cheb_coeffs = operator.compute_cheby_coeff(self, m=order, N=order + 1)
s = np.zeros((N, np.shape(c)[1]))
tmpN = np.arange(N, dtype=int)
for i in range(Nf):
s = s + operator.cheby_op(self.G, cheb_coeffs[i], c[i*N + tmpN])
elif method == 'lanczos':
s = np.zeros((N, np.shape(c)[1]))
tmpN = np.arange(N, dtype=int)
for i in range(Nf):
s += fast_filtering.lanczos_op(self.G, self.g[i], c[i*N + tmpN],
order=order)
else:
raise ValueError('Unknown method: please select exact,'
' cheby or lanczos')
return s
[docs] def approx(m, N, **kwargs):
raise NotImplementedError
[docs] def tighten():
raise NotImplementedError
[docs] def filterbank_bounds(self, N=999, bounds=None):
r"""
Compute approximate frame bounds for a filterbank.
Parameters
----------
bounds : interval to compute the bound.
Given in an ndarray: np.array([xmin, xnmax]).
By default, bounds is None and filtering is bounded
by the eigenvalues of G.
N : Number of point for the line search
Default is 999
Returns
-------
lower : Filterbank lower bound
upper : Filterbank upper bound
"""
if bounds:
xmin, xmax = bounds
rng = np.linspace(xmin, xmax, N)
else:
if not hasattr(self.G, 'lmax'):
self.logger.info('FILTERBANK_BOUNDS: Has to estimate lmax.')
self.G.estimate_lmax()
if not hasattr(self.G, 'e'):
self.logger.info('FILTERBANK_BOUNDS: Has to compute Fourier basis.')
self.G.compute_fourier_basis()
rng = self.G.e
sum_filters = np.sum(np.abs(np.power(self.evaluate(rng), 2)), axis=0)
return np.min(sum_filters), np.max(sum_filters)
[docs] def filterbank_matrix(self):
r"""
Create the matrix of the filterbank frame.
This function creates the matrix associated to the filterbank g.
The size of the matrix is MN x N, where M is the number of filters.
Returns
-------
F : Frame
"""
N = self.G.N
if N > 2000:
self.logger.warning('Create a big matrix, you can use other methods.')
Nf = len(self.g)
Ft = self.analysis(np.identity(N))
F = np.zeros(np.shape(Ft.T))
tmpN = np.arange(N, dtype=int)
for i in range(Nf):
F[:, N*i + tmpN] = Ft[N*i + tmpN]
return F
[docs] def wlog_scales(self, lmin, lmax, Nscales, t1=1, t2=2):
r"""
Compute logarithm scales for wavelets
Parameters
----------
lmin : int
Minimum non-zero eigenvalue
lmax : int
Maximum eigenvalue
Nscales : int
Number of scales
Returns
-------
s : ndarray
Scale
"""
smin = t1/lmax
smax = t2/lmin
s = np.exp(np.linspace(log(smax), log(smin), Nscales))
return s
[docs] def can_dual(self):
r"""
Creates a dual graph form a given graph
"""
def can_dual_func(g, n, x):
# Nshape = np.shape(x)
x = np.ravel(x)
N = np.shape(x)[0]
M = len(g.g)
gcoeff = g.evaluate(x).T
s = np.zeros((N, M))
for i in range(N):
s[i] = np.linalg.pinv(np.expand_dims(gcoeff[i], axis=1))
ret = s[:, n]
return ret
gdual = deepcopy(self)
Nf = len(self.g)
for i in range(Nf):
gdual.g[i] = lambda x, ind=i: can_dual_func(self, ind, deepcopy(x))
return gdual
[docs] def plot(self, **kwargs):
r"""
Plot the filter.
See plotting doc.
"""
from pygsp import plotting
plotting.plot_filter(self, **kwargs)