Source code for dynamo.preprocessing.transform

import warnings
from typing import Union

try:
    from typing import Literal
except ImportError:
    from typing_extensions import Literal

import anndata
import numpy as np
from anndata import AnnData
from scipy.sparse.base import issparse

from ..configuration import DKM
from ..dynamo_logger import main_debug, main_info_insert_adata_uns
from ..utils import copy_adata
from .utils import is_integer_arr


def _Freeman_Tukey(X: np.ndarray, inverse=False) -> np.ndarray:
    """perform Freeman-Tukey transform or inverse transform on the given array.

    Args:
        X: a matrix.
        inverse: whether to perform inverse Freeman-Tukey transform. Defaults to False.

    Returns:
        The transformed array.
    """

    if inverse:
        res = (X**2 - 1) ** 2 / (4 * X**2)
    else:
        res = np.sqrt(X) + np.sqrt((X + 1))

    return res


def _log_inplace(data: np.ndarray) -> np.ndarray:
    """Calculate the natural logarithm `log(exp(x)) = x` of an array and update the array inplace.

    Args:
        data: the array for calculation.

    Returns:
        The updated array.
    """

    return np.log(data + 1, out=data)


def _log1p_inplace(data: np.ndarray) -> np.ndarray:
    """Calculate log1p (log(1+x)) of an array and update the array inplace.

    Args:
        data: the array for calculation.

    Returns:
        The updated array.
    """

    return np.log1p(data, out=data)


def _log2_inplace(data: np.ndarray) -> np.ndarray:
    """Calculate Base-2 logarithm of `x` of an array and update the array inplace.

    Args:
        data: the array for calculation.

    Returns:
        The updated array.
    """

    return np.log2(data + 1, out=data)


def Freeman_Tukey_inplace(adata: AnnData, layer: str = DKM.X_LAYER) -> None:
    """Calculate Freeman-Tukey transform for a layer of an AnnData object inplace.

    Args:
        adata: an AnnData object.
        layer: the layer to operate on. Defaults to DKM.X_LAYER.
    """
    mat = DKM.select_layer_data(adata, layer, copy=False)
    if issparse(mat):
        if is_integer_arr(mat.data):
            mat = mat.asfptype()
            DKM.set_layer_data(adata, layer, mat)
        mat.data = _Freeman_Tukey(mat.data)
    else:
        mat = mat.astype(np.float64)
        mat = _Freeman_Tukey(mat)

    DKM.set_layer_data(adata, layer, mat)


def log_inplace(adata: AnnData, layer: str = DKM.X_LAYER) -> None:
    """Calculate the natural logarithm `log(exp(x)) = x` for a layer of an AnnData object inplace.

    Args:
        adata: an AnnData object.
        layer: the layer to operate on. Defaults to DKM.X_LAYER.
    """

    mat = DKM.select_layer_data(adata, layer, copy=False)
    if issparse(mat):
        if is_integer_arr(mat.data):
            mat = mat.asfptype()
            DKM.set_layer_data(adata, layer, mat)
        _log_inplace(mat.data)
    else:
        mat = mat.astype(np.float64)
        _log_inplace(mat)
        DKM.set_layer_data(adata, layer, mat)


def log1p_inplace(adata: AnnData, layer: str = DKM.X_LAYER) -> None:
    """Calculate log1p (log(1+x)) for a layer of an AnnData object inplace.

    Args:
        adata: an AnnData object.
        layer: the layer to operate on. Defaults to DKM.X_LAYER.
    """

    mat = DKM.select_layer_data(adata, layer, copy=False)
    if issparse(mat):
        if is_integer_arr(mat.data):
            mat = mat.asfptype()
            DKM.set_layer_data(adata, layer, mat)
        _log1p_inplace(mat.data)
    else:
        mat = mat.astype(np.float64)
        _log1p_inplace(mat)
        DKM.set_layer_data(adata, layer, mat)


