Source code for nmf_models.nmf_models_mod_updates

"""Module containing the inNMF class and tf-idf function."""

import numpy as np
import sklearn
from  sklearn.utils.extmath import randomized_svd, squared_norm, safe_sparse_dot
from sklearn.utils import check_random_state, check_array
import time
import scipy
from scipy import sparse
import sys
import logging
from typing import Optional, Union, Mapping, List  # Special


[docs]class intNMF(): """Class to run int NMF on multiome data Attributes ---------- theta : array-like cell x topic matrix (joint low dimensional embedding) phi_rna : array-like topic x gene matrix. Gives the loading matrix to define topics. phi_atac : array-like topic x region matrix. Gives the loading matrix to define topics. loss : float l2 norm of the reconstruction error i.e. ||X_rna - WH_rna||_2 + ||X_atac - WH_atac||_2 loss_atac : float l2 norm for the reconstruction error of just the atac matrix i.e. ||X_atac - WH_atac||_2 loss_rna : float l2 norm for the reconstruction error of just the rna matrix i.e. ||X_rna - WH_rna||_2 Parameters ---------- n_topics : int The number of latent topics epochs : int Number of interations during optimisation. Defaults to 15. init : string Method of initialising W and H. Defaults to random. mod1_skew : float Relative weighting of two modalities between 0-1. Defaults to 0.5. reg : string Include l1 or l2 regularisation or not. (This is TODO). Default None seed : int Random seed to use. Defaults to None ,i.e., no control of random seed (Useful for reproducability when using random initilisation) """ def __init__(self, n_topics: int, epochs: Optional[int] = 15, init: Optional[str]='random', mod1_skew: Optional[float]=0.5, seed: Optional[int] = None, reg = None): if (mod1_skew > 1) or (mod1_skew < 0): raise ValueError self.k = n_topics self.epochs = epochs self.init = init self.mod1_skew = mod1_skew self.loss = [] self.loss_atac = [] self.loss_rna = [] self.epoch_times = [] self.epoch_iter = [] self.rand = seed self.rna_features = None self.atac_features = None def _add_feature_names(self, rna_names: List[str], atac_names: Optional[List[str]] = None): """ Add feature names to the nmf model. This is useful for plotting functions Parameters ---------- rna_names: list of gene names must be same length as columns in rna_mat atac_names: Optional list. Must be the same length as columns in atac_mat """ if len(rna_names) != self.phi_rna.shape[1]: print('rna dims dont match. Features not added') return self.rna_features = rna_names if atac_names is not None: if len(atac_names) != self.phi_atac.shape[1]: print('atac deatures not added. Dims dont match') self.atac_features = atac_names
[docs] def fit(self, rna_mat: Union[np.array, sparse.csr_matrix], atac_mat: Union[np.array, sparse.csr_matrix], rna_names: Optional[List[str]] = None, atac_names = None): """ Optimise NMF. Uses accelerated Hierarchical alternating least squares algorithm proposed here, but modified to joint factorise two matrices. https://arxiv.org/pdf/1107.5194.pdf. Only required arguments are the matrices to use for factorisation. GEX and ATAC matrices are expected in cell by feature format. Matrices should be scipy sparse matrices. min ||X_rna - (theta . phi_rna)||_2 and min ||X_atac - (theta . phi_atac)||_2 s.t. theta, phi_rna, phi_atac > 0. So that theta hold the latent topic scores for a cell. And phi the allows recontruction of X Parameters ---------- rna_mat: scipy sparse matrix (or coercible) of single cell gene expression atac_mat: scipy sparse matrix (or coercible) of single cell gene expression rna_names: Optional list of gene names must be same length as columns in rna_mat atac_names: Optional list. Must be the same length as columns in atac_mat Returns -------- self Access low dim embed with self.theta or the loadings with self.phi_rna or self.phi_atac """ # if input matrices not sparse convert to sparse if not scipy.sparse.issparse(rna_mat): rna_mat = sparse.csr_matrix(rna_mat) if not scipy.sparse.issparse(atac_mat): atac_mat = sparse.csr_matrix(atac_mat) start = time.perf_counter() cells = atac_mat.shape[0] regions = atac_mat.shape[1] genes = rna_mat.shape[1] RNA_mat = rna_mat ATAC_mat = atac_mat nM_rna = sparse.linalg.norm(RNA_mat, ord='fro')**2 nM_atac = sparse.linalg.norm(ATAC_mat, ord='fro')**2 # Intialise matrices. Default is random. Dense numpy arrays. theta, phi_rna, phi_atac = self._initialize_nmf(RNA_mat, ATAC_mat, self.k, init=self.init) early_stopper = 0 counter = 0 interval = round(self.epochs/10) progress = 0 self.loss = [] #perform the optimisation. A, B, theta and phi matrices are modified to fit the update function for i in range(self.epochs): epoch_start =time.perf_counter() #update theta/W eit1 = time.perf_counter() rnaMHt = safe_sparse_dot(RNA_mat, phi_rna.T) rnaHHt = phi_rna.dot(phi_rna.T) atacMHt = safe_sparse_dot(ATAC_mat, phi_atac.T) atacHHt = phi_atac.dot(phi_atac.T) eit1 = time.perf_counter() - eit1 if i == 0: scale = ((np.sum(rnaMHt*theta)/np.sum(rnaHHt*(theta.T.dot(theta)))) + np.sum(atacMHt*theta)/np.sum(atacHHt*(theta.T.dot(theta))))/2 theta=theta*scale theta, theta_it = self._HALS_W(theta, rnaHHt, rnaMHt, atacHHt, atacMHt, eit1) #update phi_rna/H eit1 = time.perf_counter() A_rna = safe_sparse_dot(theta.T, RNA_mat) B_rna = (theta.T).dot(theta) eit1 = time.perf_counter() - eit1 phi_rna, phi_rna_it = self._HALS(phi_rna, B_rna, A_rna, eit1) #update phi_atac/H eit1 = time.perf_counter() A_atac = safe_sparse_dot(theta.T, ATAC_mat) B_atac = (theta.T).dot(theta) eit1 = time.perf_counter() - eit1 phi_atac, phi_atac_it = self._HALS(phi_atac, B_atac, A_atac, eit1) error_rna = np.sqrt(nM_rna - 2*np.sum(phi_rna*A_rna) + np.sum(B_rna*(phi_rna.dot(phi_rna.T)))) error_atac = np.sqrt(nM_atac - 2*np.sum(phi_atac*A_atac) + np.sum(B_atac*(phi_atac.dot(phi_atac.T)))) epoch_end =time.perf_counter() epoch_duration = epoch_end - epoch_start logging.info('epoch duration: {}\nloss: {}'.format(epoch_duration, error_rna+error_atac)) logging.info('theta iter: {}\nphi rna iter: {}\nphi atac iter: {}\n'.format(theta_it, phi_rna_it, phi_atac_it)) self.epoch_times.append(epoch_duration) self.loss.append(error_rna+error_atac) self.loss_atac.append(error_atac) self.loss_rna.append(error_rna) self.epoch_iter.append(theta_it + phi_rna_it + phi_atac_it) #print('total number of iterations in epoch {} is {}'.format(i, theta_rna_it, phi_rna_it + theta_atac_it + phi_atac_it)) #early stopping condition requires 50 consecutive iterations with no change. try: if self.loss[-2] < self.loss[-1]: early_stopper += 1 elif early_stopper > 0: early_stopper = 0 if early_stopper > 50: break except IndexError: continue counter += 1 self.theta = theta self.phi_rna = phi_rna self.phi_atac = phi_atac end = time.perf_counter() if rna_names is not None: self._add_feature_names(rna_names, atac_names) self.total_time = end - start logging.info(self.total_time) del RNA_mat del ATAC_mat
def _HALS(self, H, WtW, WtM, eit1, alpha=0.5, delta=0.1): """Optimizing min_{V >= 0} ||M-UV||_F^2 with an exact block-coordinate descent. UtU and UtM are exprensive to compute so multiple updates are calcuated. stop condition (epoch run time ) < (precompute time + current iteration time)/2 AND sum of squares of updates to V at current epoch > 0.01 * sum of squares of updates to V at first epoch Parameters: ----------- H: Array-like mat to update WtW : Array-like precomputed dense array WtM : Array-like precomputed dense array eit1 : int precompute time alpha : float control time based stop criteria delta : float control loss based stop criteria Returns --------------------- Array-like H (loading matrix) int number of iterations (usually 6/7) """ r, n = H.shape eit2 = time.perf_counter() # start time cnt = 1 eps = 1 eps0 = 1 eit3 = 0 # iteration time n_it = 0 while cnt == 1 or ((time.perf_counter()-eit2 < (eit1+eit3)*alpha) and (eps >= (delta**2)*eps0)): nodelta = 0 if cnt == 1: eit3 = time.perf_counter() for k in range(r): deltaH = np.maximum((WtM[k, :]-WtW[k, :].dot(H))/WtW[k, k], -H[k, :]) H[k, :] = H[k, :] + deltaH nodelta = nodelta + deltaH.dot(deltaH.T) H[k, H[k, :] == 0] = 1e-16*np.max(H) if cnt == 1: eps0 = nodelta eit3 = time.perf_counter() - eit3 eps = nodelta cnt = 0 n_it += 1 return H, n_it def _HALS_W(self, W, rnaHHt, rnaMHt, atacHHt, atacMHt, eit1, alpha=0.5, delta=0.1): """Optimizing min_{W >= 0} ||X1-WH1||_F^2 + ||X2-WH2||_F^2. An exact block-coordinate descent scheme is applied to update W. HHt and MHt (X) are exprensive to compute so multiple updates are pre-calcuated. stop condition (epoch run time ) < (precompute time + current iteration time)/2 AND sum of squares of updates to V at current epoch > 0.01 * sum of squares of updates to V at first epoch Parameters: ----------- W : array-like mat to update atacHHt : array-like precomputed dense array atacMHt : array-like precomputed dense array rnaHHt : array-like precomputed dense array rnaMHt : array-like precomputed dense array eit1 : int precompute time alpha : float Control time based stop criteria delta : float control loss based stop criteria Returns --------------------- array-like W optimised for current H_atac and H_rna values int number of iterations (usually 6/7) """ n, K = W.shape eit2 = time.perf_counter() # start time cnt = 1 eps = 1 eps0 = 1 eit3 = 0 # iteration time n_it = 0 mod1_skew = self.mod1_skew while cnt == 1 or ((time.perf_counter()-eit2 < (eit1+eit3)*alpha) and (eps >= (delta**2)*eps0)): nodelta=0 if cnt == 1: eit3 = time.perf_counter() for k in range(K): #print(W.shape, atacHHt.shape) #print(W.dot(atacHHt[:,k]).shape) deltaW = np.maximum(((mod1_skew*(rnaMHt[:,k] -W.dot(rnaHHt[:,k]) + (1-mod1_skew)*(atacMHt[:,k]-W.dot(atacHHt[:,k]))) )/ ((mod1_skew*rnaHHt[k, k]) + ((1-mod1_skew)*atacHHt[k,k]))), -W[:, k]) W[:,k] = W[:,k] + deltaW nodelta = nodelta + deltaW.dot(deltaW.T) W[W[:,k] == 0, k] = 1e-16*np.max(W) if cnt == 1: eps0 = nodelta eit3 = time.perf_counter() - eit3 eps = nodelta cnt = 0 n_it += 1 return W, n_it ### PLAN OT CHANGE THIS TO GILLIS METHOD WITH AUTOMATIC TOPIC DETECTION #https://github.com/CostaLab/scopen/blob/6be56fac6470e5b6ecdc5a2def25eb60ed6a1bcc/scopen/MF.py#L696 def _initialize_nmf(self, X, ATAC_mat, n_components, eps=1e-6, init='random'): """Algorithms for NMF initialization. Computes an initial guess for the non-negative rank k matrix approximation for X: X = WH Parameters ---------- X : array-like, shape (n_samples, n_features) The data matrix to be decomposed. n_components : integer The number of components desired in the approximation. init : None | 'random' | 'nndsvd' Method used to initialize the procedure. Default: 'random'. Valid options: - 'random': non-negative random matrices, scaled with: sqrt(X.mean() / n_components) - 'nndsvd': Nonnegative Double Singular Value Decomposition (NNDSVD) eps : float Truncate all values less then this in output to zero. random_state : int, RandomState instance or None, optional, default: None If int, random_state is the seed used by the random number generator; If RandomState instance, random_state is the random number generator; If None, the random number generator is the RandomState instance used by `np.random`. Used when ``random`` == 'nndsvdar' or 'random'. Returns ------- array-like, shape (n_samples, n_components) Initial guesses of W for solving X_rna ~= WH_rna array-like, shape (n_components, n_features) Initial guesses of H_rna for solving X_rna ~= WH_rna array-like, shape (n_components, n_features) Initial guesses of H_atac for solving X_atac ~= WH_atac References ---------- C. Boutsidis, E. Gallopoulos: SVD based initialization: A head start for nonnegative matrix factorization - Pattern Recognition, 2008, http://tinyurl.com/nndsvd """ n_samples, n_features = X.shape if not sparse.issparse(X) and np.any(np.isnan(X)): raise ValueError("NMF initializations with NNDSVD are not available " "with missing values (np.nan).") if init == 'random': X_mean = X.mean() avg = np.sqrt(X_mean / n_components) avg_atac = np.sqrt(ATAC_mat.mean() / n_components) rng = check_random_state(self.rand) H = avg * rng.randn(n_components, n_features) W = avg * rng.randn(n_samples, n_components) H_atac = avg_atac * rng.randn(n_components, ATAC_mat.shape[1]) # we do not write np.abs(H, out=H) to stay compatible with # numpy 1.5 and earlier where the 'out' keyword is not # supported as a kwarg on ufuncs np.abs(H, H) np.abs(W, W) np.abs(H_atac, H_atac) return W, H, H_atac # NNDSVD initialization U, S, V = randomized_svd(X, n_components, random_state=self.rand) W, H = np.zeros(U.shape), np.zeros(V.shape) # The leading singular triplreget is non-negative # so it can be used as is for initialization. W[:, 0] = np.sqrt(S[0]) * np.abs(U[:, 0]) H[0, :] = np.sqrt(S[0]) * np.abs(V[0, :]) for j in range(1, n_components): x, y = U[:, j], V[j, :] # extract positive and negative parts of column vectors x_p, y_p = np.maximum(x, 0), np.maximum(y, 0) x_n, y_n = np.abs(np.minimum(x, 0)), np.abs(np.minimum(y, 0)) # and their norms x_p_nrm, y_p_nrm = np.sqrt(squared_norm(x_p)), np.sqrt(squared_norm(y_p)) x_n_nrm, y_n_nrm = np.sqrt(squared_norm(x_n)), np.sqrt(squared_norm(y_n)) m_p, m_n = x_p_nrm * y_p_nrm, x_n_nrm * y_n_nrm # choose update if m_p > m_n: u = x_p / x_p_nrm v = y_p / y_p_nrm sigma = m_p else: u = x_n / x_n_nrm v = y_n / y_n_nrm sigma = m_n lbd = np.sqrt(S[j] * sigma) W[:, j] = lbd * u H[j, :] = lbd * v W[W < eps] = 0 H[H < eps] = 0 H_atac = safe_sparse_dot(np.linalg.pinv(W), ATAC_mat) H_atac[H_atac < eps] = 0 return W, H, H_atac
[docs]def log_tf_idf(mat_in, scale=10000): """ Return a TF-IDF transformed matrix. Parameters ---------- mat_in : `csr matrix type <https://docs.scipy.org/doc/scipy/reference/generated/scipy.sparse.csr_matrix.html>`_ Positional (required) matrix to be TF-IDF transformed. Should be cell x feature. scale : int scale values by this number (optional) Raises ------ nmf_models.InvalidMatType: If the mat type is invalid Returns ------- csr matrix TF-IDF transformed matrix """ mat = sparse.csr_matrix(mat_in) cell_counts = mat.sum(axis=1) for row in range(len(cell_counts)): mat.data[mat.indptr[row]:mat.indptr[row+1]] = (mat.data[mat.indptr[row]:mat.indptr[row+1]]*scale)/cell_counts[row] mat = sparse.csc_matrix(mat) [rows, cols] = mat.nonzero() feature_count = np.zeros(mat.shape[1]) unique, counts = np.unique(cols, return_counts=True) feature_count[unique] = counts for col in range(len(feature_count)): mat.data[mat.indptr[col]:mat.indptr[col+1]] = mat.data[mat.indptr[col]:mat.indptr[col+1]]*(mat.shape[0]/(feature_count[col]+1)) #idf Number of cells/Number of cells region is open in + 1 mat = mat.log1p() return mat
[docs]class InvalidMatType(Exception): """Raised if the mat is invalid.""" pass