import warnings
from typing import Callable, List, Optional, Tuple, Union
import numpy as np
from anndata import AnnData
from scipy.sparse import csr_matrix, diags, issparse, lil_matrix
from tqdm import tqdm
from ..configuration import DKM, DynamoAdataKeyManager
from ..dynamo_logger import LoggerManager
from ..preprocessing.normalization import normalize_mat_monocle, sz_util
from ..preprocessing.pca import pca
from ..utils import copy_adata
from .connectivity import mnn, normalize_knn_graph, umap_conn_indices_dist_embedding
from .utils import elem_prod, get_mapper, inverse_norm
# ---------------------------------------------------------------------------------------------------
# use for calculating moments for stochastic model:
[docs]def moments(
adata: AnnData,
X_data: Optional[np.ndarray] = None,
genes: Optional[list] = None,
group: Optional[str] = None,
conn: Optional[csr_matrix] = None,
use_gaussian_kernel: bool = False,
normalize: bool = True,
use_mnn: bool = False,
layers: Union[List[str], str] = "all",
n_pca_components: int = 30,
n_neighbors: int = 30,
copy: bool = False,
) -> Optional[AnnData]:
"""Calculate kNN based first and second moments (including uncentered covariance) for different layers of data.
Args:
adata: an AnnData object.
X_data: the user supplied data that will be used for constructing the nearest neighbor graph directly. Defaults
to None.
genes: the one-dimensional numpy array of the genes that you want to perform pca analysis (if adata.obsm['X'] is
not available). `X` keyname (instead of `X_pca`) was used to enable you use a different set of genes for
flexible connectivity graph construction. If `None`, by default it will select genes based `use_for_pca`
key in .var attributes if it exists otherwise it will also all genes stored in adata.X. Defaults to None.
group: the column key/name that identifies the grouping information (for example, clusters that correspond to
different cell types or different time points) of cells. This will be used to compute kNN graph for each
group (i.e. cell-type/time-point). This is important, for example, we don't want cells from different
labeling time points to be mixed when performing the kNN graph for calculating the moments. Defaults to
None.
conn: the connectivity graph that will be used for moment calculations. Defaults to None.
use_gaussian_kernel: whether to normalize the kNN graph via a Guasian kernel. Defaults to False.
normalize: whether to normalize the connectivity matrix so that each row sums up to 1. When
`use_gaussian_kernel` is False, this will be reset to be False because we will already normalize the
connectivity matrix by dividing each row the total number of connections. Defaults to True.
use_mnn: whether to use mutual kNN across different layers as for the moment calculation. Defaults to False.
layers: the layers that will be used for calculating the moments. Defaults to "all".
n_pca_components: the number of pca components to use for constructing nearest neighbor graph and calculating
1/2-st moments. Defaults to 30.
n_neighbors: the number of pca components to use for constructing nearest neighbor graph and calculating 1/2-st
moments. Defaults to 30.
copy: whether to return a new updated AnnData object or update inplace. Defaults to False.
Raises:
Exception: `group` is invalid.
ValueError: `conn` is invalid. It should be a square array with dimension equal to the cell number.
Returns:
The updated AnnData object if `copy` is true. Otherwise, the AnnData object passed in would be updated inplace
and None would be returned.
"""
logger = LoggerManager.gen_logger("dynamo-moments")
logger.info("calculating first/second moments...", indent_level=1)
logger.log_time()
adata = copy_adata(adata) if copy else adata
mapper = get_mapper()
(
only_splicing,
only_labeling,
splicing_and_labeling,
) = DKM.allowed_X_layer_names()
if conn is None:
if genes is None and "use_for_pca" in adata.var.keys():
genes = adata.var_names[adata.var.use_for_pca]
if use_mnn:
if "mnn" not in adata.uns.keys():
adata = mnn(
adata,
n_pca_components=n_pca_components,
layers="all",
use_pca_fit=True,
save_all_to_adata=False,
)
conn = adata.uns["mnn"]
else:
if X_data is not None:
X = X_data
else:
if DKM.X_PCA not in adata.obsm.keys():
if not any([i.startswith("X_") for i in adata.layers.keys()]):
from ..preprocessing import Preprocessor
genes_to_use = adata.var_names[genes] if genes.dtype == "bool" else genes
preprocessor = Preprocessor(force_gene_list=genes_to_use)
preprocessor.preprocess_adata(adata, recipe="monocle")
else:
CM = adata.X if genes is None else adata[:, genes].X
cm_genesums = CM.sum(axis=0)
valid_ind = np.logical_and(np.isfinite(cm_genesums), cm_genesums != 0)
valid_ind = np.array(valid_ind).flatten()
CM = CM[:, valid_ind]
adata, fit, _ = pca(
adata,
CM,
n_pca_components=n_pca_components,
return_all=True,
)
adata.uns["explained_variance_ratio_"] = fit.explained_variance_ratio_[1:]
X = adata.obsm[DKM.X_PCA][:, :n_pca_components]
with warnings.catch_warnings():
warnings.simplefilter("ignore")
if group is None:
(kNN, knn_indices, knn_dists, _,) = umap_conn_indices_dist_embedding(
X,
n_neighbors=np.min((n_neighbors, adata.n_obs - 1)),
return_mapper=False,
)
if use_gaussian_kernel and not use_mnn:
conn = gaussian_kernel(X, knn_indices, sigma=10, k=None, dists=knn_dists)
else:
conn = normalize_knn_graph(kNN > 0)
normalize = False
else:
if group not in adata.obs.keys():
raise Exception(f"the group {group} provided is not a column name in .obs attribute.")
conn = csr_matrix((adata.n_obs, adata.n_obs))
cells_group = adata.obs[group]
uniq_grp = np.unique(cells_group)
for cur_grp in uniq_grp:
cur_cells = cells_group == cur_grp
cur_X = X[cur_cells, :]
(cur_kNN, cur_knn_indices, cur_knn_dists, _,) = umap_conn_indices_dist_embedding(
cur_X,
n_neighbors=np.min((n_neighbors, sum(cur_cells) - 1)),
return_mapper=False,
)
if use_gaussian_kernel and not use_mnn:
cur_conn = gaussian_kernel(
cur_X,
cur_knn_indices,
sigma=10,
k=None,
dists=cur_knn_dists,
)
else:
cur_conn = normalize_knn_graph(cur_kNN > 0)
cur_cells_ = np.where(cur_cells)[0]
conn[cur_cells_[:, None], cur_cells_] = cur_conn
else:
if conn.shape[0] != conn.shape[1] or conn.shape[0] != adata.n_obs:
raise ValueError(
"The connectivity data `conn` you provided should a square array with dimension equal to "
"the cell number!"
)
layers = DynamoAdataKeyManager.get_available_layer_keys(adata, layers, False, False)
layers = [
layer
for layer in layers
if layer.startswith("X_") and (not layer.endswith("matrix") and not layer.endswith("ambiguous"))
]
layers.sort(reverse=True) # ensure we get M_us, M_tn, etc (instead of M_su or M_nt).
for i, layer in enumerate(layers):
layer_x = adata.layers[layer].copy()
matched_x_group_indices = np.where([layer in x for x in [only_splicing, only_labeling, splicing_and_labeling]])
if len(matched_x_group_indices[0]) == 0:
logger.warning(
f"layer {layer} is not in any of the {only_splicing, only_labeling, splicing_and_labeling} groups, skipping..."
)
continue
layer_x_group = matched_x_group_indices[0][0]
if mapper[layer] not in adata.layers.keys():
adata.layers[mapper[layer]], conn = (
calc_1nd_moment(layer_x, conn, normalize_W=normalize)
if use_gaussian_kernel
else (conn.dot(layer_x), conn)
)
for layer2 in layers[i:]:
matched_y_group_indices = np.where(
[layer2 in x for x in [only_splicing, only_labeling, splicing_and_labeling]]
)
if len(matched_y_group_indices[0]) == 0:
logger.warning(
f"layer {layer2} is not in any of the {only_splicing, only_labeling, splicing_and_labeling} groups, skipping..."
)
continue
layer_y = adata.layers[layer2].copy()
layer_y_group = matched_y_group_indices[0][0]
# don't calculate 2 moments among uu, ul, su, sl -
# they should be time-dependent moments and
# those calculations are model specific
if (layer_x_group != layer_y_group) or layer_x_group == 2:
continue
if mapper[layer2] not in adata.layers.keys():
adata.layers[mapper[layer2]], conn = (
calc_1nd_moment(layer_y, conn, normalize_W=normalize)
if use_gaussian_kernel
else (conn.dot(layer_y), conn)
)
adata.layers["M_" + layer[2] + layer2[2]] = calc_2nd_moment(
layer_x, layer_y, conn, normalize_W=normalize, mX=None, mY=None
)
if "X_protein" in adata.obsm.keys(): # may need to update with mnn or just use knn from protein layer itself.
adata.obsm[mapper["X_protein"]] = conn.dot(adata.obsm["X_protein"])
adata.obsp["moments_con"] = conn
logger.finish_progress("moments calculation")
if copy:
return adata
return None
def time_moment(
adata: AnnData,
tkey: Optional[str],
has_splicing: bool,
has_labeling: bool = True,
t_label_keys: Union[List[str], str, None] = None,
) -> AnnData:
"""Calculate time based first and second moments (including uncentered covariance) for different layers of data.
Args:
adata: an AnnData object.
tkey: The column key for the time label of cells in .obs. Used for either "ss" or "kinetic" model.
has_splicing: whether the data has splicing information.
has_labeling: whether the data has labeling information. Defaults to True.
t_label_keys: (not used for now) The column key(s) for the labeling time label of cells in .obs. Used for either
"ss" or "kinetic" model. `tkey` is implicitly assumed as `t_label_key` (however, `tkey` should just be the
time of the experiment). Defaults to None.
Returns:
An updated AnnData object with calculated first/second moments (including uncentered covariance) for each time
point for each layer included.
"""
if has_labeling:
if has_splicing:
layers = ["uu", "ul", "su", "sl"]
else:
layers = ["new", "total"]
else:
layers = ["unspliced", "spliced"]
time = adata.obs[tkey]
m, v = prepare_data_deterministic(adata, adata.var.index, time, layers, use_total_layers=True, log=False)
adata.uns["time_moments"] = {"time": time}
adata.varm["m_t"] = m
adata.varm["v_t"] = v
return adata
# ---------------------------------------------------------------------------------------------------
# use for kinetic assumption
def get_layer_pair(layer: str) -> Optional[str]:
"""Get the layer in pair for the input layer.
Args:
layer: the key for the input layer.
Returns:
The key for corresponding layer in pair.
"""
pair = {
"new": "total",
"total": "new",
"X_new": "X_total",
"X_total": "X_new",
"M_t": "M_n",
"M_n": "M_t",
}
return pair[layer] if layer in pair.keys() else None
def get_layer_group(layer: str) -> Optional[str]:
"""Get the layer group in pair for the input layer group.
Args:
layer: the key for the input layer group.
Returns:
The key for corresponding layer group in pair.
"""
group = {
"uu": "ul",
"ul": "uu",
"su": "sl",
"sl": "su",
"X_uu": "X_ul",
"X_ul": "X_uu",
"X_su": "X_sl",
"X_sl": "X_su",
"M_uu": "M_ul",
"M_ul": "M_uu",
"M_su": "M_sl",
"M_sl": "M_su",
}
return group[layer] if layer in group.keys() else None
def prepare_data_deterministic(
adata: AnnData,
genes: List[str],
time: np.ndarray,
layers: List[str],
use_total_layers: bool = True,
total_layers: List[str] = ["X_ul", "X_sl", "X_uu", "X_su"],
log: bool = False,
return_ntr: bool = False,
) -> Tuple[List[np.ndarray], List[np.ndarray], List[Union[np.ndarray, csr_matrix]]]:
"""Prepare the data for kinetic calculation based on deterministic model.
Args:
adata: an AnnData object.
genes: the genes to be estimated.
time: the array containing time stamp.
layers: the layer keys in adata object to be processed.
use_total_layers: whether to use total layers embedded in the AnnData object. Defaults to True.
total_layers: the layer(s) that can be summed up to get the total mRNA. Defaults to ["X_ul", "X_sl", "X_uu",
"X_su"].
log: whether to perform log1p (i.e. log(1+x)) on result data. Defaults to False.
return_ntr: whether to deal with new/total ratio. Defaults to False.
Returns:
A tuple [m, v, raw], where `m` is the first momentum, `v` is the second momentum, and `raw` is the normalized
expression data.
"""
if return_ntr:
use_total_layers = True
if use_total_layers:
if "total_Size_Factor" not in adata.obs.keys():
# total_layers = ["uu", "ul", "su", "sl"] if 'uu' in adata.layers.keys() else ['total']
tot_sfs, _ = sz_util(
adata,
"_total_",
round_exprs=False,
method="median",
locfunc=np.nanmean,
total_layers=total_layers,
)
else:
tot_sfs = adata.obs.total_Size_Factor
sfs_x, sfs_y = tot_sfs[:, None], tot_sfs[:, None]
m = [None] * len(layers)
v = [None] * len(layers)
raw = [None] * len(layers)
for i, layer in enumerate(layers):
if layer in ["X_total", "total", "M_t"]:
if (layer == "X_total" and adata.uns["pp"]["layers_norm_method"] is None) or layer == "M_t":
x_layer = adata[:, genes].layers[layer]
if return_ntr:
T_genes = adata[:, genes].layers[get_layer_pair(layer)]
T_genes = T_genes.A if issparse(T_genes) else T_genes
x_layer = x_layer / (T_genes + 1e-5)
else:
x_layer = x_layer - adata[:, genes].layers[get_layer_pair(layer)]
else:
x_layer = adata.layers[layer]
group_pair_x_layer_ = get_layer_group(get_layer_pair(layer))
pair_x_layer, group_x_layer, group_pair_x_layer = (
adata.layers[get_layer_pair(layer)],
adata.layers[get_layer_group(layer)],
None if group_pair_x_layer_ is None else adata.layers[group_pair_x_layer_],
)
if layer.startswith("X_"):
x_layer, pair_x_layer, group_x_layer, group_pair_x_layer = (
inverse_norm(adata, x_layer),
inverse_norm(adata, pair_x_layer),
inverse_norm(adata, group_x_layer),
0 if group_pair_x_layer_ is None else inverse_norm(adata, group_pair_x_layer),
)
if layer.startswith("M_"):
t_layer_key = "M_t"
elif layer.startswith("X_"):
t_layer_key = "X_total"
else:
t_layer_key = "total"
if not use_total_layers:
sfs_x, _ = sz_util(
adata,
layer,
round_exprs=False,
method="median",
locfunc=np.nanmean,
total_layers=None,
CM=x_layer + group_x_layer,
)
sfs_y, _ = sz_util(
adata,
get_layer_pair(layer),
round_exprs=False,
method="median",
locfunc=np.nanmean,
total_layers=None,
CM=pair_x_layer + group_pair_x_layer,
)
sfs_x, sfs_y = sfs_x[:, None], sfs_y[:, None]
x_layer = normalize_mat_monocle(
x_layer[:, adata.var_names.isin(genes)],
sfs_x,
relative_expr=True,
pseudo_expr=0,
norm_method=None,
)
y_layer = normalize_mat_monocle(
pair_x_layer[:, adata.var_names.isin(genes)],
sfs_y,
relative_expr=True,
pseudo_expr=0,
norm_method=None,
)
if return_ntr:
T_genes = adata[:, genes].layers[t_layer_key]
T_genes = T_genes.A if issparse(T_genes) else T_genes
x_layer = (x_layer - y_layer) / (T_genes + 1e-5)
else:
x_layer = x_layer - y_layer
else:
if (layer == ["X_new"] and adata.uns["pp"]["layers_norm_method"] is None) or layer == "M_n":
total_layer = "X_total" if layer == ["X_new"] else "M_t"
if return_ntr:
T_genes = adata[:, genes].layers[total_layer]
T_genes = T_genes.A if issparse(T_genes) else T_genes
x_layer = adata[:, genes].layers[layer] / (T_genes + 1e-5)
else:
x_layer = adata[:, genes].layers[layer]
else:
x_layer = adata.layers[layer]
total_layer = adata.layers["X_total"]
if layer.startswith("X_"):
x_layer = inverse_norm(adata, x_layer)
total_layer = inverse_norm(adata, total_layer)
if not use_total_layers:
tot_sfs, _ = sz_util(
adata,
layer,
round_exprs=False,
method="median",
locfunc=np.nanmean,
total_layers=None,
CM=x_layer,
)
x_layer = normalize_mat_monocle(
x_layer[:, adata.var_names.isin(genes)],
szfactors=tot_sfs[:, None],
relative_expr=True,
pseudo_expr=0,
norm_method=None,
)
if return_ntr:
total_layer = normalize_mat_monocle(
total_layer[:, adata.var_names.isin(genes)],
szfactors=tot_sfs[:, None],
relative_expr=True,
pseudo_expr=0,
norm_method=None,
)
total_layer = total_layer.A if issparse(total_layer) else total_layer
x_layer /= total_layer + 1e-5
if log:
if issparse(x_layer):
x_layer.data = np.log1p(x_layer.data)
else:
x_layer = np.log1p(x_layer)
m[i], v[i], _ = calc_12_mom_labeling(x_layer.T, time)
raw[i] = x_layer
return m, v, raw # each list element corresponds to a layer
def prepare_data_has_splicing(
adata: AnnData,
genes: List[str],
time: np.ndarray,
layer_u: str,
layer_s: str,
use_total_layers: bool = True,
total_layers: List[str] = ["X_ul", "X_sl", "X_uu", "X_su"],
total_layer: str = "X_total",
return_cov: bool = True,
return_ntr: bool = False,
) -> Tuple[List[np.ndarray], List[np.ndarray]]:
"""Prepare data when assumption is kinetic and data has splicing.
Args:
adata: an AnnData object.
genes: the genes to be estimated.
time: the array containing time stamps.
layer_u: the layer key for unspliced data.
layer_s: the layer key for spliced data.
use_total_layers: whether to use total layers embedded in the AnnData object. Defaults to True.
total_layers: the layer(s) that can be summed up to get the total mRNA. Defaults to ["X_ul", "X_sl", "X_uu",
"X_su"].
total_layer: the layer key for the precalculated total mRNA data. Defaults to "X_total".
return_cov: whether to calculate the covariance between spliced and unspliced data. Defaults to True.
return_ntr: whether to return the new to total ratio or original expression data. Defaults to False.
Returns:
A tuple [res, raw] where `res` is the calculated momentum data and `raw` is the normalized expression data.
"""
res = [0] * len(genes)
raw = [0] * len(genes)
U, S = (
adata[:, genes].layers[layer_u] if layer_u == "M_ul" else None,
adata[:, genes].layers[layer_s] if layer_s == "M_sl" else None,
)
T = adata[:, genes].layers[total_layer] if total_layer == "M_t" else None
layer_ul_data, layer_sl_data = adata.layers[layer_u], adata.layers[layer_s]
layer_uu_data, layer_su_data = (
adata.layers[total_layers[2]],
adata.layers[total_layers[3]],
)
layer_ul_data, layer_sl_data = (
layer_ul_data if layer_u == "M_ul" else inverse_norm(adata, layer_ul_data),
layer_sl_data if layer_s == "M_sl" else inverse_norm(adata, layer_sl_data),
)
layer_uu_data, layer_su_data = (
layer_uu_data if total_layers[2] == "M_uu" else inverse_norm(adata, layer_uu_data),
layer_su_data if total_layers[3] == "M_su" else inverse_norm(adata, layer_su_data),
)
total_layer_data = adata.layers[total_layer]
total_layer_data = total_layer_data if total_layer == "M_t" else inverse_norm(adata, total_layer_data)
if use_total_layers:
if "total_Size_Factor" not in adata.obs.keys():
tot_sfs, _ = sz_util(
adata,
"_total_",
round_exprs=False,
method="median",
locfunc=np.nanmean,
total_layers=total_layers,
CM=layer_ul_data + layer_sl_data + layer_uu_data + layer_su_data,
)
sfs_u, sfs_s = tot_sfs[:, None], tot_sfs[:, None]
else:
tot_sfs = adata.obs.total_Size_Factor
sfs_u, sfs_s = tot_sfs[:, None], tot_sfs[:, None]
else:
sfs_u, _ = sz_util(
adata,
layer_u,
round_exprs=False,
method="median",
locfunc=np.nanmean,
total_layers=None,
CM=layer_ul_data + layer_uu_data,
)
sfs_s, _ = sz_util(
adata,
layer_s,
round_exprs=False,
method="median",
locfunc=np.nanmean,
total_layers=None,
CM=layer_sl_data + layer_su_data,
)
sfs_u, sfs_s = sfs_u[:, None], sfs_s[:, None]
if U is None:
U = normalize_mat_monocle(
layer_ul_data[:, adata.var_names.isin(genes)],
sfs_u,
relative_expr=True,
pseudo_expr=0,
norm_method=None,
)
if S is None:
S = normalize_mat_monocle(
layer_sl_data[:, adata.var_names.isin(genes)],
sfs_s,
relative_expr=True,
pseudo_expr=0,
norm_method=None,
)
if "total_Size_Factor" not in adata.obs.keys():
tot_sfs, _ = sz_util(
adata,
"_total_",
round_exprs=False,
method="median",
locfunc=np.nanmean,
total_layers=total_layer,
CM=total_layer_data,
)
else:
tot_sfs = adata.obs.total_Size_Factor
tot_sfs = tot_sfs[:, None]
if T is None:
T = normalize_mat_monocle(
total_layer_data[:, adata.var_names.isin(genes)],
tot_sfs,
relative_expr=True,
pseudo_expr=0,
norm_method=None,
)
for i, g in enumerate(genes):
if return_ntr:
T_i = T[:, i].A if issparse(T[:, i]) else T[:, i]
u = U[:, i] / (T_i + 1e-5)
s = S[:, i] / (T_i + 1e-5)
else:
u = U[:, i]
s = S[:, i]
ut = strat_mom(u, time, np.mean)
st = strat_mom(s, time, np.mean)
uut = strat_mom(elem_prod(u, u), time, np.mean)
ust = strat_mom(elem_prod(u, s), time, np.mean)
sst = strat_mom(elem_prod(s, s), time, np.mean)
x = np.vstack([ut, st, uut, sst, ust]) if return_cov else np.vstack([ut, st, uut, sst])
res[i] = x
raw[i] = np.vstack((u, s))
return res, raw
def prepare_data_no_splicing(
adata: AnnData,
genes: List[str],
time: np.ndarray,
layer: str,
use_total_layers: bool = True,
total_layer: str = "X_total",
return_old: bool = False,
return_ntr: bool = False,
) -> Tuple[List[np.ndarray], List[np.ndarray]]:
"""Prepare the data when assumption is kinetic and data has no splicing.
Args:
adata: an AnnData object.
genes: the genes to be estimated.
time: the array containing time stamps.
layer: the layer containing the expression data.
use_total_layers: whether to use the total data embedded in the AnnData object. Defaults to True.
total_layer: the layer key for the precalculated total mRNA data. Defaults to "X_total".
return_old: whether to return the old expression data together or the newly expressed gene data only. Defaults
to False.
return_ntr: whether to return the new to total ratio or the original expression data. Defaults to False.
Returns:
A tuple [res, raw] where `res` is the calculated momentum data and `raw` is the normalized expression data.
"""
from ..preprocessing.normalization import normalize_mat_monocle, sz_util
res = [0] * len(genes)
raw = [0] * len(genes)
U, T = (
adata[:, genes].layers[layer] if layer == "M_n" else None,
adata[:, genes].layers[total_layer] if total_layer == "M_t" else None,
)
layer_data = adata.layers[layer]
total_layer_data = adata.layers[total_layer]
layer_data, total_layer_data = (
layer_data if layer == "M_n" else inverse_norm(adata, layer_data),
total_layer_data if total_layer == "M_t" else inverse_norm(adata, total_layer_data),
)
if use_total_layers:
if "total_Size_Factor" not in adata.obs.keys():
sfs, _ = sz_util(
adata,
"_total_",
round_exprs=False,
method="median",
locfunc=np.nanmean,
total_layers=total_layer,
CM=total_layer_data,
)
else:
sfs = adata.obs.total_Size_Factor
sfs, tot_sfs = sfs[:, None], sfs[:, None]
else:
sfs, _ = sz_util(
adata,
layer,
round_exprs=False,
method="median",
locfunc=np.nanmean,
total_layers=None,
CM=layer_data,
)
tot_sfs, _ = sz_util(
adata,
layer,
round_exprs=False,
method="median",
locfunc=np.nanmean,
total_layers=None,
CM=total_layer_data,
)
sfs, tot_sfs = sfs[:, None], tot_sfs[:, None]
if U is None:
U = normalize_mat_monocle(
layer_data[:, adata.var_names.isin(genes)],
sfs,
relative_expr=True,
pseudo_expr=0,
norm_method=None,
)
if T is None:
T = normalize_mat_monocle(
total_layer_data[:, adata.var_names.isin(genes)],
tot_sfs,
relative_expr=True,
pseudo_expr=0,
norm_method=None,
)
for i, g in enumerate(genes):
if return_ntr:
T_i = T[:, i].A if issparse(T[:, i]) else T[:, i]
u, t = U[:, i] / (T_i + 1e-5), T[:, i]
else:
u, t = U[:, i], T[:, i]
ut = strat_mom(u, time, np.mean)
uut = strat_mom(elem_prod(u, u), time, np.mean)
res[i] = np.vstack([ut, uut])
raw[i] = np.vstack([u, t - u]) if return_old else u
return res, raw
def prepare_data_mix_has_splicing(
adata: AnnData,
genes: List[str],
time: np.ndarray,
layer_u: str = "X_uu",
layer_s: str = "X_su",
layer_ul: str = "X_ul",
layer_sl: str = "X_sl",
use_total_layers: bool = True,
total_layers: List[str] = ["X_ul", "X_sl", "X_uu", "X_su"],
mix_model_indices: Optional[List[int]] = None,
) -> Tuple[List[np.ndarray], List[np.ndarray]]:
"""Prepare data for mixture modeling when assumption is kinetic and data has splicing.
Note that the mix_model_indices is indexed on 10 total species, which can be used to specify the data required for
different mixture models.
Args:
adata: an AnnData object.
genes: the genes to be estimated.
time: the array containing time stamps.
layer_u: the layer key for unspliced mRNA count data. Defaults to "X_uu".
layer_s: the layer key for spliced mRNA count data. Defaults to "X_su".
layer_ul: the layer key for unspliced, labeled mRNA count data. Defaults to "X_ul".
layer_sl: the layer key for spliced, labeled mRNA count data. Defaults to "X_sl".
use_total_layers: whether to use total layers embedded in the AnnData object. Defaults to True.
total_layers: the layer(s) that can be summed up to get the total mRNA. Defaults to ["X_ul", "X_sl", "X_uu",
"X_su"].
mix_model_indices: the indices for data required by the mixture model. If None, all data would be returned.
Defaults to None.
Returns:
A tuple [res, raw] where `res` is the calculated momentum data and `raw` is the normalized expression data.
"""
from ..preprocessing.normalization import normalize_mat_monocle, sz_util
res = [0] * len(genes)
raw = [0] * len(genes)
U, S = (
adata[:, genes].layers[layer_u] if layer_u == "M_uu" else None,
adata[:, genes].layers[layer_s] if layer_u == "M_su" else None,
)
Ul, Sl = (
adata[:, genes].layers[layer_ul] if layer_u == "M_ul" else None,
adata[:, genes].layers[layer_sl] if layer_u == "M_sl" else None,
)
layer_u_data, layer_s_data = adata.layers[layer_u], adata.layers[layer_s]
layer_ul_data, layer_sl_data = (
adata.layers[layer_ul],
adata.layers[layer_sl],
)
layer_u_data, layer_s_data = (
layer_u_data if layer_u == "M_uu" else inverse_norm(adata, layer_u_data),
layer_s_data if layer_s == "M_su" else inverse_norm(adata, layer_s_data),
)
layer_ul_data, layer_sl_data = (
layer_ul_data if layer_ul == "M_ul" else inverse_norm(adata, layer_ul_data),
layer_sl_data if layer_sl == "M_sl" else inverse_norm(adata, layer_sl_data),
)
if use_total_layers:
if "total_Size_Factor" not in adata.obs.keys():
sfs, _ = sz_util(
adata,
"_total_",
False,
"median",
np.nanmean,
total_layers=total_layers,
CM=layer_u_data + layer_s_data + layer_ul_data + layer_sl_data,
)
sfs_u, sfs_s = sfs[:, None], sfs[:, None]
else:
sfs = adata.obs.total_Size_Factor
sfs_u, sfs_s = sfs[:, None], sfs[:, None]
else:
sfs_u, _ = sz_util(
adata,
layer_u,
False,
"median",
np.nanmean,
total_layers=None,
CM=layer_u_data + layer_ul_data,
)
sfs_s, _ = sz_util(
adata,
layer_s,
False,
"median",
np.nanmean,
total_layers=None,
CM=layer_s_data + layer_sl_data,
)
sfs_u, sfs_s = sfs_u[:, None], sfs_s[:, None]
if U is None:
U = normalize_mat_monocle(
layer_u_data[:, adata.var_names.isin(genes)],
sfs_u,
relative_expr=True,
pseudo_expr=0,
norm_method=None,
)
if S is None:
S = normalize_mat_monocle(
layer_s_data[:, adata.var_names.isin(genes)],
sfs_s,
relative_expr=True,
pseudo_expr=0,
norm_method=None,
)
if Ul is None:
Ul = normalize_mat_monocle(
layer_ul_data[:, adata.var_names.isin(genes)],
sfs_u,
relative_expr=True,
pseudo_expr=0,
norm_method=None,
)
if Sl is None:
Sl = normalize_mat_monocle(
layer_sl_data[:, adata.var_names.isin(genes)],
sfs_s,
relative_expr=True,
pseudo_expr=0,
norm_method=None,
)
for i, g in enumerate(genes):
ul = Ul[:, i]
sl = Sl[:, i]
ult = strat_mom(ul, time, np.mean)
slt = strat_mom(sl, time, np.mean)
ul_ult = strat_mom(elem_prod(ul, ul), time, np.mean)
ul_slt = strat_mom(elem_prod(ul, sl), time, np.mean)
sl_slt = strat_mom(elem_prod(sl, sl), time, np.mean)
u = U[:, i]
s = S[:, i]
ut = strat_mom(u, time, np.mean)
st = strat_mom(s, time, np.mean)
uut = strat_mom(elem_prod(u, u), time, np.mean)
ust = strat_mom(elem_prod(u, s), time, np.mean)
sst = strat_mom(elem_prod(s, s), time, np.mean)
x = np.vstack([ult, slt, ul_ult, sl_slt, ul_slt, ut, st, uut, sst, ust])
if mix_model_indices is not None:
x = x[mix_model_indices]
res[i] = x
raw[i] = np.vstack((ul, sl, u, s))
return res, raw
def prepare_data_mix_no_splicing(
adata: AnnData,
genes: List[str],
time: np.ndarray,
layer_n: str,
layer_t: str,
use_total_layers: bool = True,
total_layer: bool = "X_total",
mix_model_indices: Optional[List[int]] = None,
) -> Tuple[List[np.ndarray], List[np.ndarray]]:
"""Prepare data for mixture modeling when assumption is kinetic and data has NO splicing.
Note that the mix_model_indices is indexed on 4 total species, which can be used to specify
the data required for different mixture models.
Args:
adata: an AnnData object.
genes: the genes to be estimated.
time: the array containing time stamps.
layer_n: the layer key for new mRNA count.
layer_t: the layer key for total mRNA count.
use_total_layers: whether to use total layers embedded in the AnnData object. Defaults to True.
total_layer: the layer key for the precalculated total mRNA data. Defaults to "X_total".
mix_model_indices: the indices for data required by the mixture model. If None, all data would be returned. Defaults to None.
Returns:
A tuple [res, raw] where `res` is the calculated momentum data and `raw` is the normalized expression data.
"""
from ..preprocessing.normalization import normalize_mat_monocle, sz_util
res = [0] * len(genes)
raw = [0] * len(genes)
N, T = (
adata[:, genes].layers[layer_n] if layer_n == "M_n" else None,
adata[:, genes].layers[layer_t] if layer_t == "M_t" else None,
)
layer_n_data = adata.layers[layer_n]
layer_t_data = adata.layers[layer_t]
layer_n_data, layer_t_data = (
layer_n_data if layer_n == "M_n" else inverse_norm(adata, layer_n_data),
layer_t_data if layer_t == "M_t" else inverse_norm(adata, layer_t_data),
)
if use_total_layers:
if "total_Size_Factor" not in adata.obs.keys():
sfs, _ = sz_util(
adata,
total_layer,
False,
"median",
np.nanmean,
total_layers="total",
CM=layer_t_data,
)
sfs_n, sfs_t = sfs[:, None], sfs[:, None]
else:
sfs = adata.obs.total_Size_Factor
sfs_n, sfs_t = sfs[:, None], sfs[:, None]
else:
sfs_n, _ = sz_util(
adata,
layer_n,
False,
"median",
np.nanmean,
total_layers=None,
CM=layer_n_data,
)
sfs_t, _ = sz_util(
adata,
layer_t,
False,
"median",
np.nanmean,
total_layers=None,
CM=layer_t_data,
)
sfs_n, sfs_t = sfs_n[:, None], sfs_t[:, None]
if N is None:
N = normalize_mat_monocle(
layer_n_data[:, adata.var_names.isin(genes)],
sfs_n,
relative_expr=True,
pseudo_expr=0,
norm_method=None,
)
if T is None:
T = normalize_mat_monocle(
layer_t_data[:, adata.var_names.isin(genes)],
sfs_t,
relative_expr=True,
pseudo_expr=0,
norm_method=None,
)
for i, g in enumerate(genes):
n = N[:, i]
nt = strat_mom(n, time, np.mean)
nnt = strat_mom(elem_prod(n, n), time, np.mean)
o = T[:, i] - n
ot = strat_mom(o, time, np.mean)
oot = strat_mom(elem_prod(o, o), time, np.mean)
x = np.vstack([nt, nnt, ot, oot])
if mix_model_indices is not None:
x = x[mix_model_indices]
res[i] = x
raw[i] = np.vstack((n, o))
return res, raw
# ---------------------------------------------------------------------------------------------------
# moment related:
def stratify(arr: np.ndarray, strata: np.ndarray) -> List[np.ndarray]:
"""Stratify the given array with the given reference strata.
Args:
arr: The array to be stratified.
strata: The reference strata vector.
Returns:
A list containing the strata from the array, with each element of the list to be the components with line index
corresponding to the reference strata vector's unique elements' index.
"""
s = np.unique(strata)
return [arr[strata == s[i]] for i in range(len(s))]
def strat_mom(arr: Union[np.ndarray, csr_matrix], strata: np.ndarray, fcn_mom: Callable) -> np.ndarray:
"""Stratify the mRNA expression data and calculate its momentum.
Args:
arr: the mRNA expression data.
strata: the time stamp array used to stratify `arr`.
fcn_mom: the function used to calculate the momentum.
Returns:
The momentum for each stratum.
"""
arr = arr.A if issparse(arr) else arr
x = stratify(arr, strata)
return np.array([fcn_mom(y) for y in x])
def calc_mom_all_genes(
T: np.ndarray, adata: AnnData, fcn_mom: Callable
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
"""Calculate momentum for all genes in an AnnData object.
Args:
T: the time stamp array.
adata: an AnnData object.
fcn_mom: the function used to calculate momentum.
Returns:
A tuple (Mn, Mo, Mt, Mr), where `Mn` is momentum calculated from labeled (new) mRNA count, `Mo` is from
unlabeled (old) mRNA count, `Mt` is from total mRNA count, and `Mr` is from new to total ratio.
"""
ng = adata.var.shape[0]
nT = len(np.unique(T))
Mn = np.zeros((ng, nT))
Mo = np.zeros((ng, nT))
Mt = np.zeros((ng, nT))
Mr = np.zeros((ng, nT))
for g in tqdm(range(ng), desc="calculating 1/2 moments"):
L = np.array(adata[:, g].layers["X_new"], dtype=float)
U = np.array(adata[:, g].layers["X_total"], dtype=float) - L
rho = L / (L + U + 0.01)
Mn[g] = strat_mom(L, T, fcn_mom)
Mo[g] = strat_mom(U, T, fcn_mom)
Mt[g] = strat_mom(L + U, T, fcn_mom)
Mr[g] = strat_mom(rho, T, fcn_mom)
return Mn, Mo, Mt, Mr
def _calc_1nd_moment(X, W, normalize_W=True):
"""deprecated"""
if normalize_W:
d = np.sum(W, 1)
W = np.diag(1 / d) @ W
return W @ X
def _calc_2nd_moment(X, Y, W, normalize_W=True, center=False, mX=None, mY=None):
"""deprecated"""
if normalize_W:
d = np.sum(W, 1)
W = np.diag(1 / d) @ W
XY = np.multiply(W @ Y, X)
if center:
mX = calc_1nd_moment(X, W, False) if mX is None else mX
mY = calc_1nd_moment(Y, W, False) if mY is None else mY
XY = XY - np.multiply(mX, mY)
return XY
def gaussian_kernel(
X: np.ndarray, nbr_idx: np.ndarray, sigma: int, k: Optional[int] = None, dists: Optional[np.ndarray] = None
) -> csr_matrix:
"""Normalize connectivity map with Gaussian kernel.
Args:
X: the mRNA expression data.
nbr_idx: the indices of nearest neighbors of each cell.
sigma: the standard deviation for gaussian model.
k: the number of nearest neighbors to be considered. Defaults to None.
dists: the distances to the n_neighbors closest points in knn graph. Defaults to None.
Returns:
The normalized connectivity map.
"""
n = X.shape[0]
if dists is None:
dists = []
for i in range(n):
d = X[nbr_idx[i][:k]] - X[i]
dists.append(np.sum(elem_prod(d, d), 1).flatten())
W = lil_matrix((n, n))
s2_inv = 1 / (2 * sigma**2)
for i in range(n):
W[i, nbr_idx[i][:k]] = np.exp(-s2_inv * dists[i][:k] ** 2)
return csr_matrix(W)
def calc_12_mom_labeling(
data: np.ndarray, t: np.ndarray, calculate_2_mom: bool = True
) -> Union[Tuple[np.ndarray, np.ndarray, np.ndarray], Tuple[np.ndarray, np.ndarray]]:
"""Calculate 1st and 2nd momentum for given data.
Args:
data: the normalized mRNA expression data.
t: the time stamp array.
calculate_2_mom: whether to calculate 2nd momentum. Defaults to True.
Returns:
A tuple (m, [v], t_uniq) where `m` is the first momentum, `v` is the second momentum which would be returned only
if `calculate_2_mom` is true, and `t_uniq` is the unique time stamps.
"""
t_uniq = np.unique(t)
m = np.zeros((data.shape[0], len(t_uniq)))
if calculate_2_mom:
v = np.zeros((data.shape[0], len(t_uniq)))
for i in range(data.shape[0]):
data_ = (
np.array(data[i].A.flatten(), dtype=float) if issparse(data) else np.array(data[i], dtype=float)
) # consider using the `adata.obs_vector`, `adata.var_vector` methods or accessing the array directly.
m[i] = strat_mom(data_, t, np.nanmean)
if calculate_2_mom:
v[i] = strat_mom(data_, t, np.nanvar)
return (m, v, t_uniq) if calculate_2_mom else (m, t_uniq)
def calc_1nd_moment(
X: np.ndarray, W: np.ndarray, normalize_W: bool = True
) -> Union[np.ndarray, Tuple[np.ndarray, np.ndarray]]:
"""Calculate first moment for the layers.
Args:
X: the layer to calculate the moment.
W: the connectivity graph that will be used for moment calculations.
normalize_W: whether to normalize W before calculation. Defaults to True.
Returns:
The first moment of the layer.
"""
if normalize_W:
if type(W) == np.ndarray:
d = np.sum(W, 1).flatten()
else:
d = np.sum(W, 1).A.flatten()
W = diags(1 / d) @ W if issparse(W) else np.diag(1 / d) @ W
return W @ X, W
else:
return W @ X
def calc_2nd_moment(
X: np.ndarray,
Y: np.ndarray,
W: np.ndarray,
normalize_W: bool = True,
center: bool = False,
mX: np.ndarray = None,
mY: np.ndarray = None,
) -> np.ndarray:
"""Calculate the 2nd moment for the layers.
Args:
X: the first layer to be used.
Y: the second layer to be used.
W: the connectivity graph that will be used for moment calculations.
normalize_W: whether to normalize W before calculation. Defaults to True.
center: whether to correct the center. Defaults to False.
mX: the moment matrix to correct the center. Defaults to None.
mY: the moment matrix to correct the center. Defaults to None.
Returns:
The second moment of the layers.
"""
if normalize_W:
if type(W) == np.ndarray:
d = np.sum(W, 1).flatten()
else:
d = W.sum(1).A.flatten()
W = diags(1 / d) @ W if issparse(W) else np.diag(1 / d) @ W
XY = W @ elem_prod(Y, X)
if center:
mX = calc_1nd_moment(X, W, False) if mX is None else mX
mY = calc_1nd_moment(Y, W, False) if mY is None else mY
XY = XY - elem_prod(mX, mY)
return XY
# ---------------------------------------------------------------------------------------------------
# old moment estimation code
class MomData(AnnData):
"""deprecated"""
def __init__(self, adata, time_key="Time", has_nan=False):
# self.data = adata
self.__dict__ = adata.__dict__
# calculate first and second moments from data
self.times = np.array(self.obs[time_key].values, dtype=float)
self.uniq_times = np.unique(self.times)
nT = self.get_n_times()
ng = self.get_n_genes()
self.M = np.zeros((ng, nT)) # first moments (data)
self.V = np.zeros((ng, nT)) # second moments (data)
for g in tqdm(range(ng), desc="calculating 1/2 moments"):
tmp = self[:, g].layers["new"]
L = (
np.array(tmp.A, dtype=float) if issparse(tmp) else np.array(tmp, dtype=float)
) # consider using the `adata.obs_vector`, `adata.var_vector` methods or accessing the array directly.
if has_nan:
self.M[g] = strat_mom(L, self.times, np.nanmean)
self.V[g] = strat_mom(L, self.times, np.nanvar)
else:
self.M[g] = strat_mom(L, self.times, np.mean)
self.V[g] = strat_mom(L, self.times, np.var)
def get_n_genes(self):
return self.var.shape[0]
def get_n_cell(self):
return self.obs.shape[0]
def get_n_times(self):
return len(self.uniq_times)
class Estimation:
"""deprecated"""
def __init__(
self,
adata,
adata_u=None,
time_key="Time",
normalize=True,
param_ranges=None,
has_nan=False,
):
# initialize Estimation
self.data = MomData(adata, time_key, has_nan)
self.data_u = MomData(adata_u, time_key, has_nan) if adata_u is not None else None
if param_ranges is None:
param_ranges = {
"a": [0, 10],
"b": [0, 10],
"alpha_a": [10, 1000],
"alpha_i": [0, 10],
"beta": [0, 10],
"gamma": [0, 10],
}
self.normalize = normalize
self.param_ranges = param_ranges
self.n_params = len(param_ranges)
def param_array2dict(self, parr):
if parr.ndim == 1:
return {
"a": parr[0],
"b": parr[1],
"alpha_a": parr[2],
"alpha_i": parr[3],
"beta": parr[4],
"gamma": parr[5],
}
else:
return {
"a": parr[:, 0],
"b": parr[:, 1],
"alpha_a": parr[:, 2],
"alpha_i": parr[:, 3],
"beta": parr[:, 4],
"gamma": parr[:, 5],
}
def fit_gene(self, gene_no, n_p0=10):
from ..estimation.tsc.utils_moments import estimation
estm = estimation(list(self.param_ranges.values()))
if self.data_u is None:
m = self.data.M[gene_no, :].T
v = self.data.V[gene_no, :].T
x_data = np.vstack((m, v))
popt, cost = estm.fit_lsq(
self.data.uniq_times,
x_data,
p0=None,
n_p0=n_p0,
normalize=self.normalize,
experiment_type="nosplice",
)
else:
mu = self.data_u.M[gene_no, :].T
ms = self.data.M[gene_no, :].T
vu = self.data_u.V[gene_no, :].T
vs = self.data.V[gene_no, :].T
x_data = np.vstack((mu, ms, vu, vs))
popt, cost = estm.fit_lsq(
self.data.uniq_times,
x_data,
p0=None,
n_p0=n_p0,
normalize=self.normalize,
experiment_type=None,
)
return popt, cost
def fit(self, n_p0=10):
ng = self.data.get_n_genes()
params = np.zeros((ng, self.n_params))
costs = np.zeros(ng)
for i in tqdm(range(ng), desc="fitting genes"):
params[i], costs[i] = self.fit_gene(i, n_p0)
return params, costs
# ---------------------------------------------------------------------------------------------------
# use for kinetic assumption with full data, deprecated
def moment_model(adata, subset_adata, _group, cur_grp, log_unnormalized, tkey):
"""deprecated"""
# a few hard code to set up data for moment mode:
if "uu" in subset_adata.layers.keys() or "X_uu" in subset_adata.layers.keys():
if log_unnormalized and "X_uu" not in subset_adata.layers.keys():
if issparse(subset_adata.layers["uu"]):
(
subset_adata.layers["uu"].data,
subset_adata.layers["ul"].data,
subset_adata.layers["su"].data,
subset_adata.layers["sl"].data,
) = (
np.log1p(subset_adata.layers["uu"].data),
np.log1p(subset_adata.layers["ul"].data),
np.log1p(subset_adata.layers["su"].data),
np.log1p(subset_adata.layers["sl"].data),
)
else:
(
subset_adata.layers["uu"],
subset_adata.layers["ul"],
subset_adata.layers["su"],
subset_adata.layers["sl"],
) = (
np.log1p(subset_adata.layers["uu"]),
np.log1p(subset_adata.layers["ul"]),
np.log1p(subset_adata.layers["su"]),
np.log1p(subset_adata.layers["sl"]),
)
subset_adata_u, subset_adata_s = (
subset_adata.copy(),
subset_adata.copy(),
)
del (
subset_adata_u.layers["su"],
subset_adata_u.layers["sl"],
subset_adata_s.layers["uu"],
subset_adata_s.layers["ul"],
)
(
subset_adata_u.layers["new"],
subset_adata_u.layers["old"],
subset_adata_s.layers["new"],
subset_adata_s.layers["old"],
) = (
subset_adata_u.layers.pop("ul"),
subset_adata_u.layers.pop("uu"),
subset_adata_s.layers.pop("sl"),
subset_adata_s.layers.pop("su"),
)
Moment, Moment_ = MomData(subset_adata_s, tkey), MomData(subset_adata_u, tkey)
if cur_grp == _group[0]:
t_ind = 0
g_len, t_len = len(_group), len(np.unique(adata.obs[tkey]))
(adata.uns["M_sl"], adata.uns["V_sl"], adata.uns["M_ul"], adata.uns["V_ul"]) = (
np.zeros((Moment.M.shape[0], g_len * t_len)),
np.zeros((Moment.M.shape[0], g_len * t_len)),
np.zeros((Moment.M.shape[0], g_len * t_len)),
np.zeros((Moment.M.shape[0], g_len * t_len)),
)
inds = np.arange((t_len * t_ind), (t_len * (t_ind + 1)))
(
adata.uns["M_sl"][:, inds],
adata.uns["V_sl"][:, inds],
adata.uns["M_ul"][:, inds],
adata.uns["V_ul"][:, inds],
) = (Moment.M, Moment.V, Moment_.M, Moment_.V)
del Moment_
Est = Estimation(Moment, adata_u=subset_adata_u, time_key=tkey, normalize=True) # # data is already normalized
else:
if log_unnormalized and "X_total" not in subset_adata.layers.keys():
if issparse(subset_adata.layers["total"]):
(subset_adata.layers["new"].data, subset_adata.layers["total"].data,) = (
np.log1p(subset_adata.layers["new"].data),
np.log1p(subset_adata.layers["total"].data),
)
else:
subset_adata.layers["total"], subset_adata.layers["total"] = (
np.log1p(subset_adata.layers["new"]),
np.log1p(subset_adata.layers["total"]),
)
Moment = MomData(subset_adata, tkey)
if cur_grp == _group[0]:
t_ind = 0
g_len, t_len = len(_group), len(np.unique(adata.obs[tkey]))
adata.uns["M"], adata.uns["V"] = (
np.zeros((adata.shape[1], g_len * t_len)),
np.zeros((adata.shape[1], g_len * t_len)),
)
inds = np.arange((t_len * t_ind), (t_len * (t_ind + 1)))
(
adata.uns["M"][:, inds],
adata.uns["V"][:, inds],
) = (Moment.M, Moment.V)
Est = Estimation(Moment, time_key=tkey, normalize=True) # # data is already normalized
return adata, Est, t_ind