def log2_inplace(adata: AnnData, layer: str = DKM.X_LAYER) -> None:
    """Calculate Base-2 logarithm of `x` for a layer of an AnnData object inplace.

    Args:
        adata: an AnnData object.
        layer: the layer to operate on. Defaults to DKM.X_LAYER.
    """

    mat = DKM.select_layer_data(adata, layer, copy=False)
    if issparse(mat):
        if is_integer_arr(mat.data):
            mat = mat.asfptype()
            DKM.set_layer_data(adata, layer, mat)
        _log2_inplace(mat.data)
    else:
        mat = mat.astype(np.float64)
        _log2_inplace(mat)
        DKM.set_layer_data(adata, layer, mat)


def is_log1p_transformed_adata(adata: anndata.AnnData) -> bool:
    """check if adata data is log transformed by checking a small subset of adata observations.

    Args:
        adata: an AnnData object

    Returns:
        A flag shows whether the adata object is log transformed.
    """

    chosen_gene_indices = np.random.choice(adata.n_vars, 10)
    _has_log1p_transformed = not np.allclose(
        np.array(adata.X[:, chosen_gene_indices].sum(1)),
        np.array(adata.layers["spliced"][:, chosen_gene_indices].sum(1)),
        atol=1e-4,
    )
    return _has_log1p_transformed


def Freeman_Tukey_adata_layer(adata: AnnData, layer: str = DKM.X_LAYER, copy: bool = False) -> AnnData:
    """Calculate Freeman_Tukey of adata's specific layer.

    Args:
        adata: an AnnData object.
        layer: the layer to operate on. Defaults to DKM.X_LAYER.
        copy: whether operate on the original object or on a copied one and return it. Defaults to False.

    Returns:
        The updated AnnData object.
    """

    _adata = adata
    if copy:
        _adata = copy_adata(adata)
    Freeman_Tukey_inplace(_adata, layer=layer)
    return _adata


def log_adata_layer(adata: AnnData, layer: str = DKM.X_LAYER, copy: bool = False) -> AnnData:
    """Calculate log of adata's specific layer.

    Args:
        adata: an AnnData object.
        layer: the layer to operate on. Defaults to DKM.X_LAYER.
        copy: whether operate on the original object or on a copied one and return it. Defaults to False.

    Returns:
        The updated AnnData object.
    """

    _adata = adata
    if copy:
        _adata = copy_adata(adata)
    log_inplace(_adata, layer=layer)
    return _adata


def log1p_adata_layer(adata: AnnData, layer: str = DKM.X_LAYER, copy: bool = False) -> AnnData:
    """Calculate log1p of adata's specific layer.

    Args:
        adata: an AnnData object.
        layer: the layer to operate on. Defaults to DKM.X_LAYER.
        copy: whether operate on the original object or on a copied one and return it. Defaults to False.

    Returns:
        The updated AnnData object.
    """

    _adata = adata
    if copy:
        _adata = copy_adata(adata)
    log1p_inplace(_adata, layer=layer)
    return _adata


def log2_adata_layer(adata: AnnData, layer: str = DKM.X_LAYER, copy: bool = False) -> AnnData:
    """Calculate log2 of adata's specific layer.

    Args:
        adata: an AnnData object.
        layer: the layer to operate on. Defaults to DKM.X_LAYER.
        copy: whether operate on the original object or on a copied one and return it. Defaults to False.

    Returns:
        The updated AnnData object.
    """

    _adata = adata
    if copy:
        _adata = copy_adata(adata)
    log2_inplace(_adata, layer=layer)
    return _adata


def Freeman_Tukey(adata: AnnData, layers: list = [DKM.X_LAYER], copy: bool = False) -> AnnData:
    """Perform Freeman_Tukey transform on selected adata layers

    Args:
        adata: an AnnData object.
        layers: the layers to operate on. Defaults to [DKM.X_LAYER].
        copy: whether operate on the original object or on a copied one and return it. Defaults to False.

    Returns:
        The updated AnnData object.
    """

    _adata = adata
    if copy:
        _adata = copy_adata(adata)

    main_debug("[Freeman_Tukey] transform applied to layers: %s" % (str(layers)))
    for layer in layers:
        Freeman_Tukey_adata_layer(_adata, layer=layer)

        if layer == DKM.X_LAYER:
            main_info_insert_adata_uns("pp.X_norm_method")
            adata.uns["pp"]["X_norm_method"] = Freeman_Tukey.__name__
        else:
            main_info_insert_adata_uns("pp.layers_norm_method")
            adata.uns["pp"]["layers_norm_method"] = Freeman_Tukey.__name__

    return _adata


