# include pseudotime and predict cell trajectory
import statsmodels.api as sm
import numpy as np
from scipy.sparse import issparse
from scipy.interpolate import interp1d
from import update_dict
from ..prediction.utils import fetch_exprs
from .utils import save_fig

from ..docrep import DocstringProcessor
from ..external.hodge import ddhodge

docstrings = DocstringProcessor()

[docs]@docstrings.get_sectionsf("kin_curves") def kinetic_curves( adata, genes, mode="vector_field", basis=None, layer="X", project_back_to_high_dim=True, tkey="potential", dist_threshold=1e-10, ncol=4, color='ntr', c_palette="Set2", standard_scale=0, save_show_or_return='show', save_kwargs={}, ): """Plot the gene expression dynamics over time (pseudotime or inferred real time) as kinetic curves. Note that by default `potential` estimated with the diffusion graph built from reconstructed vector field will be used as the measure of pseudotime. Parameters ---------- adata: :class:`~anndata.AnnData` an Annodata object. genes: `list` The gene names whose gene expression will be faceted. mode: `str` (default: `vector_field`) Which data mode will be used, either vector_field or pseudotime. if mode is vector_field, the trajectory predicted by vector field function will be used, otherwise pseudotime trajectory (defined by time argument) will be used. By default `potential` estimated with the diffusion graph built from reconstructed vector field will be used as pseudotime. basis: `str` or None (default: `None`) The embedding data used for drawing the kinetic gene expression curves, only used when mode is `vector_field`. layer: `str` (default: X) Which layer of expression value will be used. Not used if mode is `vector_field`. project_back_to_high_dim: `bool` (default: `False`) Whether to map the coordinates in low dimension back to high dimension to visualize the gene expression curves, only used when mode is `vector_field` and basis is not `X`. Currently only works when basis is 'pca' and 'umap'. color: `list` or None (default: 'ntr') A list of attributes of cells (column names in the adata.obs) will be used to color cells. tkey: `str` (default: `potential`) The .obs column that will be used for timing each cell, only used when mode is not `vector_field`. dist_threshold: `float` or None (default: 1e-10) The threshold for the distance between two points in the gene expression state, i.e, x(t), x(t+1). If below this threshold, we assume steady state is achieved and those data points will not be considered. This argument is ignored when mode is `pseudotime`. ncol: `int` (default: 4) Number of columns in each facet grid. c_palette: Name of color_palette supported in seaborn color_palette function (default: None) The color map function to use. standard_scale: `int` (default: 1) Either 0 (rows) or 1 (columns). Whether or not to standardize that dimension, meaning for each row or column, subtract the minimum and divide each by its maximum. save_show_or_return: {'show', 'save_fig', 'return'} (default: `show`) Whether to save_fig, show or return the figure. save_kwargs: `dict` (default: `{}`) A dictionary that will passed to the save_fig function. By default it is an empty dictionary and the save_fig function will use the {"path": None, "prefix": 'kinetic_curves', "dpi": None, "ext": 'pdf', "transparent": True, "close": True, "verbose": True} as its parameters. Otherwise you can provide a dictionary that properly modify those keys according to your needs. Returns ------- Nothing but plots the kinetic curves that shows the gene expression dynamics over time. """ import pandas as pd import seaborn as sns import matplotlib.pyplot as plt if tkey == "potential" and "potential" not in adata.obs_keys(): ddhodge(adata) exprs, valid_genes, time = fetch_exprs( adata, basis, layer, genes, tkey, mode, project_back_to_high_dim ) Color = np.empty((0, 1)) if color is not None and mode != "vector_field": color = list(set(color).intersection(adata.obs.keys())) Color = ( adata.obs[color].values.T.flatten() if len(color) > 0 else np.empty((0, 1)) ) exprs = exprs.A if issparse(exprs) else exprs if standard_scale is not None: exprs = (exprs - np.min(exprs, axis=standard_scale)) / np.ptp( exprs, axis=standard_scale ) time = np.sort(time) exprs = exprs[np.argsort(time), :] if dist_threshold is not None and mode == 'vector_field': valid_ind = list( np.where(np.sum(np.diff(exprs, axis=0) ** 2, axis=1) > dist_threshold)[0] + 1 ) valid_ind.insert(0, 0) exprs = exprs[valid_ind, :] time = time[valid_ind] exprs_df = pd.DataFrame( { "Time": np.repeat(time, len(valid_genes)), "Expression": exprs.flatten(), "Gene": np.tile(valid_genes, len(time)), } ) exprs_df = exprs_df.query("Gene in @genes") if exprs_df.shape[0] == 0: raise Exception( "No genes you provided are detected. Please make sure the genes provided are from the genes " "used for vector field reconstructed when layer is set." ) # if len(Color) > 0: exprs_df["Color"] = np.repeat(Color, len(valid_genes)) g = sns.relplot( x="Time", y="Expression", data=exprs_df, col="Gene", hue="Color", palette=sns.color_palette(c_palette), col_wrap=ncol, kind="line", facet_kws={"sharex": True, "sharey": False}, ) else: g = sns.relplot( x="Time", y="Expression", data=exprs_df, col="Gene", col_wrap=ncol, kind="line", facet_kws={"sharex": True, "sharey": False}, ) if save_show_or_return == "save_fig": s_kwargs = {"path": None, "prefix": 'kinetic_curves', "dpi": None, "ext": 'pdf', "transparent": True, "close": True, "verbose": True} s_kwargs = update_dict(s_kwargs, save_kwargs) save_fig(**s_kwargs) elif save_show_or_return == "show": plt.tight_layout() elif save_show_or_return == "return": return g
docstrings.delete_params("kin_curves.parameters", "ncol", "color", "c_palette")
[docs]@docstrings.with_indent(4) def kinetic_heatmap( adata, genes, mode="vector_field", basis=None, layer="X", project_back_to_high_dim=True, tkey="potential", dist_threshold=1e-10, color_map="BrBG", gene_order_method='half_max_ordering', show_colorbar=False, cluster_row_col=[False, False], figsize=(11.5, 6), standard_scale=1, save_show_or_return='show', save_kwargs={}, **kwargs ): """Plot the gene expression dynamics over time (pseudotime or inferred real time) in a heatmap. Note that by default `potential` estimated with the diffusion graph built from reconstructed vector field will be used as the measure of pseudotime. Parameters ---------- %(kin_curves.parameters.no_ncol|color|c_palette)s color_map: `str` (default: `BrBG`) Color map that will be used to color the gene expression. If `half_max_ordering` is True, the color map need to be divergent, good examples, include `BrBG`, `RdBu_r` or `coolwarm`, etc. gene_order_method: `str` (default: `half_max_ordering`) [`half_max_ordering`, `maximum`] Supports two different methods for ordering genes when plotting the heatmap: either `half_max_ordering`, or `maximum`. For `half_max_ordering`, it will order genes into up, down and transit groups by the half max ordering algorithm (HA Pliner, et. al, Molecular cell 71 (5), 858-871. e8). While for `maximum`, it will order by the position of the highest gene expression. show_colorbar: `bool` (default: `False`) Whether to show the color bar. cluster_row_col: `[bool, bool]` (default: `[False, False]`) Whether to cluster the row or columns. figsize: `str` (default: `(11.5, 6)` Size of figure standard_scale: `int` (default: 1) Either 0 (rows, cells) or 1 (columns, genes). Whether or not to standardize that dimension, meaning for each row or column, subtract the minimum and divide each by its maximum. save_show_or_return: {'show', 'save_fig', 'return'} (default: `show`) Whether to save_fig, show or return the figure. save_kwargs: `dict` (default: `{}`) A dictionary that will passed to the save_fig function. By default it is an empty dictionary and the save_fig function will use the {"path": None, "prefix": 'kinetic_heatmap', "dpi": None, "ext": 'pdf', "transparent": True, "close": True, "verbose": True} as its parameters. Otherwise you can provide a dictionary that properly modify those keys according to your needs. kwargs: All other keyword arguments are passed to heatmap(). Currently `xticklabels=False, yticklabels='auto'` is passed to heatmap() by default. Returns ------- Nothing but plots a heatmap that shows the gene expression dynamics over time. """ import pandas as pd import seaborn as sns import matplotlib.pyplot as plt if tkey == "potential" and "potential" not in adata.obs_keys(): ddhodge(adata) exprs, valid_genes, time = fetch_exprs( adata, basis, layer, genes, tkey, mode, project_back_to_high_dim ) exprs = exprs.A if issparse(exprs) else exprs if dist_threshold is not None and mode == 'vector_field': valid_ind = list( np.where(np.sum(np.diff(exprs, axis=0) ** 2, axis=1) > dist_threshold)[0] + 1 ) valid_ind.insert(0, 0) exprs = exprs[valid_ind, :] time = time[valid_ind] if gene_order_method == "half_max_ordering": time, all, valid_ind, gene_idx = _half_max_ordering( exprs.T, time, mode=mode, interpolate=True, spaced_num=100 ) all, genes = all[np.isfinite(all.sum(1)), :], np.array(valid_genes)[gene_idx][np.isfinite(all.sum(1))] df = pd.DataFrame(all, index=genes) elif gene_order_method == 'maximum': exprs = lowess_smoother(time, exprs.T, spaced_num=100) exprs = exprs[np.isfinite(exprs.sum(1)), :] if standard_scale is not None: exprs = (exprs - np.min(exprs, axis=standard_scale)[:, None]) / np.ptp( exprs, axis=standard_scale )[:, None] max_sort = np.argsort(np.argmax(exprs, axis=1)) df = pd.DataFrame(exprs[max_sort, :], index=np.array(valid_genes)[max_sort]) else: raise Exception('gene order_method can only be either half_max_ordering or maximum') heatmap_kwargs = dict(xticklabels=False, yticklabels=1) if kwargs is not None: heatmap_kwargs = update_dict(heatmap_kwargs, kwargs) sns_heatmap = sns.clustermap( df, col_cluster=cluster_row_col[0], row_cluster=cluster_row_col[1], cmap=color_map, figsize=figsize, **heatmap_kwargs ) if not show_colorbar: sns_heatmap.cax.set_visible(False) if save_show_or_return == "save_fig": s_kwargs = {"path": None, "prefix": 'kinetic_heatmap', "dpi": None, "ext": 'pdf', "transparent": True, "close": True, "verbose": True} s_kwargs = update_dict(s_kwargs, save_kwargs) save_fig(**s_kwargs) elif save_show_or_return == "show": if show_colorbar: plt.subplots_adjust(right=0.85) plt.tight_layout() elif save_show_or_return == "return": return sns_heatmap
def _half_max_ordering(exprs, time, mode, interpolate=False, spaced_num=100): """Implement the half-max ordering algorithm from HA Pliner, Molecular Cell, 2018. Parameters ---------- exprs: `np.ndarray` The gene expression matrix (ngenes x ncells) ordered along time (either pseudotime or inferred real time). time: `np.ndarray` Pseudotime or inferred real time. mode: `str` (default: `vector_field`) Which data mode will be used, either vector_field or pseudotime. if mode is vector_field, the trajectory predicted by vector field function will be used, otherwise pseudotime trajectory (defined by time argument) will be used. interpolate: `bool` (default: `False`) Whether to interpolate the data when performing the loess fitting. spaced_num: `float` (default: `100`) The number of points on the loess fitting curve. Returns ------- time: `np.ndarray` The time at which the loess is evaluated. all: `np.ndarray` The ordered smoothed, scaled expression matrix, the first group is up, then down, followed by the transient gene groups. valid_ind: `np.ndarray` The indices of valid genes that Loess smoothed. gene_idx: `np.ndarray` The indices of genes that are used for the half-max ordering plot. """ if mode == "vector_field": interpolate = False gene_num = exprs.shape[0] cell_num = spaced_num if interpolate else exprs.shape[1] if interpolate: hm_mat_scaled, hm_mat_scaled_z = ( np.zeros((gene_num, cell_num)), np.zeros((gene_num, cell_num)), ) else: hm_mat_scaled, hm_mat_scaled_z = np.zeros_like(exprs), np.zeros_like(exprs) transient, trans_max, half_max = ( np.zeros(gene_num), np.zeros(gene_num), np.zeros(gene_num), ) tmp = lowess_smoother(time, exprs, spaced_num) if interpolate else exprs for i in range(gene_num): hm_mat_scaled[i] = tmp[i] - np.min(tmp[i]) hm_mat_scaled[i] = hm_mat_scaled[i] / np.max(hm_mat_scaled[i]) scale_tmp = (tmp[i] - np.mean(tmp[i])) / np.std(tmp[i]) # scale in R hm_mat_scaled_z[i] = scale_tmp count, current = 0, hm_mat_scaled_z[i, 0] < 0 # check this for j in range(cell_num): if not (scale_tmp[j] < 0 == current): count = count + 1 current = scale_tmp[j] < 0 half_max[i] = np.argmax(np.abs(scale_tmp - 0.5)) transient[i] = count trans_max[i] = np.argsort(-scale_tmp)[0] begin = np.arange(max([5, 0.05 * cell_num])) end = np.arange(exprs.shape[1] - max([5, 0.05 * cell_num]), cell_num) trans_indx = np.logical_and( transient > 1, not [i in np.concatenate((begin, end)) for i in trans_max] ) trans_idx, trans, half_max_trans = ( np.where(trans_indx)[0], hm_mat_scaled[trans_indx, :], half_max[trans_indx], ) nt_idx, nt = np.where(~trans_indx)[0], hm_mat_scaled[~trans_indx, :] up_idx, up, half_max_up = ( np.where(nt[:, 0] < nt[:, -1])[0], nt[nt[:, 0] < nt[:, -1], :], half_max[nt[:, 0] < nt[:, -1]], ) down_indx, down, half_max_down = ( np.where(nt[:, 0] >= nt[:, -1])[0], nt[nt[:, 0] >= nt[:, -1], :], half_max[nt[:, 0] >= nt[:, -1]], ) trans, up, down = ( trans[np.argsort(half_max_trans), :], up[np.argsort(half_max_up), :], down[np.argsort(half_max_down), :], ) all = np.vstack((up, down, trans)) gene_idx = np.hstack( ( nt_idx[up_idx][np.argsort(half_max_up)], nt_idx[down_indx][np.argsort(half_max_down)], trans_idx, ) ) return time, all, np.isfinite(nt[:, 0]) & np.isfinite(nt[:, -1]), gene_idx def lowess_smoother(time, exprs, spaced_num): gene_num = exprs.shape[0] res = np.zeros((gene_num, spaced_num)) for i in range(gene_num): x = exprs[i] lowess = sm.nonparametric.lowess tmp = lowess(x, time, frac=.3) # run scipy's interpolation. f = interp1d(tmp[:, 0], tmp[:, 1], bounds_error=False) time_linspace = np.linspace(np.min(time), np.max(time), spaced_num) res[i, :] = f(time_linspace) return res
[docs]@docstrings.with_indent(4) def jacobian_kinetics( adata, source_genes=None, target_genes=None, mode="pseudotime", tkey="potential", color_map="bwr", gene_order_method='raw', show_colorbar=False, cluster_row_col=[False, True], figsize=(11.5, 6), standard_scale=1, save_show_or_return='show', save_kwargs={}, **kwargs ): """Plot the gene expression dynamics over time (pseudotime or inferred real time) in a heatmap. Note that by default `potential` estimated with the diffusion graph built from reconstructed vector field will be used as the measure of pseudotime. Parameters ---------- adata: :class:`~anndata.AnnData` an Annodata object. source_genes: `list` or `None` (default: `None`) The list of genes that will be used as regulators for plotting the Jacobian heatmap, only limited to genes that have already performed Jacobian analysis. target_genes: `List` or `None` (default: `None`) The list of genes that will be used as targets for plotting the Jacobian heatmap, only limited to genes that have already performed Jacobian analysis. mode: `str` (default: `vector_field`) Which data mode will be used, either vector_field or pseudotime. if mode is vector_field, the trajectory predicted by vector field function will be used, otherwise pseudotime trajectory (defined by time argument) will be used. By default `potential` estimated with the diffusion graph built reconstructed vector field will be used as pseudotime. tkey: `str` (default: `potential`) The .obs column that will be used for timing each cell, only used when mode is not `vector_field`. color_map: `str` (default: `BrBG`) Color map that will be used to color the gene expression. If `half_max_ordering` is True, the color map need to be divergent, good examples, include `BrBG`, `RdBu_r` or `coolwarm`, etc. gene_order_method: `str` (default: `half_max_ordering`) [`half_max_ordering`, `maximum`] Supports two different methods for ordering genes when plotting the heatmap: either `half_max_ordering`, or `maximum`. For `half_max_ordering`, it will order genes into up, down and transit groups by the half max ordering algorithm (HA Pliner, et. al, Molecular cell 71 (5), 858-871. e8). While for `maximum`, it will order by the position of the highest gene expression. show_colorbar: `bool` (default: `False`) Whether to show the color bar. cluster_row_col: `[bool, bool]` (default: `[False, False]`) Whether to cluster the row or columns. figsize: `str` (default: `(11.5, 6)` Size of figure standard_scale: `int` (default: 1) Either 0 (rows, cells) or 1 (columns, genes). Whether or not to standardize that dimension, meaning for each row or column, subtract the minimum and divide each by its maximum. save_show_or_return: {'show', 'save_fig', 'return'} (default: `show`) Whether to save_fig, show or return the figure. save_kwargs: `dict` (default: `{}`) A dictionary that will passed to the save_fig function. By default it is an empty dictionary and the save_fig function will use the {"path": None, "prefix": 'kinetic_curves', "dpi": None, "ext": 'pdf', "transparent": True, "close": True, "verbose": True} as its parameters. Otherwise you can provide a dictionary that properly modify those keys according to your needs. kwargs: All other keyword arguments are passed to heatmap(). Currently `xticklabels=False, yticklabels='auto'` is passed to heatmap() by default. Returns ------- Nothing but plots a heatmap that shows the element of Jacobian matrix dynamics over time (potential decreasing). Examples -------- >>> import dynamo as dyn >>> adata = dyn.sample_data.hgForebrainGlutamatergic() >>> adata = dyn.pp.recipe_monocle(adata) >>> >>>, basis='pca') >>> valid_gene_list = adata[:, adata.var.use_for_transition].var.index[:2] >>>, source_genes=valid_gene_list[0], target_genes=valid_gene_list[1]) >>> """ import pandas as pd import seaborn as sns import matplotlib.pyplot as plt Jacobian_ = "jacobian" #f basis is None else "jacobian_" + basis Der, source_genes_, target_genes_, cell_indx, _ = adata.uns[Jacobian_].values() if tkey == "potential" and "potential" not in adata.obs_keys(): ddhodge(adata) adata_ = adata[cell_indx, :] time = adata_.obs[tkey] jacobian_mat = Der.reshape((-1, Der.shape[2])) if Der.ndim == 3 else Der[None, :] n_source_targets_ = Der.shape[0] * Der.shape[1] if Der.ndim == 3 else 1 targets_, sources_ = (np.repeat(target_genes_, Der.shape[1]), np.tile(source_genes_, Der.shape[0])) if Der.ndim == 3 \ else (np.repeat(target_genes_, Der.shape[0]), np.repeat(target_genes_, Der.shape[0])) source_targets_ = [sources_[i] + '->' + targets_[i] for i in range(n_source_targets_)] source_genes = source_genes_ if source_genes is None else source_genes target_genes = target_genes_ if target_genes is None else target_genes if type(source_genes) == str: source_genes = [source_genes] if type(target_genes) == str: target_genes = [target_genes] source_genes = list(set(source_genes_).intersection(source_genes)) target_genes = list(set(target_genes_).intersection(target_genes)) if len(source_genes) == 0 or len(target_genes) == 0: raise ValueError(f"Jacobian related to source genes {source_genes} and target genes {target_genes}" f"you provided are existed. Available source genes includes {source_genes_} while " f"available target genes includes {target_genes_}") n_source_targets = len(source_genes) * len(target_genes) targets, sources = np.repeat(target_genes, len(source_genes)), np.tile(source_genes, len(target_genes)) source_targets = [sources[i] + '->' + targets[i] for i in range(n_source_targets)] jacobian_mat = jacobian_mat[:, np.argsort(time)] if gene_order_method == "half_max_ordering": time, all, valid_ind, gene_idx = _half_max_ordering( jacobian_mat, time, mode=mode, interpolate=True, spaced_num=100 ) all, source_targets = all[np.isfinite(all.sum(1)), :], np.array(source_targets)[gene_idx][np.isfinite(all.sum(1))] df = pd.DataFrame(all, index=source_targets_) elif gene_order_method == 'maximum': jacobian_mat = lowess_smoother(time, jacobian_mat, spaced_num=100) jacobian_mat = jacobian_mat[np.isfinite(jacobian_mat.sum(1)), :] if standard_scale is not None: exprs = (jacobian_mat - np.min(jacobian_mat, axis=standard_scale)[:, None]) / np.ptp( jacobian_mat, axis=standard_scale )[:, None] max_sort = np.argsort(np.argmax(exprs, axis=1)) df = pd.DataFrame(exprs[max_sort, :], index=np.array(source_targets_)[max_sort]) elif gene_order_method == "raw": jacobian_mat /= np.abs(jacobian_mat).max(1)[:, None] df = pd.DataFrame(jacobian_mat, index=np.array(source_targets_)) else: raise Exception('gene order_method can only be either half_max_ordering or maximum') heatmap_kwargs = dict(xticklabels=False, yticklabels=1) if kwargs is not None: heatmap_kwargs = update_dict(heatmap_kwargs, kwargs) sns_heatmap = sns.clustermap( df.loc[source_targets, :], col_cluster=cluster_row_col[0], row_cluster=cluster_row_col[1] if len(source_targets) > 2 else False, cmap=color_map, figsize=figsize, center=0, **heatmap_kwargs ) if not show_colorbar: sns_heatmap.cax.set_visible(False) if save_show_or_return == "save_fig": s_kwargs = {"path": None, "prefix": 'jacobian_kinetics', "dpi": None, "ext": 'pdf', "transparent": True, "close": True, "verbose": True} s_kwargs = update_dict(s_kwargs, save_kwargs) save_fig(**s_kwargs) elif save_show_or_return == "show": if show_colorbar: plt.subplots_adjust(right=0.85) plt.tight_layout() elif save_show_or_return == "return": return sns_heatmap