Source code for gemclus.gemini._geomdistances

import warnings
from abc import ABC
from numbers import Real

import numpy as np
import ot
from sklearn.metrics import pairwise_kernels, pairwise_distances
from sklearn.metrics.pairwise import PAIRWISE_KERNEL_FUNCTIONS, PAIRED_DISTANCES
from sklearn.utils._param_validation import StrOptions, Interval

from .._constraints import constraint_params
from ._base_loss import _GEMINI


[docs] class MMDGEMINI(_GEMINI): r""" Implements the one-vs-all and one-vs-one MMD GEMINI. The one-vs-all version compares the maximum mean discrepancy between a cluster distribution and the data distribution. The one-vs-one objective is equivalent to a kernel KMeans objective. .. math:: \mathcal{I} = \mathbb{E}_{y \sim p(y)}[\text{MMD}_\kappa(p(x|y)\|p(x|y))] where :math:`\kappa` is a kernel defined between the samples of the data space. The one-vs-one compares the maximum mean discrepancy between two cluster distributions. .. math:: \mathcal{I} = \mathbb{E}_{y_a,y_b \sim p(y)}[\text{MMD}_\kappa(p(x|y_a)\|p(x|y_b))] Parameters ---------- ovo: bool, default=False Whether to use the one-vs-all objective (False) or the one-vs-one objective (True). kernel: {'additive_chi2', 'chi2', 'cosine','linear','poly','polynomial','rbf','laplacian','sigmoid', 'precomputed'}, default='linear' The kernel to use in combination with the MMD objective. It corresponds to one value of `KERNEL_PARAMS`. Currently, all kernel parameters are the default ones. If the kernel is set to 'precomputed', then a custom kernel matrix must be passed to the argument `affinity` of the `evaluate` method. kernel_params: dict, default=None Additional keyword arguments for the kernel function. Ignored if the kernel is callable or precomputed. epsilon: float, default=1e-12 The precision for clipping the prediction values in order to avoid numerical instabilities. """
[docs] @constraint_params( { "ovo": [bool], "kernel": [StrOptions(set(list(PAIRWISE_KERNEL_FUNCTIONS) + ["precomputed"])), callable], "kernel_params": [dict, None], "epsilon": [Interval(Real, 0, 1, closed="neither")] } ) def __init__(self, ovo=False, kernel="linear", kernel_params=None, epsilon=1e-12): super().__init__(epsilon) self.ovo = ovo self.kernel_params = kernel_params self.kernel = kernel
[docs] def compute_affinity(self, X, y=None): """ Compute the kernel between all samples of X. Parameters ---------- X: ndarray of shape (n_samples, n_features) The samples between which all affinities must be computed y: ndarray of shape (n_samples, n_samples), default=None Values of the affinity between samples in case of a "precomputed" affinity. Ignored if None and the affinity is not precomputed. Returns ------- affinity: ndarray of shape (n_samples, n_samples) The kernel between all samples if it is needed for the GEMINI computations, None otherwise. """ if callable(self.kernel): if self.kernel_params is not None: warnings.warn("Parameters passed through kernel_params are ignored when kernel is a callable.") return self.kernel(X) elif self.kernel == "precomputed": if y is None: raise ValueError(f"Kernel should be precomputed, yet no kernel was passed as parameters: y={y}") return y _params = dict() if self.kernel_params is None else self.kernel_params return pairwise_kernels(X, metric=self.kernel, **_params)
[docs] def evaluate(self, y_pred, affinity, return_grad=False): clip_mask = (y_pred > self.epsilon) & (y_pred < (1 - self.epsilon)) y_pred = np.clip(y_pred, a_min=self.epsilon, a_max=1 - self.epsilon) N = y_pred.shape[0] normalised_kernel = affinity / N ** 2 pi = y_pred.mean(0, keepdims=True) alpha = y_pred / pi gamma = normalised_kernel @ alpha if self.ovo: omega = alpha.T @ gamma A = np.diag(omega).reshape((1, -1)) delta = np.sqrt(np.maximum(-2 * omega + A + A.T, 0)) # For numerical stability, keep if positive mmd_ovo_value = (pi @ delta @ pi.T).squeeze() if return_grad: # Some distances may be equal to 0 (including self-distances) # So we need to remove them from the gradient Lambda = (pi.T @ pi) / (delta + np.eye(len(delta))) Lambda -= np.diag(np.diag(Lambda)) Lambda[delta == 0] = 0 gradient = gamma * Lambda.sum(0, keepdims=True) gradient -= gamma @ Lambda gradient -= A * Lambda.sum(0, keepdims=True) / N gradient += (alpha * (gamma @ Lambda)).mean(0) gradient /= pi gradient += pi @ delta / N gradient *= 2 return mmd_ovo_value, gradient * clip_mask else: return mmd_ovo_value else: omega = alpha * gamma a = omega.sum(0) b = gamma.sum(0) c = normalised_kernel.sum() delta = np.sqrt(np.maximum(a + c - 2 * b, 0)) # For numerical stability, keep if positive mmd_ova_value = np.dot(pi, delta).squeeze() if return_grad: tau_grad = (np.eye(N) - 1 / N) @ normalised_kernel @ (alpha - 1) delta_mask = (delta == 0) gradient = tau_grad / (delta + delta_mask).reshape((1, -1)) gradient[:, delta_mask] = 0 return mmd_ova_value, gradient * clip_mask else: return mmd_ova_value
[docs] class WassersteinGEMINI(_GEMINI, ABC): r""" Implements the one-vs-all and one-vs-one Wasserstein GEMINI. The one-vs-all version compares the Wasserstein distance between a cluster distribution and the data distribution. .. math:: \mathcal{I} = \mathbb{E}_{y \sim p(y)}[\mathcal{W}_\delta(p(x|y)\|p(x|y))] where :math:`\delta` is a metric defined between the samples of the data space. The one-vs-one version compares the Wasserstein distance between two cluster distributions. .. math:: \mathcal{I} = \mathbb{E}_{y_a,y_b \sim p(y)}[\mathcal{W}_\delta(p(x|y_a)\|p(x|y_b))] Parameters ---------- ovo: bool, default=False Whether to use the one-vs-all objective (False) or the one-vs-one objective (True). metric: {'cosine', 'euclidean', 'l2','l1','manhattan','cityblock', 'precomputed'}, default='euclidean' The metric to use in combination with the Wasserstein objective. It corresponds to one value of `PAIRED_DISTANCES`. Currently, all metric parameters are the default ones. If the metric is set to 'precomputed', then a custom distance matrix must be passed to the argument `affinity` of the `evaluate` method. metric_params: dict, default=None Additional keyword arguments for the metric function. Ignored if the metric is callable or precomputed. epsilon: float, default=1e-12 The precision for clipping the prediction values in order to avoid numerical instabilities. """
[docs] @constraint_params( { "ovo": [bool], "metric": [StrOptions(set(list(PAIRED_DISTANCES) + ["precomputed"]))], "metric_params": [dict, None], "epsilon": [Interval(Real, 0, 1, closed="neither")] } ) def __init__(self, ovo=False, metric="euclidean", metric_params=None, epsilon=1e-12): super().__init__(epsilon) self.ovo = ovo self.metric = metric self.metric_params = metric_params
[docs] def compute_affinity(self, X, y=None): """ Compute the distance between all samples of X. Parameters ---------- X: ndarray of shape (n_samples, n_features) The samples between which all affinities must be computed y: ndarray of shape (n_samples, n_samples), default=None Values of the affinity between samples in case of a "precomputed" affinity. Ignored if None and the affinity is not precomputed. Returns ------- affinity: ndarray of shape (n_samples, n_samples) The distance between all samples if it is needed for the GEMINI computations, None otherwise. """ if callable(self.metric): if self.metric_params is not None: warnings.warn("Parameters passed through metric_params are ignored when a metric is a callable.") return self.metric(X) elif self.metric == "precomputed": if y is None: raise ValueError(f"Kernel should be precomputed, yet no kernel was passed as parameters: y={y}") return y _params = dict() if self.metric_params is None else self.metric_params return pairwise_distances(X, metric=self.metric, **_params)
[docs] def evaluate(self, y_pred, affinity, return_grad=False): N, K = y_pred.shape clip_mask = (y_pred > self.epsilon) & (y_pred < 1 - self.epsilon) y_pred = np.clip(y_pred, a_min=self.epsilon, a_max=1 - self.epsilon) pi = y_pred.mean(0) wy = np.ascontiguousarray((y_pred / (pi.reshape((1, -1)) * N)).T) if self.ovo: if return_grad: grads = np.zeros(y_pred.shape) wasserstein_distances = np.zeros((K, K)) for k1 in range(K): for k2 in range(k1 + 1, K): emd, log = ot.emd2(wy[k1], wy[k2], affinity, log=True) wasserstein_distances[k1, k2] = emd wasserstein_distances[k2, k1] = emd if return_grad: u_bar = log["u"] - log["u"].mean() v_bar = log["v"] - log["v"].mean() grads[:, k1] += 2 * pi[k2] * (u_bar / N - (u_bar * y_pred[:, k1] / (N * N * pi[k1])).sum()) grads[:, k2] += 2 * pi[k1] * (v_bar / N - (v_bar * y_pred[:, k2] / (N * N * pi[k2])).sum()) wasserstein_ovo_value = np.dot(pi, np.dot(wasserstein_distances, pi)) if return_grad: grads += 2 * np.dot(wasserstein_distances, pi) / N return wasserstein_ovo_value, grads * clip_mask else: return wasserstein_ovo_value else: constant_weights = np.ones(N) / N wasserstein_distances = np.zeros(y_pred.shape[1]) dual_variables = [None] * y_pred.shape[1] for k in range(K): wasserstein_distances[k], dual_variables[k] = ot.emd2(wy[k], constant_weights, affinity, log=True) wasserstein_ova_value = np.dot(pi, wasserstein_distances) if return_grad: u_bar = np.vstack([x["u"] - x["u"].mean() for x in dual_variables]).T grads = u_bar / N + wasserstein_distances / N grads -= (y_pred * u_bar).sum(0) / (N * N * pi) return wasserstein_ova_value, grads * clip_mask else: return wasserstein_ova_value