Source code for dynamo.tools.growth

"""Mapping Vector Field of Single Cells
"""

# module to deal with reaction/diffusion/advection.
# code was loosely based on PBA, WOT and PRESCIENT.

import numpy as np
import pandas as pd
from scipy.sparse import issparse
from sklearn.neighbors import NearestNeighbors


[docs]def score_cells( adata, genes=None, layer=None, basis=None, n_neighbors=30, beta=0.1, iteration=5, metric="euclidean", metric_kwds=None, cores=1, seed=19491001, return_score=True, **kwargs, ): """Score cells based on a set of genes. Parameters ---------- adata: :class:`~anndata.AnnData` AnnData object that contains the reconstructed vector field function in the `uns` attribute. genes: `list` or None (default: None) The gene names whose gene expression will be used for predicting cell fate. By default (when genes is set to None), the genes used for velocity embedding (var.use_for_transition) will be used for vector field reconstruction. Note that the genes to be used need to have velocity calculated and corresponds to those used in the `dyn.tl.VectorField` function. layer: `str` or None (default: 'X') Which layer of the data will be used for predicting cell fate with the reconstructed vector field function. The layer once provided, will override the `basis` argument and then predicting cell fate in high dimensional space. basis: `str` or None (default: `None`) The embedding data to use for predicting cell fate. If `basis` is either `umap` or `pca`, the reconstructed trajectory will be projected back to high dimensional space via the `inverse_transform` function. n_neighbors: `int` (default: `30`) Number of nearest neighbors. beta: `float` (default: `0.1`) The weight that will apply to the current query cell. iteration: `int` (default: `0.5`) Number of smooth iterations. metric: `str` or callable, default='euclidean' The distance metric to use for the tree. The default metric is , and with p=2 is equivalent to the standard Euclidean metric. See the documentation of :class:`DistanceMetric` for a list of available metrics. If metric is "precomputed", X is assumed to be a distance matrix and must be square during fit. X may be a :term:`sparse graph`, in which case only "nonzero" elements may be considered neighbors. metric_kwds : dict, default=None Additional keyword arguments for the metric function. cores: `int` (default: 1) The number of parallel jobs to run for neighbors search. ``None`` means 1 unless in a :obj:`joblib.parallel_backend` context. ``-1`` means using all processors. seed: `int` (default `19491001`) Random seed to ensure the reproducibility of each run. return_score: `bool` (default: `False`) Whether to return the score. If False, save the smoothed score to `cell_scores` column in the `.obs` attribute and also to the dictionary corresponding to the `score_cells` key in the .uns attribute. kwargs: Additional arguments that will be passed to each nearest neighbor search algorithm. Returns ------- Depending on return_score, it either return the cell scores or an updated adata object that contains the cell score information. """ if basis is None and "X_pca" not in adata.obsm.keys(): raise ValueError(f"Your adata doesn't have 'X_pca' basis in .obsm.") elif basis is not None and "X_" + basis not in adata.obsm.keys(): raise ValueError(f"Your adata doesn't have the {basis} you inputted in .obsm attribute of your adata.") if genes is None and "use_for_pca" not in adata.obs.keys(): raise ValueError(f"Your adata doesn't have 'use_for_pca' column in .obs.") if genes is None: genes = adata.var_names[adata.use_for_pca] else: genes = ( list(adata.var_names.intersection(genes)) if adata.var_names[0].isupper() else list(adata.var_names.intersection([i.capitalize() for i in genes])) if adata.var_names[0][0].isupper() and adata.var_names[0][1:].islower() else list(adata.var_names.intersection([i.lower() for i in genes])) ) if len(genes) < 1: raise ValueError(f"Your inputted gene list doesn't overlap any gene in your adata object.") X_basis = adata.obsm["X_pca"] if basis is None else adata.obsm["X_" + basis] if X_basis.shape[0] > 5000 and X_basis.shape[1] > 2: from pynndescent import NNDescent nbrs = NNDescent( X_basis, metric=metric, metric_kwds=metric_kwds, n_neighbors=30, n_jobs=cores, random_state=seed, **kwargs, ) knn, distances = nbrs.query(X_basis, k=n_neighbors) else: alg = "ball_tree" if X_basis.shape[1] > 10 else "kd_tree" nbrs = NearestNeighbors(n_neighbors=n_neighbors, algorithm=alg, n_jobs=cores).fit(X_basis) distances, knn = nbrs.kneighbors(X_basis) X_data = adata[:, genes].X if layer in [None, "X"] else adata[:, genes].layers[layer] prev_score = X_data.mean(1).A1 if issparse(X_data) else X_data.mean(1) cur_score = np.zeros(prev_score.shape) for _ in range(iteration): for i in range(len(prev_score)): xn = prev_score[knn[i]] cur_score[i] = (beta * xn[0]) + ((1 - beta) * xn[1:].mean(axis=0)) prev_score = cur_score smoothed_score = cur_score if return_score: return smoothed_score else: adata.uns["score_cells"] = { "smoothed_score": smoothed_score, "genes": genes, "layer": layer, "basis": basis, } adata.obs["cell_score"] = smoothed_score
[docs]def cell_growth_rate( adata, group, source, target, L0=0.3, L=1.2, k=1e-3, birth_genes=None, death_genes=None, clone_column=None, **kwargs, ): """Estimate the growth rate via clone information or logistic equation of population dynamics. Growth rate is calculated as 1) number_of_cell_at_source_time_in_the_clone / number_of_cell_at_end_time_in_the_clone when there is clone information (`[clone_column, time_column, source_time, target_time]` are all not None); 2) estimate via logistic equation of population growth and death. Parameters ---------- adata: :class:`~anndata.AnnData` AnnData object that contains the reconstructed vector field function in the `uns` attribute. group: str or None (default: `None`) The column key in .obs points to the collection time of each cell, required for calculating growth rate with clone information. source: str or None (default: `None`) The column key in .obs points to the starting point from collection time of each cell, required for calculating growth rate with clone information. target: str or None (default: `None`) The column key in .obs points to the end point from collection time of each cell, required for calculating growth rate with clone information. L0: float (default: `0.3`) The base growth/death rate. L: float (default: `1.2`) The maximum growth/death rate. k: float (default: `0.001) The steepness of the curve. birth_genes: list or None (default: `None`) The gene list associated with the cell cycle process. If None, GSEA's KEGG_CELL_CYCLE will be used. death_genes: list or None (default: `None`) The gene list associated with the cell cycle process. If None, GSEA's KEGG_APOPTOSIS will be used. clone_column: str or None (default: `None`) The column key in .obs points to the clone id if there is any. If a cell doesn't belong to any clone, the clone id of that cell should be assigned as `np.nan` kwargs Additional arguments that will be passed to score_cells function. Returns ------- An updated adata object that includes `growth_rate` column or `growth_rate, birth_score, death_score` in its `.obs` attribute when the clone based or purely expression based growth rate was calculated. """ # calculate growth rate when there is clone information. all_clone_info = [clone_column, group, source, target] obs = adata.obs source_mask_, target_mask_ = ( obs[group].values == source, obs[group].values == target, ) if all(i is not None for i in all_clone_info): if any(i not in adata.obs.keys() for i in all_clone_info[:2]): raise ValueError( f"At least one of your input clone information {clone_column}, {group} " f"is not in your adata .obs attribute." ) if any(i not in adata.obs[group] for i in all_clone_info[2:]): raise ValueError( f"At least one of your input source/target information {source}, {target} " f"is not in your adata.obs[{group}] column." ) clone_time_count = obs.groupby([clone_column])[group].value_counts().unstack().fillna(0).astype(int) source_meta = obs.loc[source_mask_] source_mask = (source_meta[clone_column] != np.nan).values target_meta = obs.loc[target_mask_] target_mask = (target_meta[clone_column] != np.nan).values source_num = clone_time_count.loc[source_meta.loc[source_mask, clone_column], source].values + 1 target_num = clone_time_count.loc[target_meta.loc[target_mask, clone_column], target].values + 1 growth_rates = target_num / source_num else: # calculate growth rate when there is no clone information. if birth_genes is None: birth_genes = pd.read_csv( "https://raw.githubusercontent.com/Xiaojieqiu/jungle/master/Cell_cycle.txt", header=None, dtype=str, ) birth_genes = birth_genes[0].values if death_genes is None: death_genes = pd.read_csv( "https://raw.githubusercontent.com/Xiaojieqiu/jungle/master/Apoptosis.txt", header=None, dtype=str, ) death_genes = death_genes[0].values birth_score = score_cells(adata, genes=birth_genes, **kwargs) death_score = score_cells(adata, genes=death_genes, **kwargs) adata.obs["birth_score"] = birth_score adata.obs["death_score"] = death_score kb = np.log(k) / np.min(birth_score) kd = np.log(k) / np.min(death_score) b = birth_score[source_mask_] d = death_score[source_mask_] b = L0 + L / (1 + np.exp(-kb * b)) d = L0 + L / (1 + np.exp(-kd * d)) growth_rates = b - d adata.obs["growth_rate"] = np.nan adata.obs.loc[source_mask_, "growth_rate"] = growth_rates return adata
def n_descentants(birth, death, dt): return np.exp(dt * (birth - death)) def growth_rate(n, dt): return np.log(n) / dt