from typing import List, Optional, Union
import matplotlib
import numpy as np
import pandas as pd
from anndata import AnnData
# from scipy.sparse import issparse
from matplotlib import cm
from matplotlib.axes import Axes
from matplotlib.figure import Figure
from scipy.sparse import spmatrix
from sklearn.preprocessing import normalize
from ..dynamo_logger import main_debug, main_info
from ..tools.cell_velocities import cell_velocities
from ..tools.dimension_reduction import reduceDimension
from ..tools.Markov import (
grid_velocity_filter,
prepare_velocity_grid_data,
velocity_on_grid,
)
from ..tools.utils import update_dict
from ..vectorfield.topography import VectorField
from ..vectorfield.utils import vecfld_from_adata
from .scatters import docstrings, scatters
from .utils import (
_get_adata_color_vec,
default_quiver_args,
quiver_autoscaler,
save_fig,
set_arrow_alpha,
set_stream_line_alpha,
)
docstrings.delete_params("scatters.parameters", "show_legend", "kwargs", "save_kwargs")
# import scipy as sc
# from licpy.lic import runlic
# moran'I on the transition genes, etc.
# cellranger data, velocyto, comparison and phase diagram
[docs]def cell_wise_vectors_3d(
adata: AnnData,
basis: str = "umap",
x: int = 0,
y: int = 1,
z: int = 2,
ekey: str = None,
vkey: str = "velocity_S",
X: Union[np.array, spmatrix] = None,
V: Union[np.array, spmatrix] = None,
color: Union[str, List[str]] = None,
layer: str = "X",
background: Optional[str] = "white",
ncols: int = 4,
figsize: tuple = (6, 4),
ax: Optional[Axes] = None,
inverse: True = False,
cell_inds: str = "all",
vector: str = "velocity",
save_show_or_return: str = "show",
save_kwargs: dict = {},
quiver_3d_kwargs: dict = {
"zorder": 3,
"length": 2,
"linewidth": 5,
"arrow_length_ratio": 5,
"norm": cm.colors.Normalize(),
"cmap": cm.PRGn,
},
grid_color: Optional[str] = None,
axis_label_prefix: Optional[str] = None,
axis_labels: Optional[list] = None,
elev: float = None,
azim: float = None,
alpha: Optional[float] = None,
show_magnitude=False,
titles: list = None,
**cell_wise_kwargs,
):
"""Plot the velocity or acceleration vector of each cell.
Parameters
----------
%(scatters.parameters.no_show_legend|kwargs|save_kwargs)s
ekey: `str` (default: "M_s")
The expression key
vkey: `str` (default: "velocity_S")
The velocity key
inverse: `bool` (default: False)
Whether to inverse the direction of the velocity vectors.
cell_inds: `str` or `list` (default: all)
the cell index that will be chosen to draw velocity vectors. Can be a list of integers (cell indices) or str
(Cell names).
quiver_size: `float` or None (default: None)
The size of quiver. If None, we will use set quiver_size to be 1. Note that quiver quiver_size is used to
calculate the head_width (10 x quiver_size), head_length (12 x quiver_size) and headaxislength (8 x
quiver_size) of the quiver. This is done via the `default_quiver_args` function which also calculate the
scale of the quiver (1 / quiver_length).
quiver_length: `float` or None (default: None)
The length of quiver. The quiver length which will be used to calculate scale of quiver. Note that befoe
applying `default_quiver_args` velocity values are first rescaled via the quiver_autoscaler function. Scale
of quiver indicates the nuumber of data units per arrow length unit, e.g., m/s per plot width; a smaller
scale parameter makes the arrow longer.
vector: `str` (default: `velocity`)
Which vector type will be used for plotting, one of {'velocity', 'acceleration'} or either velocity field or
acceleration field will be plotted.
frontier: `bool` (default: `False`)
Whether to add the frontier. Scatter plots can be enhanced by using transparency (alpha) in order to show
area of high density and multiple scatter plots can be used to delineate a frontier. See matplotlib tips &
tricks cheatsheet (https://github.com/matplotlib/cheatsheets). Originally inspired by figures from scEU-seq
paper: https://science.sciencemag.org/content/367/6482/1151.
save_kwargs: `dict` (default: `{}`)
A dictionary that will passed to the save_fig function. By default it is an empty dictionary and the
save_fig function will use the {"path": None, "prefix": 'cell_wise_velocity', "dpi": None, "ext": 'pdf',
"transparent": True, "close": True, "verbose": True} as its parameters. Otherwise you can provide a
dictionary that properly modify those keys according to your needs.
s_kwargs_dict: `dict` (default: {})
The dictionary of the scatter arguments.
cell_wise_kwargs:
Additional parameters that will be passed to plt.quiver function
Returns
-------
Nothing but a cell wise quiver plot.
"""
import matplotlib.pyplot as plt
from matplotlib import rcParams
from matplotlib.colors import to_hex
def add_axis_label(ax, labels):
ax.set_xlabel(labels[0])
ax.set_ylabel(labels[1])
ax.set_zlabel(labels[2])
projection_dim_indexer = [x, y, z]
# ensure axis_label prefix is not None
if ekey is not None and axis_label_prefix is None:
axis_label_prefix = ekey
elif axis_label_prefix is None:
axis_label_prefix = "dim"
# ensure axis_labels is not None
if axis_labels is None:
axis_labels = [axis_label_prefix + "_" + str(index) for index in projection_dim_indexer]
if type(color) is str:
color = [color]
if titles is None:
titles = color
assert len(color) == len(titles), "#titles does not match #color."
if grid_color:
plt.rcParams["grid.color"] = grid_color
if alpha:
quiver_3d_kwargs = dict(quiver_3d_kwargs)
quiver_3d_kwargs["alpha"] = alpha
if X is not None and V is not None:
X = X[:, [x, y, z]]
V = V[:, [x, y, z]]
elif type(x) == str and type(y) == str and type(z) == str:
if len(adata.var_names[adata.var.use_for_dynamics].intersection([x, y, z])) != 3:
raise ValueError(
"If you want to plot the vector flow of three genes, please make sure those three genes "
"belongs to dynamics genes or .var.use_for_dynamics is True."
)
X = adata[:, projection_dim_indexer].layers[ekey].A
V = adata[:, projection_dim_indexer].layers[vkey].A
layer = ekey
else:
if ("X_" + basis in adata.obsm.keys()) and (vector + "_" + basis in adata.obsm.keys()):
X = adata.obsm["X_" + basis][:, projection_dim_indexer]
V = adata.obsm[vector + "_" + basis][:, projection_dim_indexer]
else:
if "X_" + basis not in adata.obsm.keys():
layer, basis = basis.split("_")
reduceDimension(adata, layer=layer, reduction_method=basis)
if "kmc" not in adata.uns_keys():
cell_velocities(adata, vkey="velocity_S", basis=basis)
X = adata.obsm["X_" + basis][:, projection_dim_indexer]
V = adata.obsm[vector + "_" + basis][:, projection_dim_indexer]
else:
kmc = adata.uns["kmc"]
X = adata.obsm["X_" + basis][:, projection_dim_indexer]
V = kmc.compute_density_corrected_drift(X, kmc.Idx, normalize_vector=True)
adata.obsm[vector + "_" + basis] = V
X, V = X.copy(), V.copy()
if not show_magnitude:
X = normalize(X, axis=0, norm="max")
V = normalize(V, axis=0, norm="l2")
V = normalize(V, axis=1, norm="l2")
V /= 3 * quiver_autoscaler(X, V)
if inverse:
V = -V
main_info("X shape: " + str(X.shape) + " V shape: " + str(V.shape))
df = pd.DataFrame({"x": X[:, 0], "y": X[:, 1], "z": X[:, 2], "u": V[:, 0], "v": V[:, 1], "w": V[:, 2]})
if cell_inds == "all":
ix_choice = np.arange(adata.shape[0])
elif cell_inds == "random":
ix_choice = np.random.choice(np.range(adata.shape[0]), size=1000, replace=False)
elif type(cell_inds) is int:
ix_choice = np.random.choice(np.range(adata.shape[0]), size=cell_inds, replace=False)
elif type(cell_inds) is list:
if type(cell_inds[0]) is str:
cell_inds = [adata.obs_names.to_list().index(i) for i in cell_inds]
ix_choice = cell_inds
else:
ix_choice = np.arange(adata.shape[0])
df = df.iloc[ix_choice, :]
if background is None:
_background = rcParams.get("figure.facecolor")
background = to_hex(_background) if type(_background) is tuple else _background
# single axis output
x0, x1, x2 = df.iloc[:, 0], df.iloc[:, 1], df.iloc[:, 2]
v0, v1, v2 = df.iloc[:, 3], df.iloc[:, 4], df.iloc[:, 5]
nrows = len(color) // ncols
if nrows * ncols < len(color):
nrows += 1
ncols = min(ncols, len(color))
figure, axes = plt.subplots(nrows, ncols, figsize=figsize, subplot_kw=dict(projection="3d"))
axes = np.array(axes)
axes_flatten = axes.flatten()
for i in range(len(color)):
ax = axes_flatten[i]
ax.set_title(color[i])
norm = quiver_3d_kwargs["norm"]
cmap = quiver_3d_kwargs["cmap"]
color_vec = _get_adata_color_vec(adata, layer=layer, col=color[i])
assert len(color_vec) > 0, "color vector or data vector size is 0"
# convet categorical string data colors to labels
if type(color_vec[0]) is str:
unique_vals, color_vec = np.unique(color_vec, return_inverse=True)
color_vec = cmap(norm(color_vec))
# TODO due to matplotlib quiver3 impl, we need to add colors for arrow head segments
# TODO if matplotlib changes its detailed impl, we may not need the following line
color_vec = list(color_vec) + [element for element in list(color_vec) for _ in range(2)]
# color_vec = matplotlib.colors.to_rgba(color_vec, alpha=alpha)
main_debug("color vec len: " + str(len(color_vec)))
ax.view_init(elev=elev, azim=azim)
ax.quiver(
x0,
x1,
x2,
v0,
v1,
v2,
color=color_vec,
# facecolors=color_vec,
**quiver_3d_kwargs,
)
ax.set_title(titles[i])
ax.set_facecolor(background)
add_axis_label(ax, axis_labels)
if save_show_or_return == "save":
s_kwargs = {
"path": None,
"prefix": "cell_wise_vectors_3d",
"dpi": None,
"ext": "pdf",
"transparent": True,
"close": True,
"verbose": True,
}
s_kwargs = update_dict(s_kwargs, save_kwargs)
save_fig(**s_kwargs)
elif save_show_or_return == "show":
plt.show()
elif save_show_or_return == "return":
return axes
def grid_vectors_3d():
pass
# def velocity(adata, type) # type can be either one of the three, cellwise, velocity on grid, streamline plot.
# """
#
# """
#
def plot_LIC_gray(tex):
"""GET_P estimates the posterior probability and part of the energy.
Arguments
---------
Y: 'np.ndarray'
Original data.
V: 'np.ndarray'
Original data.
sigma2: 'float'
sigma2 is defined as sum(sum((Y - V)**2)) / (N * D)
gamma: 'float'
Percentage of inliers in the samples. This is an inital value for EM iteration, and it is not important.
a: 'float'
Paramerter of the model of outliers. We assume the outliers obey uniform distribution, and the volume of
outlier's variation space is a.
Returns
-------
P: 'np.ndarray'
Posterior probability, related to equation 27.
E: `np.ndarray'
Energy, related to equation 26.
"""
import matplotlib.pyplot as plt
tex = tex[:, ::-1]
tex = tex.T
M, N = tex.shape
texture = np.empty((M, N, 4), np.float32)
texture[:, :, 0] = tex
texture[:, :, 1] = tex
texture[:, :, 2] = tex
texture[:, :, 3] = 1
# texture = scipy.ndimage.rotate(texture,-90)
plt.figure()
plt.imshow(texture)
[docs]def line_integral_conv(
adata: AnnData,
basis: str = "umap",
U_grid: Optional[np.ndarray] = None,
V_grid: Optional[np.ndarray] = None,
xy_grid_nums: Union[tuple, list] = [50, 50],
method: str = "yt",
cmap: str = "viridis",
normalize: bool = False,
density: float = 1,
lim=(0, 1),
const_alpha: bool = False,
kernellen: float = 100,
V_threshold: Optional[float] = None,
vector: str = "velocity",
file=None,
save_show_or_return: str = "show",
save_kwargs: dict = {},
g_kwargs_dict: dict = {},
):
"""Visualize vector field with quiver, streamline and line integral convolution (LIC), using velocity estimates on a
grid from the associated data. A white noise background will be used for texture as default. Adjust the bounds of
lim in the range of [0, 1] which applies upper and lower bounds to the values of line integral convolution and
enhance the visibility of plots. When const_alpha=False, alpha will be weighted spatially by the values of line
integral convolution; otherwise a constant value of the given alpha is used.
Arguments
---------
adata: :class:`~anndata.AnnData`
AnnData object that contains U_grid and V_grid data
basis: `str` (default: trimap)
The dimension reduction method to use.
U_grid: 'np.ndarray' (default: None)
Original velocity on the first dimension of a 2 d grid.
V_grid: 'np.ndarray' (default: None)
Original velocity on the second dimension of a 2 d grid.
xy_grid_nums: `tuple` (default: (50, 50))
the number of grids in either x or y axis. The number of grids has to be the same on both dimensions.
method: 'float'
sigma2 is defined as sum(sum((Y - V)**2)) / (N * D)
cmap: 'float'
Percentage of inliers in the samples. This is an inital value for EM iteration, and it is not important.
normalize: 'float'
Paramerter of the model of outliers. We assume the outliers obey uniform distribution, and the volume of
outlier's variation space is a.
density: 'float'
Paramerter of the model of outliers. We assume the outliers obey uniform distribution, and the volume of
outlier's variation space is a.
lim: 'float'
Paramerter of the model of outliers. We assume the outliers obey uniform distribution, and the volume of
outlier's variation space is a.
const_alpha: 'float'
Paramerter of the model of outliers. We assume the outliers obey uniform distribution, and the volume of
outlier's variation space is a.
kernellen: 'float'
Paramerter of the model of outliers. We assume the outliers obey uniform distribution, and the volume of
outlier's variation space is a.
V_threshold: `float` or `None` (default: None)
The threshold of velocity value for visualization
vector: `str` (default: `velocity`)
Which vector type will be used for plotting, one of {'velocity', 'acceleration'} or either velocity field or
acceleration field will be plotted.
save_show_or_return: {'show', 'save', 'return'} (default: `show`)
Whether to save, show or return the figure.
save_kwargs: `dict` (default: `{}`)
A dictionary that will passed to the save_fig function. By default it is an empty dictionary and the
save_fig function will use the {"path": None, "prefix": 'line_integral_conv', "dpi": None, "ext": 'pdf',
"transparent": True, "close": True, "verbose": True} as its parameters. Otherwise you can provide a
dictionary that properly modify those keys according to your needs.
Returns
-------
Nothing, but plot the vector field with quiver, streamline and line integral convolution (LIC).
"""
import matplotlib.pyplot as plt
X = adata.obsm["X_" + basis][:, :2] if "X_" + basis in adata.obsm.keys() else None
V = adata.obsm[vector + "_" + basis][:, :2] if vector + "_" + basis in adata.obsm.keys() else None
if X is None:
raise Exception(f"The {basis} dimension reduction is not performed over your data yet.")
if V is None:
raise Exception(f"The {basis}_velocity velocity (or velocity) result does not existed in your data.")
if U_grid is None or V_grid is None:
if "VecFld_" + basis in adata.uns.keys():
# first check whether the sparseVFC reconstructed vector field exists
X_grid_, V_grid = (
adata.uns["VecFld_" + basis]["VecFld"]["grid"],
adata.uns["VecFld_" + basis]["VecFld"]["grid_V"],
)
N = int(np.sqrt(V_grid.shape[0]))
U_grid = np.reshape(V_grid[:, 0], (N, N)).T
V_grid = np.reshape(V_grid[:, 1], (N, N)).T
elif "grid_velocity_" + basis in adata.uns.keys():
# then check whether the Gaussian Kernel vector field exists
X_grid_, V_grid_, _ = (
adata.uns["grid_velocity_" + basis]["X_grid"],
adata.uns["grid_velocity_" + basis]["V_grid"],
adata.uns["grid_velocity_" + basis]["D"],
)
U_grid = V_grid_[0, :, :].T
V_grid = V_grid_[1, :, :].T
else:
# if no VF or Gaussian Kernel vector fields, recreate it
grid_kwargs_dict = {
"density": None,
"smooth": None,
"n_neighbors": None,
"min_mass": None,
"autoscale": False,
"adjust_for_stream": True,
"V_threshold": None,
}
grid_kwargs_dict.update(g_kwargs_dict)
X_grid_, V_grid_, _ = velocity_on_grid(X[:, [0, 1]], V[:, [0, 1]], xy_grid_nums, **grid_kwargs_dict)
U_grid = V_grid_[0, :, :].T
V_grid = V_grid_[1, :, :].T
if V_threshold is not None:
mass = np.sqrt((V_grid**2).sum(0))
if V_threshold is not None:
V_grid[0][mass.reshape(V_grid[0].shape) < V_threshold] = np.nan
if method == "yt":
try:
import yt
except ImportError:
print(
"Please first install yt package to use the line integral convolution plot method. "
"Install instruction is provided here: https://yt-project.org/"
)
velocity_x_ori, velocity_y_ori, velocity_z_ori = (
U_grid,
V_grid,
np.zeros(U_grid.shape),
)
velocity_x = np.repeat(velocity_x_ori[:, :, np.newaxis], V_grid.shape[1], axis=2)
velocity_y = np.repeat(velocity_y_ori[:, :, np.newaxis], V_grid.shape[1], axis=2)
velocity_z = np.repeat(velocity_z_ori[np.newaxis, :, :], V_grid.shape[1], axis=0)
data = {}
data["velocity_x"] = (velocity_x, "km/s")
data["velocity_y"] = (velocity_y, "km/s")
data["velocity_z"] = (velocity_z, "km/s")
data["velocity_sum"] = (
np.sqrt(velocity_x**2 + velocity_y**2),
"km/s",
)
ds = yt.load_uniform_grid(data, data["velocity_x"][0].shape, length_unit=(1.0, "Mpc"))
slc = yt.SlicePlot(ds, "z", ["velocity_sum"])
slc.set_cmap("velocity_sum", cmap)
slc.set_log("velocity_sum", False)
slc.annotate_velocity(normalize=normalize)
slc.annotate_streamlines("velocity_x", "velocity_y", density=density)
slc.annotate_line_integral_convolution(
"velocity_x",
"velocity_y",
lim=lim,
const_alpha=const_alpha,
kernellen=kernellen,
)
slc.set_xlabel(basis + "_1")
slc.set_ylabel(basis + "_2")
slc.show()
if file is not None:
# plt.rc('font', family='serif', serif='Times')
# plt.rc('text', usetex=True)
# plt.rc('xtick', labelsize=8)
# plt.rc('ytick', labelsize=8)
# plt.rc('axes', labelsize=8)
slc.save(file, mpl_kwargs={"figsize": [2, 2]})
elif method == "lic":
# velocyto_tex = runlic(V_grid, V_grid, 100)
# plot_LIC_gray(velocyto_tex)
pass
if save_show_or_return == "save":
s_kwargs = {
"path": None,
"prefix": "line_integral_conv",
"dpi": None,
"ext": "pdf",
"transparent": True,
"close": True,
"verbose": True,
}
s_kwargs = update_dict(s_kwargs, save_kwargs)
save_fig(**s_kwargs)
elif save_show_or_return == "show":
plt.tight_layout()
plt.show()
elif save_show_or_return == "return":
return slc
@docstrings.with_indent(4)
def cell_wise_vectors(
adata: AnnData,
basis: str = "umap",
x: int = 0,
y: int = 1,
z: int = 2,
ekey: str = "M_s",
vkey: str = "velocity_S",
color: Union[str, List[str]] = "ntr",
layer: str = "X",
highlights: Optional[list] = None,
labels: Optional[list] = None,
values: Optional[list] = None,
theme: Optional[str] = None,
cmap: Optional[str] = None,
color_key: Union[dict, list] = None,
color_key_cmap: Optional[str] = None,
background: Optional[str] = "white",
ncols: int = 4,
pointsize: Union[None, float] = None,
figsize: tuple = (6, 4),
show_legend="on data",
use_smoothed: bool = True,
ax: Optional[Axes] = None,
sort: str = "raw",
aggregate: Optional[str] = None,
show_arrowed_spines: bool = False,
inverse: True = False,
cell_inds: str = "all",
quiver_size: Optional[float] = 1,
quiver_length: Optional[float] = None,
vector: str = "velocity",
frontier: bool = False,
save_show_or_return: str = "show",
save_kwargs: dict = {},
s_kwargs_dict: dict = {},
projection: str = "2d",
**cell_wise_kwargs,
):
"""Plot the velocity or acceleration vector of each cell.
Parameters
----------
%(scatters.parameters.no_show_legend|kwargs|save_kwargs)s
ekey: `str` (default: "M_s")
The expression key
vkey: `str` (default: "velocity_S")
The velocity key
inverse: `bool` (default: False)
Whether to inverse the direction of the velocity vectors.
cell_inds: `str` or `list` (default: all)
the cell index that will be chosen to draw velocity vectors. Can be a list of integers (cell indices) or str
(Cell names).
quiver_size: `float` or None (default: None)
The size of quiver. If None, we will use set quiver_size to be 1. Note that quiver quiver_size is used to
calculate the head_width (10 x quiver_size), head_length (12 x quiver_size) and headaxislength (8 x
quiver_size) of the quiver. This is done via the `default_quiver_args` function which also calculate the
scale of the quiver (1 / quiver_length).
quiver_length: `float` or None (default: None)
The length of quiver. The quiver length which will be used to calculate scale of quiver. Note that befoe
applying `default_quiver_args` velocity values are first rescaled via the quiver_autoscaler function. Scale
of quiver indicates the nuumber of data units per arrow length unit, e.g., m/s per plot width; a smaller
scale parameter makes the arrow longer.
vector: `str` (default: `velocity`)
Which vector type will be used for plotting, one of {'velocity', 'acceleration'} or either velocity field or
acceleration field will be plotted.
frontier: `bool` (default: `False`)
Whether to add the frontier. Scatter plots can be enhanced by using transparency (alpha) in order to show
area of high density and multiple scatter plots can be used to delineate a frontier. See matplotlib tips &
tricks cheatsheet (https://github.com/matplotlib/cheatsheets). Originally inspired by figures from scEU-seq
paper: https://science.sciencemag.org/content/367/6482/1151.
save_kwargs: `dict` (default: `{}`)
A dictionary that will passed to the save_fig function. By default it is an empty dictionary and the
save_fig function will use the {"path": None, "prefix": 'cell_wise_velocity', "dpi": None, "ext": 'pdf',
"transparent": True, "close": True, "verbose": True} as its parameters. Otherwise you can provide a
dictionary that properly modify those keys according to your needs.
s_kwargs_dict: `dict` (default: {})
The dictionary of the scatter arguments.
cell_wise_kwargs:
Additional parameters that will be passed to plt.quiver function
Returns
-------
Nothing but a cell wise quiver plot.
"""
import matplotlib.pyplot as plt
from matplotlib import rcParams
from matplotlib.colors import to_hex
if projection == "2d":
projection_dim_indexer = [x, y]
elif projection == "3d":
projection_dim_indexer = [x, y, z]
if type(x) == str and type(y) == str:
if len(adata.var_names[adata.var.use_for_dynamics].intersection([x, y])) != 2:
raise ValueError(
"If you want to plot the vector flow of two genes, please make sure those two genes "
"belongs to dynamics genes or .var.use_for_dynamics is True."
)
X = adata[:, projection_dim_indexer].layers[ekey].A
V = adata[:, projection_dim_indexer].layers[vkey].A
layer = ekey
else:
if ("X_" + basis in adata.obsm.keys()) and (vector + "_" + basis in adata.obsm.keys()):
X = adata.obsm["X_" + basis][:, projection_dim_indexer]
V = adata.obsm[vector + "_" + basis][:, projection_dim_indexer]
else:
if "X_" + basis not in adata.obsm.keys():
layer, basis = basis.split("_")
reduceDimension(adata, layer=layer, reduction_method=basis)
if "kmc" not in adata.uns_keys():
cell_velocities(adata, vkey="velocity_S", basis=basis)
X = adata.obsm["X_" + basis][:, projection_dim_indexer]
V = adata.obsm[vector + "_" + basis][:, projection_dim_indexer]
else:
kmc = adata.uns["kmc"]
X = adata.obsm["X_" + basis][:, projection_dim_indexer]
V = kmc.compute_density_corrected_drift(X, kmc.Idx, normalize_vector=True)
adata.obsm[vector + "_" + basis] = V
X, V = X.copy(), V.copy()
V /= 3 * quiver_autoscaler(X, V)
if inverse:
V = -V
df = None
main_info("X shape: " + str(X.shape) + " V shape: " + str(V.shape))
if projection == "2d":
df = pd.DataFrame({"x": X[:, 0], "y": X[:, 1], "u": V[:, 0], "v": V[:, 1]})
elif projection == "3d":
df = pd.DataFrame({"x": X[:, 0], "y": X[:, 1], "z": X[:, 2], "u": V[:, 0], "v": V[:, 1], "w": V[:, 2]})
else:
raise NotImplementedError
if cell_inds == "all":
ix_choice = np.arange(adata.shape[0])
elif cell_inds == "random":
ix_choice = np.random.choice(np.range(adata.shape[0]), size=1000, replace=False)
elif type(cell_inds) is int:
ix_choice = np.random.choice(np.range(adata.shape[0]), size=cell_inds, replace=False)
elif type(cell_inds) is list:
if type(cell_inds[0]) is str:
cell_inds = [adata.obs_names.to_list().index(i) for i in cell_inds]
ix_choice = cell_inds
df = df.iloc[ix_choice, :]
if background is None:
_background = rcParams.get("figure.facecolor")
background = to_hex(_background) if type(_background) is tuple else _background
if quiver_size is None:
quiver_size = 1
if background == "black":
edgecolors = "white"
else:
edgecolors = "black"
head_w, head_l, ax_l, scale = default_quiver_args(quiver_size, quiver_length) #
quiver_kwargs = {
"angles": "xy",
"scale": scale,
"scale_units": "xy",
"width": 0.0005,
"headwidth": head_w,
"headlength": head_l,
"headaxislength": ax_l,
"minshaft": 1,
"minlength": 1,
"pivot": "tail",
"linewidth": 0.1,
"edgecolors": edgecolors,
"alpha": 1,
"zorder": 10,
}
quiver_kwargs = update_dict(quiver_kwargs, cell_wise_kwargs)
quiver_3d_kwargs = {"arrow_length_ratio": scale}
axes_list, color_list, _ = scatters(
adata=adata,
basis=basis,
x=x,
y=y,
z=z,
color=color,
layer=layer,
highlights=highlights,
labels=labels,
values=values,
theme=theme,
cmap=cmap,
color_key=color_key,
color_key_cmap=color_key_cmap,
background=background,
ncols=ncols,
pointsize=pointsize,
figsize=figsize,
show_legend=show_legend,
use_smoothed=use_smoothed,
aggregate=aggregate,
show_arrowed_spines=show_arrowed_spines,
ax=ax,
sort=sort,
save_show_or_return="return",
frontier=frontier,
projection=projection,
**s_kwargs_dict,
return_all=True,
)
# single axis output
if type(axes_list) != list:
axes_list = [axes_list]
x0, x1 = df.iloc[:, 0], df.iloc[:, 1]
v0, v1 = df.iloc[:, 2], df.iloc[:, 3]
if projection == "3d":
x0, x1, x2 = df.iloc[:, 0], df.iloc[:, 1], df.iloc[:, 2]
v0, v1, v2 = df.iloc[:, 3], df.iloc[:, 4], df.iloc[:, 5]
for i in range(len(axes_list)):
ax = axes_list[i]
if projection == "2d":
ax.quiver(
x0,
x1,
v0,
v1,
color=color_list[i],
facecolors=color_list[i],
**quiver_kwargs,
)
elif projection == "3d":
ax.quiver(
x0,
x1,
x2,
v0,
v1,
v2,
# color=color_list[i],
# facecolors=color_list[i],
**quiver_3d_kwargs,
)
ax.set_facecolor(background)
if save_show_or_return == "save":
s_kwargs = {
"path": None,
"prefix": "cell_wise_vector",
"dpi": None,
"ext": "pdf",
"transparent": True,
"close": True,
"verbose": True,
}
s_kwargs = update_dict(s_kwargs, save_kwargs)
save_fig(**s_kwargs)
elif save_show_or_return == "show":
if projection != "3d":
plt.tight_layout()
plt.show()
elif save_show_or_return == "return":
return axes_list
[docs]@docstrings.with_indent(4)
def cell_wise_vectors(
adata: AnnData,
basis: str = "umap",
x: int = 0,
y: int = 1,
z: int = 2,
ekey: str = "M_s",
vkey: str = "velocity_S",
color: Union[str, List[str]] = "ntr",
layer: str = "X",
highlights: Optional[list] = None,
labels: Optional[list] = None,
values: Optional[list] = None,
theme: Optional[str] = None,
cmap: Optional[str] = None,
color_key: Union[dict, list] = None,
color_key_cmap: Optional[str] = None,
background: Optional[str] = "white",
ncols: int = 4,
pointsize: Union[None, float] = None,
figsize: tuple = (6, 4),
show_legend="on data",
use_smoothed: bool = True,
ax: Optional[Axes] = None,
sort: str = "raw",
aggregate: Optional[str] = None,
show_arrowed_spines: bool = False,
inverse: True = False,
cell_inds: str = "all",
quiver_size: Optional[float] = 1,
quiver_length: Optional[float] = None,
vector: str = "velocity",
frontier: bool = False,
save_show_or_return: str = "show",
save_kwargs: dict = {},
s_kwargs_dict: dict = {},
projection: str = "2d",
**cell_wise_kwargs,
):
"""Plot the velocity or acceleration vector of each cell.
Parameters
----------
%(scatters.parameters.no_show_legend|kwargs|save_kwargs)s
ekey: `str` (default: "M_s")
The expression key
vkey: `str` (default: "velocity_S")
The velocity key
inverse: `bool` (default: False)
Whether to inverse the direction of the velocity vectors.
cell_inds: `str` or `list` (default: all)
the cell index that will be chosen to draw velocity vectors. Can be a list of integers (cell indices) or str
(Cell names).
quiver_size: `float` or None (default: None)
The size of quiver. If None, we will use set quiver_size to be 1. Note that quiver quiver_size is used to
calculate the head_width (10 x quiver_size), head_length (12 x quiver_size) and headaxislength (8 x
quiver_size) of the quiver. This is done via the `default_quiver_args` function which also calculate the
scale of the quiver (1 / quiver_length).
quiver_length: `float` or None (default: None)
The length of quiver. The quiver length which will be used to calculate scale of quiver. Note that befoe
applying `default_quiver_args` velocity values are first rescaled via the quiver_autoscaler function. Scale
of quiver indicates the nuumber of data units per arrow length unit, e.g., m/s per plot width; a smaller
scale parameter makes the arrow longer.
vector: `str` (default: `velocity`)
Which vector type will be used for plotting, one of {'velocity', 'acceleration'} or either velocity field or
acceleration field will be plotted.
frontier: `bool` (default: `False`)
Whether to add the frontier. Scatter plots can be enhanced by using transparency (alpha) in order to show
area of high density and multiple scatter plots can be used to delineate a frontier. See matplotlib tips &
tricks cheatsheet (https://github.com/matplotlib/cheatsheets). Originally inspired by figures from scEU-seq
paper: https://science.sciencemag.org/content/367/6482/1151.
save_kwargs: `dict` (default: `{}`)
A dictionary that will passed to the save_fig function. By default it is an empty dictionary and the
save_fig function will use the {"path": None, "prefix": 'cell_wise_velocity', "dpi": None, "ext": 'pdf',
"transparent": True, "close": True, "verbose": True} as its parameters. Otherwise you can provide a
dictionary that properly modify those keys according to your needs.
s_kwargs_dict: `dict` (default: {})
The dictionary of the scatter arguments.
cell_wise_kwargs:
Additional parameters that will be passed to plt.quiver function
Returns
-------
Nothing but a cell wise quiver plot.
"""
import matplotlib.pyplot as plt
from matplotlib import rcParams
from matplotlib.colors import to_hex
if projection == "2d":
projection_dim_indexer = [x, y]
elif projection == "3d":
projection_dim_indexer = [x, y, z]
else:
projection_dim_indexer = [x, y]
if type(x) == str and type(y) == str:
if len(adata.var_names[adata.var.use_for_dynamics].intersection([x, y])) != 2:
raise ValueError(
"If you want to plot the vector flow of two genes, please make sure those two genes "
"belongs to dynamics genes or .var.use_for_dynamics is True."
)
X = adata[:, projection_dim_indexer].layers[ekey].A
V = adata[:, projection_dim_indexer].layers[vkey].A
layer = ekey
else:
if ("X_" + basis in adata.obsm.keys()) and (vector + "_" + basis in adata.obsm.keys()):
X = adata.obsm["X_" + basis][:, projection_dim_indexer]
V = adata.obsm[vector + "_" + basis][:, projection_dim_indexer]
else:
if "X_" + basis not in adata.obsm.keys():
layer, basis = basis.split("_")
reduceDimension(adata, layer=layer, reduction_method=basis)
if "kmc" not in adata.uns_keys():
cell_velocities(adata, vkey="velocity_S", basis=basis)
X = adata.obsm["X_" + basis][:, projection_dim_indexer]
V = adata.obsm[vector + "_" + basis][:, projection_dim_indexer]
else:
kmc = adata.uns["kmc"]
X = adata.obsm["X_" + basis][:, projection_dim_indexer]
V = kmc.compute_density_corrected_drift(X, kmc.Idx, normalize_vector=True)
adata.obsm[vector + "_" + basis] = V
X, V = X.copy(), V.copy()
V /= 3 * quiver_autoscaler(X, V)
if inverse:
V = -V
df = None
main_info("X shape: " + str(X.shape) + " V shape: " + str(V.shape))
if projection == "2d":
df = pd.DataFrame({"x": X[:, 0], "y": X[:, 1], "u": V[:, 0], "v": V[:, 1]})
elif projection == "3d":
df = pd.DataFrame({"x": X[:, 0], "y": X[:, 1], "z": X[:, 2], "u": V[:, 0], "v": V[:, 1], "w": V[:, 2]})
else:
raise NotImplementedError
if cell_inds == "all":
ix_choice = np.arange(adata.shape[0])
elif cell_inds == "random":
ix_choice = np.random.choice(np.range(adata.shape[0]), size=1000, replace=False)
elif type(cell_inds) is int:
ix_choice = np.random.choice(np.range(adata.shape[0]), size=cell_inds, replace=False)
elif type(cell_inds) is list:
if type(cell_inds[0]) is str:
cell_inds = [adata.obs_names.to_list().index(i) for i in cell_inds]
ix_choice = cell_inds
df = df.iloc[ix_choice, :]
if background is None:
_background = rcParams.get("figure.facecolor")
background = to_hex(_background) if type(_background) is tuple else _background
if quiver_size is None:
quiver_size = 1
if background == "black":
edgecolors = "white"
else:
edgecolors = "black"
head_w, head_l, ax_l, scale = default_quiver_args(quiver_size, quiver_length) #
quiver_kwargs = {
"angles": "xy",
"scale": scale,
"scale_units": "xy",
"width": 0.0005,
"headwidth": head_w,
"headlength": head_l,
"headaxislength": ax_l,
"minshaft": 1,
"minlength": 1,
"pivot": "tail",
"linewidth": 0.1,
"edgecolors": edgecolors,
"alpha": 1,
"zorder": 10,
}
quiver_kwargs = update_dict(quiver_kwargs, cell_wise_kwargs)
quiver_3d_kwargs = {"arrow_length_ratio": scale}
axes_list, color_list, _ = scatters(
adata=adata,
basis=basis,
x=x,
y=y,
z=z,
color=color,
layer=layer,
highlights=highlights,
labels=labels,
values=values,
theme=theme,
cmap=cmap,
color_key=color_key,
color_key_cmap=color_key_cmap,
background=background,
ncols=ncols,
pointsize=pointsize,
figsize=figsize,
show_legend=show_legend,
use_smoothed=use_smoothed,
aggregate=aggregate,
show_arrowed_spines=show_arrowed_spines,
ax=ax,
sort=sort,
save_show_or_return="return",
frontier=frontier,
projection=projection,
**s_kwargs_dict,
return_all=True,
)
# single axis output
if type(axes_list) != list:
axes_list = [axes_list]
x0, x1 = df.iloc[:, 0], df.iloc[:, 1]
v0, v1 = df.iloc[:, 2], df.iloc[:, 3]
if projection == "3d":
x0, x1, x2 = df.iloc[:, 0], df.iloc[:, 1], df.iloc[:, 2]
v0, v1, v2 = df.iloc[:, 3], df.iloc[:, 4], df.iloc[:, 5]
for i in range(len(axes_list)):
ax = axes_list[i]
if projection == "2d":
ax.quiver(
x0,
x1,
v0,
v1,
color=color_list[i],
facecolors=color_list[i],
**quiver_kwargs,
)
elif projection == "3d":
ax.quiver(
x0,
x1,
x2,
v0,
v1,
v2,
# color=color_list[i],
# facecolors=color_list[i],
**quiver_3d_kwargs,
)
ax.set_facecolor(background)
if save_show_or_return == "save":
s_kwargs = {
"path": None,
"prefix": "cell_wise_vector",
"dpi": None,
"ext": "pdf",
"transparent": True,
"close": True,
"verbose": True,
}
s_kwargs = update_dict(s_kwargs, save_kwargs)
save_fig(**s_kwargs)
elif save_show_or_return == "show":
if projection != "3d":
plt.tight_layout()
plt.show()
elif save_show_or_return == "return":
return axes_list
[docs]@docstrings.with_indent(4)
def grid_vectors(
adata: AnnData,
basis: str = "umap",
x: int = 0,
y: int = 1,
ekey: str = "M_s",
vkey: str = "velocity_S",
color: Union[str, List[str]] = "ntr",
layer: str = "X",
highlights: Optional[list] = None,
labels: Optional[list] = None,
values: Optional[list] = None,
theme: Optional[str] = None,
cmap: Optional[str] = None,
color_key: Union[dict, list] = None,
color_key_cmap: Optional[str] = None,
background: Optional[str] = "white",
ncols: int = 4,
pointsize: Union[None, float] = None,
figsize: tuple = (6, 4),
show_legend="on data",
use_smoothed: bool = True,
ax: Optional[Axes] = None,
sort: str = "raw",
aggregate: Optional[str] = None,
show_arrowed_spines: bool = False,
inverse: bool = False,
cell_inds: Union[str, list] = "all",
method: str = "gaussian",
xy_grid_nums: list = [50, 50],
cut_off_velocity: bool = True,
quiver_size: Optional[float] = None,
quiver_length: Optional[float] = None,
vector: str = "velocity",
frontier: bool = False,
save_show_or_return: str = "show",
save_kwargs: dict = {},
s_kwargs_dict: dict = {},
q_kwargs_dict: dict = {},
**grid_kwargs,
):
"""Plot the velocity or acceleration vector of each cell on a grid.
Parameters
----------
%(scatters.parameters.no_show_legend|kwargs|save_kwargs)s
ekey: `str` (default: "M_s")
The expression key
vkey: `str` (default: "velocity_S")
The velocity key
inverse: `bool` (default: False)
Whether to inverse the direction of the velocity vectors.
cell_inds: `str` or `list` (default: all)
the cell index that will be chosen to draw velocity vectors. Can be a list of integers (cell integer
indices) or str (Cell names).
method: `str` (default: `SparseVFC`)
Method to reconstruct the vector field. Currently it supports either SparseVFC (default) or the empirical
method Gaussian kernel method from RNA velocity (Gaussian).
xy_grid_nums: `tuple` (default: (50, 50))
the number of grids in either x or y axis.
cut_off_velocity: `bool` (default: True)
Whether to remove small velocity vectors from the recovered the vector field grid, either through the simple
Gaussian kernel (applicable to 2D) or the powerful sparseVFC approach.
quiver_size: `float` or None (default: None)
The size of quiver. If None, we will use set quiver_size to be 1. Note that quiver quiver_size is used to
calculate the head_width (10 x quiver_size), head_length (12 x quiver_size) and headaxislength (8 x
quiver_size) of the quiver. This is done via the `default_quiver_args` function which also calculate the
scale of the quiver (1 / quiver_length).
quiver_length: `float` or None (default: None)
The length of quiver. The quiver length which will be used to calculate scale of quiver. Note that befoe
applying `default_quiver_args` velocity values are first rescaled via the quiver_autoscaler function. Scale
of quiver indicates the nuumber of data units per arrow length unit, e.g., m/s per plot width; a smaller
scale parameter makes the arrow longer.
vector: `str` (default: `velocity`)
Which vector type will be used for plotting, one of {'velocity', 'acceleration'} or either velocity field or
acceleration field will be plotted.
frontier: `bool` (default: `False`)
Whether to add the frontier. Scatter plots can be enhanced by using transparency (alpha) in order to show
area of high density and multiple scatter plots can be used to delineate a frontier. See matplotlib tips &
tricks cheatsheet (https://github.com/matplotlib/cheatsheets). Originally inspired by figures from scEU-seq
paper: https://science.sciencemag.org/content/367/6482/1151.
save_kwargs: `dict` (default: `{}`)
A dictionary that will passed to the save_fig function. By default it is an empty dictionary and the
save_fig function will use the {"path": None, "prefix": 'grid_velocity', "dpi": None, "ext": 'pdf',
"transparent": True, "close": True, "verbose": True} as its parameters. Otherwise you can provide a
dictionary that properly modify those keys according to your needs.
s_kwargs_dict: `dict` (default: {})
The dictionary of the scatter arguments.
q_kwargs_dict: `dict` (default: {})
The dictionary of the quiver arguments.
grid_kwargs:
Additional parameters that will be passed to velocity_on_grid function.
Returns
-------
Nothing but a quiver plot on the grid.
"""
import matplotlib.pyplot as plt
from matplotlib import rcParams
from matplotlib.colors import to_hex
if type(x) == str and type(y) == str:
if len(adata.var_names[adata.var.use_for_dynamics].intersection([x, y])) != 2:
raise ValueError(
"If you want to plot the vector flow of two genes, please make sure those two genes "
"belongs to dynamics genes or .var.use_for_dynamics is True."
)
X = adata[:, [x, y]].layers[ekey].A
V = adata[:, [x, y]].layers[vkey].A
layer = ekey
else:
if ("X_" + basis in adata.obsm.keys()) and (vector + "_" + basis in adata.obsm.keys()):
X = adata.obsm["X_" + basis][:, [x, y]]
V = adata.obsm[vector + "_" + basis][:, [x, y]]
else:
if "X_" + basis not in adata.obsm.keys():
layer, basis = basis.split("_")
reduceDimension(adata, layer=layer, reduction_method=basis)
if "kmc" not in adata.uns_keys():
cell_velocities(adata, vkey="velocity_S", basis=basis)
X = adata.obsm["X_" + basis][:, [x, y]]
V = adata.obsm[vector + "_" + basis][:, [x, y]]
else:
kmc = adata.uns["kmc"]
X = adata.obsm["X_" + basis][:, [x, y]]
V = kmc.compute_density_corrected_drift(X, kmc.Idx, normalize_vector=True)
adata.obsm[vector + "_" + basis] = V
X, V = X.copy(), V.copy()
if cell_inds == "all":
ix_choice = np.arange(adata.shape[0])
elif cell_inds == "random":
ix_choice = np.random.choice(np.range(adata.shape[0]), size=1000, replace=False)
elif type(cell_inds) is int:
ix_choice = np.random.choice(np.range(adata.shape[0]), size=cell_inds, replace=False)
elif type(cell_inds) is list:
if type(cell_inds[0]) is str:
cell_inds = [adata.obs_names.to_list().index(i) for i in cell_inds]
ix_choice = cell_inds
X, V = X[ix_choice, :], V[ix_choice, :] # 0, 0
grid_kwargs_dict = {
"density": None,
"smooth": None,
"n_neighbors": None,
"min_mass": None,
"autoscale": False,
"adjust_for_stream": True,
"V_threshold": None,
}
grid_kwargs_dict = update_dict(grid_kwargs_dict, grid_kwargs)
if method.lower() == "sparsevfc":
if "VecFld_" + basis not in adata.uns.keys():
VectorField(adata, basis=basis, dims=[x, y])
X_grid, V_grid = (
adata.uns["VecFld_" + basis]["grid"],
adata.uns["VecFld_" + basis]["grid_V"],
)
N = int(np.sqrt(V_grid.shape[0]))
if cut_off_velocity:
X_grid, p_mass, neighs, weight = prepare_velocity_grid_data(
X,
xy_grid_nums,
density=grid_kwargs_dict["density"],
smooth=grid_kwargs_dict["smooth"],
n_neighbors=grid_kwargs_dict["n_neighbors"],
)
for i in ["density", "smooth", "n_neighbors"]:
grid_kwargs_dict.pop(i)
VecFld, func = vecfld_from_adata(adata, basis)
V_emb = func(X)
V_grid = (V_emb[neighs] * weight[:, :, None]).sum(1) / np.maximum(1, p_mass)[:, None]
X_grid, V_grid = grid_velocity_filter(
V_emb=V,
neighs=neighs,
p_mass=p_mass,
X_grid=X_grid,
V_grid=V_grid,
**grid_kwargs_dict,
)
else:
X_grid, V_grid = (
np.array([np.unique(X_grid[:, 0]), np.unique(X_grid[:, 1])]),
np.array([V_grid[:, 0].reshape((N, N)), V_grid[:, 1].reshape((N, N))]),
)
elif method.lower() == "gaussian":
X_grid, V_grid, D = velocity_on_grid(
X,
V,
xy_grid_nums,
cut_off_velocity=cut_off_velocity,
**grid_kwargs_dict,
)
elif "grid_velocity_" + basis in adata.uns.keys():
X_grid, V_grid, _ = (
adata.uns["grid_velocity_" + basis]["VecFld"]["X_grid"],
adata.uns["grid_velocity_" + basis]["VecFld"]["V_grid"],
adata.uns["grid_velocity_" + basis]["VecFld"]["D"],
)
else:
raise Exception(
"Vector field learning method {} is not supported or the grid velocity is collected for "
"the current adata object.".format(method)
)
V_grid /= 3 * quiver_autoscaler(X_grid, V_grid)
if inverse:
V_grid = -V_grid
if background is None:
_background = rcParams.get("figure.facecolor")
background = to_hex(_background) if type(_background) is tuple else _background
if quiver_size is None:
quiver_size = 1
if background == "black":
edgecolors = "white"
else:
edgecolors = "black"
head_w, head_l, ax_l, scale = default_quiver_args(quiver_size, quiver_length)
quiver_kwargs = {
"angles": "xy",
"scale": scale,
"scale_units": "xy",
"width": 0.0005,
"headwidth": head_w,
"headlength": head_l,
"headaxislength": ax_l,
"minshaft": 1,
"minlength": 1,
"pivot": "tail",
"edgecolors": edgecolors,
"linewidth": 0.2,
"facecolors": edgecolors,
"color": edgecolors,
"alpha": 1,
"zorder": 3,
}
quiver_kwargs = update_dict(quiver_kwargs, q_kwargs_dict)
# if ax is None:
# plt.figure(facecolor=background)
axes_list, _, font_color = scatters(
adata=adata,
basis=basis,
x=x,
y=y,
color=color,
layer=layer,
highlights=highlights,
labels=labels,
values=values,
theme=theme,
cmap=cmap,
color_key=color_key,
color_key_cmap=color_key_cmap,
background=background,
ncols=ncols,
pointsize=pointsize,
figsize=figsize,
show_legend=show_legend,
use_smoothed=use_smoothed,
aggregate=aggregate,
show_arrowed_spines=show_arrowed_spines,
ax=ax,
sort=sort,
save_show_or_return="return",
frontier=frontier,
**s_kwargs_dict,
return_all=True,
)
if type(axes_list) == list:
for i in range(len(axes_list)):
axes_list[i].quiver(X_grid[0], X_grid[1], V_grid[0], V_grid[1], **quiver_kwargs)
axes_list[i].set_facecolor(background)
else:
axes_list.quiver(X_grid[0], X_grid[1], V_grid[0], V_grid[1], **quiver_kwargs)
axes_list.set_facecolor(background)
if save_show_or_return == "save":
s_kwargs = {
"path": None,
"prefix": "grid_velocity",
"dpi": None,
"ext": "pdf",
"transparent": True,
"close": True,
"verbose": True,
}
s_kwargs = update_dict(s_kwargs, save_kwargs)
save_fig(**s_kwargs)
elif save_show_or_return == "show":
plt.tight_layout()
plt.show()
elif save_show_or_return == "return":
return axes_list
[docs]@docstrings.with_indent(4)
def streamline_plot(
adata: AnnData,
basis: str = "umap",
x: int = 0,
y: int = 1,
ekey: str = "M_s",
vkey: str = "velocity_S",
color: Union[str, List[str]] = "ntr",
layer: str = "X",
highlights: Optional[list] = None,
labels: Optional[list] = None,
values: Optional[list] = None,
theme: Optional[str] = None,
cmap: Optional[str] = None,
color_key: Union[dict, list] = None,
color_key_cmap: Optional[str] = None,
background: Optional[str] = "white",
ncols: int = 4,
pointsize: Union[None, float] = None,
figsize: tuple = (6, 4),
show_legend="on data",
use_smoothed: bool = True,
ax: Optional[Axes] = None,
sort: str = "raw",
aggregate: Optional[str] = None,
show_arrowed_spines: bool = False,
inverse: bool = False,
cell_inds: Union[str, list] = "all",
method: str = "gaussian",
xy_grid_nums: list = [50, 50],
cut_off_velocity: bool = True,
density: float = 1,
linewidth: float = 1,
streamline_alpha: float = 1,
vector: str = "velocity",
frontier: bool = False,
save_show_or_return: str = "show",
save_kwargs: dict = {},
s_kwargs_dict: dict = {},
**streamline_kwargs,
):
"""Plot the velocity vector of each cell.
Parameters
----------
%(scatters.parameters.no_show_legend|kwargs|save_kwargs)s
ekey: `str` (default: "M_s")
The expression key
vkey: `str` (default: "velocity_S")
The velocity key
inverse: `bool` (default: False)
Whether to inverse the direction of the velocity vectors.
cell_inds: `str` or `list` (default: all)
the cell index that will be chosen to draw velocity vectors. Can be a list of integers (cell integer
indices) or str (Cell names).
method: `str` (default: `SparseVFC`)
Method to reconstruct the vector field. Currently it supports either SparseVFC (default) or the empirical
method Gaussian kernel method from RNA velocity (Gaussian).
xy_grid_nums: `tuple` (default: (50, 50))
the number of grids in either x or y axis.
cut_off_velocity: `bool` (default: True)
Whether to remove small velocity vectors from the recovered the vector field grid, either through the simple
Gaussian kernel (applicable only to 2D) or the powerful sparseVFC approach.
density: `float` or None (default: 1)
density of the plt.streamplot function.
linewidth: `float` or None (default: 1)
multiplier of automatically calculated linewidth passed to the plt.streamplot function.
streamline_alpha: `float` or None (default: 1)
The alpha value applied to the vector field stream lines.
vector: `str` (default: `velocity`)
Which vector type will be used for plotting, one of {'velocity', 'acceleration'} or either velocity field or
acceleration field will be plotted.
frontier: `bool` (default: `False`)
Whether to add the frontier. Scatter plots can be enhanced by using transparency (alpha) in order to show
area of high density and multiple scatter plots can be used to delineate a frontier. See matplotlib tips &
tricks cheatsheet (https://github.com/matplotlib/cheatsheets). Originally inspired by figures from scEU-seq
paper: https://science.sciencemag.org/content/367/6482/1151.
save_kwargs: `dict` (default: `{}`)
A dictionary that will passed to the save_fig function. By default it is an empty dictionary and the
save_fig function will use the {"path": None, "prefix": 'streamline_plot', "dpi": None, "ext": 'pdf',
"transparent": True, "close": True, "verbose": True} as its parameters. Otherwise you can provide a
dictionary that properly modify those keys according to your needs.
s_kwargs_dict: `dict` (default: {})
The dictionary of the scatter arguments.
streamline_kwargs:
Additional parameters that will be passed to plt.streamplot function
Returns
-------
Nothing but a streamline plot that integrates paths in the vector field.
"""
import matplotlib.pyplot as plt
from matplotlib import rcParams
from matplotlib.colors import to_hex
if type(x) == str and type(y) == str:
if len(adata.var_names[adata.var.use_for_dynamics].intersection([x, y])) != 2:
raise ValueError(
"If you want to plot the vector flow of two genes, please make sure those two genes "
"belongs to dynamics genes or .var.use_for_dynamics is True."
)
X = adata[:, [x, y]].layers[ekey].A
V = adata[:, [x, y]].layers[vkey].A
layer = ekey
else:
if ("X_" + basis in adata.obsm.keys()) and (vector + "_" + basis in adata.obsm.keys()):
X = adata.obsm["X_" + basis][:, [x, y]]
V = adata.obsm[vector + "_" + basis][:, [x, y]]
else:
if basis not in adata.obsm.keys() or "X_" + basis not in adata.obsm.keys():
layer, basis = basis.split("_") if "_" in basis else ("X", basis)
reduceDimension(adata, layer=layer, reduction_method=basis)
if "kmc" not in adata.uns_keys():
cell_velocities(adata, vkey="velocity_S", basis=basis)
X = adata.obsm["X_" + basis][:, [x, y]]
V = adata.obsm[vector + "_" + basis][:, [x, y]]
else:
kmc = adata.uns["kmc"]
X = adata.obsm["X_" + basis][:, [x, y]]
V = kmc.compute_density_corrected_drift(X, kmc.Idx, normalize_vector=True)
adata.obsm[vector + "_" + basis] = V
X, V = X.copy(), V.copy()
if cell_inds == "all":
ix_choice = np.arange(adata.shape[0])
elif cell_inds == "random":
ix_choice = np.random.choice(np.range(adata.shape[0]), size=1000, replace=False)
elif type(cell_inds) is int:
ix_choice = np.random.choice(np.range(adata.shape[0]), size=cell_inds, replace=False)
elif type(cell_inds) is list:
if type(cell_inds[0]) is str:
cell_inds = [adata.obs_names.to_list().index(i) for i in cell_inds]
ix_choice = cell_inds
X, V = X[ix_choice, :], V[ix_choice, :] # 0, 0
grid_kwargs_dict = {
"density": None,
"smooth": None,
"n_neighbors": None,
"min_mass": None,
"autoscale": False,
"adjust_for_stream": True,
"V_threshold": None,
}
grid_kwargs_dict = update_dict(grid_kwargs_dict, streamline_kwargs)
if method.lower() == "sparsevfc":
if "VecFld_" + basis not in adata.uns.keys():
VectorField(adata, basis=basis, dims=[x, y])
X_grid, V_grid = (
adata.uns["VecFld_" + basis]["grid"],
adata.uns["VecFld_" + basis]["grid_V"],
)
N = int(np.sqrt(V_grid.shape[0]))
if cut_off_velocity:
X_grid, p_mass, neighs, weight = prepare_velocity_grid_data(
X,
xy_grid_nums,
density=grid_kwargs_dict["density"],
smooth=grid_kwargs_dict["smooth"],
n_neighbors=grid_kwargs_dict["n_neighbors"],
)
for i in ["density", "smooth", "n_neighbors"]:
grid_kwargs_dict.pop(i)
VecFld, func = vecfld_from_adata(adata, basis)
V_emb = func(X)
V_grid = (V_emb[neighs] * weight[:, :, None]).sum(1) / np.maximum(1, p_mass)[:, None]
X_grid, V_grid = grid_velocity_filter(
V_emb=V,
neighs=neighs,
p_mass=p_mass,
X_grid=X_grid,
V_grid=V_grid,
**grid_kwargs_dict,
)
else:
X_grid, V_grid = (
np.array([np.unique(X_grid[:, 0]), np.unique(X_grid[:, 1])]),
np.array([V_grid[:, 0].reshape((N, N)), V_grid[:, 1].reshape((N, N))]),
)
elif method.lower() == "gaussian":
X_grid, V_grid, D = velocity_on_grid(
X,
V,
xy_grid_nums,
cut_off_velocity=cut_off_velocity,
**grid_kwargs_dict,
)
elif "grid_velocity_" + basis in adata.uns.keys():
X_grid, V_grid, _ = (
adata.uns["grid_velocity_" + basis]["VecFld"]["X_grid"],
adata.uns["grid_velocity_" + basis]["VecFld"]["V_grid"],
adata.uns["grid_velocity_" + basis]["VecFld"]["D"],
)
else:
raise Exception(
"Vector field learning method {} is not supported or the grid velocity is collected for "
"the current adata object.".format(method)
)
if inverse:
V_grid = -V_grid
streamplot_kwargs = {
"density": density * 2,
"linewidth": None,
"cmap": None,
"norm": None,
"arrowsize": 1,
"arrowstyle": "fancy",
"minlength": 0.1,
"transform": None,
"start_points": None,
"maxlength": 4.0,
"integration_direction": "both",
"zorder": 3,
}
mass = np.sqrt((V_grid**2).sum(0))
linewidth *= 2 * mass / mass[~np.isnan(mass)].max()
streamplot_kwargs.update({"linewidth": linewidth * streamline_kwargs.pop("linewidth", 1)})
streamplot_kwargs = update_dict(streamplot_kwargs, streamline_kwargs)
if background is None:
_background = rcParams.get("figure.facecolor")
background = to_hex(_background) if type(_background) is tuple else _background
if background in ["black", "#ffffff"]:
streamline_color = "red"
else:
streamline_color = "black"
# if ax is None:
# plt.figure(facecolor=background)
axes_list, _, _ = scatters(
adata=adata,
basis=basis,
x=x,
y=y,
color=color,
layer=layer,
highlights=highlights,
labels=labels,
values=values,
theme=theme,
cmap=cmap,
color_key=color_key,
color_key_cmap=color_key_cmap,
background=background,
ncols=ncols,
pointsize=pointsize,
figsize=figsize,
show_legend=show_legend,
use_smoothed=use_smoothed,
aggregate=aggregate,
show_arrowed_spines=show_arrowed_spines,
ax=ax,
sort=sort,
save_show_or_return="return",
frontier=frontier,
**s_kwargs_dict,
return_all=True,
)
# single axis case, convert to list
if type(axes_list) != list:
axes_list = [axes_list]
def streamplot_2d(ax):
ax.set_facecolor(background)
s = ax.streamplot(
X_grid[0],
X_grid[1],
V_grid[0],
V_grid[1],
color=streamline_color,
**streamplot_kwargs,
)
set_arrow_alpha(ax, streamline_alpha)
set_stream_line_alpha(s, streamline_alpha)
if type(axes_list) == list:
for i in range(len(axes_list)):
ax = axes_list[i]
streamplot_2d(ax)
if save_show_or_return == "save":
s_kwargs = {
"path": None,
"prefix": "streamline_plot",
"dpi": None,
"ext": "pdf",
"transparent": True,
"close": True,
"verbose": True,
}
s_kwargs = update_dict(s_kwargs, save_kwargs)
save_fig(**s_kwargs)
elif save_show_or_return == "show":
# TODO: fix bug: the following line causing plotting issue
# plt.tight_layout()
plt.show()
elif save_show_or_return == "return":
return axes_list
# refactor line_conv_integration
[docs]def plot_energy(
adata: AnnData,
basis: Optional[str] = None,
vecfld_dict: Optional[dict] = None,
figsize: Optional[tuple] = None,
fig: Optional[Figure] = None,
save_show_or_return: str = "show",
save_kwargs: dict = {},
):
"""Plot the energy and energy change rate over each optimization iteration.
Parameters
----------
adata: :class:`~anndata.AnnData`
an Annodata object with vector field function reconstructed.
basis: `str` or None (default: `None`)
The reduced dimension embedding (pca or umap, for example) of cells from which vector field function was
reconstructed. When basis is None, the velocity vector field function building from the full gene expression
space is used.
vecfld_dict: `str` or None (default: `None`)
The dictionary storing the information for the reconstructed velocity vector field function. If None, the
corresponding dictionary stored in the adata object will be used.
figsize: `[float, float]` or `None` (default: None)
The width and height of the resulting figure when fig is set to be None.
fig: `matplotlib.figure.Figure` or None
The figure object where panels of the energy or energy change rate over iteration plots will be appended to.
save_show_or_return: {'show', 'save', 'return'} (default: `show`)
Whether to save, show or return the figure.
save_kwargs: `dict` (default: `{}`)
A dictionary that will passed to the save_fig function. By default it is an empty dictionary and the
save_fig function will use the {"path": None, "prefix": 'energy', "dpi": None, "ext": 'pdf', "transparent":
True, "close": True, "verbose": True} as its parameters. Otherwise you can provide a dictionary that
properly modify those keys according to your needs.
Returns
-------
Nothing, but plot the energy or energy change rate each optimization iteration.
"""
import matplotlib.pyplot as plt
if vecfld_dict is None:
vf_key = "VecFld" if basis is None else "VecFld_" + basis
if vf_key not in adata.uns.keys():
raise ValueError(
f"Your adata doesn't have the key for Vector Field with {basis} basis."
f"Try firstly running dyn.vf.VectorField(adata, basis={basis})."
)
vecfld_dict = adata.uns[vf_key]
E = vecfld_dict["E_traj"] if "E_traj" in vecfld_dict.keys() else None
tecr = vecfld_dict["tecr_traj"] if "tecr_traj" in vecfld_dict.keys() else None
if E is not None and tecr is not None:
fig = fig or plt.figure(figsize=figsize)
Iterations = np.arange(0, len(E))
ax = fig.add_subplot(1, 2, 1)
E_ = E - np.min(E) + 1
ax.plot(Iterations, E_, "k")
ax.plot(E_, "r.")
ax.set_yscale("log")
plt.xlabel("Iteration")
plt.ylabel("Energy")
ax = fig.add_subplot(1, 2, 2)
ax.plot(Iterations, tecr, "k")
ax.plot(tecr, "r.")
ax.set_yscale("log")
plt.xlabel("Iteration")
plt.ylabel("Energy change rate")
if save_show_or_return == "save":
s_kwargs = {
"path": None,
"prefix": "energy",
"dpi": None,
"ext": "pdf",
"transparent": True,
"close": True,
"verbose": True,
}
s_kwargs = update_dict(s_kwargs, save_kwargs)
save_fig(**s_kwargs)
elif save_show_or_return == "show":
plt.tight_layout()
plt.show()
elif save_show_or_return == "return":
return fig