Source code for dynamo.external.hodge

# YEAR: 2019
# COPYRIGHT HOLDER: ddhodge

# Code adapted from https://github.com/kazumits/ddhodge.

from typing import Union

import numpy as np
from anndata import AnnData
from scipy.sparse import csr_matrix, issparse

from ..dynamo_logger import main_finish_progress, main_info, main_log_time

# from ..vectorfield.scVectorField import graphize_vecfld
from ..tools.graph_calculus import divergence, graphize_velocity, potential
from ..tools.sampling import sample_by_velocity, trn
from ..vectorfield.utils import vecfld_from_adata, vector_field_function

"""from ..tools.graph_operators import (
    build_graph,
    div,
    potential,
)"""
from ..tools.connectivity import check_and_recompute_neighbors, generate_neighbor_keys


[docs]def ddhodge( adata: AnnData, X_data: Union[np.ndarray, None] = None, layer: Union[str, None] = None, basis: str = "pca", n: int = 30, VecFld: Union[dict, None] = None, adjmethod: str = "graphize_vecfld", distance_free: bool = False, n_downsamples: int = 5000, up_sampling: bool = True, sampling_method: str = "velocity", seed: int = 19491001, enforce: bool = False, cores: int = 1, **kwargs, ) -> None: """Modeling Latent Flow Structure using Hodge Decomposition based on the creation of sparse diffusion graph from the reconstructed vector field function. This method is relevant to the curl-free/divergence-free vector field reconstruction. Args: adata: An Anndata object. X_data: The user supplied expression (embedding) data that will be used for graph hodege decomposition directly. layer: Which layer of the data will be used for graph Hodge decomposition. basis: Which basis of the data will be used for graph Hodge decomposition. n: Number of nearest neighbors when the nearest neighbor graph is not included. VecFld: The reconstructed vector field function. adjmethod: The method to build the ajacency matrix that will be used to create the sparse diffusion graph, can be either "naive" or "graphize_vecfld". If "naive" used, the transition_matrix that created during vector field projection will be used; if "graphize_vecfld" used, a method that guarantees the preservance of divergence will be used. n_downsamples: Number of cells to downsample to if the cell number is large than this value. Three downsampling methods are available, see `sampling_method`. up_sampling: Whether to assign calculated potential, curl and divergence to cells not sampled based on values from their nearest sampled cells. sampling_method: Methods to downsample datasets to facilitate calculation. Can be one of {`random`, `velocity`, `trn`}, each corresponds to random sampling, velocity magnitude based and topology representing network based sampling. seed: Seed for RandomState. Must be convertible to 32 bit unsigned integers. Used in sampling control points. Default is to be 0 for ensure consistency between different runs. enforce: Whether to enforce the calculation of adjacency matrix for estimating potential, curl, divergence for each cell. cores: Number of cores to run the graphize_vecfld function. If cores is set to be > 1, multiprocessing will be used to parallel the graphize_vecfld calculation. Returns: `AnnData` object wil be updated with the `ddhodge` key in the `obsp` attribute which to adjacency matrix that corresponds to the sparse diffusion graph. Two columns `potential` and `divergence` corresponds to the potential and divergence for each cell will also be added. """ main_log_time() prefix = "" if basis is None else basis + "_" to_downsample = adata.n_obs > n_downsamples if VecFld is None: VecFld, func = vecfld_from_adata(adata, basis) else: def func(x): return vector_field_function(x, VecFld) if X_data is None: X_data_full = VecFld["X"].copy() else: if X_data.shape[0] != adata.n_obs: raise ValueError(f"The X_data you provided doesn't correspond to exactly {adata.n_obs} cells") X_data_full = X_data.copy() if to_downsample: if sampling_method == "trn": cell_idx = trn(X_data_full, n_downsamples) elif sampling_method == "velocity": np.random.seed(seed) cell_idx = sample_by_velocity(func(X_data_full), n_downsamples) elif sampling_method == "random": np.random.seed(seed) cell_idx = np.random.choice(np.arange(adata.n_obs), n_downsamples) else: raise ImportError( f"sampling method {sampling_method} is not available. Only `random`, `velocity`, `trn` are" f"available." ) else: cell_idx = np.arange(adata.n_obs) X_data = X_data_full[cell_idx, :] adata_ = adata[cell_idx].copy() if prefix + "ddhodge" in adata_.obsp.keys() and not enforce and not to_downsample: main_info("fetch computation results from adata.obsp[%s]..." % (prefix + "ddhodge")) adj_mat = adata_.obsp[prefix + "ddhodge"] else: if adjmethod == "graphize_vecfld": main_info("graphizing vectorfield...") V_data = func(X_data) neighbor_result_prefix = "" if layer is None else layer conn_key, dist_key, neighbor_key = generate_neighbor_keys(neighbor_result_prefix) if neighbor_key not in adata_.uns_keys() or to_downsample: existing_nbrs_idx = None else: check_and_recompute_neighbors(adata, result_prefix=neighbor_result_prefix) neighbors = adata_.obsp[conn_key] existing_nbrs_idx = neighbors.tolil().rows adj_mat, nbrs_idx, dists, nbrs = graphize_velocity( V_data, X_data, nbrs_idx=existing_nbrs_idx, k=n, return_nbrs=True, ) """adj_mat, nbrs = graphize_vecfld( func, X_data, nbrs_idx=existing_nbrs_idx, k=n, distance_free=distance_free, n_int_steps=20, cores=cores, )""" elif adjmethod == "naive": main_info( 'method=naive, get adj_mat from transition matrix in adata directly (adata.uns["transition_matrix"' ) if "transition_matrix" not in adata_.uns.keys(): raise Exception( "Your adata doesn't have transition matrix created. You need to first " "run dyn.tl.cell_velocity(adata) to get the transition before running" " this function." ) adj_mat = adata_.uns["transition_matrix"][cell_idx, cell_idx] else: raise ValueError(f"adjmethod can be only one of {'naive', 'graphize_vecfld'}") # TODO transform the type of adj_mat here so that we can maintain one set of API (either sparse or numpy) # if not issparse(adj_mat): # main_info("adj_mat:%s is not sparse, transforming it to a sparse matrix..." %(str(type(adj_mat)))) # adj_mat = csr_matrix(adj_mat) # TODO temp fix; refactor to make adj_mat sparse and adjust all the function call if issparse(adj_mat): adj_mat = adj_mat.toarray() # if not all cells are used in the graphize_vecfld function, set diagnoal to be 1 if len(np.unique(np.hstack(adj_mat.nonzero()))) != adata.n_obs: main_info("not all cells are used, set diag to 1...", indent_level=2) # temporary fix for github issue #263 # https://github.com/aristoteleo/dynamo-release/issues/263 # support numpy and sparse matrices here if issparse(adj_mat): adj_mat.setdiag(1) else: np.fill_diagonal(adj_mat, 1) # g = build_graph(adj_mat) # TODO the following line does not work on sparse matrix. A = np.abs(np.sign(adj_mat)) if (prefix + "ddhodge" not in adata.obsp.keys() or enforce) and not to_downsample: adata.obsp[prefix + "ddhodge"] = adj_mat # ddhodge_div = div(g) # potential_ = potential(g, -ddhodge_div) ddhodge_div = divergence(adj_mat, W=A) potential_ = potential(adj_mat, W=A, div=ddhodge_div, **kwargs) if up_sampling and to_downsample: main_info("Constructing W matrix according upsampling=True and downsampling=True options...", indent_level=2) query_idx = np.array(list(set(np.arange(adata.n_obs)).difference(cell_idx))) query_data = X_data_full[query_idx, :] # construct nbrs of query points based on two types of nbrs: from NNDescent (pynndescent) or NearestNeighbors if hasattr(nbrs, "kneighbors"): dist, query_nbrs_idx = nbrs.kneighbors(query_data) elif hasattr(nbrs, "query"): query_nbrs_idx, dist = nbrs.query(query_data, k=nbrs.n_neighbors) k = query_nbrs_idx.shape[1] query_W_row = np.repeat(np.arange(len(query_idx)), k) query_W_col = query_nbrs_idx.flatten() W = csr_matrix( (np.repeat(1 / k, len(query_W_row)), (query_W_row, query_W_col)), shape=(len(query_idx), len(cell_idx)), ) query_data_div, query_data_potential = ( W.dot(ddhodge_div), W.dot(potential_), ) (adata.obs[prefix + "ddhodge_sampled"], adata.obs[prefix + "ddhodge_div"], adata.obs[prefix + "potential"],) = ( False, 0, 0, ) adata.obs.loc[adata.obs_names[cell_idx], prefix + "ddhodge_sampled"] = True adata.obs.loc[adata.obs_names[cell_idx], prefix + "ddhodge_div"] = ddhodge_div adata.obs.loc[adata.obs_names[cell_idx], prefix + "ddhodge_potential"] = potential_ adata.obs.loc[adata.obs_names[query_idx], prefix + "ddhodge_div"] = query_data_div adata.obs.loc[adata.obs_names[query_idx], prefix + "ddhodge_potential"] = query_data_potential else: adata.obs[prefix + "ddhodge_div"] = ddhodge_div adata.obs[prefix + "ddhodge_potential"] = potential_ main_finish_progress("ddhodge completed")