# Source code for dynamo.vectorfield.vector_calculus

```
# from tqdm import tqdm
# from anndata._core.views import ArrayView
# import scipy.sparse as sp
from typing import Dict, List, Optional, Union
try:
from typing import Literal
except ImportError:
from typing_extensions import Literal
import numpy as np
import pandas as pd
from anndata._core.anndata import AnnData
from ..dynamo_logger import (
LoggerManager,
main_info,
main_info_insert_adata,
main_info_insert_adata_uns,
main_warning,
)
from ..tools.sampling import sample
from ..tools.utils import (
create_layer,
fetch_states,
get_pd_row_column_idx,
get_rank_array,
index_gene,
list_top_genes,
list_top_interactions,
table_top_genes,
)
from ..utils import isarray, ismatrix
from ..vectorfield import scVectorField
from .scVectorField import SvcVectorField
from .utils import (
average_jacobian_by_group,
elementwise_hessian_transformation,
elementwise_jacobian_transformation,
get_vf_dict,
hessian_transformation,
intersect_sources_targets,
subset_jacobian_transformation,
vecfld_from_adata,
vector_field_function,
vector_transformation,
)
try:
import dynode
use_dynode = "vectorfield" in dir(dynode)
except ImportError:
use_dynode = False
if use_dynode:
from .scVectorField import dynode_vectorfield
from .utils import dynode_vector_field_function
def get_vf_class(adata: AnnData, basis: str = "pca") -> SvcVectorField:
"""Get the corresponding vector field class according to different methods.
Args:
adata: AnnData object that contains the reconstructed vector field in the `uns` attribute.
basis: The embedding data in which the vector field was reconstructed.
Returns:
SvcVectorField object that is extracted from the `uns` attribute of adata.
"""
vf_dict = get_vf_dict(adata, basis=basis)
if "method" not in vf_dict.keys():
vf_dict["method"] = "sparsevfc"
if vf_dict["method"].lower() == "sparsevfc":
vector_field_class = SvcVectorField()
vector_field_class.from_adata(adata, basis=basis)
elif vf_dict["method"].lower() == "dynode":
vf_dict["parameters"]["load_model_from_buffer"] = True
vector_field_class = vf_dict["dynode_object"] # dynode_vectorfield(**vf_dict["parameters"])
else:
raise ValueError("current only support two methods, SparseVFC and dynode")
return vector_field_class
[docs]def velocities(
adata: AnnData,
init_cells: Optional[List] = None,
init_states: Optional[list] = None,
basis: Optional[str] = None,
vector_field_class: Optional[scVectorField.BaseVectorField] = None,
layer: Optional[str] = "X",
dims: Optional[Union[int, list]] = None,
Qkey: str = "PCs",
) -> AnnData:
"""Calculate the velocities for any cell state with the reconstructed vector field function.
Args:
adata: AnnData object that contains the reconstructed vector field function in the `uns` attribute.
init_cells: Cell name or indices of the initial cell states for the historical or future cell state prediction with
numerical integration. If the names in init_cells are not found in the adata.obs_name, they will be treated as
cell indices and must be integers.
init_states: Initial cell states for the historical or future cell state prediction with numerical integration.
basis: The embedding data to use for calculating velocities. If `basis` is either `umap` or `pca`, the
reconstructed trajectory will be projected back to high dimensional space via the `inverse_transform`
function.
vector_field_class: If not None, the speed will be computed using this class instead of the vector field stored in adata. You
can set up the class with a known ODE function, useful when the data is generated through simulation.
layer: Which layer of the data will be used for predicting cell fate with the reconstructed vector field function.
The layer once provided, will override the `basis` argument and this function will then predict cell fate in high
dimensional space.
dims: The dimensions that will be selected for velocity calculation.
Qkey: The key of the PCA loading matrix in `.uns`. Only used when basis is `pca`.
Returns:
AnnData object that is updated with the `"velocities"` related key in the `.uns`.
"""
if vector_field_class is None:
vector_field_class = get_vf_class(adata, basis=basis)
init_states, _, _, _ = fetch_states(adata, init_states, init_cells, basis, layer, False, None)
if vector_field_class.vf_dict["normalize"]:
xm, xscale = vector_field_class.norm_dict["xm"][None, :], vector_field_class.norm_dict["xscale"]
init_states = (init_states - xm) / xscale
vec_mat = vector_field_class.func(init_states)
vec_key = "velocities" if basis is None else "velocities_" + basis
if np.isscalar(dims):
vec_mat = vec_mat[:, :dims]
elif dims is not None:
vec_mat = vec_mat[:, dims]
if basis == "pca":
adata.uns["velocities_pca"] = vec_mat
Qkey = "PCs" if Qkey is None else Qkey
if Qkey in adata.uns.keys():
Q = adata.uns[Qkey]
elif Qkey in adata.varm.keys():
Q = adata.varm[Qkey]
else:
raise Exception(f"No PC matrix {Qkey} found in neither .uns nor .varm.")
vel = adata.uns["velocities_pca"].copy()
vel_hi = vector_transformation(vel, Q)
create_layer(
adata,
vel_hi,
layer_key="velocity_VecFld",
genes=adata.var.use_for_pca,
)
adata.uns[vec_key] = vec_mat
[docs]def speed(
adata: AnnData,
basis: Optional[str] = "umap",
vector_field_class: Optional[scVectorField.BaseVectorField] = None,
method: str = "analytical",
) -> AnnData:
"""Calculate the speed for each cell with the reconstructed vector field function.
Args:
adata: AnnData object that contains the reconstructed vector field function in the `uns` attribute.
basis: The embedding data in which the vector field was reconstructed.
vector_field_class: If not None, the speed will be computed using this class instead of the vector field stored in adata. You
can set up the class with a known ODE function, useful when the data is generated through simulation.
method: The method that will be used for calculating speed, either `analytical` or `numeric`. `analytical`
method will use the analytical form of the reconstructed vector field for calculating Jacobian. Otherwise,
raw velocity vectors are used.
Returns:
AnnData object that is updated with the `'speed'` key in the `.obs`.
"""
if vector_field_class is None:
vector_field_class = get_vf_class(adata, basis=basis)
X, V = vector_field_class.get_data()
if method == "analytical":
vec_mat = vector_field_class.func(X)
else:
vec_mat = adata.obsm["velocity_" + basis] if basis is not None else vector_field_class.vf_dict["Y"]
speed = np.array([np.linalg.norm(i) for i in vec_mat])
speed_key = "speed" if basis is None else "speed_" + basis
adata.obs[speed_key] = speed
[docs]def jacobian(
adata: AnnData,
regulators: Optional[List] = None,
effectors: Optional[List] = None,
cell_idx: Optional[List] = None,
sampling: Optional[Literal['random', 'velocity', 'trn']] = None,
sample_ncells: int = 1000,
basis: str = "pca",
Qkey: str = "PCs",
vector_field_class: Optional[scVectorField.BaseVectorField] = None,
method: str = "analytical",
store_in_adata: bool = True,
**kwargs,
):
"""Calculate Jacobian for each cell with the reconstructed vector field.
If the vector field was reconstructed from the reduced PCA space, the Jacobian matrix will then be inverse
transformed back to high dimension. Note that this should also be possible for reduced UMAP space and will be
supported shortly. Note that we compute the Jacobian for the RKHS kernel vector field analytically,
which is much more computationally efficient than the numerical method.
Args:
adata: AnnData object that contains the reconstructed vector field in `.uns`.
regulators: The list of genes that will be used as regulators when calculating the cell-wise Jacobian matrix. The
Jacobian is the matrix consisting of partial derivatives of the vector field wrt gene expressions. It can be
used to evaluate the change in velocities of effectors (see below) as the expressions of regulators
increase. The regulators are the denominators of the partial derivatives.
effectors: The list of genes that will be used as effectors when calculating the cell-wise Jacobian matrix. The
effectors are the numerators of the partial derivatives.
cell_idx: A list of cell index (or boolean flags) for which the jacobian is calculated.
If `None`, all or a subset of sampled cells are used.
sampling: {None, 'random', 'velocity', 'trn'}, (default: None)
See specific information on these methods in `.tl.sample`.
If `None`, all cells are used.
sample_ncells: The number of cells to be sampled. If `sampling` is None, this parameter is ignored.
basis: The embedding data in which the vector field was reconstructed. If `None`, use the vector field function
that was reconstructed directly from the original unreduced gene expression space.
Qkey: The key of the PCA loading matrix in `.uns`.
vector_field_class: If not `None`, the jacobian will be computed using this class instead of the vector field stored in adata.
method: The method that will be used for calculating Jacobian, either `'analytical'` or `'numerical'`.
`'analytical'` method uses the analytical expressions for calculating Jacobian while `'numerical'` method
uses numdifftools, a numerical differentiation tool, for computing Jacobian. `'analytical'` method is much
more efficient.
cores: Number of cores to calculate Jacobian. If cores is set to be > 1, multiprocessing will be used to
parallel the Jacobian calculation.
kwargs: Any additional keys that will be passed to `elementwise_jacobian_transformation` function.
Returns:
AnnData object that is updated with the `'jacobian'` key in the `.uns`. This is a 3-dimensional tensor with
dimensions n_effectors x n_regulators x n_obs.
"""
if vector_field_class is None:
vector_field_class = get_vf_class(adata, basis=basis)
if basis == "umap":
cell_idx = np.arange(adata.n_obs)
X, V = vector_field_class.get_data()
if cell_idx is None:
if sampling is None or sampling == "all":
cell_idx = np.arange(adata.n_obs)
else:
cell_idx = sample(np.arange(adata.n_obs), sample_ncells, sampling, X, V)
Jac_func = vector_field_class.get_Jacobian(method=method)
Js = Jac_func(X[cell_idx])
if regulators is None and effectors is not None:
regulators = effectors
elif effectors is None and regulators is not None:
effectors = regulators
if regulators is not None and effectors is not None:
if type(regulators) is str:
if regulators in adata.var.keys():
regulators = adata.var.index[adata.var[regulators]]
else:
regulators = [regulators]
if type(effectors) is str:
if effectors in adata.var.keys():
effectors = adata.var.index[adata.var[effectors]]
else:
effectors = [effectors]
regulators = np.unique(regulators)
effectors = np.unique(effectors)
var_df = adata[:, adata.var.use_for_dynamics].var
regulators = var_df.index.intersection(regulators)
effectors = var_df.index.intersection(effectors)
reg_idx, eff_idx = (
get_pd_row_column_idx(var_df, regulators, "row"),
get_pd_row_column_idx(var_df, effectors, "row"),
)
if len(regulators) == 0 or len(effectors) == 0:
raise ValueError(
"Either the regulator or the effector gene list provided is not in the dynamics gene list!"
)
if basis == "pca":
if Qkey in adata.uns.keys():
Q = adata.uns[Qkey]
elif Qkey in adata.varm.keys():
Q = adata.varm[Qkey]
else:
raise Exception(f"No PC matrix {Qkey} found in neither .uns nor .varm.")
Q = Q[:, : X.shape[1]]
if len(regulators) == 1 and len(effectors) == 1:
Jacobian = elementwise_jacobian_transformation(
Js, Q[eff_idx, :].flatten(), Q[reg_idx, :].flatten(), **kwargs
)
else:
Jacobian = subset_jacobian_transformation(Js, Q[eff_idx, :], Q[reg_idx, :], **kwargs)
else:
Jacobian = Js.copy()
else:
Jacobian = None
ret_dict = {"jacobian": Js, "cell_idx": cell_idx}
# use 'str_key' in dict.keys() to check if these items are computed, or use dict.get('str_key')
if Jacobian is not None:
ret_dict["jacobian_gene"] = Jacobian
if regulators is not None:
ret_dict["regulators"] = regulators.to_list()
if effectors is not None:
ret_dict["effectors"] = effectors.to_list()
Js_det = [np.linalg.det(Js[:, :, i]) for i in np.arange(Js.shape[2])]
jacobian_det_key = "jacobian_det" if basis is None else "jacobian_det_" + basis
adata.obs[jacobian_det_key] = np.nan
adata.obs.loc[adata.obs_names[cell_idx], jacobian_det_key] = Js_det
if store_in_adata:
jkey = "jacobian" if basis is None else "jacobian_" + basis
adata.uns[jkey] = ret_dict
return adata
else:
return ret_dict
def hessian(
adata: AnnData,
regulators: List,
coregulators: List,
effector: Optional[List] = None,
cell_idx: Optional[List] = None,
sampling: Optional[Literal['random', 'velocity', 'trn']] = None,
sample_ncells: int = 1000,
basis: str = "pca",
Qkey: str = "PCs",
vector_field_class: Optional[scVectorField.BaseVectorField] = None,
method: str = "analytical",
store_in_adata: bool = True,
**kwargs,
):
"""Calculate Hessian for each cell with the reconstructed vector field.
If the vector field was reconstructed from the reduced PCA space, the Hessian matrix will then be inverse
transformed back to high dimension. Note that this should also be possible for reduced UMAP space and will be
supported shortly. Note that we compute the Hessian for the RKHS kernel vector field analytically, which is much
more computationally efficient than the numerical method.
Args:
adata: AnnData object that contains the reconstructed vector field in `.uns`.
regulators: The list of genes that will be used as regulators when calculating the cell-wise Hessian matrix. The
Hessian is the matrix consisting of secondary partial derivatives of the vector field wrt gene expressions.
It can be used to evaluate the change in velocities of effectors (see below) as the expressions of
regulators and co-regulators increase. The regulators/co-regulators are the denominators of the partial
derivatives.
coregulators: The list of genes that will be used as regulators when calculating the cell-wise Hessian matrix. The
Hessian is the matrix consisting of secondary partial derivatives of the vector field wrt gene expressions.
It can be used to evaluate the change in velocities of effectors (see below) as the expressions of
regulators and co-regulators increase. The regulators/co-regulators are the denominators of the partial
derivatives.
effector: The target gene that will be used as effector when calculating the cell-wise Hessian matrix. Effector
must be a single gene. The effector is the numerator of the partial derivatives.
cell_idx: A list of cell index (or boolean flags) for which the Hessian is calculated.
If `None`, all or a subset of sampled cells are used.
sampling: {None, 'random', 'velocity', 'trn'}, (default: None)
See specific information on these methods in `.tl.sample`.
If `None`, all cells are used.
sample_ncells: The number of cells to be sampled. If `sampling` is None, this parameter is ignored.
basis: The embedding data in which the vector field was reconstructed. If `None`, use the vector field function
that was reconstructed directly from the original unreduced gene expression space.
Qkey: The key of the PCA loading matrix in `.uns`.
vector_field_class: If not `None`, the Hessian will be computed using this class instead of the vector field stored in adata.
method: The method that will be used for calculating Hessian, either `'analytical'` or `'numerical'`.
`'analytical'` method uses the analytical expressions for calculating Hessian while `'numerical'` method
uses numdifftools, a numerical differentiation tool, for computing Hessian. `'analytical'` method is much
more efficient.
cores: Number of cores to calculate Hessian. Currently note used.
kwargs: Any additional keys that will be passed to elementwise_hessian_transformation function.
Returns:
AnnData object that is updated with the `'Hessian'` key in the `.uns`. This is a 4-dimensional tensor with
dimensions 1 (n_effector) x n_regulators x n_coregulators x n_obs.
"""
if vector_field_class is None:
vector_field_class = get_vf_class(adata, basis=basis)
if basis == "umap":
cell_idx = np.arange(adata.n_obs)
X, V = vector_field_class.get_data()
if cell_idx is None:
if sampling is None or sampling == "all":
cell_idx = np.arange(adata.n_obs)
else:
cell_idx = sample(np.arange(adata.n_obs), sample_ncells, sampling, X, V)
Hessian_func = vector_field_class.get_Hessian(method=method)
Hs = np.zeros([X.shape[1], X.shape[1], X.shape[1], X.shape[0]])
for ind, i in enumerate(cell_idx):
Hs[:, :, :, ind] = Hessian_func(X[i])
if regulators is not None and coregulators is not None and effector is not None:
if type(regulators) is str:
if regulators in adata.var.keys():
regulators = adata.var.index[adata.var[regulators]]
else:
regulators = [regulators]
if type(coregulators) is str:
if coregulators in adata.var.keys():
coregulators = adata.var.index[adata.var[coregulators]]
else:
coregulators = [coregulators]
if type(effector) is str:
if effector in adata.var.keys():
effector = adata.var.index[adata.var[effector]]
else:
effector = [effector]
if len(effector) > 1:
raise Exception(f"effector must be a single gene but you have {effector}. ")
regulators = np.unique(regulators)
coregulators = np.unique(coregulators)
effector = np.unique(effector)
var_df = adata[:, adata.var.use_for_dynamics].var
regulators = var_df.index.intersection(regulators)
coregulators = var_df.index.intersection(coregulators)
effector = var_df.index.intersection(effector)
reg_idx, coreg_idx, eff_idx = (
get_pd_row_column_idx(var_df, regulators, "row"),
get_pd_row_column_idx(var_df, coregulators, "row"),
get_pd_row_column_idx(var_df, effector, "row"),
)
if len(regulators) == 0 or len(coregulators) == 0 or len(effector) == 0:
raise ValueError(
"Either the regulator, coregulator or the effector gene list provided is not in the dynamics gene list!"
)
if basis == "pca":
if Qkey in adata.uns.keys():
Q = adata.uns[Qkey]
elif Qkey in adata.varm.keys():
Q = adata.varm[Qkey]
else:
raise Exception(f"No PC matrix {Qkey} found in neither .uns nor .varm.")
Q = Q[:, : X.shape[1]]
if len(regulators) == 1 and len(coregulators) == 1 and len(effector) == 1:
Hessian = [
elementwise_hessian_transformation(
Hs[:, :, :, i],
Q[eff_idx, :].flatten(),
Q[reg_idx, :].flatten(),
Q[coreg_idx, :].flatten(),
**kwargs,
)
for i in np.arange(Hs.shape[-1])
]
else:
Hessian = [
hessian_transformation(Hs[:, :, :, i], Q[eff_idx, :], Q[reg_idx, :], Q[coreg_idx, :], **kwargs)
for i in np.arange(Hs.shape[-1])
]
else:
Hessian = Hs.copy()
else:
Hessian = None
ret_dict = {"hessian": Hs, "cell_idx": cell_idx}
# use 'str_key' in dict.keys() to check if these items are computed, or use dict.get('str_key')
if Hessian is not None:
ret_dict["hessian_gene"] = Hessian
if regulators is not None:
ret_dict["regulators"] = regulators.to_list() if type(regulators) != list else regulators
if coregulators is not None:
ret_dict["coregulators"] = coregulators.to_list() if type(coregulators) != list else coregulators
if effector is not None:
ret_dict["effectors"] = effector.to_list() if type(effector) != list else effector
if store_in_adata:
hkey = "hessian" if basis is None else "hessian_" + basis
adata.uns[hkey] = ret_dict
return adata
else:
return ret_dict
def laplacian(
adata: AnnData,
hkey: str = "hessian_pca",
basis: str = "pca",
Qkey: str = "PCs",
vector_field_class: Optional[scVectorField.BaseVectorField] = None,
method: str = "analytical",
**kwargs,
):
"""Calculate Laplacian for each target gene in each cell with the reconstructed vector field.
If the vector field was reconstructed from the reduced PCA space, the Lapalacian matrix will then be inverse
transformed back to high dimension. Note that this should also be possible for reduced UMAP space and will be
supported shortly. Note we compute the Lapalacian for the RKHS kernel vector field analytically, which is much
more computationally efficient than the numerical method.
Args:
adata: AnnData object that contains the reconstructed vector field in `.uns` and the `hkey` (the hessian matrix).
basis: The embedding data in which the vector field was reconstructed. If `None`, use the vector field function
that was reconstructed directly from the original unreduced gene expression space.
Qkey: The key of the PCA loading matrix in `.uns`.
vector_field_class: If not `None`, the Hessian will be computed using this class instead of the vector field stored in adata.
method: The method that will be used for calculating Laplacian, either `'analytical'` or `'numerical'`.
`'analytical'` method uses the analytical expressions for calculating Laplacian while `'numerical'` method
uses numdifftools, a numerical differentiation tool, for computing Laplacian. `'analytical'` method is much
more efficient.
kwargs: Any additional keys that will be passed to elementwise_hessian_transformation function.
Returns:
AnnData object that is updated with the `'Laplacian'` key in the `.obs` and `obsm`. The first one is the
norm of the Laplacian for all target genes in a cell while the second one is the vector of Laplacian for all
target genes in each cell.
"""
if hkey not in adata.uns_keys():
raise Exception(
f"{hkey} is not in adata.uns_keys(). Please first run dyn.vf.hessian(adata) properly before "
f"calculating Laplacian. This can be done by calculating Hessian between any three dynamical "
f"genes which will generate the Hessian matrix."
)
else:
H = adata.uns[hkey]["hessian"]
if vector_field_class is None:
vector_field_class = get_vf_class(adata, basis=basis)
Laplacian_func = vector_field_class.get_Laplacian(method=method)
Ls = Laplacian_func(H)
L_key = "Laplacian" if basis is None else "Laplacian_" + basis
adata.obsm[L_key] = Ls.T
adata.obs[L_key] = np.linalg.norm(Ls, axis=0)
if basis == "pca":
if Qkey in adata.uns.keys():
Q = adata.uns[Qkey]
elif Qkey in adata.varm.keys():
Q = adata.varm[Qkey]
else:
raise Exception(f"No PC matrix {Qkey} found in neither .uns nor .varm.")
Ls_hi = vector_transformation(Ls.T, Q)
create_layer(
adata,
Ls_hi,
layer_key="laplacian",
genes=adata.var.use_for_pca,
)
elif basis is None:
create_layer(
adata,
Ls.T,
layer_key="laplacian",
genes=adata.var.use_for_pca,
)
[docs]def sensitivity(
adata: AnnData,
regulators: Optional[List] = None,
effectors: Optional[List] = None,
cell_idx: Optional[List] = None,
sampling: Optional[Literal['random', 'velocity', 'trn']] = None,
sample_ncells: int = 1000,
basis: str = "pca",
Qkey: str = "PCs",
vector_field_class: Optional[scVectorField.BaseVectorField] = None,
method: str = "analytical",
projection_method: str = "from_jacobian",
store_in_adata: bool = True,
**kwargs,
) -> Union[AnnData, Dict]:
"""Calculate Sensitivity matrix for each cell with the reconstructed vector field.
If the vector field was reconstructed from the reduced PCA space, the Sensitivity matrix will then be inverse
transformed back to high dimension. Note that this should also be possible for reduced UMAP space and will be
supported shortly. Note that we compute the Sensitivity for the RKHS kernel vector field analytically,
which is much more computationally efficient than the numerical method.
Args:
adata: AnnData object that contains the reconstructed vector field in `.uns`.
regulators: The list of genes that will be used as regulators when calculating the cell-wise Jacobian matrix. The
Jacobian is the matrix consisting of partial derivatives of the vector field wrt gene expressions. It can be
used to evaluate the change in velocities of effectors (see below) as the expressions of regulators
increase. The regulators are the denominators of the partial derivatives.
effectors: The list of genes that will be used as effectors when calculating the cell-wise Jacobian matrix. The
effectors are the numerators of the partial derivatives.
cell_idx: A list of cell index (or boolean flags) for which the jacobian is calculated.
If `None`, all or a subset of sampled cells are used.
sampling: {None, 'random', 'velocity', 'trn'}, (default: None)
See specific information on these methods in `.tl.sample`.
If `None`, all cells are used.
sample_ncells: The number of cells to be sampled. If `sampling` is None, this parameter is ignored.
basis: The embedding data in which the vector field was reconstructed. If `None`, use the vector field function
that was reconstructed directly from the original unreduced gene expression space.
Qkey: The key of the PCA loading matrix in `.uns`.
vector_field_class: If not `None`, the jacobian will be computed using this class instead of the vector field stored in adata.
method: The method that will be used for calculating Jacobian, either `'analytical'` or `'numerical'`.
`'analytical'` method uses the analytical expressions for calculating Jacobian while `'numerical'` method
uses numdifftools, a numerical differentiation tool, for computing Jacobian. `'analytical'` method is much
more efficient.
projection_method: The method that will be used to project back to original gene expression space for calculating gene-wise
sensitivity matrix:
(1) 'from_jacobian': first calculate jacobian matrix and then calculate sensitivity matrix. This method
will take the combined regulator + effectors gene set for calculating a square Jacobian matrix
required for the sensitivity matrix calculation.
(2) 'direct': The sensitivity matrix on low dimension will first calculated and then projected back to
original gene expression space in a way that is similar to the gene-wise jacobian calculation.
cores: Number of cores to calculate Jacobian. If cores is set to be > 1, multiprocessing will be used to
parallel the Jacobian calculation.
kwargs: Any additional keys that will be passed to elementwise_jacobian_transformation function.
Returns:
adata: AnnData object that is updated with the `'sensitivity'` key in the `.uns`. This is a 3-dimensional tensor
with dimensions n_obs x n_regulators x n_effectors.
"""
regulators, effectors = (
list(np.unique(regulators)) if regulators is not None else None,
list(np.unique(effectors)) if effectors is not None else None,
)
if vector_field_class is None:
vector_field_class = get_vf_class(adata, basis=basis)
if basis == "umap":
cell_idx = np.arange(adata.n_obs)
X, V = vector_field_class.get_data()
if cell_idx is None:
if sampling is None or sampling == "all":
cell_idx = np.arange(adata.n_obs)
else:
cell_idx = sample(np.arange(adata.n_obs), sample_ncells, sampling, X, V)
S = vector_field_class.compute_sensitivity(method=method)
if regulators is None and effectors is not None:
regulators = effectors
elif effectors is None and regulators is not None:
effectors = regulators
if regulators is not None and effectors is not None:
if type(regulators) is str:
if regulators in adata.var.keys():
regulators = adata.var.index[adata.var[regulators]]
else:
regulators = [regulators]
if type(effectors) is str:
if effectors in adata.var.keys():
effectors = adata.var.index[adata.var[effectors]]
else:
effectors = [effectors]
var_df = adata[:, adata.var.use_for_dynamics].var
regulators = var_df.index.intersection(regulators)
effectors = var_df.index.intersection(effectors)
if projection_method == "direct":
reg_idx, eff_idx = (
get_pd_row_column_idx(var_df, regulators, "row"),
get_pd_row_column_idx(var_df, effectors, "row"),
)
if len(regulators) == 0 or len(effectors) == 0:
raise ValueError(
"Either the regulator or the effector gene list provided is not in the dynamics gene list!"
)
Q = adata.uns[Qkey][:, : X.shape[1]]
if len(regulators) == 1 and len(effectors) == 1:
Sensitivity = elementwise_jacobian_transformation(
S,
Q[eff_idx, :].flatten(),
Q[reg_idx, :].flatten(),
**kwargs,
)
else:
Sensitivity = subset_jacobian_transformation(S, Q[eff_idx, :], Q[reg_idx, :], **kwargs)
elif projection_method == "from_jacobian":
Js = jacobian(
adata,
regulators=list(regulators) + list(effectors),
effectors=list(regulators) + list(effectors),
cell_idx=cell_idx,
sampling=sampling,
sample_ncells=sample_ncells,
basis=basis,
Qkey=Qkey,
vector_field_class=vector_field_class,
method=method,
store_in_adata=False,
**kwargs,
)
J, regulators, effectors = (
Js.get("jacobian_gene"),
Js.get("regulators"),
Js.get("effectors"),
)
Sensitivity = np.zeros_like(J)
n_genes, n_genes_, n_cells = J.shape
idenity = np.eye(n_genes)
for i in LoggerManager.progress_logger(
np.arange(n_cells), progress_name="Calculating sensitivity matrix with precomputed gene-wise Jacobians"
):
s = np.linalg.inv(idenity - J[:, :, i]) # np.transpose(J)
Sensitivity[:, :, i] = s.dot(np.diag(1 / np.diag(s)))
else:
raise ValueError("`projection_method` can only be `from_jacoian` or `direct`!")
else:
Sensitivity = None
ret_dict = {"sensitivity": S, "cell_idx": cell_idx}
# use 'str_key' in dict.keys() to check if these items are computed, or use dict.get('str_key')
if Sensitivity is not None:
ret_dict["sensitivity_gene"] = Sensitivity
if regulators is not None:
ret_dict["regulators"] = regulators if type(regulators) == list else regulators.to_list()
if effectors is not None:
ret_dict["effectors"] = effectors if type(effectors) == list else effectors.to_list()
S_det = [np.linalg.det(S[:, :, i]) for i in np.arange(S.shape[2])]
adata.obs["sensitivity_det_" + basis] = np.nan
adata.obs["sensitivity_det_" + basis][cell_idx] = S_det
if store_in_adata:
skey = "sensitivity" if basis is None else "sensitivity_" + basis
adata.uns[skey] = ret_dict
return adata
else:
return ret_dict
[docs]def acceleration(
adata: AnnData,
basis: str = "umap",
vector_field_class: Optional[scVectorField.BaseVectorField] = None,
Qkey: str = "PCs",
method: str = "analytical",
**kwargs,
):
"""Calculate acceleration for each cell with the reconstructed vector field function. AnnData object is updated with the `'acceleration'` key in the `.obs` as well as .obsm. If basis is `pca`, acceleration matrix will be inverse transformed back to original high dimension space.
Args:
adata: AnnData object that contains the reconstructed vector field function in the `uns` attribute.
basis: The embedding data in which the vector field was reconstructed.
vector_field_class: If not None, the divergene will be computed using this class instead of the vector field stored in adata.
Qkey: The key of the PCA loading matrix in `.uns`.
method: The method that will be used for calculating acceleration field, either `'analytical'` or `'numerical'`.
`'analytical'` method uses the analytical expressions for calculating acceleration field while `'numerical'`
method uses numdifftools, a numerical differentiation tool, for computing acceleration. `'analytical'`
method is much more efficient.
kwargs: Any additional keys that will be passed to vector_field_class.compute_acceleration function.
"""
if vector_field_class is None:
vector_field_class = get_vf_class(adata, basis=basis)
X, V = vector_field_class.get_data()
acce_norm, acce = vector_field_class.compute_acceleration(X=X, method=method, **kwargs)
acce_key = "acceleration" if basis is None else "acceleration_" + basis
adata.obsm[acce_key] = acce
adata.obs[acce_key] = acce_norm
if basis == "pca":
if Qkey in adata.uns.keys():
Q = adata.uns[Qkey]
elif Qkey in adata.varm.keys():
Q = adata.varm[Qkey]
else:
raise Exception(f"No PC matrix {Qkey} found in neither .uns nor .varm.")
acce_hi = vector_transformation(acce, Q)
create_layer(
adata,
acce_hi,
layer_key="acceleration",
genes=adata.var.use_for_pca,
)
elif basis is None:
create_layer(
adata,
acce,
layer_key="acceleration",
genes=adata.var.use_for_pca,
)
[docs]def curvature(
adata: AnnData,
basis: str = "pca",
vector_field_class: Optional[scVectorField.BaseVectorField] = None,
formula: int = 2,
Qkey: str = "PCs",
method: str = "analytical",
**kwargs,
):
"""Calculate curvature for each cell with the reconstructed vector field function. AnnData object that is updated with the `curvature` key in the `.obs`.
Args:
adata: AnnData object that contains the reconstructed vector field function in the `uns` attribute.
basis: The embedding data in which the vector field was reconstructed.
vector_field_class: If not None, the divergene will be computed using this class instead of the vector field stored in adata.
formula: Which formula of curvature will be used, there are two formulas, so formula can be either `{1, 2}`. By
default it is 2 and returns both the curvature vectors and the norm of the curvature. The formula one only
gives the norm of the curvature.
Qkey: The key of the PCA loading matrix in `.uns`.
method: The method that will be used for calculating curvature field, either `'analytical'` or `'numerical'`.
`'analytical'` method uses the analytical expressions for calculating curvature while `'numerical'` method
uses numdifftools, a numerical differentiation tool, for computing curvature. `'analytical'` method is much
more efficient.
kwargs: Any additional keys that will be passed to vector_field_class.compute_curvature function.
"""
if vector_field_class is None:
vector_field_class = get_vf_class(adata, basis=basis)
if formula not in [1, 2]:
raise ValueError(
f"There are only two available formulas (formula can be either `{1, 2}`) to calculate "
f"curvature, but your formula argument is {formula}."
)
X, V = vector_field_class.get_data()
curv, curv_mat = vector_field_class.compute_curvature(X=X, formula=formula, method=method, **kwargs)
curv_key = "curvature" if basis is None else "curvature_" + basis
main_info_insert_adata(curv_key, adata_attr="obs", indent_level=1)
adata.obs[curv_key] = curv
main_info_insert_adata(curv_key, adata_attr="obsm", indent_level=1)
adata.obsm[curv_key] = curv_mat
if basis == "pca":
curv_hi = vector_transformation(curv_mat, adata.uns[Qkey])
create_layer(adata, curv_hi, layer_key="curvature", genes=adata.var.use_for_pca)
elif basis is None:
create_layer(
adata,
curv_mat,
layer_key="curvature",
genes=adata.var.use_for_pca,
)
[docs]def torsion(
adata: AnnData, basis: str = "umap", vector_field_class: Optional[scVectorField.BaseVectorField] = None, **kwargs
):
"""Calculate torsion for each cell with the reconstructed vector field function. AnnData object that is updated with the `torsion` key in the .obs.
Args:
adata: :class:`~anndata.AnnData`
AnnData object that contains the reconstructed vector field function in the `uns` attribute.
basis: str or None (default: `umap`)
The embedding data in which the vector field was reconstructed.
vector_field_class: dict
The true ODE function, useful when the data is generated through simulation.
kwargs:
Any additional keys that will be passed to vector_field_class.compute_torsion function.
Examples
--------
>>> adata = dyn.sample_data.hematopoiesis()
>>> dyn.tl.reduceDimension(adata, n_components=3, enforce=True, embedding_key='X_umap_3d')
>>> adata
>>> dyn.tl.cell_velocities(adata,
>>> X=adata.layers["M_t"],
>>> V=adata.layers["velocity_alpha_minus_gamma_s"],
>>> basis='umap_3d',
>>> )
>>> dyn.vf.VectorField(adata, basis='umap_3d')
>>> dyn.vf.torsion(adata, basis='umap_3d')
>>> dyn.pl.streamline_plot(adata, color='torsion_umap_3d', basis='umap_3d')
>>> dyn.pl.streamline_plot(adata, color='torsion_umap_3d')
"""
if vector_field_class is None:
vector_field_class = get_vf_class(adata, basis=basis)
X, V = vector_field_class.get_data()
torsion_mat = vector_field_class.compute_torsion(X=X, **kwargs)
torsion = np.array([np.linalg.norm(i) for i in torsion_mat])
torsion_key = "torsion" if basis is None else "torsion_" + basis
adata.obs[torsion_key] = torsion
adata.uns[torsion_key] = torsion_mat
[docs]def curl(
adata: AnnData,
basis: str = "umap",
vector_field_class: Optional[scVectorField.BaseVectorField] = None,
method: str = "analytical",
**kwargs,
):
"""Calculate Curl for each cell with the reconstructed vector field function. AnnData object is updated with the `'curl'` information in the `.
obs`. When vector field has three dimension, adata.obs['curl'] (magnitude of curl) and adata.obsm['curl'] (curl vector) will be added; when vector field has two dimension, only adata.obs['curl'] (magnitude of curl) will be provided.
Args:
adata: AnnData object that contains the reconstructed vector field function in the `uns` attribute.
basis: The embedding data in which the vector field was reconstructed.
vector_field_class: If not None, the divergene will be computed using this class instead of the vector field stored in adata.
method: The method that will be used for calculating curl, either `analytical` or `numeric`. `analytical`
method will use the analytical form of the reconstructed vector field for calculating curl while
`numeric` method will use numdifftools for calculation. `analytical` method is much more efficient.
kwargs: Any additional keys that will be passed to vector_field_class.compute_curl function.
"""
if vector_field_class is None:
vector_field_class = get_vf_class(adata, basis=basis)
X, V = vector_field_class.get_data()
curl = vector_field_class.compute_curl(X=X, method=method, **kwargs)
curl_key = "curl" if basis is None else "curl_" + basis
if X.shape[1] == 3:
curl_mag = np.array([np.linalg.norm(i) for i in curl])
adata.obs[curl_key] = curl_mag
adata.obsm[curl_key] = curl
else:
adata.obs[curl_key] = curl
[docs]def divergence(
adata: AnnData,
cell_idx: Optional[List] = None,
sampling: Optional[Literal['random', 'velocity', 'trn']] = None,
sample_ncells: int = 1000,
basis: str = "pca",
vector_field_class=None,
method: str = "analytical",
store_in_adata: bool = True,
**kwargs,
) -> Optional[np.ndarray]:
"""Calculate divergence for each cell with the reconstructed vector field function. Either AnnData object is updated with the `'divergence'` key in the `.obs` or the divergence is returned as a numpy array.
Args:
adata: AnnData object that contains the reconstructed vector field function in the `uns` attribute.
cell_idx: A list of cell index (or boolean flags) for which the jacobian is calculated.
sampling: {None, 'random', 'velocity', 'trn'}, (default: None)
See specific information on these methods in `.tl.sample`.
If `None`, all cells are used.
sample_ncells: The number of cells to be sampled. If `sampling` is None, this parameter is ignored.
basis: The embedding data in which the vector field was reconstructed.
vector_field_class: If not None, the divergene will be computed using this class instead of the vector field stored in adata.
method: The method that will be used for calculating divergence, either `analytical` or `numeric`. `analytical`
method will use the analytical form of the reconstructed vector field for calculating divergence while
`numeric` method will use numdifftools for calculation. `analytical` method is much more efficient.
store_in_adata: Whether to store the divergence result in adata.
kwargs: Any additional keys that will be passed to vector_field_class.compute_divergence function.
Returns:
the divergence is returned as an np.ndarray if store_in_adata is False.
"""
if vector_field_class is None:
vector_field_class = get_vf_class(adata, basis=basis)
if basis == "umap":
cell_idx = np.arange(adata.n_obs)
X, V = vector_field_class.get_data()
if cell_idx is None:
if sampling is None or sampling == "all":
cell_idx = np.arange(adata.n_obs)
else:
cell_idx = sample(np.arange(adata.n_obs), sample_ncells, sampling, X, V)
jkey = "jacobian" if basis is None else "jacobian_" + basis
div = np.zeros(len(cell_idx))
calculated = np.zeros(len(cell_idx), dtype=bool)
if jkey in adata.uns_keys():
Js = adata.uns[jkey]["jacobian"]
cidx = adata.uns[jkey]["cell_idx"]
for i, c in enumerate(
LoggerManager.progress_logger(cell_idx, progress_name="Calculating divergence with precomputed Jacobians")
):
if c in cidx:
calculated[i] = True
div[i] = np.trace(Js[:, :, i]) if Js.shape[2] == len(cell_idx) else np.trace(Js[:, :, c])
div[~calculated] = vector_field_class.compute_divergence(X[cell_idx[~calculated]], method=method, **kwargs)
if store_in_adata:
div_key = "divergence" if basis is None else "divergence_" + basis
Div = np.array(adata.obs[div_key]) if div_key in adata.obs.keys() else np.ones(adata.n_obs) * np.nan
Div[cell_idx] = div
adata.obs[div_key] = Div
else:
return div
```