Source code for ttml.ttml

"""
Implements TTML, a tensor train based machine learning estimator.

Uses existing machine learning estimators to initialize a tensor train
decomposition on a particular feature space discretization.

Then this tensor train is further optimized with Riemannian conjugate gradient
descent. This library also implements much functionality related to tensor
trains. And their Riemannian optimization in general.

One can use :class:`TTMLRegressor` and :class:`TTMLClassifier` just like any
`scikit-learn` estimator. As parameter it just needs another `scikit-learn`-like
estimator. For example, here we fit a :class:`TTMLRegressor` using
:class:`RandomForestRegressor` for initialization.

>>> from ttml import ttml
... import numpy as np
... from sklearn.ensemble import RandomForestRegressor, RandomForestClassifier
...
... # Try to learn the summation function in 10 dimensions
... X = np.random.normal(size=(1000, 10))
... y = np.sum(X, axis=1)
...
... forest = RandomForestRegressor()
... ttml = ttml.TTMLRegressor(forest)
... ttml.fit(X, y)
TTMLRegressor(estimator=RandomForestRegressor())

Note that here there is no need to fit random forest to data separately; this is
done automatically when calling :meth:`ttml.fit()` (but only if the random forest
has not been fitted to data yet). After fitting, we can use :meth:`.predict()`
for predicting values:

>>> ttml.predict([[0.3] * 10]) # ten times the same value
array([2.89993562])

For classification problems only data with 0/1 labels is supported, but 
otherwise the procedure is very similar to regression. For example below
we try to learn the function which returns 1 only if the sum of 10 numbers
is positive.

>>> X = np.random.normal(size=(1000, 10))
... y = (np.sum(X, axis=1) > 0).astype(int)
... 
... forest = RandomForestClassifier()
... ttml = ttml.TTMLClassifier(forest)
... ttml.fit(X, y)
TTMLClassifier(estimator=RandomForestClassifier())

For prediction, we can use :meth:`.predict()` which gives 0/1 labels as output,
:meth:`.predict_proba()` which gives a probability, and :meth:`predict_logit()` 
which outputs a logit.

>>> ttml.predict([[0.3] * 10])
array([1.])

>>> ttml.predict_proba([[0.3] * 10])
array([0.9999665])

>>> ttml.predict_logit([0.3] * 10)
array([10.30384462])

Other than the base estimator, the most important hyperparameters for the
ttml are the `tensor train rank` and `number of thresholds per feature`,
controlled by the keywords ``max_rank`` and ``num_thresholds`` respectively. The
respective default values are ``5`` and ``50``. Changing the ``max_rank``
parameter can cause fitting and inference (prediction) to take significantly
longer. The ``num_thresholds`` parameter also affects fittings speed, but does
not affect inference speed. Both parameters affect the accuracy of the model,
and there is no general rule of thumb for the optimal value of these parameters.

>>> ttml = ttml.TTMLClassifier(forest, max_rank=2, num_thresholds=10)
... ttml.fit(X, y)
TTMLClassifier(estimator=RandomForestClassifier())

Another feature that can greatly improve performance of ttml, is early
stopping. During the Riemannian optimization phase, we can monitor the
performance on a validation dataset. This greatly reduces the tendency to
overfit. To do this, we simply need to specify a validation dataset to the
:meth:`fit` method:

>>> from sklearn.model_selection import train_test_split
... X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.15)
... ttml.fit(X_train, y_train, X_val=X_val, y_val=y_val)
TTMLClassifier(estimator=RandomForestClassifier())
"""

import numbers
from datetime import datetime
import warnings
from time import perf_counter_ns

import autoray as ar
import numpy as np
from numpy.linalg import LinAlgError
from scipy.special import expit, logit
from sklearn.base import BaseEstimator, RegressorMixin, ClassifierMixin
from sklearn.utils.validation import check_array, check_is_fitted
from sklearn.tree._classes import BaseDecisionTree
from sklearn.metrics import log_loss, mean_squared_error
from sklearn.utils.validation import check_is_fitted
from sklearn.exceptions import NotFittedError


from ttml.tensor_train import TensorTrain
from ttml.tt_rlinesearch import TTLS, TensorTrainLineSearch
from ttml.forest_compression import compress_forest_thresholds
from ttml.tt_cross import estimator_to_tt_cross
from ttml.utils import convert_backend, predict_logit

