# -*- coding: utf-8 -*-
import numpy as np
from pygsp import utils
# prevent circular import in Python < 3.5
from . import approximations
_logger = utils.build_logger(__name__)
[docs]class Filter(object):
r"""
The base Filter class.
* Provide a common interface (and implementation) to filter objects.
* Can be instantiated to construct custom filters from functions.
* Initialize attributes for derived classes.
Parameters
----------
G : graph
The graph to which the filter bank is tailored.
kernels : function or list of functions
A (list of) function(s) defining the filter bank in the Fourier domain.
One function per filter.
Attributes
----------
G : Graph
The graph to which the filter bank was tailored. It is a reference to
the graph passed when instantiating the class.
kernels : function or list of functions
A (list of) function defining the filter bank. One function per filter.
Either passed by the user when instantiating the base class, either
constructed by the derived classes.
Nf : int
Number of filters in the filter bank.
Examples
--------
>>>
>>> G = graphs.Logo()
>>> my_filter = filters.Filter(G, lambda x: x / (1. + x))
>>>
>>> # Signal: Kronecker delta.
>>> signal = np.zeros(G.N)
>>> signal[42] = 1
>>>
>>> filtered_signal = my_filter.filter(signal)
"""
def __init__(self, G, kernels):
self.G = G
try:
iter(kernels)
except TypeError:
kernels = [kernels]
self._kernels = kernels
self.Nf = len(kernels)
[docs] def evaluate(self, x):
r"""Evaluate the kernels at given frequencies.
Parameters
----------
x : ndarray
Graph frequencies at which to evaluate the filter.
Returns
-------
y : ndarray
Frequency response of the filters. Shape ``(G.Nf, len(x))``.
Examples
--------
Frequency response of a low-pass filter:
>>> import matplotlib.pyplot as plt
>>> G = graphs.Logo()
>>> G.compute_fourier_basis()
>>> f = filters.Expwin(G)
>>> G.compute_fourier_basis()
>>> y = f.evaluate(G.e)
>>> plt.plot(G.e, y[0]) # doctest: +ELLIPSIS
[<matplotlib.lines.Line2D object at ...>]
"""
# Avoid to copy data as with np.array([g(x) for g in self._kernels]).
y = np.empty((self.Nf, len(x)))
for i, g in enumerate(self._kernels):
y[i] = g(x)
return y
[docs] def filter(self, s, method='chebyshev', order=30):
r"""Filter signals (analysis or synthesis).
A signal is defined as a rank-3 tensor of shape ``(N_NODES, N_SIGNALS,
N_FEATURES)``, where ``N_NODES`` is the number of nodes in the graph,
``N_SIGNALS`` is the number of independent signals, and ``N_FEATURES``
is the number of features which compose a graph signal, or the
dimensionality of a graph signal. For example if you filter a signal
with a filter bank of 8 filters, you're extracting 8 features and
decomposing your signal into 8 parts. That is called analysis. Your are
thus transforming your signal tensor from ``(G.N, 1, 1)`` to ``(G.N, 1,
8)``. Now you may want to combine back the features to form an unique
signal. For this you apply again 8 filters, one filter per feature, and
sum the result up. As such you're transforming your ``(G.N, 1, 8)``
tensor signal back to ``(G.N, 1, 1)``. That is known as synthesis. More
generally, you may want to map a set of features to another, though
that is not implemented yet.
The method computes the transform coefficients of a signal :math:`s`,
where the atoms of the transform dictionary are generalized
translations of each graph spectral filter to each vertex on the graph:
.. math:: c = D^* s,
where the columns of :math:`D` are :math:`g_{i,m} = T_i g_m` and
:math:`T_i` is a generalized translation operator applied to each
filter :math:`\hat{g}_m(\cdot)`. Each column of :math:`c` is the
response of the signal to one filter.
In other words, this function is applying the analysis operator
:math:`D^*`, respectively the synthesis operator :math:`D`, associated
with the frame defined by the filter bank to the signals.
Parameters
----------
s : ndarray
Graph signals, a tensor of shape ``(N_NODES, N_SIGNALS,
N_FEATURES)``, where ``N_NODES`` is the number of nodes in the
graph, ``N_SIGNALS`` the number of independent signals you want to
filter, and ``N_FEATURES`` is either 1 (analysis) or the number of
filters in the filter bank (synthesis).
method : {'exact', 'chebyshev'}
Whether to use the exact method (via the graph Fourier transform)
or the Chebyshev polynomial approximation. A Lanczos
approximation is coming.
order : int
Degree of the Chebyshev polynomials.
Returns
-------
s : ndarray
Graph signals, a tensor of shape ``(N_NODES, N_SIGNALS,
N_FEATURES)``, where ``N_NODES`` and ``N_SIGNALS`` are the number
of nodes and signals of the signal tensor that pas passed in, and
``N_FEATURES`` is either 1 (synthesis) or the number of filters in
the filter bank (analysis).
References
----------
See :cite:`hammond2011wavelets` for details on filtering graph signals.
Examples
--------
Create a bunch of smooth signals by low-pass filtering white noise:
>>> import matplotlib.pyplot as plt
>>> G = graphs.Ring(N=60)
>>> G.estimate_lmax()
>>> s = np.random.RandomState(42).uniform(size=(G.N, 10))
>>> taus = [1, 10, 100]
>>> s = filters.Heat(G, taus).filter(s)
>>> s.shape
(60, 10, 3)
Plot the 3 smoothed versions of the 10th signal:
>>> fig, ax = plt.subplots()
>>> G.set_coordinates('line1D') # To visualize multiple signals in 1D.
>>> G.plot_signal(s[:, 9, :], ax=ax)
>>> legend = [r'$\tau={}$'.format(t) for t in taus]
>>> ax.legend(legend) # doctest: +ELLIPSIS
<matplotlib.legend.Legend object at ...>
Low-pass filter a delta to create a localized smooth signal:
>>> G = graphs.Sensor(30, seed=42)
>>> G.compute_fourier_basis() # Reproducible computation of lmax.
>>> s1 = np.zeros(G.N)
>>> s1[13] = 1
>>> s1 = filters.Heat(G, 3).filter(s1)
>>> s1.shape
(30,)
Filter and reconstruct our signal:
>>> g = filters.MexicanHat(G, Nf=4)
>>> s2 = g.analyze(s1)
>>> s2.shape
(30, 4)
>>> s2 = g.synthesize(s2)
>>> s2.shape
(30,)
Look how well we were able to reconstruct:
>>> fig, axes = plt.subplots(1, 2)
>>> G.plot_signal(s1, ax=axes[0])
>>> G.plot_signal(s2, ax=axes[1])
>>> print('{:.5f}'.format(np.linalg.norm(s1 - s2)))
0.29620
Perfect reconstruction with Itersine, a tight frame:
>>> g = filters.Itersine(G)
>>> s2 = g.analyze(s1, method='exact')
>>> s2 = g.synthesize(s2, method='exact')
>>> np.linalg.norm(s1 - s2) < 1e-10
True
"""
if s.shape[0] != self.G.N:
raise ValueError('First dimension should be the number of nodes '
'G.N = {}, got {}.'.format(self.G.N, s.shape))
# TODO: not in self.Nin (Nf = Nin x Nout).
if s.ndim == 1 or s.shape[-1] not in [1, self.Nf]:
if s.ndim == 3:
raise ValueError('Third dimension (#features) should be '
'either 1 or the number of filters Nf = {}, '
'got {}.'.format(self.Nf, s.shape))
s = np.expand_dims(s, -1)
n_features_in = s.shape[-1]
if s.ndim < 3:
s = np.expand_dims(s, 1)
n_signals = s.shape[1]
if s.ndim > 3:
raise ValueError('At most 3 dimensions: '
'#nodes x #signals x #features.')
assert s.ndim == 3
# TODO: generalize to 2D (m --> n) filter banks.
# Only 1 --> Nf (analysis) and Nf --> 1 (synthesis) for now.
n_features_out = self.Nf if n_features_in == 1 else 1
if method == 'exact':
# TODO: will be handled by g.adjoint().
axis = 1 if n_features_in == 1 else 2
f = self.evaluate(self.G.e)
f = np.expand_dims(f.T, axis)
assert f.shape == (self.G.N, n_features_in, n_features_out)
s = self.G.gft(s)
s = np.matmul(s, f)
s = self.G.igft(s)
elif method == 'chebyshev':
# TODO: update Chebyshev implementation (after 2D filter banks).
c = approximations.compute_cheby_coeff(self, m=order)
if n_features_in == 1: # Analysis.
s = s.squeeze(axis=2)
s = approximations.cheby_op(self.G, c, s)
s = s.reshape((self.G.N, n_features_out, n_signals), order='F')
s = s.swapaxes(1, 2)
elif n_features_in == self.Nf: # Synthesis.
s = s.swapaxes(1, 2)
s_in = s.reshape(
(self.G.N * n_features_in, n_signals), order='F')
s = np.zeros((self.G.N, n_signals))
tmpN = np.arange(self.G.N, dtype=int)
for i in range(n_features_in):
s += approximations.cheby_op(self.G,
c[i],
s_in[i * self.G.N + tmpN])
s = np.expand_dims(s, 2)
else:
raise ValueError('Unknown method {}.'.format(method))
# Return a 1D signal if e.g. a 1D signal was filtered by one filter.
return s.squeeze()
[docs] def analyze(self, s, method='chebyshev', order=30):
r"""Convenience alias to :meth:`filter`."""
if s.ndim == 3 and s.shape[-1] != 1:
raise ValueError('Last dimension (#features) should be '
'1, got {}.'.format(s.shape))
return self.filter(s, method, order)
[docs] def synthesize(self, s, method='chebyshev', order=30):
r"""Convenience wrapper around :meth:`filter`.
Will be an alias to `adjoint().filter()` in the future.
"""
if s.shape[-1] != self.Nf:
raise ValueError('Last dimension (#features) should be the number '
'of filters Nf = {}, got {}.'.format(self.Nf,
s.shape))
return self.filter(s, method, order)
[docs] def localize(self, i, **kwargs):
r"""Localize the kernels at a node (to visualize them).
That is particularly useful to visualize a filter in the vertex domain.
A kernel is localized by filtering a Kronecker delta, i.e.
.. math:: g(L) s = g(L)_i, \text{ where } s_j = \delta_{ij} =
\begin{cases} 0 \text{ if } i \neq j \\
1 \text{ if } i = j \end{cases}
Parameters
----------
i : int
Index of the node where to localize the kernel.
kwargs: dict
Parameters to be passed to the :meth:`analyze` method.
Returns
-------
s : ndarray
Kernel localized at vertex i.
Examples
--------
Visualize heat diffusion on a grid by localizing the heat kernel.
>>> import matplotlib
>>> N = 20
>>> DELTA = N//2 * (N+1)
>>> G = graphs.Grid2d(N)
>>> G.estimate_lmax()
>>> g = filters.Heat(G, 100)
>>> s = g.localize(DELTA)
>>> G.plot_signal(s, highlight=DELTA)
"""
s = np.zeros(self.G.N)
s[i] = 1
return np.sqrt(self.G.N) * self.filter(s, **kwargs)
[docs] def estimate_frame_bounds(self, min=0, max=None, N=1000,
use_eigenvalues=False):
r"""Estimate lower and upper frame bounds.
The frame bounds are estimated using the vector :code:`np.linspace(min,
max, N)` with min=0 and max=G.lmax by default. The eigenvalues G.e can
be used instead if you set use_eigenvalues to True.
Parameters
----------
min : float
The lowest value the filter bank is evaluated at. By default
filtering is bounded by the eigenvalues of G, i.e. min = 0.
max : float
The largest value the filter bank is evaluated at. By default
filtering is bounded by the eigenvalues of G, i.e. max = G.lmax.
N : int
Number of points where the filter bank is evaluated.
Default is 1000.
use_eigenvalues : bool
Set to True to use the Laplacian eigenvalues instead.
Returns
-------
A : float
Lower frame bound of the filter bank.
B : float
Upper frame bound of the filter bank.
Examples
--------
>>> G = graphs.Logo()
>>> G.compute_fourier_basis()
>>> f = filters.MexicanHat(G)
Bad estimation:
>>> A, B = f.estimate_frame_bounds(min=1, max=20, N=100)
>>> print('A={:.3f}, B={:.3f}'.format(A, B))
A=0.125, B=0.270
Better estimation:
>>> A, B = f.estimate_frame_bounds()
>>> print('A={:.3f}, B={:.3f}'.format(A, B))
A=0.177, B=0.270
Best estimation:
>>> G.compute_fourier_basis()
>>> A, B = f.estimate_frame_bounds(use_eigenvalues=True)
>>> print('A={:.3f}, B={:.3f}'.format(A, B))
A=0.177, B=0.270
The Itersine filter bank defines a tight frame:
>>> f = filters.Itersine(G)
>>> A, B = f.estimate_frame_bounds(use_eigenvalues=True)
>>> print('A={:.3f}, B={:.3f}'.format(A, B))
A=1.000, B=1.000
"""
if max is None:
max = self.G.lmax
if use_eigenvalues:
x = self.G.e
else:
x = np.linspace(min, max, N)
sum_filters = np.sum(np.abs(self.evaluate(x)**2), axis=0)
return sum_filters.min(), sum_filters.max()
[docs] def compute_frame(self, **kwargs):
r"""Compute the associated frame.
The size of the returned matrix operator :math:`D` is N x MN, where M
is the number of filters and N the number of nodes. Multiplying this
matrix with a set of signals is equivalent to analyzing them with the
associated filterbank. Though computing this matrix is a rather
inefficient way of doing it.
The frame is defined as follows:
.. math:: g_i(L) = U g_i(\Lambda) U^*,
where :math:`g` is the filter kernel, :math:`L` is the graph Laplacian,
:math:`\Lambda` is a diagonal matrix of the Laplacian's eigenvalues,
and :math:`U` is the Fourier basis, i.e. its columns are the
eigenvectors of the Laplacian.
Parameters
----------
kwargs: dict
Parameters to be passed to the :meth:`analyze` method.
Returns
-------
frame : ndarray
Matrix of size N x MN.
See also
--------
filter: more efficient way to filter signals
Examples
--------
Filtering signals as a matrix multiplication.
>>> G = graphs.Sensor(N=1000, seed=42)
>>> G.estimate_lmax()
>>> f = filters.MexicanHat(G, Nf=6)
>>> s = np.random.uniform(size=G.N)
>>>
>>> frame = f.compute_frame()
>>> frame.shape
(1000, 1000, 6)
>>> frame = frame.reshape(G.N, -1).T
>>> s1 = np.dot(frame, s)
>>> s1 = s1.reshape(G.N, -1)
>>>
>>> s2 = f.filter(s)
>>> np.all((s1 - s2) < 1e-10)
True
"""
if self.G.N > 2000:
_logger.warning('Creating a big matrix, you can use other means.')
# Filter one delta per vertex.
s = np.identity(self.G.N)
return self.filter(s, **kwargs)
[docs] def can_dual(self):
r"""Creates a dual graph form a given graph"""
def can_dual_func(g, n, x):
# Nshape = np.shape(x)
x = np.ravel(x)
N = np.shape(x)[0]
M = g.Nf
gcoeff = g.evaluate(x).T
s = np.zeros((N, M))
for i in range(N):
s[i] = np.linalg.pinv(np.expand_dims(gcoeff[i], axis=1))
ret = s[:, n]
return ret
kernels = []
for i in range(self.Nf):
kernels.append(lambda x, i=i: can_dual_func(self, i, x))
return Filter(self.G, kernels)
[docs] def plot(self, **kwargs):
r"""Plot the filter bank's frequency response.
See :func:`pygsp.plotting.plot_filter`.
"""
from pygsp import plotting
plotting.plot_filter(self, **kwargs)