Source code for dynamo.plot.time_series

# include pseudotime and predict cell trajectory
from typing import Optional, Union

import numpy as np
from anndata import AnnData
from scipy.interpolate import interp1d
from scipy.sparse import issparse

from ..docrep import DocstringProcessor
from ..external.hodge import ddhodge
from ..prediction.utils import fetch_exprs
from ..tools.utils import update_dict
from .utils import _to_hex, save_fig

docstrings = DocstringProcessor()


[docs]@docstrings.get_sectionsf("kin_curves") def kinetic_curves( adata: AnnData, genes: list, mode: str = "vector_field", basis: Optional[str] = None, layer: str = "X", project_back_to_high_dim: bool = True, tkey: str = "potential", dist_threshold: float = 1e-10, ncol: int = 4, color: Union[list, None] = "ntr", c_palette: str = "Set2", standard_scale: int = 0, traj_ind: int = 0, log: bool = True, save_show_or_return: str = "show", save_kwargs: dict = {}, ): """Plot the gene expression dynamics over time (pseudotime or inferred real time) as kinetic curves. Note that by default `potential` estimated with the diffusion graph built from reconstructed vector field will be used as the measure of pseudotime. Parameters ---------- adata: :class:`~anndata.AnnData` an Annodata object. genes: `list` The gene names whose gene expression will be faceted. mode: `str` (default: `vector_field`) Which data mode will be used, either vector_field, lap or pseudotime. if mode is vector_field, the trajectory predicted by vector field function will be used; if mode is lap, the trajectory predicted by least action path will be used otherwise pseudotime trajectory (defined by time argument) will be used. By default `potential` estimated with the diffusion graph built from reconstructed vector field will be used as pseudotime when mode is pseudotime. basis: `str` or None (default: `None`) The embedding data used for drawing the kinetic gene expression curves, only used when mode is `vector_field`. layer: `str` (default: X) Which layer of expression value will be used. Not used if mode is `vector_field`. project_back_to_high_dim: `bool` (default: `False`) Whether to map the coordinates in low dimension back to high dimension to visualize the gene expression curves, only used when mode is `vector_field` and basis is not `X`. Currently only works when basis is 'pca' and 'umap'. color: `list` or None (default: 'ntr') A list of attributes of cells (column names in the adata.obs) will be used to color cells. tkey: `str` (default: `potential`) The .obs column that will be used for timing each cell, only used when mode is not `vector_field`. dist_threshold: `float` (default: 1e-10) The threshold for the distance between two points in the gene expression state, i.e, x(t), x(t+1). If below this threshold, we assume steady state is achieved and those data points will not be considered. This argument is ignored when mode is `pseudotime`. ncol: `int` (default: 4) Number of columns in each facet grid. c_palette: Name of color_palette supported in seaborn color_palette function (default: None) The color map function to use. standard_scale: `int` (default: 1) Either 0 (rows) or 1 (columns). Whether or not to standardize that dimension, meaning for each row or column, subtract the minimum and divide each by its maximum. traj_ind: `int` (default: 0) If the element from the dictionary is a list (obtained from a list of trajectories), the index of trajectory that will be selected for visualization. log: `bool` (default: True) Whether to log1p transform your data before data visualization. If expression data is from adata object, it is generally already log1p transformed. When the data is from predicted either from traj simulation or LAP, the data is generally in the original gene expression space and needs to be log1p transformed. Note: when predicted data is not inverse transformed back to original expression space, no transformation will be applied. save_show_or_return: {'show', 'save_fig', 'return'} (default: `show`) Whether to save_fig, show or return the figure. save_kwargs: `dict` (default: `{}`) A dictionary that will passed to the save_fig function. By default it is an empty dictionary and the save_fig function will use the {"path": None, "prefix": 'kinetic_curves', "dpi": None, "ext": 'pdf', "transparent": True, "close": True, "verbose": True} as its parameters. Otherwise you can provide a dictionary that properly modify those keys according to your needs. Returns ------- Nothing but plots the kinetic curves that shows the gene expression dynamics over time. """ import matplotlib.pyplot as plt import pandas as pd import seaborn as sns if mode == "pseudotime" and tkey == "potential" and "potential" not in adata.obs_keys(): ddhodge(adata) exprs, valid_genes, time = fetch_exprs( adata, basis, layer, genes, tkey, mode, project_back_to_high_dim, traj_ind, ) Color = np.empty((0, 1)) if color is not None and mode not in ["lap", "vector_field"]: color = list(set(color).intersection(adata.obs.keys())) Color = adata.obs[color].values.T.flatten() if len(color) > 0 else np.empty((0, 1)) exprs = exprs.A if issparse(exprs) else exprs if len(set(genes).intersection(valid_genes)) > 0: # by default, expression values are log1p tranformed if using the expression from adata. exprs = np.expm1(exprs) if not log else exprs if standard_scale is not None: exprs = (exprs - np.min(exprs, axis=standard_scale)) / np.ptp(exprs, axis=standard_scale) time = np.sort(time) exprs = exprs[np.argsort(time), :] if dist_threshold is not None and mode in ["lap", "vector_field"]: valid_ind = list(np.where(np.sum(np.diff(exprs, axis=0) ** 2, axis=1) > dist_threshold)[0] + 1) valid_ind.insert(0, 0) exprs = exprs[valid_ind, :] time = time[valid_ind] exprs_df = pd.DataFrame( { "Time": np.repeat(time, len(valid_genes)), "Expression": exprs.flatten(), "Gene": np.tile(valid_genes, len(time)), } ) if exprs_df.shape[0] == 0: raise Exception( "No genes you provided are detected. Please make sure the genes provided are from the genes " "used for vector field reconstructed when layer is set." ) # https://stackoverflow.com/questions/43920341/python-seaborn-facetgrid-change-titles if len(Color) > 0: exprs_df["Color"] = np.repeat(Color, len(valid_genes)) g = sns.relplot( x="Time", y="Expression", data=exprs_df, col="Gene", hue="Color", palette=sns.color_palette(c_palette), col_wrap=ncol, kind="line", facet_kws={"sharex": True, "sharey": False}, ) else: g = sns.relplot( x="Time", y="Expression", data=exprs_df, col="Gene", col_wrap=ncol, kind="line", facet_kws={"sharex": True, "sharey": False}, ) if save_show_or_return == "save": s_kwargs = { "path": None, "prefix": "kinetic_curves", "dpi": None, "ext": "pdf", "transparent": True, "close": True, "verbose": True, } s_kwargs = update_dict(s_kwargs, save_kwargs) save_fig(**s_kwargs) elif save_show_or_return == "show": plt.tight_layout() plt.show() elif save_show_or_return == "return": return g
docstrings.delete_params("kin_curves.parameters", "ncol", "color", "c_palette")
[docs]@docstrings.with_indent(4) def kinetic_heatmap( adata: AnnData, genes: list, mode: str = "vector_field", basis: Optional[str] = None, layer: str = "X", project_back_to_high_dim: bool = True, tkey: str = "potential", dist_threshold: float = 1e-10, color_map: int = "BrBG", gene_order_method: str = "maximum", show_colorbar: bool = False, cluster_row_col: list = [False, False], figsize: tuple = (11.5, 6), standard_scale: int = 1, n_convolve: int = 30, spaced_num: int = 100, traj_ind: int = 0, log: bool = True, gene_group: Union[None, list] = None, gene_group_cmap: Union[None, list] = None, cell_group: Union[None, list] = None, cell_group_cmap: Union[None, list] = None, enforce: bool = False, hline_rows: Union[None, list] = None, hlines_kwargs: dict = {}, vline_cols: Union[None, list] = None, vlines_kwargs: dict = {}, save_show_or_return: str = "show", save_kwargs: dict = {}, transpose: bool = False, **kwargs, ): """Plot the gene expression dynamics over time (pseudotime or inferred real time) in a heatmap. Note that by default `potential` estimated with the diffusion graph built from reconstructed vector field will be used as the measure of pseudotime. Parameters ---------- %(kin_curves.parameters.no_ncol|color|c_palette)s color_map: `str` (default: `BrBG`) Color map that will be used to color the gene expression. If `half_max_ordering` is True, the color map need to be divergent, good examples, include `BrBG`, `RdBu_r` or `coolwarm`, etc. gene_order_method: `str` (default: `half_max_ordering`) [`half_max_ordering`, `maximum`, `raw`] Supports three different methods for ordering genes when plotting the heatmap: either `half_max_ordering`, `maximum` or `raw`. For `half_max_ordering`, it will order genes into up, down and transit groups by the half max ordering algorithm (HA Pliner, et. al, Molecular cell 71 (5), 858-871. e8). While for `maximum`, it will order by the position of the highest gene expression. `raw` means just use the original order from the input gene list. show_colorbar: `bool` (default: `False`) Whether to show the color bar. cluster_row_col: `[bool, bool]` (default: `[False, False]`) Whether to cluster the row or columns. figsize: `str` (default: `(11.5, 6)` Size of figure standard_scale: `int` (default: 1) Either 0 (rows, cells) or 1 (columns, genes). Whether or not to standardize that dimension, meaning for each row or column, subtract the minimum and divide each by its maximum. n_convolve: `int` (default: 30) Number of cells for convolution. traj_ind: `int` (default: 0) If the element from the dictionary is a list (obtained from a list of trajectories), the index of trajectory that will be selected for visualization. log: `bool` (default: True) Whether to log1p transform your data before data visualization. If expression data is from adata object, it is generally already log1p transformed. When the data is from predicted either from traj simulation or LAP, the data is generally in the original gene expression space and needs to be log1p transformed. Note: when predicted data is not inverse transformed back to original expression space, no transformation will be applied. gene_group: The key of the gene groups in .var. gene_group_cmap: The str of the colormap for gene groups. cell_group: The key of the cell groups in .obs. cell_group_cmap: The str of the colormap for cell groups. enforce: Whether to recalculate the dataframe that will be used to create the kinetic heatmap. If this is set to be False and the the .uns['kinetic_heatmap'] is in the adata object, we will use data from `.uns['kinetic_heatmap']` directly. hline_rows: The indices of rows that we can place a line on the heatmap. hlines_kwargs: The dictionary of arguments that will be passed into sns_heatmap.ax_heatmap.hlines. vline_cols: The indices of column that we can place a line on the heatmap. vlines_kwargs: The dictionary of arguments that will be passed into sns_heatmap.ax_heatmap.vlines. save_show_or_return: {'show', 'save_fig', 'return'} (default: `show`) Whether to save_fig, show or return the figure. save_kwargs: `dict` (default: `{}`) A dictionary that will passed to the save_fig function. By default it is an empty dictionary and the save_fig function will use the {"path": None, "prefix": 'kinetic_heatmap', "dpi": None, "ext": 'pdf', "transparent": True, "close": True, "verbose": True} as its parameters. Otherwise you can provide a dictionary that properly modify those keys according to your needs. transpose: Whether to transpose the dataframe and swap X-Y in heatmap. In single cell case, `transpose=True` results in gene on the x-axis. kwargs: All other keyword arguments are passed to heatmap(). Currently `xticklabels=False, yticklabels='auto'` is passed to heatmap() by default. Returns ------- Nothing but plots a heatmap that shows the gene expression dynamics over time. """ import matplotlib.pyplot as plt import pandas as pd import seaborn as sns if enforce or "kinetic_heatmap" not in adata.uns_keys(): if mode == "pseudotime" and tkey == "potential" and "potential" not in adata.obs_keys(): ddhodge(adata) exprs, valid_genes, time = fetch_exprs( adata, basis, layer, genes, tkey, mode, project_back_to_high_dim, traj_ind, ) valid_genes = [x for x in genes if x in valid_genes] exprs = exprs.A if issparse(exprs) else exprs if mode != "pseudotime": exprs = np.log1p(exprs) if log else exprs spaced_num = None # don't need to get further smoothed. if len(set(genes).intersection(valid_genes)) > 0: # by default, expression values are log1p tranformed if using the expression from adata. exprs = np.expm1(exprs) if not log else exprs if dist_threshold is not None and mode == "vector_field": valid_ind = list(np.where(np.sum(np.diff(exprs, axis=0) ** 2, axis=1) > dist_threshold)[0] + 1) valid_ind.insert(0, 0) exprs = exprs[valid_ind, :] time = time[valid_ind] if gene_order_method == "half_max_ordering": time, all, valid_ind, gene_idx = _half_max_ordering( exprs.T, time, mode=mode, interpolate=True, spaced_num=spaced_num ) all, genes = ( all[np.isfinite(all.sum(1)), :], np.array(valid_genes)[gene_idx][np.isfinite(all.sum(1))], ) df = pd.DataFrame(all, index=genes) elif gene_order_method in ["maximum", "raw"]: exprs = lowess_smoother(time, exprs.T, spaced_num=spaced_num, n_convolve=n_convolve) exprs = exprs[np.isfinite(exprs.sum(1)), :] if standard_scale is not None: exprs = (exprs - np.min(exprs, axis=standard_scale)[:, None]) / np.ptp(exprs, axis=standard_scale)[ :, None ] if gene_order_method == "maximum": max_sort = np.argsort(np.argmax(exprs, axis=1)) else: max_sort = np.arange(exprs.shape[0]) if spaced_num is None and mode == "pseudotime": df = pd.DataFrame( exprs[max_sort, :], index=np.array(valid_genes)[max_sort], columns=adata.obs_names, ) else: df = pd.DataFrame(exprs[max_sort, :], index=np.array(valid_genes)[max_sort]) else: raise Exception("gene order_method can only be either half_max_ordering or maximum") adata.uns["kinetics_heatmap"] = df else: df = adata.uns["kinetics_heatmap"] row_colors, col_colors = None, None if gene_group is not None: color_key_cmap = "tab20" if gene_group_cmap is None else gene_group_cmap uniq_gene_grps = adata.var[gene_group].unique().tolist() num_labels = len(uniq_gene_grps) color_key = _to_hex(plt.get_cmap(color_key_cmap)(np.linspace(0, 1, num_labels))) gene_lut = dict(zip(map(str, uniq_gene_grps), color_key)) row_colors = adata.var[gene_group].map(gene_lut) else: uniq_gene_grps, gene_lut = [], {} if cell_group is not None: color_key_cmap = "tab20" if cell_group_cmap is None else cell_group_cmap uniq_cell_grps = adata.obs[cell_group].unique().tolist() num_labels = len(uniq_cell_grps) color_key = _to_hex(plt.get_cmap(color_key_cmap)(np.linspace(0, 1, num_labels))) cell_lut = dict(zip(map(str, uniq_cell_grps), color_key)) col_colors = adata.obs[cell_group].map(cell_lut) else: uniq_cell_grps, cell_lut = [], {} if transpose: row_colors, col_colors = col_colors, row_colors cluster_row_col[0], cluster_row_col[1] = cluster_row_col[1], cluster_row_col[0] df = df.T heatmap_kwargs = dict( xticklabels=False, yticklabels=1, row_colors=row_colors, col_colors=col_colors, row_linkage=None, col_linkage=None, method="average", metric="euclidean", z_score=None, standard_scale=None, ) if kwargs is not None: heatmap_kwargs = update_dict(heatmap_kwargs, kwargs) sns_heatmap = sns.clustermap( df, col_cluster=cluster_row_col[0], row_cluster=cluster_row_col[1], cmap=color_map, figsize=figsize, **heatmap_kwargs, ) if not show_colorbar: sns_heatmap.cax.set_visible(False) if cell_group is not None or gene_group is not None: # https://stackoverflow.com/questions/27988846/how-to-express-classes-on-the-axis-of-a-heatmap-in-seaborn # answer from mwaskom uniq_grps = uniq_cell_grps + uniq_gene_grps lut = cell_lut.copy() lut.update(gene_lut) for label in uniq_grps: sns_heatmap.ax_col_dendrogram.bar(0, 0, color=lut[label], label=label, linewidth=0) cell_group_num, gene_group_num = len(cell_lut), len(gene_lut) if cell_group_num > 0 and gene_group_num > 0: ncol = min([cell_group_num, gene_group_num]) else: ncol = 5 if cell_group is None: title = gene_group elif gene_group is None: title = cell_group else: title = gene_group + cell_group sns_heatmap.ax_col_dendrogram.legend(title=title, loc="center", ncol=ncol) sns_heatmap.cax.set_position([0.15, 0.2, 0.03, 0.45]) if hline_rows is not None: hl_kwargs = update_dict({"linestyles": "dashdot"}, hlines_kwargs) sns_heatmap.ax_heatmap.hlines(hline_rows, *sns_heatmap.ax_heatmap.get_xlim(), **hl_kwargs) if vline_cols is not None: vline_kwargs = update_dict({"linestyles": "dashdot"}, vlines_kwargs) sns_heatmap.ax_heatmap.vlines(vline_cols, *sns_heatmap.ax_heatmap.get_ylim(), **vline_kwargs) if save_show_or_return == "save": s_kwargs = { "path": None, "prefix": "kinetic_heatmap", "dpi": None, "ext": "pdf", "transparent": True, "close": True, "verbose": True, } s_kwargs = update_dict(s_kwargs, save_kwargs) save_fig(**s_kwargs) elif save_show_or_return == "show": if show_colorbar: plt.subplots_adjust(right=0.85) plt.tight_layout() plt.show() elif save_show_or_return == "return": return sns_heatmap
def _half_max_ordering(exprs, time, mode, interpolate=False, spaced_num=100): """Implement the half-max ordering algorithm from HA Pliner, Molecular Cell, 2018. Parameters ---------- exprs: `np.ndarray` The gene expression matrix (ngenes x ncells) ordered along time (either pseudotime or inferred real time). time: `np.ndarray` Pseudotime or inferred real time. mode: `str` (default: `vector_field`) Which data mode will be used, either vector_field or pseudotime. if mode is vector_field, the trajectory predicted by vector field function will be used, otherwise pseudotime trajectory (defined by time argument) will be used. interpolate: `bool` (default: `False`) Whether to interpolate the data when performing the loess fitting. spaced_num: `float` (default: `100`) The number of points on the loess fitting curve. Returns ------- time: `np.ndarray` The time at which the loess is evaluated. all: `np.ndarray` The ordered smoothed, scaled expression matrix, the first group is up, then down, followed by the transient gene groups. valid_ind: `np.ndarray` The indices of valid genes that Loess smoothed. gene_idx: `np.ndarray` The indices of genes that are used for the half-max ordering plot. """ if mode == "vector_field": interpolate = False gene_num = exprs.shape[0] cell_num = spaced_num if interpolate else exprs.shape[1] if interpolate: hm_mat_scaled, hm_mat_scaled_z = ( np.zeros((gene_num, cell_num)), np.zeros((gene_num, cell_num)), ) else: hm_mat_scaled, hm_mat_scaled_z = ( np.zeros_like(exprs), np.zeros_like(exprs), ) transient, trans_max, half_max = ( np.zeros(gene_num), np.zeros(gene_num), np.zeros(gene_num), ) tmp = lowess_smoother(time, exprs, spaced_num) if interpolate else exprs for i in range(gene_num): hm_mat_scaled[i] = tmp[i] - np.min(tmp[i]) hm_mat_scaled[i] = hm_mat_scaled[i] / np.max(hm_mat_scaled[i]) scale_tmp = (tmp[i] - np.mean(tmp[i])) / np.std(tmp[i]) # scale in R hm_mat_scaled_z[i] = scale_tmp count, current = 0, hm_mat_scaled_z[i, 0] < 0 # check this for j in range(cell_num): if not (scale_tmp[j] < 0 == current): count = count + 1 current = scale_tmp[j] < 0 half_max[i] = np.argmax(np.abs(scale_tmp - 0.5)) transient[i] = count trans_max[i] = np.argsort(-scale_tmp)[0] begin = np.arange(max([5, 0.05 * cell_num])) end = np.arange(exprs.shape[1] - max([5, 0.05 * cell_num]), cell_num) trans_indx = np.logical_and( transient > 1, not [i in np.concatenate((begin, end)) for i in trans_max], ) trans_idx, trans, half_max_trans = ( np.where(trans_indx)[0], hm_mat_scaled[trans_indx, :], half_max[trans_indx], ) nt_idx, nt = np.where(~trans_indx)[0], hm_mat_scaled[~trans_indx, :] up_idx, up, half_max_up = ( np.where(nt[:, 0] < nt[:, -1])[0], nt[nt[:, 0] < nt[:, -1], :], half_max[nt[:, 0] < nt[:, -1]], ) down_indx, down, half_max_down = ( np.where(nt[:, 0] >= nt[:, -1])[0], nt[nt[:, 0] >= nt[:, -1], :], half_max[nt[:, 0] >= nt[:, -1]], ) trans, up, down = ( trans[np.argsort(half_max_trans), :], up[np.argsort(half_max_up), :], down[np.argsort(half_max_down), :], ) all = np.vstack((up, down, trans)) gene_idx = np.hstack( ( nt_idx[up_idx][np.argsort(half_max_up)], nt_idx[down_indx][np.argsort(half_max_down)], trans_idx, ) ) return time, all, np.isfinite(nt[:, 0]) & np.isfinite(nt[:, -1]), gene_idx def lowess_smoother(time, exprs, spaced_num=None, n_convolve=30): gene_num = exprs.shape[0] if spaced_num is None: res = exprs.copy() if exprs.shape[1] < 300: return res else: res = np.zeros((gene_num, spaced_num)) for i in range(gene_num): x = exprs[i] if spaced_num is None: x_convolved = np.convolve(x[np.argsort(time)], np.ones(30) / 30, mode="same") res[i, :] = x_convolved else: # lowess = sm.nonparametric.lowess # tmp = lowess(x, time, frac=.3) # # run scipy's interpolation. # f = interp1d(tmp[:, 0], tmp[:, 1], bounds_error=False) x_convolved = np.convolve( x[np.argsort(time)], np.ones(n_convolve) / n_convolve, mode="same", ) # check: is any difference between interpld and np.convolve? if len(time) == len(x_convolved): f = interp1d(time[np.argsort(time)], x_convolved, bounds_error=False) time_linspace = np.linspace(np.min(time), np.max(time), spaced_num) res[i, :] = f(time_linspace) else: res[i, :] = np.convolve( x[np.argsort(time)], np.ones(spaced_num) / spaced_num, mode="same", ) return res
[docs]@docstrings.with_indent(4) def jacobian_kinetics( adata: AnnData, basis: str = "umap", regulators: Optional[list] = None, effectors: Optional[list] = None, mode: str = "pseudotime", tkey: str = "potential", color_map: str = "bwr", gene_order_method: str = "raw", show_colorbar: bool = False, cluster_row_col: list = [False, True], figsize: tuple = (11.5, 6), standard_scale: int = 1, n_convolve: int = 30, save_show_or_return: str = "show", save_kwargs: dict = {}, **kwargs, ): """Plot the Jacobian dynamics over time (pseudotime or inferred real time) in a heatmap. Note that by default `potential` estimated with the diffusion graph built from reconstructed vector field will be used as the measure of pseudotime. Parameters ---------- adata: :class:`~anndata.AnnData` an Annodata object. basis: `str` The reduced dimension basis. regulators: `list` or `None` (default: `None`) The list of genes that will be used as regulators for plotting the Jacobian heatmap, only limited to genes that have already performed Jacobian analysis. effectors: `List` or `None` (default: `None`) The list of genes that will be used as targets for plotting the Jacobian heatmap, only limited to genes that have already performed Jacobian analysis. mode: `str` (default: `vector_field`) Which data mode will be used, either vector_field or pseudotime. if mode is vector_field, the trajectory predicted by vector field function will be used, otherwise pseudotime trajectory (defined by time argument) will be used. By default `potential` estimated with the diffusion graph built reconstructed vector field will be used as pseudotime. tkey: `str` (default: `potential`) The .obs column that will be used for timing each cell, only used when mode is not `vector_field`. color_map: `str` (default: `BrBG`) Color map that will be used to color the gene expression. If `half_max_ordering` is True, the color map need to be divergent, good examples, include `BrBG`, `RdBu_r` or `coolwarm`, etc. gene_order_method: `str` (default: `half_max_ordering`) [`half_max_ordering`, `maximum`] Supports two different methods for ordering genes when plotting the heatmap: either `half_max_ordering`, or `maximum`. For `half_max_ordering`, it will order genes into up, down and transit groups by the half max ordering algorithm (HA Pliner, et. al, Molecular cell 71 (5), 858-871. e8). While for `maximum`, it will order by the position of the highest gene expression. show_colorbar: `bool` (default: `False`) Whether to show the color bar. cluster_row_col: `[bool, bool]` (default: `[False, False]`) Whether to cluster the row or columns. figsize: `str` (default: `(11.5, 6)` Size of figure standard_scale: `int` (default: 1) Either 0 (rows, cells) or 1 (columns, genes). Whether or not to standardize that dimension, meaning for each row or column, subtract the minimum and divide each by its maximum. n_convolve: `int` (default: 30) Number of cells for convolution. save_show_or_return: {'show', 'save_fig', 'return'} (default: `show`) Whether to save_fig, show or return the figure. save_kwargs: `dict` (default: `{}`) A dictionary that will passed to the save_fig function. By default it is an empty dictionary and the save_fig function will use the {"path": None, "prefix": 'kinetic_curves', "dpi": None, "ext": 'pdf', "transparent": True, "close": True, "verbose": True} as its parameters. Otherwise you can provide a dictionary that properly modify those keys according to your needs. kwargs: All other keyword arguments are passed to heatmap(). Currently `xticklabels=False, yticklabels='auto'` is passed to heatmap() by default. Returns ------- Nothing but plots a heatmap that shows the element of Jacobian matrix dynamics over time (potential decreasing). Examples -------- >>> import dynamo as dyn >>> adata = dyn.sample_data.hgForebrainGlutamatergic() >>> adata = dyn.pp.recipe_monocle(adata) >>> dyn.tl.dynamics(adata) >>> dyn.vf.VectorField(adata, basis='pca') >>> valid_gene_list = adata[:, adata.var.use_for_transition].var.index[:2] >>> dyn.vf.jacobian(adata, regulators=valid_gene_list[0], effectors=valid_gene_list[1]) >>> dyn.pl.jacobian_kinetics(adata) """ import matplotlib.pyplot as plt import pandas as pd import seaborn as sns Jacobian_ = "jacobian" if basis is None else "jacobian_" + basis Der, cell_indx, _, regulators_, effectors_ = ( adata.uns[Jacobian_].get("jacobian"), adata.uns[Jacobian_].get("cell_idx"), adata.uns[Jacobian_].get("jacobian_gene"), adata.uns[Jacobian_].get("regulators"), adata.uns[Jacobian_].get("effectors"), ) if tkey == "potential" and "potential" not in adata.obs_keys(): ddhodge(adata) adata_ = adata[cell_indx, :] time = adata_.obs[tkey] jacobian_mat = Der.reshape((-1, Der.shape[2])) if Der.ndim == 3 else Der[None, :] n_source_targets_ = Der.shape[0] * Der.shape[1] if Der.ndim == 3 else 1 targets_, sources_ = ( ( np.repeat(effectors_, Der.shape[1]), np.tile(regulators_, Der.shape[0]), ) if Der.ndim == 3 else ( np.repeat(effectors_, Der.shape[0]), np.repeat(effectors_, Der.shape[0]), ) ) source_targets_ = [sources_[i] + "->" + targets_[i] for i in range(n_source_targets_)] regulators = regulators_ if regulators is None else regulators effectors = effectors_ if effectors is None else effectors if type(regulators) == str: regulators = [regulators] if type(effectors) == str: effectors = [effectors] regulators = list(set(regulators_).intersection(regulators)) effectors = list(set(effectors_).intersection(effectors)) if len(regulators) == 0 or len(effectors) == 0: raise ValueError( f"Jacobian related to source genes {regulators} and target genes {effectors}" f"you provided are existed. Available source genes includes {regulators_} while " f"available target genes includes {effectors_}" ) n_source_targets = len(regulators) * len(effectors) targets, sources = ( np.repeat(effectors, len(regulators)), np.tile(regulators, len(effectors)), ) source_targets = [sources[i] + "->" + targets[i] for i in range(n_source_targets)] jacobian_mat = jacobian_mat[:, np.argsort(time)] if gene_order_method == "half_max_ordering": time, all, valid_ind, gene_idx = _half_max_ordering( jacobian_mat, time, mode=mode, interpolate=True, spaced_num=100 ) all, source_targets = ( all[np.isfinite(all.sum(1)), :], np.array(source_targets)[gene_idx][np.isfinite(all.sum(1))], ) df = pd.DataFrame(all, index=source_targets_) elif gene_order_method == "maximum": jacobian_mat = lowess_smoother(time, jacobian_mat, spaced_num=None, n_convolve=n_convolve) jacobian_mat = jacobian_mat[np.isfinite(jacobian_mat.sum(1)), :] if standard_scale is not None: exprs = (jacobian_mat - np.min(jacobian_mat, axis=standard_scale)[:, None]) / np.ptp( jacobian_mat, axis=standard_scale )[:, None] max_sort = np.argsort(np.argmax(exprs, axis=1)) df = pd.DataFrame( exprs[max_sort, :], index=np.array(source_targets_)[max_sort], columns=adata.obs_names, ) elif gene_order_method == "raw": jacobian_mat /= np.abs(jacobian_mat).max(1)[:, None] df = pd.DataFrame( jacobian_mat, index=np.array(source_targets_), columns=adata.obs_names, ) else: raise Exception("gene order_method can only be either half_max_ordering or maximum") heatmap_kwargs = dict( xticklabels=False, yticklabels=1, row_colors=None, col_colors=None, row_linkage=None, col_linkage=None, method="average", metric="euclidean", z_score=None, standard_scale=None, ) if kwargs is not None: heatmap_kwargs = update_dict(heatmap_kwargs, kwargs) sns_heatmap = sns.clustermap( df.loc[source_targets, :], col_cluster=cluster_row_col[0], row_cluster=cluster_row_col[1] if len(source_targets) > 2 else False, cmap=color_map, figsize=figsize, center=0, **heatmap_kwargs, ) if not show_colorbar: sns_heatmap.cax.set_visible(False) if save_show_or_return == "save_fig": s_kwargs = { "path": None, "prefix": "jacobian_kinetics", "dpi": None, "ext": "pdf", "transparent": True, "close": True, "verbose": True, } s_kwargs = update_dict(s_kwargs, save_kwargs) save_fig(**s_kwargs) elif save_show_or_return == "show": if show_colorbar: plt.subplots_adjust(right=0.85) plt.tight_layout() plt.show() elif save_show_or_return == "return": return sns_heatmap
[docs]@docstrings.with_indent(4) def sensitivity_kinetics( adata, basis="umap", regulators=None, effectors=None, mode="pseudotime", tkey="potential", color_map="bwr", gene_order_method="raw", show_colorbar=False, cluster_row_col=[False, True], figsize=(11.5, 6), standard_scale=1, n_convolve=30, save_show_or_return="show", save_kwargs={}, **kwargs, ): """Plot the Sensitivity dynamics over time (pseudotime or inferred real time) in a heatmap. Note that by default `potential` estimated with the diffusion graph built from reconstructed vector field will be used as the measure of pseudotime. Parameters ---------- adata: :class:`~anndata.AnnData` an Annodata object. basis: `str` The reduced dimension basis. regulators: `list` or `None` (default: `None`) The list of genes that will be used as regulators for plotting the Jacobian heatmap, only limited to genes that have already performed Jacobian analysis. effectors: `List` or `None` (default: `None`) The list of genes that will be used as targets for plotting the Jacobian heatmap, only limited to genes that have already performed Jacobian analysis. mode: `str` (default: `vector_field`) Which data mode will be used, either vector_field or pseudotime. if mode is vector_field, the trajectory predicted byvector field function will be used, otherwise pseudotime trajectory (defined by time argument) will be used. By default `potential` estimated with the diffusion graph built reconstructed vector field will be used as pseudotime. tkey: `str` (default: `potential`) The .obs column that will be used for timing each cell, only used when mode is not `vector_field`. color_map: `str` (default: `BrBG`) Color map that will be used to color the gene expression. If `half_max_ordering` is True, the color map need to be divergent, good examples, include `BrBG`, `RdBu_r` or `coolwarm`, etc. gene_order_method: `str` (default: `half_max_ordering`) [`half_max_ordering`, `maximum`] Supports two different methods for ordering genes when plotting the heatmap: either `half_max_ordering`, or `maximum`. For `half_max_ordering`, it will order genes into up, down and transit groups by the half max ordering algorithm (HA Pliner, et. al, Molecular cell 71 (5), 858-871. e8). While for `maximum`, it will order by the position of the highest gene expression. show_colorbar: `bool` (default: `False`) Whether to show the color bar. cluster_row_col: `[bool, bool]` (default: `[False, False]`) Whether to cluster the row or columns. figsize: `str` (default: `(11.5, 6)` Size of figure standard_scale: `int` (default: 1) Either 0 (rows, cells) or 1 (columns, genes). Whether or not to standardize that dimension, meaning for each row or column, subtract the minimum and divide each by its maximum. n_convolve: `int` (default: 30) Number of cells for convolution. save_show_or_return: {'show', 'save_fig', 'return'} (default: `show`) Whether to save_fig, show or return the figure. save_kwargs: `dict` (default: `{}`) A dictionary that will passed to the save_fig function. By default it is an empty dictionary and the save_fig function will use the {"path": None, "prefix": 'kinetic_curves', "dpi": None, "ext": 'pdf', "transparent": True, "close": True, "verbose": True} as its parameters. Otherwise you can provide a dictionary that properly modify those keys according to your needs. kwargs: All other keyword arguments are passed to heatmap(). Currently `xticklabels=False, yticklabels='auto'` is passed to heatmap() by default. Returns ------- Nothing but plots a heatmap that shows the element of Jacobian matrix dynamics over time (potential decreasing). Examples -------- >>> import dynamo as dyn >>> adata = dyn.sample_data.hgForebrainGlutamatergic() >>> adata = dyn.pp.recipe_monocle(adata) >>> dyn.tl.dynamics(adata) >>> dyn.vf.VectorField(adata, basis='pca') >>> valid_gene_list = adata[:, adata.var.use_for_transition].var.index[:2] >>> dyn.vf.sensitivity(adata, regulators=valid_gene_list[0], effectors=valid_gene_list[1]) >>> dyn.pl.sensitivity_kinetics(adata) """ import matplotlib.pyplot as plt import pandas as pd import seaborn as sns Sensitivity_ = "sensitivity" if basis is None else "sensitivity_" + basis Der, cell_indx, _, regulators_, effectors_ = ( adata.uns[Sensitivity_].get("sensitivity"), adata.uns[Sensitivity_].get("cell_idx"), adata.uns[Sensitivity_].get("sensitivity_gene"), adata.uns[Sensitivity_].get("regulators"), adata.uns[Sensitivity_].get("effectors"), ) if tkey == "potential" and "potential" not in adata.obs_keys(): ddhodge(adata) adata_ = adata[cell_indx, :] time = adata_.obs[tkey] sensitivity_mat = Der.reshape((-1, Der.shape[2])) if Der.ndim == 3 else Der[None, :] n_source_targets_ = Der.shape[0] * Der.shape[1] if Der.ndim == 3 else 1 targets_, sources_ = ( ( np.repeat(effectors_, Der.shape[1]), np.tile(regulators_, Der.shape[0]), ) if Der.ndim == 3 else ( np.repeat(effectors_, Der.shape[0]), np.repeat(effectors_, Der.shape[0]), ) ) source_targets_ = [sources_[i] + "->" + targets_[i] for i in range(n_source_targets_)] regulators = regulators_ if regulators is None else regulators effectors = effectors_ if effectors is None else effectors if type(regulators) == str: regulators = [regulators] if type(effectors) == str: effectors = [effectors] regulators = list(set(regulators_).intersection(regulators)) effectors = list(set(effectors_).intersection(effectors)) if len(regulators) == 0 or len(effectors) == 0: raise ValueError( f"Sensitivity related to source genes {regulators} and target genes {effectors}" f"you provided are existed. Available source genes includes {regulators_} while " f"available target genes includes {effectors_}" ) n_source_targets = len(regulators) * len(effectors) targets, sources = ( np.repeat(effectors, len(regulators)), np.tile(regulators, len(effectors)), ) source_targets = [sources[i] + "->" + targets[i] for i in range(n_source_targets)] sensitivity_mat = sensitivity_mat[:, np.argsort(time)] if gene_order_method == "half_max_ordering": time, all, valid_ind, gene_idx = _half_max_ordering( sensitivity_mat, time, mode=mode, interpolate=True, spaced_num=100 ) all, source_targets = ( all[np.isfinite(all.sum(1)), :], np.array(source_targets)[gene_idx][np.isfinite(all.sum(1))], ) df = pd.DataFrame(all, index=source_targets_) elif gene_order_method == "maximum": sensitivity_mat = lowess_smoother(time, sensitivity_mat, spaced_num=None, n_convolve=n_convolve) sensitivity_mat = sensitivity_mat[np.isfinite(sensitivity_mat.sum(1)), :] if standard_scale is not None: exprs = (sensitivity_mat - np.min(sensitivity_mat, axis=standard_scale)[:, None]) / np.ptp( sensitivity_mat, axis=standard_scale )[:, None] max_sort = np.argsort(np.argmax(exprs, axis=1)) df = pd.DataFrame( exprs[max_sort, :], index=np.array(source_targets_)[max_sort], columns=adata.obs_names, ) elif gene_order_method == "raw": sensitivity_mat /= np.abs(sensitivity_mat).max(1)[:, None] df = pd.DataFrame( sensitivity_mat, index=np.array(source_targets_), columns=adata.obs_names, ) else: raise Exception("gene order_method can only be either half_max_ordering or maximum") heatmap_kwargs = dict( xticklabels=False, yticklabels=1, row_colors=None, col_colors=None, row_linkage=None, col_linkage=None, method="average", metric="euclidean", z_score=None, standard_scale=None, ) if kwargs is not None: heatmap_kwargs = update_dict(heatmap_kwargs, kwargs) sns_heatmap = sns.clustermap( df.loc[source_targets, :], col_cluster=cluster_row_col[0], row_cluster=cluster_row_col[1] if len(source_targets) > 2 else False, cmap=color_map, figsize=figsize, center=0, **heatmap_kwargs, ) if not show_colorbar: sns_heatmap.cax.set_visible(False) if save_show_or_return == "save_fig": s_kwargs = { "path": None, "prefix": "sensitivity_kinetics", "dpi": None, "ext": "pdf", "transparent": True, "close": True, "verbose": True, } s_kwargs = update_dict(s_kwargs, save_kwargs) save_fig(**s_kwargs) elif save_show_or_return == "show": if show_colorbar: plt.subplots_adjust(right=0.85) plt.tight_layout() plt.show() elif save_show_or_return == "return": return sns_heatmap