_fit_parameters_string = """

Parameters
----------
X : np.ndarray
    Training X data values
y : np.ndarray
    Training truth labels. Should be 0/1 for classification.
estimator :
    An `sklearn`-like estimator for fitting. If the estimator is not yet
    fitted, this calls the estimator's .fit function. For classification
    tasks, take note of the `estimator_output` keyword. If your
    estimator does not have an appropriate .predict method, then pass
    _predict_fun as a last resort. For classification we assume this
    outputs logits.
X_val : np.ndarray (optional)
    Validation set to monitor during training
y_val: np.ndarray (optional)
task : str (default: "regression")
    Whether to use "regression" or "classification" as task. In the case
    of "classification" this estimator will output logits.
num_thresholds : int (default: 50)
    The number of thresholds per feature to pick from the forest.
    Ignored if the `_thresholds` kwarg is specified.
tt_cross_its : int (default: 5)
    Number of iterations for the TT-cross algorithm
max_rank : int (default: 10)
    Maximum rank for the tensor train
opt_steps : int (default: 100)
    Number of steps of Riemannian conjugate gradient descent to take
opt_tol : float (default:  1e-5)
    After 3 steps of no relative improvement of at least `opt_tol`, the
    Riemannian conjugate gradient descent is stopped. If `X_val` is
    supplied then error is monitored on validation set, otherwise on
    training set.
estimator_output : str (default: "logit")
    For classification tasks, the output of the estimators' .predict
    function. Supported arguments are "logit" and "proba". If the
    estimator has a .predict_proba method, then this is ignored and
    .predict_proba is used instead.
verbose : bool (default: False)
    If True, print convergence and debug information
_thresholds : list[np.ndarray] (optional)
    Use this list of thresholds instead of inferring from the forest.
    Should be a list of arrays, one array per feature. The last element
    of each array is expected to be `np.inf`. TODO: update
_ttls_kwargs : dict (optional)
    Keyword arguments to pass to the Riemannian conjugate gradient
    descent optimizer. See `TensorTrainLineSearch` for details.
_predict_fun : method (optional)
"""


