Source code for ttml.tensor_train

from copy import copy, deepcopy

import autoray as ar
import opt_einsum

try:
    import tensorflow as tf
except ModuleNotFoundError:
    pass
from autoray import numpy as np
from numpy import sqrt

from ttml.utils import (
    convert_backend_cores,
    random_normal,
    random_isometry,
    trim_ranks,
)

# TODO: automatic right_core caching


[docs]class TensorTrain: """ Implements a tensor train with orthogonalized cores. Parameters ---------- cores : list<order 3 tensors> List of the TT cores. First core should have shape (1,d0,r0), last has shape (r(n-1),dn,1). Last dimension should match first dimension of consecutive tensors in the list. The tensors can be `numpy.ndarray`, `torch.Tensor` or `tensorflow.EagerTensor`, so long as all the tensors are the same type. mode : str or int (default: `'l'`) The orthogonalization mode. Can be an int, or the string `'l'` or `'r'`, which respectively get converted to the `self.order-1` and `0` is_orth : bool (default: `False`) Whether or not the cores are already orthogonalized. If `False`, the cores are orthogonalized during init. Attributes ---------- cores : list<order 3 tensors> List of all the tensor train cores. order : int The order of the tensor train (number of cores) dims : tuple<int> of length self.order The outer dimensions of the tensor train, i.e. the shape of the dense tensor represented by the tensor train, or `shape[1]` for each core. tt_rank : tuple<int> of length self.order-1 The tt-rank. This `shape[0]` or `shape[2]` for each core. This tuple starts with `cores[0].shape[2]`, and hence does always start/end with 1. backend : str String encoding the backend of the tensors. This is inferred from cores[0]. """ def __init__(self, cores, mode="l", is_orth=False): self.cores = cores self.order = len(cores) self.dims = tuple(c.shape[1] for c in cores) self.tt_rank = tuple(c.shape[0] for c in cores[1:]) self.backend = ar.infer_backend(cores[0]) if mode == "l": self.mode = self.order - 1 elif mode == "r": self.mode = 0 else: self.mode = mode if mode is not None: if not is_orth: # Orthogonalize. If inplace it modifies the argument cores self.cores = self._orth_cores(mode, inplace=False) self.is_orth = True else: self.is_orth = False
[docs] def orthogonalize(self, mode="l", inplace=True, force_rank=True): """Orthogonalize the cores with respect to `mode`. Parameters ---------- mode : int or str (default: "l") Orthogonalization mode. If "l", defaults to right-most core, if "r" to left-most core. inplace : bool (default: True) If `True` then cores are changed in place and return `None`. Otherwise return a TensorTrain object with orthogonalized cores. force_rank : bool (default: True) If True, check after each step that the rank of the TT hasn't lowered. If it has, artificially increase it back by mutliplying by random isometry. """ new_cores = self._orth_cores( mode=mode, inplace=inplace, force_rank=force_rank ) if not inplace: return TensorTrain(new_cores, mode=mode, is_orth=True)
def _orth_cores(self, mode="l", inplace=True, force_rank=True): """Orthogonalize with respect to mode `mode` and return list of new TT cores. See `orthogonalize` for parameters. """ if mode == "l": mu = self.order - 1 elif mode == "r": mu = 0 else: if not isinstance(mode, int): raise ValueError( "Orthogonalization mode should be 'l','r' or int" ) mu = mode new_cores = [None] * self.order if inplace: new_cores[mu] = self.cores[mu] else: new_cores[mu] = deepcopy(self.cores[mu]) # Orthogonalize to left of mu if mu > 0: for i in range(mu): if inplace: C = self.cores[i] else: C = deepcopy(self.cores[i]) shape = C.shape if i > 0: C = np.reshape(C, (shape[0], shape[1] * shape[2])) C = R @ C K = R.shape[0] else: K = 1 C = np.reshape(C, (K * shape[1], shape[2])) Q, R = np.linalg.qr(C) if ( force_rank and R.shape[0] < R.shape[1] ): # detect rank decrease isometry = random_isometry( (R.shape[1], R.shape[0]), self.backend ) R = isometry @ R Q = Q @ ar.transpose(isometry) new_cores[i] = np.reshape(Q, (K, shape[1], Q.shape[1])) if mu == self.order - 1: C = new_cores[mu] C = np.reshape(C, C.shape[:-1]) C = R @ C Q, R = np.linalg.qr(C) new_cores[mu - 1] = ar.do( "einsum", "ijk,kl->ijl", new_cores[mu - 1], Q ) new_cores[mu] = np.reshape(R, R.shape + (1,)) if mu < self.order - 1: C = new_cores[mu] shape = C.shape C = np.reshape(C, (shape[0], shape[1] * shape[2])) C = R @ C C = np.reshape(C, (R.shape[0], shape[1], shape[2])) new_cores[mu] = C # Orthogonalize to the right of mu if mu < self.order - 1: for i in range(self.order - 1, mu, -1): if inplace: C = self.cores[i] else: C = deepcopy(self.cores[i]) shape = C.shape if i < self.order - 1: C = np.reshape(C, (shape[0] * shape[1], shape[2])) C = C @ np.transpose(R) K = R.shape[0] else: K = 1 C = np.reshape(C, (shape[0], shape[1] * K)) Q, R = np.linalg.qr(np.transpose(C)) if ( force_rank and R.shape[0] < R.shape[1] ): # detect rank decrease isometry = random_isometry( (R.shape[1], R.shape[0]), self.backend ) R = isometry @ R Q = Q @ ar.transpose(isometry) if i == 0: new_cores[i + 1] = ar.do( "einsum", "ji,jkl->ikl", Q, new_cores[i + 1] ) new_cores[i] = np.reshape(np.transpose(R), (1,) + R.shape) else: new_cores[i] = np.reshape( np.transpose(Q), (Q.shape[1], shape[1], K) ) if mu == 0: C = new_cores[mu] C = np.reshape(C, C.shape[1:]) C = R @ np.transpose(C) Q, R = np.linalg.qr(C) new_cores[mu + 1] = ar.do( "einsum", "ji,jkl->ikl", Q, new_cores[mu + 1] ) new_cores[mu] = np.reshape( np.transpose(R), (1,) + R.shape[::-1] ) if mu > 0: C = new_cores[mu] shape = C.shape C = np.reshape(C, (shape[0] * shape[1], shape[2])) C = C @ np.transpose(R) C = np.reshape(C, (shape[0], shape[1], R.shape[0])) new_cores[mu] = C if inplace: self.cores = new_cores self.is_orth = True self.mode = mu self.tt_rank = tuple(c.shape[0] for c in new_cores[1:]) return new_cores
[docs] def convert_backend(self, backend): """Change the backend to target backend inplace.""" if self.backend == backend: pass self.cores = convert_backend_cores(self.cores, backend) self.backend = backend
@classmethod def from_dense(cls, X): raise NotImplementedError
[docs] def dense(self): """Contract to dense tensor in left-to-right sweep. Returns ------- np.ndarray Dense tensor with shape `self.dims` """ C = self.cores[0] shape = C.shape contracted = np.reshape(C, (shape[0] * shape[1], shape[2])) for C in self.cores[1:]: shape1 = contracted.shape shape2 = C.shape contracted = contracted @ np.reshape( C, (shape2[0], shape2[1] * shape2[2]) ) contracted = np.reshape( contracted, (shape1[0] * shape2[1], shape2[2]) ) contracted = np.reshape(contracted, self.dims) return contracted
[docs] @classmethod def random(cls, dims, tt_rank, mode="l", backend="numpy", auto_rank=True): """Create random TensorTrain of specified shape and rank. Always uses double precision for the tensors. Parameters ---------- dims : iterable of ints tt_rank : int or iterable of ints if int, all tt-ranks will be the same mode : "l", "r" or int backend : str (default: "numpy") autorank : bool (default: `True`) if True, automatically losslessly reduce the rank. This option should mainly be set to `False` for debugging and testing purposes. """ if isinstance(tt_rank, int): tt_rank = [tt_rank] * (len(dims) - 1) if auto_rank: tt_rank = trim_ranks(dims, tt_rank) ranks = [1] + list(tt_rank) + [1] cores = [] for i in range(len(dims)): C = random_normal( (ranks[i] * dims[i], ranks[i + 1]), backend=backend ) if i > 0: C, _ = ar.do("linalg.qr", C) # QR to create O(1) sing vals C = ar.do("reshape", C, (ranks[i], dims[i], ranks[i + 1])) cores.append(C) tt = cls(cores, mode=mode, is_orth=True) if auto_rank: tt.orthogonalize(force_rank=False) else: tt.orthogonalize(force_rank=True) return tt
[docs] def gather(self, idx): """Gather entries of dense tensor according to indices. For each row of `idx` this returns one number. This number is obtained by multiplying the slices of each core corresponding to each index (in a left-to-right fashion). Parameters ---------- idx : np.ndarray<int> Array of indices of shape `(len(self.dims), -1)`. Returns ------- np.ndarray Result of contraction. """ # TODO: use caching for further optimization # TODO: improve memory efficiency for large idx if self.backend == "numpy": return self.fast_gather(idx) else: gather = ar.do("take", self[0], idx[:, 0], axis=1) result = ar.do( "reshape", gather[0], (gather.shape[1], gather.shape[2]) ) for i in range(1, self.order): gather = ar.do("take", self[i], idx[:, i], axis=1) result = opt_einsum.contract("ij,jik->ik", result, gather) return ar.do("reshape", result, (-1,))
[docs] def fast_gather(self, idx): """Faster version of standard gather. Only works with numpy backend""" idx = idx.T N = idx.shape[1] result = np.take(self[0].reshape(self[0].shape[1:]), idx[0], axis=0) for i in range(1, self.order): r = self[i].shape[2] next_step = np.zeros((N, r)) for j in range(self.dims[i]): idx_mask = np.where(idx[i] == j) mat = self[i][:, j, :] next_step[idx_mask] = result[idx_mask] @ mat result = next_step return result.reshape(-1)
[docs] def idx_env(self, alpha, idx, num_cores=1, flatten=True): """Gather the left and right environment of a TT-core. Parameters ---------- - cores : list<tensor of order 3> - alpha : int left site of (super)core - idx : array<int> positions of data to gather - num_cores : int (default: 1) number of cores in the supercore - flatten : bool, optional (default: True) If True, always flatten result to 2D array. Otherwise result can be 2D or 3D depending on alpha. """ if (alpha + num_cores > self.order) or (alpha < 0): raise ValueError("The value of alpha is out of range") N = len(idx) # Gather indices of all cores to the left / right of supercore left = self[:alpha] right = self[alpha + num_cores :] left_gather = [ ar.do("take", left[i], idx[:, i], axis=1) for i in range(len(left)) ] right_gather = [ ar.do("take", right[i], idx[:, alpha + num_cores + i], axis=1) for i in range(len(right)) ] # Contract all the cores to the left / right of the supercore if alpha != 0: left_env = ar.do( "reshape", left_gather[0], (left_gather[0].shape[1], left_gather[0].shape[2]), ) for M in left_gather[1:]: left_env = opt_einsum.contract("ij,jik->ik", left_env, M) if alpha != len(self) - num_cores: right_env = ar.do( "reshape", right_gather[-1], (right_gather[-1].shape[0], right_gather[-1].shape[1]), ) for M in right_gather[-2::-1]: right_env = opt_einsum.contract("jik,ki->ji", M, right_env) # Tensor left and right environments of the supercore if alpha == 0: env = ar.do("transpose", right_env) if not flatten: env = ar.do("reshape", env, (1,) + env.shape) elif alpha == len(self) - num_cores: env = left_env if not flatten: env = ar.do("transpose", env) env = ar.do("reshape", env, env.shape + (1,)) else: if flatten: env = opt_einsum.contract("bi,jb->bij", left_env, right_env) env = ar.do("reshape", env, (N, -1)) else: env = opt_einsum.contract("bi,jb->ibj", left_env, right_env) return env
def __mul__(self, other): if not self.is_orth: self._orth_cores() new_cores = deepcopy(self.cores) new_cores[self.mode] *= other return TensorTrain(new_cores, mode=self.mode, is_orth=True) __rmul__ = __mul__ def __imul__(self, other): if not self.is_orth: self._orth_cores() self.cores[self.mode] *= other return self def __truediv__(self, other): return self.__mul__(1 / other) def __itruediv__(self, other): self.__imul__(1 / other) return self def __add__(self, other): # Take direct sum for each tensor slice new_cores = [np.concatenate((self.cores[0], other.cores[0]), axis=2)] for C1, C2 in zip(self.cores[1:-1], other.cores[1:-1]): r1, d, r2 = C1.shape r3, _, r4 = C2.shape zeros1 = ar.do( "zeros", (r1, d, r4), like=self.backend, dtype=ar.to_backend_dtype("float64", self.backend), ) zeros2 = ar.do( "zeros", (r3, d, r2), like=self.backend, dtype=ar.to_backend_dtype("float64", self.backend), ) row1 = np.concatenate((C1, zeros1), axis=2) row2 = np.concatenate((zeros2, C2), axis=2) new_cores.append(np.concatenate((row1, row2), axis=0)) new_cores.append( np.concatenate((self.cores[-1], other.cores[-1]), axis=0) ) new_tt = TensorTrain(new_cores, is_orth=False) new_tt.orthogonalize(force_rank=False) return new_tt def __iadd__(self, other): new_tt = self.__add__(other) return new_tt # self.cores = new_tt.cores # self.mode = new_tt.mode # return self def __neg__(self): return (-1) * self def __sub__(self, other): return self + (-1) * other def __isub__(self, other): new_tt = self.__sub__(other) return new_tt def __len__(self): return self.order def __getitem__(self, index): return self.cores[index] def __setitem__(self, index, data): self.cores[index] = data def __repr__(self): return ( f"<TensorTrain of order {self.order} " f"with outer dimensions {self.dims}, TT-rank " f"{self.tt_rank}, and orthogonalized at mode {self.mode}>" )
[docs] def round(self, max_rank=None, eps=None, inplace=True): """ Truncate the tensor train. * If ``max_rank`` is specified, truncate to this rank. * If ``eps`` is specified, truncate with precision ``eps`` (relative to largest singular value of each unfolding) Parameters ---------- max_rank : int or tuple<int> (optional) eps : float (optional) inplace : bool If True, round the tensor train in place. Otherwise performI truncation on a copy. Returns ------- TensorTrain """ if inplace: tt = self else: tt = self.copy() if max_rank is None and eps is None: raise ValueError( "At least one of `max_rank` or `eps` has to be specified" ) # scale epsilon relative to norm if eps is not None: eps *= tt.norm() if isinstance(max_rank, int): max_rank = [max_rank] * tt.order # sweep left to right, last core there's nothing to do tt.orthogonalize(mode="r") for i in range(tt.order - 1): # truncated SVD C = tt.cores[i] shape = C.shape U, S, V = np.linalg.svd( np.reshape(C, (shape[0] * shape[1], shape[2])) ) if max_rank is not None: S = S[: max_rank[i]] if eps is not None: if S[0] > eps: S = S[S > eps] else: S = S[:1] r = len(S) tt.cores[i] = np.reshape(U[:, :r], (shape[0], shape[1], r)) # update next core SV = np.diag(S) @ V[:r, :] next_core = tt.cores[i + 1] next_shape = next_core.shape next_core = np.reshape( next_core, (next_shape[0], next_shape[1] * next_shape[2]) ) next_core = SV @ next_core tt.cores[i + 1] = np.reshape( next_core, (r, next_shape[1], next_shape[2]) ) tt.mode = tt.order - 1 tt.tt_rank = tuple(c.shape[0] for c in tt.cores[1:]) return tt
[docs] def increase_rank(self, inc, i=None): """Increase the rank of the edge between core `i` and `i+1` by `inc`. This operation leaves the dense tensor invariant. To instead decrease the rank, use `truncate`.""" # Increase the rank of all nodes if i is None: for i in range(len(self.tt_rank)): self.increase_rank(inc, i) return self # Random isometry of shape (r+inc, r) r = self.tt_rank[i] A = ar.do( "random.normal", size=(r + inc, r), like=self.backend, dtype="float64", ) Q, _ = ar.do("linalg.qr", A) # Apply isometry self[i] = ar.do("einsum", "ijk,lk->ijl", self[i], Q) self[i + 1] = ar.do("einsum", "ij,jkl->ikl", Q, self[i + 1]) # Update rank information new_tt_rank = list(self.tt_rank) new_tt_rank[i] += inc self.tt_rank = tuple(new_tt_rank) return self
[docs] def sing_vals(self): """Compute singular values of each unfolding. As a side effect the TT is left-orthogonalized. One array of size ``self.tt_rank[i]`` is returned for each core ``i``. """ if self.mode != 0: self.orthogonalize(mode=0) sing_vals = [] for i in range(self.order - 1): # SVD C = self.cores[i] shape = C.shape U, S, V = np.linalg.svd( np.reshape(C, (shape[0] * shape[1], shape[2])) ) sing_vals.append(ar.to_numpy(S)) self.cores[i] = np.reshape(U, shape) # update next core SV = np.diag(S) @ V next_core = self.cores[i + 1] next_shape = next_core.shape next_core = np.reshape( next_core, (next_shape[0], next_shape[1] * next_shape[2]) ) next_core = SV @ next_core self.cores[i + 1] = np.reshape(next_core, next_shape) self.mode = self.order - 1 self.tt_rank = tuple(c.shape[0] for c in self.cores[1:]) return sing_vals
[docs] def rgrad_sparse(self, grad, idx): """Project sparse euclidean gradient to tangent space. Parameters ---------- grad : array<float64> Array containing the values of the sparse gradient. idx : array<int64> of shape `(len(grad),self.order)` Array containing the indices of the dense tensor corresponding to the values of the sparse euclidean gradient Returns ------- TensorTrainTangentVector """ if self.mode != self.order - 1: self.orthogonalize() right_cores = self._orth_cores(mode="r", inplace=False) left_cores = self.cores for C1, C2 in zip(left_cores, right_cores): assert C1.shape == C2.shape, ( [C.shape for C in left_cores], [C.shape for C in right_cores], ) N = len(idx) backend = self.backend # Compute left vectors left_vectors = [None] * self.order for mu in range(self.order - 1): left_vectors[mu] = ar.do( "zeros", (N, self.tt_rank[mu]), like=backend, dtype=ar.to_backend_dtype("float64", backend), ) for i in range(self.dims[mu]): inds = ar.do("where", idx[:, mu] == i)[0] if len(inds) == 0: continue if backend == "tensorflow": inds = ar.do("reshape", inds, (-1, 1)) if mu == 0: update = ar.do( "stack", [left_cores[0][0, i, :]] * len(inds) ) if backend == "tensorflow": left_vectors[mu] = tf.tensor_scatter_nd_update( left_vectors[mu], inds, update ) else: left_vectors[mu][inds] = update else: update = ( ar.do("take", left_vectors[mu - 1], inds, axis=0)
[docs] @ left_cores[mu][:, i, :] ) if backend == "tensorflow": update = ar.do( "reshape", update, (len(inds), left_vectors[mu].shape[1]), ) left_vectors[mu] = tf.tensor_scatter_nd_update( left_vectors[mu], inds, update ) else: left_vectors[mu][inds] = update # Compute right vectors right_vectors = [None] * self.order for mu in range(self.order - 1, 0, -1): right_vectors[mu] = ar.do( "zeros", (N, self.tt_rank[mu - 1]), like=backend, dtype=ar.to_backend_dtype("float64", backend), ) for i in range(self.dims[mu]): inds = ar.do("where", idx[:, mu] == i)[0] if len(inds) == 0: continue if backend == "tensorflow": inds = ar.do("reshape", inds, (-1, 1)) if mu == self.order - 1: update = ar.do( "stack", [right_cores[mu][:, i, 0]] * len(inds) ) if backend == "tensorflow": right_vectors[mu] = tf.tensor_scatter_nd_update( right_vectors[mu], inds, update ) else: right_vectors[mu][inds] = update else: update = ar.do( "take", right_vectors[mu + 1], inds, axis=0 ) @ ar.transpose(right_cores[mu][:, i, :]) if backend == "tensorflow": update = ar.do( "reshape", update, (len(inds), right_vectors[mu].shape[1]), ) right_vectors[mu] = tf.tensor_scatter_nd_update( right_vectors[mu], inds, update ) else: right_vectors[mu][inds] = update # Compute the gradient cores grad_cores = [[None] * d for d in self.dims] for mu in range(self.order - 1, -1, -1): for i in range(self.dims[mu]): inds = ar.do("where", idx[:, mu] == i)[0] if len(inds) == 0: if mu == 0: grad_cores[mu][i] = ar.do( "zeros", (1, self.tt_rank[0]), like=backend, dtype=ar.to_backend_dtype("float64", backend), ) elif mu == self.order - 1: grad_cores[mu][i] = ar.do( "zeros", (self.tt_rank[-1], 1), like=backend, dtype=ar.to_backend_dtype("float64", backend), ) else: grad_cores[mu][i] = ar.do( "zeros", (self.tt_rank[mu - 1], self.tt_rank[mu]), like=backend, dtype=ar.to_backend_dtype("float64", backend), ) continue Z = ar.do("take", grad, inds, axis=0) if mu > 0: U = ar.do("take", left_vectors[mu - 1], inds, axis=0) if mu < self.order - 1: V = ar.transpose( ar.do("take", right_vectors[mu + 1], inds, axis=0) ) if mu == self.order - 1: Z = ar.reshape(Z, (1, -1)) G = ar.do("reshape", Z @ U, (-1, 1)) elif mu == 0: Z = ar.reshape(Z, (-1, 1)) G = ar.do("reshape", V @ Z, (1, -1)) else: G = ar.do("einsum", "ji,j,kj->ik", U, Z, V) grad_cores[mu][i] = G grad_cores = [ar.do("stack", G_list, axis=1) for G_list in grad_cores] # Apply gauge conditions to the gradient cores self.apply_gradient_gauge_conditions(left_cores, grad_cores) return TensorTrainTangentVector(grad_cores, left_cores, right_cores)
def apply_gradient_gauge_conditions(self, left_cores, grad_cores): """Apply gauge conditions to gradient cores inplace""" for mu in range(self.order - 1): r1, r2, r3 = left_cores[mu].shape U = np.reshape(left_cores[mu], (r1 * r2, r3)) grad_cores[mu] -= np.reshape( U
[docs] @ (np.transpose(U) @ np.reshape(grad_cores[mu], (r1 * r2, r3))), (r1, r2, r3), )
def grad_proj(self, tangent_vector, right_cores=None): """Project TensorTrainTangentVector to tangent space. This implements parallel transport of `tangent_vector` to this point. """ return self.tt_proj( tangent_vector.to_tt(round=False), right_cores=right_cores )
[docs] def tt_proj(self, tt, right_cores=None, proj_U=True): """Project a TT to tangent space of self. This needs both left- and right-orthogonal cores. If `right_cores` is not specified, the right cores are computed first.""" if self.mode != self.order - 1: self.orthogonalize() left_cores = self.cores if right_cores is None: right_cores = self._orth_cores(mode="r", inplace=False) # List of partial contractions with left/right-orthogonal cores and tt right = contract_cores(right_cores, tt, "RL", store_parts=True) left = contract_cores(left_cores, tt, "LR", store_parts=True) # Project grad cores using left and right environments grad_cores = [] grad_cores.append(opt_einsum.contract("ijk,bk->ijb", tt[0], right[1])) for i in range(1, self.order - 1): grad_cores.append( opt_einsum.contract( "ai,ijk,bk->ajb", left[i - 1], tt[i], right[i + 1], ) ) grad_cores.append(opt_einsum.contract("ai,ijk->ajk", left[-2], tt[-1])) # Apply gauge conditions self.apply_gradient_gauge_conditions(left_cores, grad_cores) return TensorTrainTangentVector(grad_cores, left_cores, right_cores)
[docs] def apply_grad(self, tangent_vector, alpha=1.0, round=True, inplace=False): """Compute retract of tangent vector. Parameters ---------- tangent_vector : TensorTrainTangentVector Tangent vector, assumed to lie in tangent space at current point. alpha : float (default: 1.0) Stepsize of retract. Using this parameter is equivalent but more efficient to supplying `alpha*tangent_vector` as first argument. round : bool (default: True) After applying tangent vector we end up with a TT of double the tt-rank. This bool controls whether to project back to original tt-rank, which is necessary if we want to compute the retract. inplace : bool (default: False) If false, return a new TensorTrain, otherwise update this TT inplace TODO: implement efficient reorthogonalization. """ left_cores = tangent_vector.left_cores right_cores = tangent_vector.right_cores grad_cores = tangent_vector.grad_cores # Formula of Steinlechner "Riemannian Optimization for High-Dimensional # Tensor Completion" (published version), end of page 10. new_cores = [ np.concatenate((alpha * grad_cores[0], left_cores[0]), axis=2) ] for U, V, dU in zip( left_cores[1:-1], right_cores[1:-1], grad_cores[1:-1] ): first_row = np.concatenate((V, np.zeros_like(V)), axis=2) second_row = np.concatenate((alpha * dU, U), axis=2) new_cores.append(np.concatenate((first_row, second_row), axis=0)) new_cores.append( np.concatenate( (right_cores[-1], left_cores[-1] + alpha * grad_cores[-1]), axis=0, ) ) new_tt = TensorTrain(new_cores) if round: new_tt.round(max_rank=self.tt_rank) if inplace: self.cores = new_tt.cores self.tt_rank = new_tt.tt_rank self.mode = new_tt.mode self.is_orth = new_tt.is_orth else: return new_tt
def norm(self): if not self.is_orth: self.orthogonalize() return np.linalg.norm(self.cores[self.mode])
[docs] def dot(self, other): """Compute dot product with other TT with same outer dimensions""" return contract_cores(self.cores, other.cores)
def __matmul__(self, other): return self.dot(other) def copy(self, deep=True): if deep: return deepcopy(self) else: return copy(self) def num_params(self): return sum([np.prod(C.shape) for C in self.cores])
[docs]class TensorTrainTangentVector: """Class for storing a tangent vector to TT manifold. A tangent vector at a point on the TT-manifold is a list of cores with the same shape as the TT, satisfying a certain gauge condition. This class stores both the left-orthogonal and right orthogonal cores of the original TT. This is because both are needed for most computations. parameters ---------- grad_cores : list<order 3 tensors> list of first-order variation cores left_cores : list<order 3 tensors> list of left-orthogonal cores right_cores : list<order 3 tensors> list of right-orthogonal cores""" def __init__(self, grad_cores, left_cores, right_cores): self.grad_cores = grad_cores self.left_cores = left_cores self.right_cores = right_cores self.backend = ar.infer_backend(grad_cores[0]) self.order = len(grad_cores) self.tt_rank = tuple(c.shape[0] for c in grad_cores[1:]) self.dims = tuple(c.shape[1] for c in grad_cores)
[docs] def inner(self, other): """Compute inner product between two tangent vectors. Due to orthogonalization this is just inner product of first-order variations""" result = 0.0 for core1, core2 in zip(self.grad_cores, other.grad_cores): if self.backend != "tensorflow": result += ar.do( "dot", np.reshape(core1, (-1,)), np.reshape(core2, (-1,)) ) else: result += ar.do( "tensordot", np.reshape(core1, (-1,)), np.reshape(core2, (-1,)), 1, ) return result
def __matmul__(self, other): return self.inner(other) def norm(self): result = 0 for c in self.grad_cores: result += ar.do("linalg.norm", c) ** 2 return ar.do("sqrt", result)
[docs] @classmethod def random(cls, left_cores, right_cores): """Random tangent vector with unit norm gradients""" order = len(left_cores) backend = ar.infer_backend(left_cores[0]) grad_cores = [] for i in range(order): # C = ar.do("random.normal", size=left_cores[i].shape, like=backend) C = random_normal(left_cores[i].shape, backend=backend) C = C / (sqrt(order) * np.linalg.norm(C)) grad_cores.append(C) # Poject to range of U for i in range(order - 1): Y_mat = grad_cores[i] shape = Y_mat.shape Y_mat = np.reshape(grad_cores[i], (shape[0] * shape[1], shape[2])) U_mat = left_cores[i] U_mat = np.reshape(U_mat, (shape[0] * shape[1], shape[2])) Y_mat -= U_mat @ (np.transpose(U_mat) @ Y_mat) grad_cores[i] = np.reshape(Y_mat, shape) return cls(grad_cores, left_cores, right_cores)
[docs] def to_tt(self, round=False): """Convert to TensorTrain. If `round=True` then round to TT of rank same TT-rank as point, if `round=False` it will have double the TT-rank of the point.""" new_cores = [ np.concatenate((self.grad_cores[0], self.left_cores[0]), axis=2) ] for U, V, dU in zip( self.left_cores[1:-1], self.right_cores[1:-1], self.grad_cores[1:-1], ): first_row = np.concatenate((V, np.zeros_like(U)), axis=2) second_row = np.concatenate((dU, U), axis=2) new_cores.append(np.concatenate((first_row, second_row), axis=0)) new_cores.append( np.concatenate( (self.right_cores[-1], self.grad_cores[-1]), axis=0, ) ) new_tt = TensorTrain(new_cores) if round: new_tt.round(max_rank=self.tt_rank) return new_tt
[docs] def to_eucl(self, idx): """Return sparse Euclidean gradient with indices `idx`. *NB: `idx` should not contain any repeated indices.*""" return self.to_tt().gather(idx)
[docs] def convert_backend(self, backend): """Change the backend to target backend inplace.""" if self.backend == backend: pass self.grad_cores = convert_backend_cores(self.grad_cores, backend) self.left_cores = convert_backend_cores(self.left_cores, backend) self.right_cores = convert_backend_cores(self.right_cores, backend) self.backend = backend
def __repr__(self): return ( f"<TensorTrainTangentVector of order {self.order}, " f"outer dimensions {self.dims}, and TT-rank " f"{self.tt_rank}>" ) def __getitem__(self, index): return self.grad_cores[index] def __setitem__(self, index, data): self.grad_cores[index] = data def __mul__(self, other): new_grad_cores = [C * other for C in self.grad_cores] return TensorTrainTangentVector( new_grad_cores, self.left_cores, self.right_cores ) __rmul__ = __mul__ def __imul__(self, other): self.grad_cores = [C * other for C in self.grad_cores] return self def __truediv__(self, other): return self.__mul__(1 / other) def __itruediv__(self, other): self.__imul__(1 / other) return self def __neg__(self): return (-1) * self def __sub__(self, other): return self + (-1) * other def __isub__(self, other): self.__iadd__(-other) return self def __add__(self, other): new_grad_cores = [ C1 + C2 for C1, C2 in zip(self.grad_cores, other.grad_cores) ] return TensorTrainTangentVector( new_grad_cores, self.left_cores, self.right_cores ) def __iadd__(self, other): self.grad_cores = [ C1 + C2 for C1, C2 in zip(self.grad_cores, other.grad_cores) ] return self def __len__(self): return self.order def copy(self, deep=True): if deep: return deepcopy(self) else: return copy(self)
[docs]def contract_cores( cores1, cores2, dir="LR", upto=None, store_parts=False, return_float=True ): """Compute contraction of one list of TT cores with another. Cores should be of outer dimensions If `dir=LR`, then first cores should have first shape 1, if `dir=RL` then last shape of last core should be 1. Parameters ---------- cores1 : list of TT cores. cores2 : list of TT cores. dir : str, default='LR' Direction of contraction; LR (left-to-right), RL (right-to-left) upto : int, optional Contract only up to this mode store_parts : bool, default=False If `True`, return list of all intermediate contractions. return_float : bool, default=True If `True` and the result has total dimension 1, then compress result down to a float. Ignored if `store_parts=True`. """ result_list = [] if dir == "LR": result = opt_einsum.contract("ijk,ajc->iakc", cores1[0], cores2[0]) result = ar.do( "reshape", result, (cores1[0].shape[-1], cores2[0].shape[-1]) ) for core1, core2 in zip(cores1[1:upto], cores2[1:upto]): if store_parts: result_list.append(result) result = opt_einsum.contract("ij,ika,jkb->ab", result, core1, core2) total_dim = 1 for d in result.shape: # if result is number, reformat to float. total_dim *= d if return_float and d == 1 and not store_parts: result = float(np.reshape(result, (-1,))) if store_parts: result_list.append(result) return result_list else: return result elif dir == "RL": result = opt_einsum.contract("ijk,ajc->iakc", cores1[-1], cores2[-1]) result = ar.do( "reshape", result, (cores1[-1].shape[0], cores2[-1].shape[0]) ) for core1, core2 in zip(cores1[-2:upto:-1], cores2[-2:upto:-1]): if store_parts: result_list.append(result) result = opt_einsum.contract("ab,ika,jkb->ij", result, core1, core2) total_dim = 1 for d in result.shape: # if result is number, reformat to float. total_dim *= d if return_float and d == 1 and not store_parts: result = float(np.reshape(result, (-1,))) if store_parts: result_list.append(result) return result_list[::-1] # reverse result list for RL direction else: return result else: raise ValueError(f"Unknown direction '{dir}'")