def log(adata: AnnData, layers: list = [DKM.X_LAYER], copy: bool = False) -> AnnData:
    """Perform log transform on selected adata layers

    Args:
        adata: an AnnData object.
        layers: the layers to operate on. Defaults to [DKM.X_LAYER].
        copy: whether operate on the original object or on a copied one and return it. Defaults to False.

    Returns:
        The updated AnnData object.
    """

    _adata = adata
    if copy:
        _adata = copy_adata(adata)

    main_debug("[log] transform applied to layers: %s" % (str(layers)))
    for layer in layers:
        log_adata_layer(_adata, layer=layer)

        if layer == DKM.X_LAYER:
            main_info_insert_adata_uns("pp.X_norm_method")
            adata.uns["pp"]["X_norm_method"] = log.__name__
        else:
            main_info_insert_adata_uns("pp.layers_norm_method")
            adata.uns["pp"]["layers_norm_method"] = log.__name__

    return _adata


[docs]def log1p(adata: AnnData, layers: list = [DKM.X_LAYER], copy: bool = False) -> AnnData: """Perform log1p transform on selected adata layers Args: adata: an AnnData object. layers: the layers to operate on. Defaults to [DKM.X_LAYER]. copy: whether operate on the original object or on a copied one and return it. Defaults to False. Returns: The updated AnnData object. """ _adata = adata if copy: _adata = copy_adata(adata) main_debug("[log1p] transform applied to layers: %s" % (str(layers))) for layer in layers: log1p_adata_layer(_adata, layer=layer) if layer == DKM.X_LAYER: main_info_insert_adata_uns("pp.X_norm_method") adata.uns["pp"]["X_norm_method"] = log1p.__name__ else: main_info_insert_adata_uns("pp.layers_norm_method") adata.uns["pp"]["layers_norm_method"] = log1p.__name__ return _adata
def log2(adata: AnnData, layers: list = [DKM.X_LAYER], copy: bool = False) -> AnnData: """Perform log2 transform on selected adata layers Args: adata: an AnnData object. layers: the layers to operate on. Defaults to [DKM.X_LAYER]. copy: whether operate on the original object or on a copied one and return it. Defaults to False. Returns: The updated AnnData object. """ _adata = adata if copy: _adata = copy_adata(adata) main_debug("[log2] transform applied to layers: %s" % (str(layers))) for layer in layers: log2_adata_layer(_adata, layer=layer) if layer == DKM.X_LAYER: main_info_insert_adata_uns("pp.X_norm_method") adata.uns["pp"]["X_norm_method"] = log2.__name__ else: main_info_insert_adata_uns("pp.layers_norm_method") adata.uns["pp"]["layers_norm_method"] = log2.__name__ return _adata def vstExprs( adata: anndata.AnnData, expr_matrix: Union[np.ndarray, None] = None, round_vals: bool = True, ) -> np.ndarray: """Variance stabilization transformation of the gene expression. This function is partly based on Monocle R package (https://github.com/cole-trapnell-lab/monocle3). Args: adata: an AnnData object. expr_matrix: an matrix of values to transform. Must be normalized (e.g. by size factors) already. Defaults to None. round_vals: whether to round expression values to the nearest integer before applying the transformation. Defaults to True. Returns: A numpy array of the gene expression after VST. """ fitInfo = adata.uns["dispFitInfo"] coefs = fitInfo["coefs"] if expr_matrix is None: ncounts = adata.X if round_vals: if issparse(ncounts): ncounts.data = np.round(ncounts.data, 0) else: ncounts = ncounts.round().astype("int") else: ncounts = expr_matrix def vst(q): # c( "asymptDisp", "extraPois" ) return np.log( (1 + coefs[1] + 2 * coefs[0] * q + 2 * np.sqrt(coefs[0] * q * (1 + coefs[1] + coefs[0] * q))) / (4 * coefs[0]) ) / np.log(2) res = vst(ncounts.toarray()) if issparse(ncounts) else vst(ncounts) return res