Source code for Compocyte.core.base.data_base

import numpy as np
import scanpy as sc
import pandas as pd
from Compocyte.core.tools import is_counts
from scipy import sparse
from warnings import warn

[docs] class DataBase(): """Add explanation """
[docs] def load_adata( self, adata): self.adata = adata if self.var_names is not None: # Load new adata for transfer learning/prediction self.adata = self.variable_match_adata(adata) else: self.var_names = self.adata.var_names.tolist() self.ensure_not_view() self.check_for_counts() if not isinstance(self.adata.X, sparse.csr_matrix): self.adata.X = sparse.csr_matrix(self.adata.X) if self.default_input_data == 'normlog': self.ensure_normlog()
[docs] def variable_match_adata( self, new_adata): new_var_names = [v for v in self.var_names if v not in new_adata.var_names] if is_counts(new_adata.X): pass else: if hasattr(new_adata, 'raw') and new_adata.raw is not None and is_counts(new_adata.raw.X): new_adata.X = new_adata.raw[new_adata.obs_names, new_adata.var_names].X #new_adata.raw.X elif hasattr(new_adata, 'layers') and 'raw' in new_adata.layers and is_counts(new_adata.layers['raw']): new_adata.X = new_adata.layers['raw'] if len(new_var_names) > 0: new_values = np.empty( (len(new_adata.obs_names), len(new_var_names)) ) new_values[:] = 0 new_X = sparse.csr_matrix( sparse.hstack( [new_adata.X, sparse.csr_matrix(new_values)])) new_var = pd.DataFrame(index=list(new_adata.var_names) + new_var_names) new_adata = sc.AnnData( X=new_X, var=new_var, obs=new_adata.obs) return new_adata[:, self.var_names]
[docs] def add_variables( self, new_variables): new_values = np.empty(( len(self.adata.obs_names), len(new_variables))) new_values[:] = 0 new_X = sparse.csr_matrix( sparse.hstack( [self.adata.X, sparse.csr_matrix(new_values)])) new_var = pd.DataFrame(index=list(self.adata.var_names) + new_variables) self.adata = sc.AnnData( X=new_X, var=new_var, obs=self.adata.obs, obsm=self.adata.obsm, uns=self.adata.uns)
[docs] def ensure_not_view(self): """Ensures that the AnnData object saved within is not a view. """ if self.adata.is_view: self.adata = self.adata.copy() else: pass
[docs] def check_for_counts(self): """Checks self.adata.X and self.adata.raw.X for presence of raw count data. """ if self.ignore_counts: return if is_counts(self.adata.X): pass else: if hasattr(self.adata, 'raw') and self.adata.raw is not None and is_counts(self.adata.raw.X): self.adata.X = self.adata.raw[self.adata.obs_names, self.adata.var_names].X #self.adata.raw.X elif hasattr(self.adata, 'layers') and 'raw' in self.adata.layers and is_counts(self.adata.layers['raw']): self.adata.X = self.adata.layers['raw'] else: raise ValueError('No raw counts found in adata.X or adata.raw.X or adata.layers["raw"].')
[docs] def ensure_normlog(self): test_x = self.adata.X[:min(100, len(self.adata)), :] if hasattr(test_x, 'todense'): test_x = test_x.todense() test_x = np.asarray(test_x) test_x = np.longdouble(test_x) is_normlog = len(np.unique(np.array(np.round( np.sum(np.expm1(test_x), axis=1) )))) == 1 if is_normlog: warn('You have supplied normalized, log-transformed data. Please ensure that \ this is intended and data is normalized to 10_000 counts per cell prior \ to log1p transformation.') if 'log1p' in self.adata.uns: del self.adata.uns['log1p'] if not is_normlog: if self.ignore_counts: warn('Data is not tc-count normalized to 10_000 counts per cell and log-\ transformed, thus renormalizing and re-log-transforming data. If this \ is not intended, please set default_input_data to "counts".') sc.pp.normalize_total(self.adata, target_sum=10000) sc.pp.log1p(self.adata)