"""Data handling base class utilities.
This module contains base functionality for reading, validating and
preprocessing AnnData objects used by the rest of the package.
"""
import numpy as np
import scanpy as sc
import pandas as pd
from Compocyte.core.tools import is_counts
from scipy import sparse
from warnings import warn
[docs]
class DataBase():
"""Add explanation
"""
[docs]
def load_adata(
self,
adata):
self.adata = adata
if self.var_names is not None: # Load new adata for transfer learning/prediction
self.adata = self.variable_match_adata(adata)
else:
self.var_names = self.adata.var_names.tolist()
self.ensure_not_view()
self.check_for_counts()
if not isinstance(self.adata.X, sparse.csr_matrix):
self.adata.X = sparse.csr_matrix(self.adata.X)
if self.default_input_data == 'normlog':
self.ensure_normlog()
[docs]
def variable_match_adata(
self,
new_adata):
new_var_names = [v for v in self.var_names if v not in new_adata.var_names]
if is_counts(new_adata.X):
pass
else:
if hasattr(new_adata, 'raw') and new_adata.raw is not None and is_counts(new_adata.raw.X):
new_adata.X = new_adata.raw[new_adata.obs_names, new_adata.var_names].X #new_adata.raw.X
elif hasattr(new_adata, 'layers') and 'raw' in new_adata.layers and is_counts(new_adata.layers['raw']):
new_adata.X = new_adata.layers['raw']
if len(new_var_names) > 0:
new_values = np.empty(
(len(new_adata.obs_names), len(new_var_names))
)
new_values[:] = 0
new_X = sparse.csr_matrix(
sparse.hstack(
[new_adata.X, sparse.csr_matrix(new_values)]))
new_var = pd.DataFrame(index=list(new_adata.var_names) + new_var_names)
new_adata = sc.AnnData(
X=new_X,
var=new_var,
obs=new_adata.obs)
return new_adata[:, self.var_names]
[docs]
def add_variables(
self,
new_variables):
new_values = np.empty((
len(self.adata.obs_names),
len(new_variables)))
new_values[:] = 0
new_X = sparse.csr_matrix(
sparse.hstack(
[self.adata.X, sparse.csr_matrix(new_values)]))
new_var = pd.DataFrame(index=list(self.adata.var_names) + new_variables)
self.adata = sc.AnnData(
X=new_X,
var=new_var,
obs=self.adata.obs,
obsm=self.adata.obsm,
uns=self.adata.uns)
[docs]
def ensure_not_view(self):
"""Ensures that the AnnData object saved within is not a view.
"""
if self.adata.is_view:
self.adata = self.adata.copy()
else:
pass
[docs]
def check_for_counts(self):
"""Checks self.adata.X and self.adata.raw.X for presence of raw count data.
"""
if self.ignore_counts:
return
if is_counts(self.adata.X):
pass
else:
if hasattr(self.adata, 'raw') and self.adata.raw is not None and is_counts(self.adata.raw.X):
self.adata.X = self.adata.raw[self.adata.obs_names, self.adata.var_names].X #self.adata.raw.X
elif hasattr(self.adata, 'layers') and 'raw' in self.adata.layers and is_counts(self.adata.layers['raw']):
self.adata.X = self.adata.layers['raw']
else:
raise ValueError('No raw counts found in adata.X or adata.raw.X or adata.layers["raw"].')
[docs]
def ensure_normlog(self):
test_x = self.adata.X[:min(100, len(self.adata)), :]
if hasattr(test_x, 'todense'):
test_x = test_x.todense()
test_x = np.asarray(test_x)
test_x = np.longdouble(test_x)
is_normlog = len(np.unique(np.array(np.round(
np.sum(np.expm1(test_x), axis=1)
)))) == 1
if is_normlog:
warn('You have supplied normalized, log-transformed data. Please ensure that \
this is intended and data is normalized to 10_000 counts per cell prior \
to log1p transformation.')
if 'log1p' in self.adata.uns:
del self.adata.uns['log1p']
if not is_normlog:
if self.ignore_counts:
warn('Data is not tc-count normalized to 10_000 counts per cell and log-\
transformed, thus renormalizing and re-log-transforming data. If this \
is not intended, please set default_input_data to "counts".')
sc.pp.normalize_total(self.adata, target_sum=10000)
sc.pp.log1p(self.adata)