from typing import Union
import numpy as np
import pandas as pd
from anndata import AnnData
from scipy.sparse import csr_matrix
from scipy.stats import mode
from sklearn.neighbors import NearestNeighbors
from ..dynamo_logger import main_info
from ..preprocessing.utils import pca_monocle
from ..tools.clustering import hdbscan, infomap, leiden, louvain
from ..tools.Markov import (
grid_velocity_filter,
prepare_velocity_grid_data,
velocity_on_grid,
)
from ..utils import LoggerManager, copy_adata
from .scVectorField import SvcVectorField
from .utils import vecfld_from_adata
[docs]def cluster_field(
adata,
basis="pca",
features=["speed", "potential", "divergence", "acceleration", "curvature", "curl"],
add_embedding_basis=True,
embedding_basis=None,
normalize=False,
method="leiden",
cores=1,
copy=False,
**kwargs,
):
"""Cluster cells based on vector field features.
We would like to see whether the vector field can be used to better define cell state/types. This can be accessed
via characterizing critical points (attractor/saddle/repressor, etc.) and characteristic curves (nullcline,
separatrix). However, the calculation of those is not easy, for example, a strict definition of an attractor is
states where velocity is 0 and the eigenvalue of the jacobian matrix at that point is all negative. Under this
strict definition, we may sometimes find the attractors are very far away from our sampled cell states which makes
them less meaningful although this can be largely avoided when we decide to remove the density correction during the
velocity projection. This is not unexpected as the vector field we learned is defined via a set of basis functions
based on gaussian kernels and thus it is hard to satisfy that strict definition.
Fortunately, we can handle this better with the help of a different set of ideas. Instead of using critical points
by the classical dynamic system methods, we can use some machine learning approaches that are based on extracting
geometric features of streamline to "cluster vector field space" for define cell states/type. This requires
calculating, potential (ordered pseudotime), speed, curliness, divergence, acceleration, curvature, etc. Thanks to
the fact that we can analytically calculate Jacobian matrix matrix, those quantities of the vector field function
can be conveniently and efficiently calculated.
Parameters
----------
adata: :class:`~anndata.AnnData`.
adata object that includes both newly synthesized and total gene expression of cells. Alternatively,
the object should include both unspliced and spliced gene expression of cells.
basis: `str` or None (default: `None`)
The space that will be used for calculating vector field features. Valid names includes, for example, `pca`,
`umap`, etc.
embedding_basis: `str` or None (default: `None`)
The embedding basis that will be combined with the vector field feature space for clustering.
normalize: `bool` (default: `False`)
Whether to mean center and scale the feature across all cells.
method: `str` (default: `leiden`)
The method that will be used for clustering, one of `{'kmeans'', 'hdbscan', 'louvain', 'leiden'}`. If `louvain`
or `leiden` used, you need to have `cdlib` installed.
cores: `int` (default: 1)
The number of parallel jobs to run for neighbors search. ``None`` means 1 unless in a
:obj:`joblib.parallel_backend` context.
``-1`` means using all processors.
copy:
Whether to return a new deep copy of `adata` instead of updating `adata` object passed in arguments.
kwargs:
Any additional arguments that will be passed to either kmeans, hdbscan, louvain or leiden clustering algorithms.
Returns
-------
"""
logger = LoggerManager.gen_logger("dynamo-cluster_field")
logger.log_time()
adata = copy_adata(adata) if copy else adata
if method in ["louvain", "leiden"]:
try:
from cdlib import algorithms
"leiden" in dir(algorithms)
except ImportError:
raise ImportError(
"You need to install the excellent package `cdlib` if you want to use louvain or leiden "
"for clustering."
)
features = list(
set(features).intersection(["speed", "potential", "divergence", "acceleration", "curvature", "curl"])
)
if len(features) < 1:
raise ValueError(
"features has to be selected from ['speed', 'potential', 'divergence', 'acceleration', "
f"'curvature', 'curl']. your feature is {features}"
)
feature_key = [
"speed_" + basis,
basis + "_ddhodge_potential",
"divergence_" + basis,
"acceleration_" + basis,
"curvature_" + basis,
"curl_" + basis,
]
feature_list = [i + "_" + basis if i != "potential" else basis + "_ddhodge_" + i for i in features]
if feature_key[0] not in adata.obs.keys() and feature_key[0] in feature_list:
from ..vectorfield import speed
speed(adata, basis=basis)
if feature_key[1] not in adata.obs.keys() and feature_key[1] in feature_list:
from ..ext import ddhodge
ddhodge(adata, basis=basis)
if feature_key[2] not in adata.obs.keys() and feature_key[2] in feature_list:
from ..vectorfield import divergence
divergence(adata, basis=basis)
if feature_key[3] not in adata.obs.keys() and feature_key[3] in feature_list:
from ..vectorfield import acceleration
acceleration(adata, basis=basis)
if feature_key[4] not in adata.obs.keys() and feature_key[4] in feature_list:
from ..vectorfield import curvature
curvature(adata, basis=basis)
if feature_key[5] not in adata.obs.keys() and feature_key[5] in feature_list:
from ..vectorfield import curl
curl(adata, basis=basis)
feature_data = adata.obs.loc[:, feature_list].values
if embedding_basis is None:
embedding_basis = basis
if add_embedding_basis:
X = np.hstack((feature_data, adata.obsm["X_" + embedding_basis]))
else:
X = feature_data
if normalize:
# X = (X - X.min(0)) / X.ptp(0)
X = (X - X.mean(0)) / X.std(0)
if method in ["hdbscan", "kmeans"]:
if method == "hdbscan":
key = "field_hdbscan"
hdbscan(adata, X_data=X, result_key=key, **kwargs)
elif method == "kmeans":
from sklearn.cluster import KMeans
key = "field_kmeans"
kmeans = KMeans(random_state=0, **kwargs).fit(X)
adata.obs[key] = kmeans.labels_.astype("str")
# clusters need to be categorical variables
adata.obs[key] = adata.obs.obs[key].astype("category")
elif method in ["louvain", "leiden", "infomap"]:
if X.shape[0] > 200000 and X.shape[1] > 2:
from pynndescent import NNDescent
nbrs = NNDescent(
X,
metric="euclidean",
n_neighbors=31,
n_jobs=cores,
random_state=19491001,
)
nbrs_idx, dist = nbrs.query(X, k=31)
else:
nbrs = NearestNeighbors(n_neighbors=31, n_jobs=cores).fit(X)
dist, nbrs_idx = nbrs.kneighbors(X)
row = np.repeat(nbrs_idx[:, 0], 30)
col = nbrs_idx[:, 1:].flatten()
graph = csr_matrix(
(np.repeat(1, len(col)), (row, col)),
shape=(adata.n_obs, adata.n_obs),
)
adata.obsp["vf_feature_knn"] = graph
if method == "leiden":
leiden(
adata,
adj_matrix_key="vf_feature_knn",
result_key="field_leiden",
)
elif method == "louvain":
louvain(
adata,
adj_matrix_key="vf_feature_knn",
result_key="field_louvain",
)
elif method == "infomap":
infomap(
adata,
adj_matrix_key="vf_feature_knn",
result_key="field_infomap",
)
logger.finish_progress(progress_name="clustering_field")
if copy:
return adata
return None
[docs]def streamline_clusters(
adata: AnnData,
basis: str = "umap",
features: list = ["speed", "divergence", "acceleration", "curvature", "curl"],
method: str = "sparsevfc",
xy_grid_nums: list = [50, 50],
density: float = 5,
curvature_method: int = 1,
feature_bins: int = 10,
clustering_method: str = "leiden",
assign_fixedpoints: bool = False,
reversed_fixedpoints: bool = False,
**kwargs,
):
"""
Parameters
----------
adata
basis
features
method
xy_grid_nums
density
curvature_method
feature_bins
clustering_method
Returns
-------
"""
import matplotlib.pyplot as plt
if method in ["louvain", "leiden"]:
try:
from cdlib import algorithms
"leiden" in dir(algorithms)
except ImportError:
raise ImportError(
"You need to install the excellent package `cdlib` if you want to use louvain or leiden "
"for clustering."
)
vf_dict, func = vecfld_from_adata(adata, basis=basis)
grid_kwargs_dict = {
"density": None,
"smooth": None,
"n_neighbors": None,
"min_mass": None,
"autoscale": False,
"adjust_for_stream": True,
"V_threshold": None,
}
if method.lower() == "sparsevfc":
X, V = adata.obsm["X_" + basis], adata.obsm["velocity_" + basis]
X_grid, p_mass, neighs, weight = prepare_velocity_grid_data(
X,
xy_grid_nums,
density=grid_kwargs_dict["density"],
smooth=grid_kwargs_dict["smooth"],
n_neighbors=grid_kwargs_dict["n_neighbors"],
)
for i in ["density", "smooth", "n_neighbors"]:
grid_kwargs_dict.pop(i)
V_emb = func(X)
V_grid = (V_emb[neighs] * weight[:, :, None]).sum(1) / np.maximum(1, p_mass)[:, None]
X_grid, V_grid = grid_velocity_filter(
V_emb=V,
neighs=neighs,
p_mass=p_mass,
X_grid=X_grid,
V_grid=V_grid,
**grid_kwargs_dict,
)
elif method.lower() == "gaussian":
X_grid, V_grid, D = velocity_on_grid(
vf_dict["X"],
vf_dict["Y"],
xy_grid_nums,
cut_off_velocity=True,
**grid_kwargs_dict,
)
else:
raise ValueError(f"only `sparsevfc` and `gaussian` method supported")
strm = plt.streamplot(
X_grid[0],
X_grid[1],
V_grid[0],
V_grid[1],
density=density,
)
strm_res = strm.lines.get_segments() # get streamline segements
# split segments into different streamlines
line_list_ori = {}
line_ind = 0
for i, seg in enumerate(strm_res):
if i == 0:
line_list_ori[0] = [seg]
else:
# the second point from the previous segment should be the same from the first point in the current segment
if all(strm_res[i - 1][1] == seg[0]):
line_list_ori[line_ind].append(seg)
else:
line_ind += 1
line_list_ori[line_ind] = [seg]
line_list = line_list_ori.copy()
# convert to list of numpy arrays.
for key, values in line_list_ori.items():
line_list_ori[key] = np.array(values).reshape((-1, 2))
# remove duplicated rows from the numpy arrays.
for key, values in line_list.items():
line_list[key] = np.unique(np.array(values).reshape((-1, 2)), axis=0)
vector_field_class = SvcVectorField()
vector_field_class.from_adata(adata, basis=basis)
has_acc = True if "acceleration" in features else False
has_curv = True if "curvature" in features else False
has_div = True if "divergence" in features else False
has_speed = True if "speed" in features else False
has_curl = True if "curl" in features else False
if has_acc:
acc_dict = {}
if has_curv:
cur_1_dict = {}
cur_2_dict = {}
if has_div:
div_dict = {}
if has_speed:
speed_dict = {}
if has_curl:
curl_dict = {}
# save features along the streameline and create histogram for each feature
bins = feature_bins # number of feature bins
line_len = []
feature_df = np.zeros((len(line_list), len(features) * bins))
for key, values in line_list.items():
line_len.append(values.shape[0])
tmp = None
if has_acc:
acceleration_val, acceleration_vec = vector_field_class.compute_acceleration(values)
acc_dict[key] = acceleration_val
_, acc_hist = np.histogram(acceleration_val, bins=(bins - 1), density=True)
if tmp is None:
tmp = acc_hist
if has_curv:
curvature_val_1 = vector_field_class.compute_curvature(values, formula=1)[0]
cur_1_dict[key] = curvature_val_1
curvature_val_2, curvature_vec = vector_field_class.compute_curvature(values)
cur_2_dict[key] = curvature_val_2
_, cur_1_hist = np.histogram(curvature_val_1, bins=(bins - 1), density=True)
_, cur_2_hist = np.histogram(curvature_val_2, bins=(bins - 1), density=True)
if tmp is None:
tmp = cur_1_hist if curvature_method == 1 else cur_2_hist
else:
tmp = np.hstack((tmp, cur_1_hist if curvature_method == 1 else cur_2_hist))
if has_div:
divergence_val = vector_field_class.compute_divergence(values)
div_dict[key] = divergence_val
_, div_hist = np.histogram(divergence_val, bins=(bins - 1), density=True)
if tmp is None:
tmp = div_hist
else:
tmp = np.hstack((tmp, div_hist))
if has_speed:
speed_vec = vector_field_class.func(values)
speed_val = np.linalg.norm(speed_vec)
speed_dict[key] = speed_val
_, speed_hist = np.histogram(speed_val, bins=(bins - 1), density=True)
if tmp is None:
tmp = speed_hist
else:
tmp = np.hstack((tmp, speed_hist))
if has_curl:
curl_val = vector_field_class.compute_curl(values)
curl_dict[key] = curl_val
_, curl_hist = np.histogram(curl_val, bins=(bins - 1), density=True)
if tmp is None:
tmp = curl_hist
else:
tmp = np.hstack((tmp, curl_hist))
feature_df[key, :] = tmp
# clustering
feature_adata = AnnData(feature_df)
pca_monocle(feature_adata, X_data=feature_df, pca_key="X_pca")
if clustering_method == "louvain":
louvain(feature_adata, obsm_key="X_pca")
elif clustering_method == "leiden":
leiden(feature_adata, obsm_key="X_pca")
elif clustering_method == "infomap":
infomap(feature_adata, obsm_key="X_pca")
elif method in ["hdbscan", "kmeans"]:
key = "field_hdbscan"
hdbscan(feature_adata, X_data=feature_df, result_key=key, **kwargs)
elif method == "kmeans":
from sklearn.cluster import KMeans
key = "field_kmeans"
kmeans = KMeans(random_state=0, **kwargs).fit(X)
feature_adata.obs[key] = kmeans.labels_.astype("str")
# clusters need to be categorical variables
feature_adata.obs[key] = adata.obs.obs[key].astype("category")
else:
raise ValueError(
"only louvain, leiden, infomap, hdbscan and kmeans clustering supported but your requested "
f"method is {method}"
)
if assign_fixedpoints or reversed_fixedpoints:
tmp = np.array(strm.lines.get_segments()).reshape((-1, 2))
vector_field_class.data["X"] = np.unique(tmp, axis=0)
if assign_fixedpoints:
(
X,
valid_fps_type_assignment,
assignment_id,
) = vector_field_class.assign_fixed_points(cores=1)
feature_adata.obs["fixed_point"] = -1
if reversed_fixedpoints:
# reverse vector field to identify source:
vector_field_class.func = lambda x: -vector_field_class.func(x)
(
X_rev,
valid_fps_type_assignment_rev,
assignment_id_rev,
) = vector_field_class.assign_fixed_points(cores=1)
feature_adata.obs["rev_fixed_point"] = -1
data_X = vector_field_class.data["X"]
for key, values in line_list.items():
indices = [np.where(np.logical_and(data_X[:, 0] == val[0], data_X[:, 1] == val[1]))[0][0] for val in values]
# assign fixed point to the most frequent point
if assign_fixedpoints:
mode_val = mode(assignment_id[indices])[0][0]
if not np.isnan(mode_val):
feature_adata.obs.loc[str(key), "fixed_point"] = mode_val
if reversed_fixedpoints:
mode_val = mode(assignment_id_rev[indices])[0][0]
if not np.isnan(mode_val):
feature_adata.obs.loc[str(key), "rev_fixed_point"] = mode_val
adata.uns["streamline_clusters_" + basis] = {
"feature_df": feature_df,
"segments": line_list_ori,
"X_pca": feature_adata.obsm["X_pca"],
"clustering_method": clustering_method,
"distances": feature_adata.obsp["X_pca_distances"],
"connectivities": feature_adata.obsp["X_pca_connectivities"],
"clusters": feature_adata.obs[clustering_method].values,
}
if assign_fixedpoints:
adata.uns["streamline_clusters_" + basis]["fixed_point"] = feature_adata.obs["fixed_point"]
if reversed_fixedpoints:
adata.uns["streamline_clusters_" + basis]["rev_fixed_point"] = feature_adata.obs["rev_fixed_point"]