Source code for pydiffmap.kernel

"""
A class to implement diffusion kernels.
"""

import numbers
import numpy as np
import numexpr as ne
import scipy.sparse as sps
import warnings
from sklearn.neighbors import NearestNeighbors
from six import string_types
from . import utils
try:
    from scipy.special import logsumexp
except ModuleNotFoundError:
    from scipy.misc import logsumexp


[docs]class Kernel(object): """ Class abstracting the evaluation of kernel functions on the dataset. Parameters ---------- kernel_type : string or callable, optional Type of kernel to construct. Currently the only option is 'gaussian' (the default), but more will be implemented. epsilon : string, optional Method for choosing the epsilon. Currently, the only options are to provide a scalar (epsilon is set to the provided scalar) 'bgh' (Berry, Giannakis and Harlim), and 'bgh_generous' ('bgh' method, with answer multiplied by 2. k : int, optional Number of nearest neighbors over which to construct the kernel. neighbor_params : dict or None, optional Optional parameters for the nearest Neighbor search. See scikit-learn NearestNeighbors class for details. metric : string, optional Distance metric to use in constructing the kernel. This can be selected from any of the scipy.spatial.distance metrics, or a callable function returning the distance. metric_params : dict or None, optional Optional parameters required for the metric given. bandwidth_type: callable, number, string, or None, optional Type of bandwidth to use in the kernel. If None (default), a fixed bandwidth kernel is used. If a callable function, the data is passed to the function, and the bandwidth is output (note that the function must take in an entire dataset, not the points 1-by-1). If a number, e.g. -.25, a kernel density estimate is performed, and the bandwidth is taken to be q**(input_number). For a string input, the input is assumed to be an evaluatable expression in terms of the dimension d, e.g. "-1/(d+2)". The dimension is then estimated, and the bandwidth is set to q**(evaluated input string). """ def __init__(self, kernel_type='gaussian', epsilon='bgh', k=64, neighbor_params=None, metric='euclidean', metric_params=None, bandwidth_type=None): self.kernel_fxn = _parse_kernel_type(kernel_type) self.epsilon = epsilon self.k = k self.metric = metric self.metric_params = metric_params if neighbor_params is None: neighbor_params = {} self.neighbor_params = neighbor_params self.bandwidth_type = bandwidth_type self.d = None self.epsilon_fitted = None
[docs] def build_bandwidth_fxn(self, bandwidth_type): """ Parses an input string or function specifying the bandwidth. Parameters ---------- bandwidth_fxn : string or number or callable Bandwidth to use. If a number, taken to be the beta parameter in [1]_. If a string, taken to again be beta, but with an evaluatable expression as a function of the intrinsic dimension d, e.g. '1/(d+2)'. If a function, taken to be a function that outputs the bandwidth. References ---------- .. [1] T. Berry, and J. Harlim, Applied and Computational Harmonic Analysis 40, 68-96 (2016). """ if self.bandwidth_type is None: return None elif callable(self.bandwidth_type): return self.bandwidth_type else: is_string = isinstance(self.bandwidth_type, string_types) is_number = isinstance(self.bandwidth_type, numbers.Number) if (is_string or is_number): kde_function, d = self._build_nn_kde() if is_string: beta = ne.evaluate(self.bandwidth_type) elif is_number: beta = self.bandwidth_type else: raise Exception("Honestly, we shouldn't have gotten to this point in the code") bandwidth_fxn = lambda x: kde_function(x)**beta return bandwidth_fxn else: raise ValueError("Bandwidth Type was not a callable, string, or number. Don't know what to make of it.")
def _build_nn_kde(self, num_nearest_neighbors=8): my_nnkde = NNKDE(self.neigh, k=num_nearest_neighbors) my_nnkde.fit() bandwidth_fxn = lambda x: my_nnkde.compute(x) self.kde = my_nnkde return bandwidth_fxn, my_nnkde.d def _compute_bandwidths(self, X): if self.bandwidth_fxn is not None: return self.bandwidth_fxn(X) else: return None
[docs] def fit(self, X): """ Fits the kernel to the data X, constructing the nearest neighbor tree. Parameters ---------- X : array-like, shape (n_query, n_features) Data upon which to fit the nearest neighbor tree. Returns ------- self : the object itself """ k0 = min(self.k, np.shape(X)[0]) self.data = X # Construct Nearest Neighbor Tree with warnings.catch_warnings(): warnings.filterwarnings("ignore", message="Parameter p is found in metric_params. The corresponding parameter from __init__ is ignored.") self.neigh = NearestNeighbors(n_neighbors=k0, metric=self.metric, metric_params=self.metric_params, **self.neighbor_params) self.neigh.fit(X) self.bandwidth_fxn = self.build_bandwidth_fxn(self.bandwidth_type) self.bandwidths = self._compute_bandwidths(X) self.scaled_dists = self._get_scaled_distance_mat(self.data, self.bandwidths) self.choose_optimal_epsilon() return self
[docs] def compute(self, Y=None, return_bandwidths=False): """ Computes the sparse kernel matrix. Parameters ---------- Y : array-like, shape (n_query, n_features), optional. Data against which to calculate the kernel values. If not provided, calculates against the data provided in the fit. return_bandwidths : boolean, optional If True, also returns the computed bandwidth for each y point. Returns ------- K : array-like, shape (n_query_X, n_query_Y) Values of the kernel matrix. y_bandwidths : array-like, shape (n_query_y) Bandwidth evaluated at each point Y. Only returned if return_bandwidths is True. """ if Y is None: Y = self.data # if np.array_equal(Y, self.data): # Avoid recomputing nearest neighbors unless needed. if _check_equal(Y, self.data): y_bandwidths = self.bandwidths K = self.scaled_dists else: # perform k nearest neighbour search on X and Y and construct sparse matrix # retrieve all nonzero elements and apply kernel function to it y_bandwidths = self._compute_bandwidths(Y) K = self._get_scaled_distance_mat(Y, y_bandwidths=y_bandwidths) K.data = self.kernel_fxn(K.data, self.epsilon_fitted) if return_bandwidths: return K, y_bandwidths else: return K
def _get_scaled_distance_mat(self, Y, y_bandwidths=None): # Scales distance matrix by (rho(x) rho(y))^1/2, where rho is the # bandwidth. dists = self.neigh.kneighbors_graph(Y, mode='distance') if y_bandwidths is not None: bw_x = np.power(self.bandwidths, 0.5) bw_y = np.power(y_bandwidths, 0.5) dists = _scale_by_bw(dists, bw_x, bw_y) return dists
[docs] def choose_optimal_epsilon(self, epsilon=None): """ Chooses the optimal value of epsilon and automatically detects the dimensionality of the data. Parameters ---------- epsilon : string or scalar, optional Method for choosing the epsilon. Currently, the only options are to provide a scalar (epsilon is set to the provided scalar) or 'bgh' (Berry, Giannakis and Harlim). Returns ------- self : the object itself """ if epsilon is None: epsilon = self.epsilon # Choose Epsilon according to method provided. if isinstance(epsilon, numbers.Number): # if user provided. self.epsilon_fitted = epsilon return self elif ((epsilon == 'bgh') or (epsilon == 'bgh_generous')): # Berry, Giannakis Harlim method. if (self.metric != 'euclidean'): # TODO : replace with call to scipy metrics. warnings.warn('The BGH method for choosing epsilon assumes a euclidean metric. However, the metric being used is %s. Proceed at your own risk...' % self.metric) if self.scaled_dists is None: self.scaled_dists = self._get_scaled_distance_mat(self.data, self.bandwidths) self.epsilon_fitted, self.d = choose_optimal_epsilon_BGH(self.scaled_dists.data**2) if epsilon == 'bgh_generous': self.epsilon_fitted *= 2. else: raise ValueError("Method for automatically choosing epsilon was given as %s, but this was not recognized" % epsilon) return self
[docs]class NNKDE(object): """ Class building a kernel density estimate with a variable bandwidth built from the k nearest neighbors. Parameters ---------- neighbors : scikit-learn NearestNeighbors object NearestNeighbors object to use in constructing the KDE. k : int, optional Number of nearest neighbors to use in the construction of the bandwidth. This must be less or equal to the number of nearest neighbors used by the nearest neighbor object. """ def __init__(self, neighbors, k=8): self.neigh = neighbors self.kernel_fxn = _parse_kernel_type('gaussian') self.k = k def _reduce_nn(self, nn_graph, k): # gets the k nearest neighbors of an m nearest nearest graph, # where m >n sub_neighbors = [] for row in nn_graph: dense_row = np.array(row[row.nonzero()]).ravel() sorted_ndxs = np.argpartition(dense_row, k-1) sorted_row = dense_row[sorted_ndxs[:k]] sub_neighbors.append(sorted_row) return np.array(sub_neighbors) def _build_bandwidth(self): dist_graph_vals = self._reduce_nn(self.dist_graph_sq, k=self.k-1) avg_sq_dist = np.array(dist_graph_vals.sum(axis=1)).ravel() self.bandwidths = np.sqrt(avg_sq_dist/(self.k-1)).ravel() def _choose_epsilon(self): # dist_graph_sq = self.neigh.kneighbors_graph(n_neighbors=self.neigh.n_neighbors-1, mode='distance') dist_graph_sq = self.dist_graph_sq.copy() n = dist_graph_sq.shape[0] dist_graph_sq = _scale_by_bw(dist_graph_sq, self.bandwidths, self.bandwidths) sq_dists = np.hstack([dist_graph_sq.data, np.zeros(n)]) self.epsilon_fitted, self.d = choose_optimal_epsilon_BGH(sq_dists)
[docs] def fit(self): """ Fits the kde object to the data provided in the nearest neighbor object. """ self.dist_graph_sq = self.neigh.kneighbors_graph(n_neighbors=self.neigh.n_neighbors-1, mode='distance') self.dist_graph_sq.data = self.dist_graph_sq.data**2 self._build_bandwidth() self._choose_epsilon()
[docs] def compute(self, Y): """ Computes the density at each query point in Y. Parameters ---------- Y : array-like, shape (n_query, n_features) Data against which to calculate the kernel values. If not provided, calculates against the data provided in the fit. Returns ------- q : array-like, shape (n_query) Density evaluated at each point Y. """ dist_bw = self.neigh.kneighbors_graph(Y, mode='distance', n_neighbors=self.k) dist_bw.data = dist_bw.data**2 avg_sq_dist = np.array(dist_bw.sum(axis=1)).ravel() y_bandwidths = np.sqrt(avg_sq_dist/(self.k-1)).ravel() K = self.neigh.kneighbors_graph(Y, mode='distance') K.data = K.data**2 K = _scale_by_bw(K, self.bandwidths, y_bandwidths) K.data /= 4. * self.epsilon_fitted K.data = np.exp(-K.data) density = np.array(K.mean(axis=1)).ravel() density /= y_bandwidths**self.d density /= (4 * np.pi * self.epsilon_fitted)**(self.d / 2.) return density
[docs]def choose_optimal_epsilon_BGH(scaled_distsq, epsilons=None): """ Calculates the optimal epsilon for kernel density estimation according to the criteria in Berry, Giannakis, and Harlim. Parameters ---------- scaled_distsq : numpy array Values for scaled distance squared values, in no particular order or shape. (This is the exponent in the Gaussian Kernel, aka the thing that gets divided by epsilon). epsilons : array-like, optional Values of epsilon from which to choose the optimum. If not provided, uses all powers of 2. from 2^-40 to 2^40 Returns ------- epsilon : float Estimated value of the optimal length-scale parameter. d : int Estimated dimensionality of the system. Notes ----- This code explicitly assumes the kernel is gaussian, for now. References ---------- The algorithm given is based on [1]_. If you use this code, please cite them. .. [1] T. Berry, D. Giannakis, and J. Harlim, Physical Review E 91, 032915 (2015). """ if epsilons is None: epsilons = 2**np.arange(-40., 41., 1.) epsilons = np.sort(epsilons).astype('float') log_T = [logsumexp(-scaled_distsq/(4. * eps)) for eps in epsilons] log_eps = np.log(epsilons) log_deriv = np.diff(log_T)/np.diff(log_eps) max_loc = np.argmax(log_deriv) # epsilon = np.max([np.exp(log_eps[max_loc]), np.exp(log_eps[max_loc+1])]) epsilon = np.exp(log_eps[max_loc]) d = np.round(2.*log_deriv[max_loc]) return epsilon, d
def _parse_kernel_type(kernel_type): """ Parses an input string or function specifying the kernel. Parameters ---------- kernel_type : string or callable Type of kernel to construct. Currently the only option is 'gaussian' or a user provided function. If set to a user defined function, it should take in two arguments: in order, a vector of distances between two samples, and a length-scale parameter epsilon. The units on epsilon should be distance squared. Returns ------- kernel_fxn : callable Function that takes in the distance and length-scale parameter, and outputs the value of the kernel. """ if kernel_type.lower() == 'gaussian': def gaussian_kfxn(d, epsilon): return np.exp(-d**2 / (4. * epsilon)) return gaussian_kfxn elif callable(kernel_type): return kernel_type else: raise("Error: Kernel type not understood.") def _scale_by_bw(d_yx, bw_x, bw_y): """ Scale a distance matrix with the bandwidth functions while retaining explicit zeros. Note that this reorders the indices in d_yx. Parameters ---------- d_yx : scipy sparse matrix Sparse matrix whose i,j'th element corresponds to f(y_i, x_j) dw_x : numpy array Array of bandwidth values evaluated at each x_i dw_y : numpy array Array of bandwidth values evaluated at each y_i Returns ------ scaled_d_yx : scipy sparse matrix Sparse matrix whose i,j'th element corresponds to f(y_i, x_j)/ bw[y_i] bw[x_j] """ m, n = d_yx.shape x_bw_diag = sps.spdiags(np.power(bw_x, -1), 0, n, n) y_bw_diag = sps.spdiags(np.power(bw_y, -1), 0, m, m) row, col = utils._get_sparse_row_col(d_yx) inv_bw = sps.csr_matrix((np.ones(d_yx.data.shape), (row, col)), shape=d_yx.shape) inv_bw = y_bw_diag * inv_bw * x_bw_diag d_yx.sort_indices() inv_bw.sort_indices() d_yx.data = d_yx.data * inv_bw.data return d_yx def _check_equal(X, Y): """ Check if two datasets are equal. Parameters ---------- X : array-like, shape (n_query, n_features), optional. Data against which to calculate the kernel values. If not provided, calculates against the data provided in the fit. Y : array-like, shape (n_query, n_features), optional. Data against which to calculate the kernel values. If not provided, calculates against the data provided in the fit. Returns ------- is_equal : bool True if the datasets are equal, False if not. """ X_is_sparse = isinstance(X, sps.spmatrix) Y_is_sparse = isinstance(Y, sps.spmatrix) if (X_is_sparse and Y_is_sparse): if X.shape != Y.shape: return False else: nonzero_rows, nonzero_cols = (X - Y).nonzero() return (len(nonzero_rows) == 0) else: return np.array_equal(X, Y)