Source code for pygsp.utils

r"""
The :mod:`pygsp.utils` module implements some utility functions used throughout
the package.
"""

import functools
import io
import logging
import pkgutil

import numpy as np
import scipy.io
from scipy import sparse


[docs] def build_logger(name): logger = logging.getLogger(name) if not logger.handlers: formatter = logging.Formatter( "%(asctime)s:[%(levelname)s](%(name)s.%(funcName)s): %(message)s" ) steam_handler = logging.StreamHandler() steam_handler.setLevel(logging.DEBUG) steam_handler.setFormatter(formatter) logger.setLevel(logging.DEBUG) logger.addHandler(steam_handler) return logger
logger = build_logger(__name__)
[docs] def filterbank_handler(func): # Preserve documentation of func. @functools.wraps(func) def inner(f, *args, **kwargs): if "i" in kwargs: return func(f, *args, **kwargs) elif f.Nf <= 1: return func(f, *args, **kwargs) else: output = [] for i in range(f.Nf): output.append(func(f, *args, i=i, **kwargs)) return output return inner
[docs] def loadmat(path): r""" Load a matlab data file. Parameters ---------- path : string Path to the mat file from the data folder, without the .mat extension. Returns ------- data : dict dictionary with variable names as keys, and loaded matrices as values. Examples -------- >>> from pygsp import utils >>> data = utils.loadmat('pointclouds/bunny') >>> data['bunny'].shape (2503, 3) """ data = pkgutil.get_data("pygsp", "data/" + path + ".mat") data = io.BytesIO(data) return scipy.io.loadmat(data)
[docs] def distanz(x, y=None): r""" Calculate the distance between two colon vectors. Parameters ---------- x : ndarray First colon vector y : ndarray Second colon vector Returns ------- d : ndarray Distance between x and y Examples -------- >>> from pygsp import utils >>> x = np.arange(3) >>> utils.distanz(x, x) array([[0., 1., 2.], [1., 0., 1.], [2., 1., 0.]]) """ try: x.shape[1] except IndexError: x = x.reshape(1, x.shape[0]) if y is None: y = x else: try: y.shape[1] except IndexError: y = y.reshape(1, y.shape[0]) rx, cx = x.shape ry, cy = y.shape # Size verification if rx != ry: raise ValueError("The sizes of x and y do not fit") xx = (x * x).sum(axis=0) yy = (y * y).sum(axis=0) xy = np.dot(x.T, y) d = abs(np.kron(np.ones((cy, 1)), xx).T + np.kron(np.ones((cx, 1)), yy) - 2 * xy) return np.sqrt(d)
[docs] def resistance_distance(G): r""" Compute the resistance distances of a graph. Parameters ---------- G : Graph or sparse matrix Graph structure or Laplacian matrix (L) Returns ------- rd : sparse matrix distance matrix References ---------- :cite:`klein1993resistance` """ if sparse.issparse(G): L = G.tocsc() else: if G.lap_type != "combinatorial": raise ValueError("Need a combinatorial Laplacian.") L = G.L.tocsc() try: pseudo = sparse.linalg.inv(L) except RuntimeError: pseudo = sparse.lil_matrix(np.linalg.pinv(L.toarray())) N = np.shape(L)[0] d = sparse.csc_matrix(pseudo.diagonal()) rd = ( sparse.kron(d, sparse.csc_matrix(np.ones((N, 1)))).T + sparse.kron(d, sparse.csc_matrix(np.ones((N, 1)))) - pseudo - pseudo.T ) return rd
[docs] def symmetrize(W, method="average"): r""" Symmetrize a square matrix. Parameters ---------- W : array_like Square matrix to be symmetrized method : string * 'average' : symmetrize by averaging with the transpose. Most useful when transforming a directed graph to an undirected one. * 'maximum' : symmetrize by taking the maximum with the transpose. Similar to 'fill' except that ambiguous entries are resolved by taking the largest value. * 'fill' : symmetrize by filling in the zeros in both the upper and lower triangular parts. Ambiguous entries are resolved by averaging the values. * 'tril' : symmetrize by considering the lower triangular part only. * 'triu' : symmetrize by considering the upper triangular part only. Notes ----- You can have the sum by multiplying the average by two. It is however not a good candidate for this function as it modifies an already symmetric matrix. Examples -------- >>> from pygsp import utils >>> W = np.array([[0, 3, 0], [3, 1, 6], [4, 2, 3]], dtype=float) >>> W array([[0., 3., 0.], [3., 1., 6.], [4., 2., 3.]]) >>> utils.symmetrize(W, method='average') array([[0., 3., 2.], [3., 1., 4.], [2., 4., 3.]]) >>> 2 * utils.symmetrize(W, method='average') array([[0., 6., 4.], [6., 2., 8.], [4., 8., 6.]]) >>> utils.symmetrize(W, method='maximum') array([[0., 3., 4.], [3., 1., 6.], [4., 6., 3.]]) >>> utils.symmetrize(W, method='fill') array([[0., 3., 4.], [3., 1., 4.], [4., 4., 3.]]) >>> utils.symmetrize(W, method='tril') array([[0., 3., 4.], [3., 1., 2.], [4., 2., 3.]]) >>> utils.symmetrize(W, method='triu') array([[0., 3., 0.], [3., 1., 6.], [0., 6., 3.]]) """ if W.shape[0] != W.shape[1]: raise ValueError("Matrix must be square.") if method == "average": return (W + W.T) / 2 elif method == "maximum": if sparse.issparse(W): bigger = W.T > W return W - W.multiply(bigger) + W.T.multiply(bigger) else: return np.maximum(W, W.T) elif method == "fill": A = W > 0 # Boolean type. if sparse.issparse(W): mask = (A + A.T) - A W = W + mask.multiply(W.T) else: # Numpy boolean subtract is deprecated. mask = np.logical_xor(np.logical_or(A, A.T), A) W = W + mask * W.T return symmetrize(W, method="average") # Resolve ambiguous entries. elif method in ["tril", "triu"]: if sparse.issparse(W): tri = getattr(sparse, method) else: tri = getattr(np, method) W = tri(W) return symmetrize(W, method="maximum") else: raise ValueError(f"Unknown symmetrization method {method}.")
[docs] def rescale_center(x): r""" Rescale and center data, e.g. embedding coordinates. Parameters ---------- x : ndarray Data to be rescaled. Returns ------- r : ndarray Rescaled data. Examples -------- >>> from pygsp import utils >>> x = np.array([[1, 6], [2, 5], [3, 4]]) >>> utils.rescale_center(x) array([[-1. , 1. ], [-0.6, 0.6], [-0.2, 0.2]]) """ N = x.shape[1] y = x - np.kron(np.ones((1, N)), np.mean(x, axis=1)[:, np.newaxis]) c = np.amax(y) r = y / c return r
[docs] def compute_log_scales(lmin, lmax, Nscales, t1=1, t2=2): r""" Compute logarithm scales for wavelets. Parameters ---------- lmin : float Smallest non-zero eigenvalue. lmax : float Largest eigenvalue, i.e. :py:attr:`pygsp.graphs.Graph.lmax`. Nscales : int Number of scales. Returns ------- scales : ndarray List of scales of length Nscales. Examples -------- >>> from pygsp import utils >>> utils.compute_log_scales(1, 10, 3) array([2. , 0.4472136, 0.1 ]) """ scale_min = t1 / lmax scale_max = t2 / lmin return np.exp(np.linspace(np.log(scale_max), np.log(scale_min), Nscales))