[docs]class TTML: """Implements a TTML. This stores a TensorTrain and a list of thresholds for each feature. Can be used as an sklearn-like estimator once trained. Not intended to be initialized directly for most use cases, use :class:`TTMLClassifier` and :class:`TTMLRegressor` instead. Parameters ---------- tt : TensorTrain thresholds : list<np.ndarray> categorical_features : tuple<int> or None """ def __init__(self, tt, thresholds, categorical_features=None): if categorical_features is None: self.categorical_features = tuple() else: self.categorical_features = categorical_features self.tt = tt self.thresholds = thresholds self.n_features = len(thresholds) self.backend = tt.backend @property def num_params(self): n_thresh_params = sum(np.size(t) for t in self.thresholds) n_tt_params = self.tt.num_params() return n_thresh_params + n_tt_params
[docs] @classmethod def from_tree(cls, decision_tree): """Initialize from sklearn decision tree. This creates a lossless encoding of the decision tree as TTML. The thresholds are precisely the decision boundaries of the tree. """ # Support both decision trees and sklearn.tree._tree.Tree objects. if isinstance(decision_tree, BaseDecisionTree): decision_tree = decision_tree.tree_ thresholds, leaf_values, leaf_filter_matrices = cls._tree_to_tensor( decision_tree ) tt = cls.tree_to_tt(thresholds, leaf_values, leaf_filter_matrices) return cls(tt, thresholds)
@staticmethod def _tree_to_tensor(tree, thresholds=None): """Return tensor encoding tree in CP format together with thresholds. Parameters ---------- tree : :meth:`sklearn.tree.tree_.Tree` currently only works with tree coming from DecisionTreeRegressor thresholds : list[np.ndarray] Don't infer thresholds from tree, but use a precomputed list instead. Returns ------- thresholds : list[np.ndarray] One array of thresholds for each feature. First value is always `-inf` leaf_values : np.ndarray Decision labels at leaves leaf_filter_matrices : list[np.ndarray[bool]] For each feature a matrix of shape (n_thresholds, n_leaves). Each row encodes the filter values for this particular feature and leaf. """ if isinstance(tree, BaseDecisionTree): tree = tree.tree_ # Compute thresholds for each feature feat_inds = [ np.where(tree.feature == feat) for feat in range(tree.n_features) ] if thresholds is None: thresholds = [] for inds in feat_inds: thresholds.append( np.unique( np.concatenate([[np.infty], tree.threshold[inds]]) ) ) # Decision labels of leaves leaves = np.where(tree.feature < 0) leaf_values = tree.value[leaves][:, 0, 0] # For each node, initialize filter matrix to be True for all thresholds filter_matrices = [ np.full((tree.node_count, len(thresholds[i])), True) for i in range(tree.n_features) ] for index in range(tree.node_count): l_child = tree.children_left[index] r_child = tree.children_right[index] if l_child < 0: # leaf node continue feat = tree.feature[index] threshold = tree.threshold[index] new_filter = thresholds[feat] <= threshold # Let children inherit filter matrices for f in range(tree.n_features): filter_matrices[f][l_child] = filter_matrices[f][index].copy() filter_matrices[f][r_child] = filter_matrices[f][index].copy() # Update filter matrices of children filter_matrices[feat][l_child] *= new_filter filter_matrices[feat][r_child] *= ~new_filter # Only store filters of leaves leaf_filter_matrices = [ filt[leaves].astype(np.float64) for filt in filter_matrices ] return thresholds, leaf_values, leaf_filter_matrices
[docs] @staticmethod def tree_to_tt(thresholds, leaf_values, leaf_filter_matrices): """Convert sklearn decision tree into a TT together with threshold arrays""" cores = [] d = len(thresholds) for i, X in enumerate(leaf_filter_matrices): if i == 0: # make shape (1,k[0],r) core = (leaf_values * X.T).reshape(1, X.shape[1], X.shape[0]) elif i == d - 1: # make shape (r,k[d-1],1) core = X.reshape(X.shape[0], X.shape[1], 1) else: # make shape (r,k[i],r), diagonal along axis 0 and 2. core = np.apply_along_axis(np.diag, 0, X).transpose(0, 2, 1) # cores in the long run. cores.append(core) # Orthogonalize, reducing rank tt = TensorTrain(cores, is_orth=True) tt.orthogonalize(force_rank=False) return tt
[docs] @staticmethod def thresholds_from_data(X, n_thresholds, categorical_features=None): """Compute thresholds from data by binning according to percentile. The last threshold is always `np.infty`. For categorical features all the unique values are used as thresholds, and the last value is replaced by `np.infty`. Parameters ---------- X : np.ndarray n_thresholds : int or iterable<int> The number of thresholds to use for each feature. Ignored for categorical features. categorical_features : tuple (optional) The indices of the categorical features. Returns ------- thresholds : list<np.ndarray> """ if isinstance(n_thresholds, int): n_thresholds = [n_thresholds] * X.shape[1] if categorical_features is None: categorical_features = tuple() thresholds = [] for i in range(X.shape[1]): if i in categorical_features: thresholds.append(np.unique(X[:, i])) thresholds[i][-1] = np.infty else: X_values = X[:, i][X[:, i] < np.max(X[:, i])] percents = np.arange(1, n_thresholds[i]) * 100 / n_thresholds[i] try: thresholds.append( np.percentile( X_values, percents, interpolation="nearest" ) ) except IndexError: # Handle index errors, apparently X_values is empty thresholds.append(np.array([np.min(X[:, i])])) thresholds[i] = np.concatenate([thresholds[i], [np.infty]]) thresholds[i] = np.unique(thresholds[i]) return thresholds
[docs] @classmethod def random_from_data( cls, X, rank, n_thresholds, backend="numpy", categorical_features=None ): """Make a TTML with random TT and with thresholds determined by :meth:`TTML.thresholds_from_data` Parameters ---------- X : np.ndarray rank : int or iterable<int> The tensor-train rank. If a list, it should be of length one shorter than number of features n_thresholds : int or interable<int> Number of thresholds to use for each feature. This determines the outer dimensions of the TT backend : str, optional (default: 'numpy') categorical_features : tuple<int> or None The indices of the categorical features if any """ thresholds = cls.thresholds_from_data( X, n_thresholds, categorical_features=categorical_features ) tt = TensorTrain.random( [len(t) for t in thresholds], rank, backend=backend ) return cls(tt, thresholds, categorical_features=categorical_features)
def observed_indices(self, X): inds = np.array( [ np.searchsorted(self.thresholds[i], X[:, i]) for i in range(self.n_features) ] ).T return convert_backend(inds, self.backend)
[docs] def predict(self, X, task="regression"): """Predict values for the TT `tt` using threshold arrays and input `X`""" # Look up location of X in threshold arrays (and subtract 1) inds = self.observed_indices(X) predictions = self.tt.gather(inds) predictions = ar.to_numpy(predictions) if task == "classification": predictions = predict_logit(predictions) return predictions
[docs] def expand_thresholds(self, thresholds): """Add new tresholds to TT (inplace), merging duplicate thresholds. The TT-cores for the new thresholds are copied from those already present. The predictions of expanded model are guaranteed to be the same. """ new_thresholds = [] new_cores = [] for i in range(len(self.thresholds)): # Do nothing with categorical features if i in self.categorical_features: new_thresholds.append(self.thresholds[i]) new_cores.append(self.tt.cores[i]) continue # Merge thresholds new_threshold = np.sort( np.unique(np.concatenate([self.thresholds[i], thresholds[i]])) ) # New core is obtained by repeating slices in axis 1 thresh_inds = np.searchsorted(self.thresholds[i], new_threshold) thresh_inds = np.clip(thresh_inds, 0, len(self.thresholds[i]) - 1) thresh_inds = convert_backend(thresh_inds, self.backend) core = self.tt.cores[i] new_core = ar.do("take", core, thresh_inds, axis=1) new_thresholds.append(new_threshold) new_cores.append(new_core) self.tt = TensorTrain(new_cores) self.tt.orthogonalize(force_rank=False) self.thresholds = new_thresholds
def __mul__(self, other): if not isinstance(other, numbers.Number): raise NotImplementedError( "only multiplication by scalars is supported for now" ) new_tt = self.tt * other return self.__class__( new_tt, self.thresholds, self.categorical_features ) def __truediv__(self, other): if not isinstance(other, numbers.Number): raise NotImplementedError( "only multiplication by scalars is supported for now" ) new_tt = self.tt / other return self.__class__( new_tt, self.thresholds, self.categorical_features ) def __add__(self, other): # Expand thresholds of both TTML so that they match self.expand_thresholds(other.thresholds) other.expand_thresholds(self.thresholds) # Add the underlying TT's new_tt = self.tt + other.tt new_tt.orthogonalize(force_rank=False) return self.__class__( new_tt, self.thresholds, self.categorical_features ) def _new_tresh_median(self, X, mu, i): """Calculate median of a bin and number of points on either side of median.""" right_bndry = self.thresholds[mu][i] if i == 0: left_bndry = -np.inf else: left_bndry = self.thresholds[mu][i - 1] X_filtered = X[:, mu][ (left_bndry < X[:, mu]) * (X[:, mu] <= right_bndry) ] median = np.median(X_filtered) # Calculate minimum number of points in either new bin left_points = np.sum(X_filtered <= median) right_points = np.sum(X_filtered > median) min_split_points = min(left_points, right_points) return median, min_split_points def _expand_tresholds_from_residuals( self, X, y, n=5, min_split=5, task="regression" ): """Add thresholds inplace according to worst performance. Find the n worst-performing threshold in terms of contribution to the squared sum error or cross entropy, and split those thresholds in two according to median. Parameters ---------- X : array<float, float> y : array<float> n : int min_split : int or float (default: 5) If an integer, only split a threshold if both new bins will contain at least `min_split` samples. If a float between 0 and 1, split based on fraction of all samples instead. """ if min_split <= 0: min_split = 0 elif 0 < min_split < 1: min_split = int(min_split * len(y)) else: min_split = int(min_split) idx = self.observed_indices(X) predictions = self.predict(X) if task == "classification": predictions = ar.do("sigmoid", predictions) # residuals = (self.predict(X) - y) ** 2 errors = [] mu_i = [] for mu in range(self.tt.order): # that way adding tasks just requires changing something in one place. if mu in self.categorical_features: continue # compute mean error for i in range(self.tt.dims[mu]): predictions_masked = predictions[idx[:, mu] == i] y_masked = y[idx[:, mu] == i] if task == "regression": residuals = (predictions_masked - y_masked) ** 2 errors.append(ar.do("sum", residuals)) if task == "classification": p = predictions_masked cross_entropy = -y_masked * ar.do("log", p) - ( 1 - y_masked ) * ar.do("log", 1 - p) errors.append(ar.do("sum", cross_entropy)) mu_i.append((mu, i)) worst_thresh = np.argsort(errors) split_points = np.array(mu_i)[worst_thresh] new_thresholds = [[] for _ in range(self.tt.order)] for mu, i in split_points: new_tresh, min_split_points = self._new_tresh_median(X, mu, i) if min_split_points > min_split: new_thresholds[mu].append(new_tresh) if len(new_thresholds) >= n: break self.expand_thresholds(new_thresholds)
[docs] @classmethod def fit( cls, X, y, estimator, X_val=None, y_val=None, task="regression", num_thresholds=50, tt_cross_its=5, max_rank=10, opt_steps=100, opt_tol=1e-5, tt_cross_method="dmrg", estimator_output="logit", verbose=False, _thresholds=None, _ttls_kwargs=None, _predict_fun=None, ): """ Fit a TTML, using `estimator` for initialization. The tensor train is initialized using the DMRG TT-cross algorithm from the function values of the estimator. It is then further optimized to lower training loss using Riemannian conjugate gradient descent. Parameters ---------- X : np.ndarray Training X data values y : np.ndarray Training truth labels. Should be 0/1 for classification. estimator : An `sklearn`-like estimator for fitting. If the estimator is not yet fitted, this calls the estimator's .fit function. For classification tasks, take note of the `estimator_output` keyword. If your estimator does not have an appropriate .predict method, then pass _predict_fun as a last resort. For classification we assume this outputs logits. X_val : np.ndarray (optional) Validation set to monitor during training y_val: np.ndarray (optional) task : str (default: "regression") Whether to use "regression" or "classification" as task. In the case of "classification" this estimator will output logits. num_thresholds : int (default: 50) The number of thresholds per feature to pick from the forest. Ignored if the `_thresholds` kwarg is specified. tt_cross_its : int (default: 5) Number of iterations for the TT-cross algorithm max_rank : int (default: 10) Maximum rank for the tensor train opt_steps : int (default: 100) Number of steps of Riemannian conjugate gradient descent to take opt_tol : float (default: 1e-5) After 3 steps of no relative improvement of at least `opt_tol`, the Riemannian conjugate gradient descent is stopped. If `X_val` is supplied then error is monitored on validation set, otherwise on training set. tt_cross_method : str (default: "dmrg") Whether to use "regular" or "dmrg" type TT-cross algorithm for initialization. estimator_output : str (default: "logit") For classification tasks, the output of the estimators' .predict function. Supported arguments are "logit" and "proba". If the estimator has a .predict_proba method, then this is ignored and .predict_proba is used instead. verbose : bool (default: False) If True, print convergence and debug information _thresholds : list[np.ndarray] (optional) Use this list of thresholds instead of inferring from the forest. Should be a list of arrays, one array per feature. The last element of each array is expected to be `np.inf`. TODO: update _ttls_kwargs : dict (optional) Keyword arguments to pass to the Riemannian conjugate gradient descent optimizer. See `TensorTrainLineSearch` for details. _predict_fun : method (optional) """ # Obtain thresholds if _thresholds is None: thresholds = cls.thresholds_from_data(X, num_thresholds) else: thresholds = _thresholds if _predict_fun is not None: predict_fun = _predict_fun else: # Check if estimator is fitted, if not try to fit it. try: check_is_fitted(estimator) except NotFittedError: try: estimator.fit(X, y) except AttributeError: pass # determine the right prediction function for training if task == "regression": predict_fun = estimator.predict elif task == "classification": def pred_logit(X): probs = proba_fun(X) # Deal with sklearn's weird probability output if len(probs.shape) > 1 and probs.shape[1] == 2: probs = probs[:, 1] probs = np.clip(probs, 1e-8, 1 - 1e-8) return logit(probs) if hasattr(estimator, "predict_proba"): proba_fun = estimator.predict_proba predict_fun = pred_logit elif estimator_output == "proba": proba_fun = estimator.predict predict_fun = pred_logit else: predict_fun = estimator.predict else: raise ValueError(f"Unknown task {task}") # Use tt-cross to initialize the TT tt = estimator_to_tt_cross( predict_fun, thresholds, verbose=verbose, max_rank=max_rank, max_its=tt_cross_its, method=tt_cross_method, ) ttml = cls(tt, thresholds) # Report loss after tt-cross if verbose: # use validation set for reporting if possible, otherwise training if X_val is not None: X_report = X_val y_report = y_val else: X_report = X y_report = y if task == "regression": ttml_error = mean_squared_error(y_report, ttml.predict(X_report)) tree_error = mean_squared_error(y_report, predict_fun(X_report)) else: ttml_error = log_loss(y_report, expit(ttml.predict(X_report))) tree_error = log_loss(y_report, expit(predict_fun(X_report))) print( f"validation loss pre-optimization: {ttml_error:.4e}, " f"vs. baseline {tree_error:.4e}" ) idx = ttml.observed_indices(X) if X_val is not None: idx_val = ttml.observed_indices(X_val) if _ttls_kwargs is None: _ttls_kwargs = {} if task == "classification": # Gradients for classification are tiny, this makes linesearch find # good step sizes. _ttls_kwargs["auto_scale"] = True ttls = TTLS(tt.copy(), y, idx, task=task, **_ttls_kwargs) history = { "train_loss": [], "val_loss": [], "step_size": [], "grad": [], "time": [], } start_time = perf_counter_ns() prev_loss = None num_steps_little_change = 0 for i in range(opt_steps): try: train_loss, grad, step_size = ttls.step() except (LinAlgError, ValueError): # Linalg error points to convergence # On MKL this raises ValueError instead break if step_size is None: # Line search didn't converge break history["train_loss"].append(train_loss) if X_val is not None: val_loss = ttls.loss(y=y_val, idx=idx_val) loss = val_loss history["val_loss"].append(val_loss) if verbose: print(f"{i=}, {val_loss=:.4e}, {train_loss=:.4e}") else: loss = train_loss if verbose: print(f"{i=}, {train_loss=:.4e}") grad = np.abs(grad) / len(y) history["grad"].append(grad) history["step_size"].append(step_size) time_delta = perf_counter_ns() - start_time history["time"].append(time_delta / 1e9) # Monitor relative improvement for early stopping if opt_tol is not None: if prev_loss is not None: rel_error = (prev_loss - loss) / np.abs(prev_loss) else: rel_error = np.inf prev_loss = loss if rel_error < opt_tol: num_steps_little_change += 1 else: num_steps_little_change = 0 if num_steps_little_change >= 3: break ttml.tt = ttls.tt ttml._history = history return ttml
def _fit_old( self, X, y, X_val=None, y_val=None, task="regression", sample_weight=None, n_steps_init=50, n_rounds=50, n_steps_round=20, n_thresh_round=10, n_round_rank_increase=5, min_improvement=1e-5, backend="numpy", optimizer_cls=TensorTrainLineSearch, opt_kwargs=None, verbose=False, ): """Fit the TTML to data using RGD, incrementing number of thresholds during training. Parameters ----------forest X : np.ndarray y : np.ndarray X_val : np.ndarray (optional) If supplied together with `y_val`, validation loss is monitored during training. y_val : np.ndarray or None (optional) task : str (default: 'regression') Whether to perform `regression` or `classification` sample_weight : np.ndarray Array of same shape as `y` giving weights to samples n_steps_init : int (default: 50) Number of steps to take before adding more thresholds n_rounds : int (default: 50) Number of times thresholds are added or rank is increased. n_steps_round : int (default: 20) Number of steps to take between adding thresholds n_thresh_round : int (default: 10) Number of thresholds to add each round n_round_rank_increase : int (default: 5) Increase rank every `n_round_rank_increase` rounds min_improvement : float (optional, default: 1e-5) If not `None`, stop optimization round if relative change in training loss is less than `min_improvement` for 3 consecutive steps backend : str (default: "numpy") optimizer : `TensorTrainOptimizer` (default: `TensorTrainLineSearch`) Which optimizer to use. By default use `TensorTrainLineSearch`, other supported option is `TensorTrainSGD`. opt_kwargs : dict or None (default: None) keyword arguments to pass to the optimizer. verbose : bool Returns ------- history : dict Dictionary containing variables monitored during training: train_loss : training loss at each step val_loss : validation loss at each step (if `X_val` is not None) step_size : step size taken by optimizer grad : Riemannian gradient norm time : time in ms since start """ backend = self.backend history = { "train_loss": [], "val_loss": [], "step_size": [], "grad": [], "time": [], } start_time = np.datetime64(datetime.now()) def opt_round(n): prev_phi0 = None num_steps_little_change = 0 idx = convert_backend(self.observed_indices(X), backend) if X_val is not None: idx_val = convert_backend(self.observed_indices(X_val), backend) # Make default step size equal to previous step size for faster start if len(history["step_size"]) > 0: last_step_size = history["step_size"][-1] else: last_step_size = 1.0 opt = optimizer_cls( self.tt, y, idx, task=task, sample_weight=sample_weight, last_step_size=last_step_size, **opt_kwargs, ) for _ in range(n): phi0, derphi0, step_size = opt.step() if step_size is None: break history["train_loss"].append(phi0) if X_val is not None: val_loss = opt.loss(y=y_val, idx=idx_val) history["val_loss"].append(val_loss) grad = np.abs(derphi0) / len(y) history["grad"].append(grad) history["step_size"].append(step_size) time_delta = np.datetime64(datetime.now()) - start_time history["time"].append(time_delta / np.timedelta64(1, "ms")) # Monitor relative improvement for early stopping if min_improvement is not None: if prev_phi0 is not None: rel_error = (prev_phi0 - phi0) / np.abs(prev_phi0) else: rel_error = np.inf prev_phi0 = phi0 if rel_error < min_improvement: num_steps_little_change += 1 else: num_steps_little_change = 0 if num_steps_little_change >= 3: break self.tt = opt.tt try: opt_round(n_steps_init) except LinAlgError: print("LinalgError in initial round, reduce number of steps!") if verbose: if X_val is not None: val_string = f" Validation error: {history['val_loss'][-1]:.5f}" else: val_string = "" print( f"Initial round complete. Train error: {history['train_loss'][-1]:.5f}.{val_string}" ) for i in range(n_rounds): self._expand_tresholds_from_residuals( X, y, n=n_thresh_round, task=task ) if i > 0 and (i % n_round_rank_increase == 0): self.tt.increase_rank(1) try: opt_round(n_steps_round) except LinAlgError: if verbose: print("Training aborted, probably convergence reached.") break if verbose: if X_val is not None: val_string = ( f" Validation error: {history['val_loss'][-1]:.5f}" ) else: val_string = "" print( f"Round {i+1}/{n_rounds}. Train error: {history['train_loss'][-1]:.5f}.\ {val_string}" ) return history
[docs]class TTMLEstimator(BaseEstimator): """Meta class to use TTML as sklearn estimator. Use derivative classes TTMLClassifier and TTMLRegressor instead. """ def __init__( self, estimator, task=None, max_rank=5, num_thresholds=50, tt_cross_its=5, opt_steps=100, opt_tol=1e-5, verbose=False, **kwargs, ): self.estimator = estimator self.task = task self.max_rank = max_rank self.is_fitted = False self.verbose = verbose self.num_thresholds = num_thresholds self.tt_cross_its = tt_cross_its self.opt_steps = opt_steps self.opt_tol = opt_tol self.kwargs = kwargs def fit(self, X, y, X_val=None, y_val=None, sample_weight=None): self.ttml_ = TTML.fit( X, y, self.estimator, X_val=X_val, y_val=y_val, task=self.task, num_thresholds=self.num_thresholds, tt_cross_its=self.tt_cross_its, max_rank=self.max_rank, opt_steps=self.opt_steps, opt_tol=self.opt_tol, verbose=self.verbose, **self.kwargs, ) self.history_ = self.ttml_._history self.is_fitted = True return self
[docs]class TTMLRegressor(TTMLEstimator, RegressorMixin): __doc__ = ( """Wrapper to turn TTML into an sklearn classifier.""" + _fit_parameters_string ) def __init__(self, estimator, **kwargs): super(TTMLRegressor, self).__init__( estimator, task="regression", **kwargs ) def predict(self, X): check_is_fitted(self) X = check_array(X) prediction = self.ttml_.predict(X) return prediction
[docs]class TTMLClassifier(TTMLEstimator, ClassifierMixin): __doc__ = ( """Wrapper to turn TTML into an sklearn classifier.""" + _fit_parameters_string ) def __init__(self, estimator, **kwargs): super(TTMLClassifier, self).__init__( estimator, task="classification", **kwargs ) def predict(self, X): check_is_fitted(self) X = check_array(X) prediction = self.ttml_.predict(X, task="classification") return prediction def predict_proba(self, X): check_is_fitted(self) X = check_array(X) prediction = self.ttml_.predict(X, task="regression") prediction = ar.do("sigmoid", prediction) return prediction def predict_logit(self, X): check_is_fitted(self) X = check_array(X) prediction = self.ttml_.predict(X, task="regression") return prediction