Source code for gemclus.tree.douglas

from functools import reduce
from numbers import Integral, Real

import numpy as np

from .._base_gemini import DiscriminativeModel
from sklearn.utils import check_array
from sklearn.utils._param_validation import Interval
from sklearn.utils.extmath import softmax
from sklearn.utils.validation import check_is_fitted


[docs] class Douglas(DiscriminativeModel): """ Implementation of the `DNDTs optimised using GEMINI leveraging apprised splits` tree algorithm. This model learns clusters by optimising learnable parameters to perform feature-wise soft-binnings and recombine those bins into a single cluster predictions. The parameters are optimised to maximise a generalised mutual information score. Parameters ---------- n_clusters : int, default=3 The number of clusters to form as well as the number of output neurons in the neural network. gemini: str, GEMINI instance or None, default="wasserstein_ova" GEMINI objective used to train this discriminative model. Can be "mmd_ova", "mmd_ovo", "wasserstein_ova", "wasserstein_ovo", "mi" or other GEMINI available in `gemclus.gemini.AVAILABLE_GEMINI`. Default GEMINIs involve the Euclidean metric or linear kernel. To incorporate custom metrics, a GEMINI can also be passed as an instance. If None, the GEMINI will be the MMD OvA. n_cuts: int, default=1 The number of cuts to consider per feature in the soft binning function of the DNDT feature_mask: array of boolean [shape d], default None A boolean vector indicating whether a feature should be considered or not among splits. If None, all features are considered during training. temperature: float, default=0.1 The temperature controls the relative importance of logits per leaf soft-binning. A high temperature smoothens the differences in probability whereas a low temperature produces distributions closer to delta Dirac distributions. max_iter: int, default=100 The number of epochs for training the model parameters. batch_size: int, default=None The number of samples per batch during an epoch. If set to `None`, all samples will be considered in a single batch. solver: {'sgd','adam'}, default='adam' The solver for weight optimisation. - 'sgd' refers to stochastic gradient descent. - 'adam' refers to a stochastic gradient-based optimiser proposed by Kingma, Diederik and Jimmy Ba. learning_rate: float, default=1e-2 The learning rate hyperparameter for the optimiser's update rule. verbose: bool, default=False Whether to print progress messages to stdout random_state: int, RandomState instance, default=None Determines random number generation for feature exploration. Pass an int for reproducible results across multiple function calls. Attributes ---------- optimiser_: `AdamOptimizer` or `SGDOptimizer` The optimisation algorithm used for training depending on the chosen solver parameter. labels_: ndarray of shape (n_samples) The labels that were assigned to the samples passed to the :meth:`fit` method. n_iter_: int The number of iterations that the model took for converging. """ _parameter_constraints: dict = { **DiscriminativeModel._parameter_constraints, "n_cuts": [Interval(Integral, 1, None, closed="left"), None], "feature_mask": [np.ndarray, None], "temperature": [Interval(Real, 0, None, closed="neither")], }
[docs] def __init__(self, n_clusters=3, gemini="wasserstein_ova", n_cuts=1, feature_mask=None, temperature=0.1, max_iter=100, batch_size=None, solver="adam", learning_rate=1e-2, verbose=False, random_state=None): super().__init__( n_clusters=n_clusters, gemini=gemini, max_iter=max_iter, learning_rate=learning_rate, solver=solver, batch_size=batch_size, verbose=verbose, random_state=random_state ) self.n_cuts = n_cuts self.feature_mask = feature_mask self.temperature = temperature
def _leaf_binning(self, X, cut_points): n = len(cut_points) W = np.expand_dims(np.linspace(1, n + 1, n + 1, dtype=np.float64), axis=0) order = np.argsort(cut_points) sorted_cut_points = cut_points[order] b = np.cumsum(np.concatenate([np.zeros(1), -sorted_cut_points])).reshape((1, -1)) logits = X @ W + b return softmax(logits / self.temperature), order def _merge_leaf(self, leaf_res1, leaf_res2): # Compute feature-wise kronecker product product = np.einsum("ij,ik->ijk", leaf_res1, leaf_res2) # reshape to 2d return product.reshape((-1, np.prod(product.shape[1:]))) def _infer(self, X, retain=True): leaf_binning = lambda z: self._leaf_binning(X[:, z[0]:z[0] + 1], z[1]) cut_iterator = map(leaf_binning, self.cut_points_list_) all_binnings_results = list(cut_iterator) all_binnings = [x[0] for x in all_binnings_results] all_orders = [x[1] for x in all_binnings_results] leaf = reduce(self._merge_leaf, all_binnings) y_pred = leaf @ self.leaf_scores_ if retain: self._leaf = leaf self._all_orders = all_orders self._all_binnings = all_binnings return softmax(y_pred) def _init_params(self, random_state, X=None): # Create the parameters if self.feature_mask is None: self.cut_points_list_ = [(i, random_state.normal(size=(self.n_cuts,))) for i in range(X.shape[1])] num_leaf = int((self.n_cuts + 1) ** X.shape[1]) else: if len(self.feature_mask) != X.shape[1]: raise ValueError("The boolean feature mask must have as much entries as the number of features") self.cut_points_list_ = [(i, random_state.normal(size=self.n_cuts, )) for i in range(X.shape[1]) if self.feature_mask[i]] num_leaf = int((self.n_cuts + 1) ** len(self.cut_points_list_)) if self.verbose: print(f"Total will be {num_leaf} values per sample") self.leaf_scores_ = random_state.normal(size=(num_leaf, self.n_clusters)) def _compute_grads(self, X, y_pred, gradient): # Start by the backprop through the softmax y_pred_grad = y_pred * (gradient - (y_pred * gradient).sum(1, keepdims=True)) # Then backprop through the final matrix multiplication binning_backprop = y_pred_grad @ self.leaf_scores_.T leaf_score_backprop = self._leaf.T @ y_pred_grad # Store the update corresponding to the leaf score, negate for maximisation instead of minimisation updates = [-leaf_score_backprop] # Then, compute all feature kronecker derivatives axes_for_reshape = tuple([-1] + [len(x[1]) + 1 for x in self.cut_points_list_]) binning_backprop = binning_backprop.reshape(axes_for_reshape) # We must multiply the binning gradient by all binnings binning_backprop *= self._leaf.reshape(axes_for_reshape) # Compute individual cut points backprop for i, (_, cut_points) in enumerate(self.cut_points_list_): axes_for_sum = tuple([1 + j for j in range(len(self.cut_points_list_)) if i != j]) softmax_grad = binning_backprop.sum(axes_for_sum) / self._all_binnings[i] bin_grad = self._all_binnings[i] * ( softmax_grad - (self._all_binnings[i] * softmax_grad).sum(1, keepdims=True)) # Shape Nx(d+1) bin_grad /= self.temperature # Gradient is directly on the bias, so we only need to do the cumsum backprop after summing on # all samples # We remove the gradient on the constant [0] bias_grad = bin_grad.sum(0)[1:] cumsum_grad = -np.cumsum(bias_grad[::-1])[::-1] # Take the order back cut_grad = cumsum_grad[np.argsort(self._all_orders[i])] # Apply update rule and negate for maximisation instead of minimisation updates += [-cut_grad] return updates def _get_weights(self): return [self.leaf_scores_] + list(map(lambda x: x[1], self.cut_points_list_))
[docs] def find_active_points(self, X): """ Calculates the list of cut points that are considered as active for a Douglas tree and some data X. A cut point is active if its value falls within the bounds of its matching feature. Active points can be used for finding features that actively contributed to the clustering. Parameters ---------- X: {array-like, sparse matrix} of shape (n_samples, n_features) Test samples. Returns ------- active_points: List A list containing the integer indices of features for which the Douglas model has active cut points """ check_is_fitted(self) X = check_array(X) if X.shape[1] < len(self.cut_points_list_): raise ValueError("The passed data has fewer features than the number of cut points expected for the " "Douglas model") active_points = [] for (feature_index, cut_points) in self.cut_points_list_: feature = X[:, feature_index] min_threshold = cut_points.min() max_threshold = cut_points.max() # Check of the cut point lists having at least one threshold falling within bounds of the feature if not (np.all(feature <= min_threshold) or np.all(feature >= max_threshold)): active_points += [feature_index] return active_points