Source code for dynamo.tools.metric_velocity

import numpy as np
import pandas as pd
from tqdm import tqdm
from scipy.sparse import issparse, csr_matrix
from sklearn.neighbors import NearestNeighbors
from .connectivity import umap_conn_indices_dist_embedding, mnn_from_list
from .utils import (
    get_finite_inds,
    inverse_norm,
    einsum_correlation,
    fetch_X_data,
)


[docs]def cell_wise_confidence( adata, X_data=None, V_data=None, ekey="M_s", vkey="velocity_S", neighbors_from_basis=False, method="jaccard", ): """Calculate the cell-wise velocity confidence metric. Parameters ---------- adata: :class:`~anndata.AnnData` an Annodata object. X_data: 'np.ndarray' or `sp.csr_matrix` or None (optional, default `None`) The expression states of single cells (or expression states in reduced dimension, like pca, of single cells) V_data: 'np.ndarray' or `sp.csr_matrix` or None (optional, default `None`) The RNA velocity of single cells (or velocity estimates projected to reduced dimension, like pca, of single cells). Note that X, V_mat need to have the exact dimensionalities. ekey: `str` (optional, default `M_s`) The dictionary key that corresponds to the gene expression in the layer attribute. By default, it is the smoothed expression `M_s`. vkey: 'str' (optional, default `velocity_S`) The dictionary key that corresponds to the estimated velocity values in layers attribute. neighbors_from_basis: `bool` (optional, default `False`) Whether to construct nearest neighbors from low dimensional space as defined by the `basis`, instead of using that calculated during UMAP process. method: `str` (optional, default `jaccard`) Which method will be used for calculating the cell wise velocity confidence metric. By default it uses `jaccard` index, which measures how well each velocity vector meets the geometric constraints defined by the local neighborhood structure. Jaccard index is calculated as the fraction of the number of the intersected set of nearest neighbors from each cell at current expression state (X) and that from the future expression state (X + V) over the number of the union of these two sets. The `cosine` or `correlation` method is similar to that used by scVelo (https://github.com/theislab/scvelo). Returns ------- adata: :class:`~anndata.AnnData` Returns an updated `~anndata.AnnData` with `.obs.confidence` as the cell-wise velocity confidence. """ if ekey == "X": X, V = ( adata.X if X_data is None else X_data, adata.layers[vkey] if V_data is None else V_data, ) norm_method = adata.uns["pp"]["norm_method"].copy() adata.uns["pp"]["norm_method"] = "log1p" X = inverse_norm(adata, X) if X_data is None else X_data adata.uns["pp"]["norm_method"] = norm_method else: X, V = ( adata.layers[ekey] if X_data is None else X_data, adata.layers[vkey] if V_data is None else V_data, ) X = inverse_norm(adata, X) if X_data is None else X_data if not neighbors_from_basis: n_neigh, X_neighbors = ( adata.uns["neighbors"]["params"]["n_neighbors"], adata.obsp["connectivities"], ) else: n_neigh = 30 if X.shape[0] > 200000 and X.shape[1] > 2: from pynndescent import NNDescent nbrs = NNDescent( X, metric="euclidean", n_neighbors=n_neigh + 1, n_jobs=-1, random_state=19491001, ) nbrs_idx, dist = nbrs.query(X, k=n_neigh + 1) else: alg = "ball_tree" if X.shape[1] > 10 else "kd_tree" nbrs = NearestNeighbors(n_neighbors=n_neigh + 1, algorithm=alg, n_jobs=-1).fit(X) dist, nbrs_idx = nbrs.kneighbors(X) row = np.repeat(nbrs_idx[:, 0], n_neigh) col = nbrs_idx[:, 1:].flatten() X_neighbors = csr_matrix( (np.repeat(1, len(col)), (row, col)), shape=(adata.n_obs, adata.n_obs), ) n_neigh = n_neigh[0] if type(n_neigh) == np.ndarray else n_neigh n_pca_components = adata.obsm["X"].shape[1] finite_inds = get_finite_inds(V, 0) X, V = X[:, finite_inds], V[:, finite_inds] if method == "jaccard": jac, _, _ = jaccard(X, V, n_pca_components, n_neigh, X_neighbors) confidence = jac elif method == "hybrid": # this is inspired from the locality preservation paper jac, intersect_, _ = jaccard(X, V, n_pca_components, n_neigh, X_neighbors) confidence = np.zeros(adata.n_obs) for i in tqdm( range(adata.n_obs), desc="calculating hybrid method (jaccard + consensus) based cell wise confidence", ): neigh_ids = np.where(intersect_[i].A)[0] if issparse(intersect_) else np.where(intersect_[i])[0] confidence[i] = ( jac[i] * np.mean([consensus(V[i].A.flatten(), V[j].A.flatten()) for j in neigh_ids]) if issparse(V) else jac[i] * np.mean([consensus(V[i].flatten(), V[j].flatten()) for j in neigh_ids]) ) elif method == "cosine": indices = adata.uns["neighbors"]["indices"] confidence = np.zeros(adata.n_obs) for i in tqdm( range(adata.n_obs), desc="calculating cosine based cell wise confidence", ): neigh_ids = indices[i] confidence[i] = ( np.mean([einsum_correlation(V[i].A, V[j].A.flatten(), type="cosine")[0, 0] for j in neigh_ids]) if issparse(V) else np.mean( [einsum_correlation(V[i][None, :], V[j].flatten(), type="cosine")[0, 0] for j in neigh_ids] ) ) elif method == "consensus": indices = adata.uns["neighbors"]["indices"] confidence = np.zeros(adata.n_obs) for i in tqdm( range(adata.n_obs), desc="calculating consensus based cell wise confidence", ): neigh_ids = indices[i] confidence[i] = ( np.mean([consensus(V[i].A.flatten(), V[j].A.flatten()) for j in neigh_ids]) if issparse(V) else np.mean([consensus(V[i], V[j].flatten()) for j in neigh_ids]) ) elif method == "correlation": # this is equivalent to scVelo indices = adata.uns["neighbors"]["indices"] confidence = np.zeros(adata.n_obs) for i in tqdm( range(adata.n_obs), desc="calculating correlation based cell wise confidence", ): neigh_ids = indices[i] confidence[i] = ( np.mean([einsum_correlation(V[i].A, V[j].A.flatten(), type="pearson")[0, 0] for j in neigh_ids]) if issparse(V) else np.mean( [einsum_correlation(V[i][None, :], V[j].flatten(), type="pearson")[0, 0] for j in neigh_ids] ) ) elif method == "divergence": pass else: raise Exception( "The input {} method for cell-wise velocity confidence calculation is not implemented" " yet".format(method) ) adata.obs[method + "_velocity_confidence"] = confidence return adata
def jaccard(X, V, n_pca_components, n_neigh, X_neighbors): from sklearn.decomposition import TruncatedSVD transformer = TruncatedSVD(n_components=n_pca_components + 1, random_state=0) Xt = X + V if issparse(Xt): Xt.data[Xt.data < 0] = 0 Xt.data = np.log2(Xt.data + 1) else: Xt = np.log2(Xt + 1) X_fit = transformer.fit(Xt) Xt_pca = X_fit.transform(Xt)[:, 1:] V_neighbors, _, _, _ = umap_conn_indices_dist_embedding(Xt_pca, n_neighbors=n_neigh, return_mapper=False) X_neighbors_, V_neighbors_ = ( X_neighbors.dot(X_neighbors), V_neighbors.dot(V_neighbors), ) union_ = X_neighbors_ + V_neighbors_ > 0 intersect_ = mnn_from_list([X_neighbors_, V_neighbors_]) > 0 jaccard = (intersect_.sum(1) / union_.sum(1)).A1 if issparse(X) else intersect_.sum(1) / union_.sum(1) return jaccard, intersect_, union_ def consensus(x, y): x_norm, y_norm = np.linalg.norm(x), np.linalg.norm(y) consensus = ( einsum_correlation(x[None, :], y, type="cosine")[0, 0] * np.min([x_norm, y_norm]) / np.max([x_norm, y_norm]) ) return consensus
[docs]def gene_wise_confidence( adata, group, lineage_dict=None, genes=None, ekey="M_s", vkey="velocity_S", X_data=None, V_data=None, V_threshold=1, ): """Diagnostic measure to identify genes contributed to "wrong" directionality of the vector flow. In some scenarios, you may find unexpected "wrong vector backflow" from your dynamo analysis, in order to diagnose those cases, we can identify those genes showing up in the wrong phase portrait position. Then we nay remove those identified genes to "correct" velocity vectors. This requires us to give some priors about what progenitor and terminal cell types are. The rationale behind this basically boils down to understanding the following two scenarios: 1). if the progenitor’s expression is low, starting from time point 0, cells should start to increase expression. There must be progenitors that are above the steady-state line. However, if most of the progenitors are laying below the line (indicated by the red cells), we will have negative velocity and this will lead to reversed vector flow. 2). if progenitors start from high expression, starting from time point 0, cells should start to decrease expression. There must be progenitors that are below the steady-state line. However, if most of the progenitors are laying above the steady state line, we will have positive velocity and this will lead to reversed vector flow. The same rationale can be applied to the mature cell states. Thus, we design an algorithm to access the confidence of each gene obeying the above two constraints: We first check for whether a gene should be in the induction or repression phase from each progenitor to each terminal cell states (based on the shift of the median gene expression between these two states). If it is in induction phase, cells should show mostly at >= small negative velocity; otherwise <= small negative velocity. 1 - ratio of cells with velocity pass those threshold (defined by `V_threshold`) in each state is then defined as a velocity confidence measure. Note that, this heuristic method requires you provide meaningful `progenitors_groups` and `mature_cells_groups`. In particular, the progentitor groups should in principle have cell going out (transcriptomically) while mature groups should end up in a different expression state and there are intermediate cells going to the dead end cells in the each terminal group (or most terminal groups). Parameters ---------- adata: :class:`~anndata.AnnData` an Annodata object group: `str` The column key/name that identifies the cell state grouping information of cells. This will be used for calculating gene-wise confidence score in each cell state. lineage_dict: `dict` A dictionary describes lineage priors. Keys corresponds to the group name from `group` that corresponding to the state of one progenitor type while values correspond to the group names from `group` that corresponding to the states of one or multiple terminal cell states. The best practice for determining terminal cell states are those fully functional cells instead of intermediate cell states. Note that in python a dictionary key cannot be a list, so if you have two progenitor types converge into one terminal cell state, you need to create two records each with the same terminal cell as value but different progenitor as the key. Value can be either a string for one cell group or a list of string for multiple cell groups. genes: `list` or None (default: `None`) The list of genes that will be used to gene-wise confidence score calculation. If `None`, all genes that go through velocity estimation will be used. ekey: `str` or None (default: `M_s`) The layer that will be used to retrieve data for identifying the gene is in induction or repression phase at each cell state. If `None`, .X is used. vkey: `str` or None (default: `velocity_S`) The layer that will be used to retrieve velocity data for calculating gene-wise confidence. If `None`, `velocity_S` is used. X_data: `np.ndarray` (default: `None`) The user supplied data that will be used for identifying the gene is in induction or repression phase at each cell state directly V_data: `np.ndarray` (default: `None`) The user supplied data that will be used for calculating gene-wise confidence directly. V_threshold: `float` (default: `1`) The threshold of velocity to calculate the gene wise confidence. Returns ------- An updated adata object with a new `gene_wise_confidence` key in .uns, which contains gene-wise confidence score in each cell state. .var will also be updated with `avg_prog_confidence` and `avg_mature_confidence` key which correspond to the average gene wise confidence in the progenitor state or the mature cell state. """ if X_data is None: genes, X_data = fetch_X_data(adata, genes, ekey) else: if genes is None or len(genes) != X_data.shape[1]: raise ValueError( f"When providing X_data, a list of genes name that corresponds to the columns of X_data " f"must be provided" ) if V_data is None: genes, V_data = fetch_X_data(adata, genes, vkey) else: if genes is None or len(genes) != X_data.shape[1]: raise ValueError( f"When providing V_data, a list of genes name that corresponds to the columns of X_data " f"must be provided" ) sparse, sparse_v = issparse(X_data), issparse(V_data) confidence = [] for i_gene, gene in tqdm( enumerate(genes), desc="calculating gene velocity vectors confidence based on phase " "portrait location with priors of progenitor/mature cell types", ): all_vals = X_data[:, i_gene].A if sparse else X_data[:, i_gene] all_vals_v = V_data[:, i_gene].A if sparse_v else V_data[:, i_gene] for progenitors_groups, mature_cells_groups in lineage_dict.items(): progenitors_groups = [progenitors_groups] if type(mature_cells_groups) is str: mature_cells_groups = [mature_cells_groups] for i, progenitor in enumerate(progenitors_groups): prog_vals = all_vals[adata.obs[group] == progenitor] prog_vals_v = all_vals_v[adata.obs[group] == progenitor] threshold_val = np.percentile(abs(all_vals_v), V_threshold) for j, mature in enumerate(mature_cells_groups): mature_vals = all_vals[adata.obs[group] == mature] mature_vals_v = all_vals_v[adata.obs[group] == mature] if np.nanmedian(prog_vals) - np.nanmedian(mature_vals) > 0: # repression phase (bottom curve -- phase curve below the linear line indicates steady states) prog_confidence = 1 - sum(prog_vals_v > -threshold_val)[0] / len( prog_vals_v ) # most cells should downregulate / ss mature_confidence = 1 - sum(mature_vals_v > -threshold_val)[0] / len( mature_vals_v ) # most cell should downregulate / ss else: # induction phase (upper curve -- phase curve above the linear line indicates steady states) prog_confidence = 1 - sum(prog_vals_v < threshold_val)[0] / len( prog_vals_v ) # most cells should upregulate / ss mature_confidence = 1 - sum(mature_vals_v < threshold_val)[0] / len( mature_vals_v ) # most cell should upregulate / ss confidence.append( ( gene, progenitor, mature, prog_confidence, mature_confidence, ) ) confidence = pd.DataFrame( confidence, columns=[ "gene", "progenitor", "mature", "prog_confidence", "mature_confidence", ], ) confidence.astype(dtype={"prog_confidence": "float64", "prog_confidence": "float64"}) adata.var["avg_prog_confidence"], adata.var["avg_mature_confidence"] = ( np.nan, np.nan, ) avg = confidence.groupby("gene")["prog_confidence", "mature_confidence"].mean() avg = avg.reset_index().set_index("gene") adata.var.loc[genes, "avg_prog_confidence"] = avg.loc[genes, "prog_confidence"] adata.var.loc[genes, "avg_mature_confidence"] = avg.loc[genes, "mature_confidence"] adata.uns["gene_wise_confidence"] = confidence