Source code for dynamo.preprocessing.external.sctransform

# =================================================================
# Original Code Repository Author: Lause, Berens & Kobak
# Adapted to Dynamo by: dynamo authors
# Created Date: 12/16/2021
# Description: pearson residuals based method for preprocessing single cell expression data
# Original Code Repository: https://github.com/atarashansky/SCTransformPy
# Sctransform Paper: Hafemeister, C., Satija, R. Normalization and variance stabilization of single-cell RNA-seq data using regularized negative binomial regression.
# =================================================================

import os
from typing import Dict, Optional, Union

import numpy as np
import pandas as pd
import scipy
import scipy as sp
import statsmodels.discrete.discrete_model
import statsmodels.nonparametric.kernel_regression
from anndata import AnnData
from scipy import stats

from ...configuration import DKM
from ...dynamo_logger import main_info, main_info_insert_adata_layer, main_warning
from ..utils import get_gene_selection_filter

_EPS = np.finfo(float).eps


def robust_scale_binned(
    y: np.ndarray,
    x: np.ndarray,
    breaks: np.ndarray,
) -> np.ndarray:
    """Scale the values of y based on their medians and absolute deviations in each bin defined by the breaks of x.

    The scaling factor for each bin is determined by the median absolute deviation (MAD) of the residuals, which are
    defined as the differences between the y values and their median in each bin, divided by a constant factor of
    1.4826. This scaling is robust to the presence of outliers in each bin.

    Args:
        y: the array of values to be scaled. Must have the same length as x.
        x: the array of values to define the bins for scaling. Must have the same length as y.
        breaks: the sequence of break points for binning the x values. Must be sorted in increasing order.

    Returns:
        An array of the same shape as y, with the scaled values.
    """
    bins = np.digitize(x, breaks)
    binsu = np.unique(bins)
    res = np.zeros(bins.size)
    for i in range(binsu.size):
        yb = y[bins == binsu[i]]
        res[bins == binsu[i]] = (yb - np.median(yb)) / (1.4826 * np.median(np.abs(yb - np.median(yb))) + _EPS)

    return res


def is_outlier(
    y: np.ndarray,
    x: np.ndarray,
    th: Union[int, float] = 10,
) -> np.ndarray:
    """Identify outliers in a dataset using robust scaling and a FFTKDE density estimate.

    Args:
       y: the array of values to test for outliers.
       x: an array of values corresponding to the locations of `y`.
       th: threshold value for outlier detection.

    Returns:
       Boolean array indicating whether each value in `y` is an outlier (`True`) or not (`False`).
    """
    try:
        from KDEpy import FFTKDE
    except ImportError:
        raise ImportError("Please install KDEpy for sctransform.")
    z = FFTKDE(kernel="gaussian", bw="ISJ").fit(x)
    z.evaluate()
    bin_width = (max(x) - min(x)) * z.bw / 2
    eps = _EPS * 10

    breaks1 = np.arange(min(x), max(x) + bin_width, bin_width)
    breaks2 = np.arange(min(x) - eps - bin_width / 2, max(x) + bin_width, bin_width)
    score1 = robust_scale_binned(y, x, breaks1)
    score2 = robust_scale_binned(y, x, breaks2)
    return np.abs(np.vstack((score1, score2))).min(0) > th


def _parallel_init(
    igenes_bin_regress: np.ndarray,
    iumi_bin: np.ndarray,
    ign: np.ndarray,
    imm: np.ndarray,
    ips: Dict,
) -> None:
    """Initialize global variables used in parallel processing."""
    global genes_bin_regress
    global umi_bin
    global gn
    global mm
    global ps
    genes_bin_regress = igenes_bin_regress
    umi_bin = iumi_bin
    gn = ign
    mm = imm
    ps = ips


def _parallel_wrapper(j: int) -> None:
    """Helper function to fit Poisson regression to UMI counts."""
    name = gn[genes_bin_regress[j]]
    y = umi_bin[:, j].A.flatten()
    pr = statsmodels.discrete.discrete_model.Poisson(y, mm)
    res = pr.fit(disp=False)
    mu = res.predict()
    theta = theta_ml(y, mu)
    ps[name] = np.append(res.params, theta)


def gmean(
    X: scipy.sparse.spmatrix,
    axis: int = 0,
    eps: Union[int, float] = 1,
) -> np.ndarray:
    """Compute the geometric mean of the non-zero elements in each column (or row) of a sparse matrix.

    Args:
        X: the sparse matrix of shape (n_samples, n_features) whose columns (or rows) are to be averaged.
        axis: the axis along which to average the columns (or rows). By default, the function averages over columns
            (axis=0).
        eps: A small positive number added to the elements of the sparse matrix before taking the logarithm. This is
            necessary to avoid taking the logarithm of zero.

    Returns:
        An array of shape (n_features,) containing the geometric means of the non-zero elements in each column (or row)
        of the input sparse matrix X.
    """
    X = X.copy()
    X = X.asfptype()
    assert np.all(X.sum(0) > 0)
    assert np.all(X.data > 0)
    X.data[:] = np.log(X.data + eps)
    res = np.exp(X.mean(axis).A.flatten()) - eps

    assert np.all(res > 0)
    return res


def theta_ml(y: np.ndarray, mu: np.ndarray) -> float:
    """Compute the maximum likelihood estimator of theta parameter.

    This function uses an iterative algorithm to optimize the log-likelihood of the Poisson distribution with respect to
    the theta parameter. The algorithm stops when the change in theta estimate is smaller than a threshold value.

    Args:
        y: the observed count data.
        mu: the mean of the Poisson distribution.

    Returns:
        The maximum likelihood estimate of the theta parameter.
    """
    n = y.size
    weights = np.ones(n)
    limit = 10
    eps = (_EPS) ** 0.25

    from scipy.special import polygamma, psi

    def score(n, th, mu, y, w):
        return sum(w * (psi(th + y) - psi(th) + np.log(th) + 1 - np.log(th + mu) - (y + th) / (mu + th)))

    def info(n, th, mu, y, w):
        return sum(w * (-polygamma(1, th + y) + polygamma(1, th) - 1 / th + 2 / (mu + th) - (y + th) / (mu + th) ** 2))

    t0 = n / sum(weights * (y / mu - 1) ** 2)
    it = 0
    de = 1

    while it + 1 < limit and abs(de) > eps:
        it += 1
        t0 = abs(t0)
        i = info(n, t0, mu, y, weights)
        de = score(n, t0, mu, y, weights) / i
        t0 += de
    t0 = max(t0, 0)

    return t0


def sctransform_core(
    adata: AnnData,
    layer: Optional[str] = DKM.X_LAYER,
    min_cells: int = 5,
    gmean_eps: Union[int, float] = 1,
    n_genes: int = 2000,
    n_cells: Optional[int] = None,
    bin_size: int = 500,
    bw_adjust: int = 3,
    inplace: bool = True,
) -> Optional[AnnData]:
    """A re-implementation of SCTransform from the Satija lab.

    Args:
        adata: an Annotated data matrix.
        layer: the name of the layer to perform sctransform
        min_cells: minimum number of cells expressing a gene to be included in sctransform.
        gmean_eps: epsilon value to add to the geometric mean to avoid log(0) when calculating the log of geometric
            mean.
        n_genes: number of genes to be used for sctransform.
        n_cells: number of cells to be used for sctransform. If None, use all cells.
        bin_size: size of bins in which to group genes during sctransform.
        bw_adjust: bandwidth adjustment factor for kernel density estimation.
        inplace: whether to perform the sctransform in-place or return a copy of the original data matrix.

    Returns:
        If inplace=True, adata is updated with results in layer `layer`.
        If inplace=False, a new AnnData object with results will be returned.
    """
    import multiprocessing
    import sys
    try:
        from KDEpy import FFTKDE
    except ImportError:
        raise ImportError("Please install KDEpy for sctransform.")

    main_info("sctransform adata on layer: %s" % (layer))
    X = DKM.select_layer_data(adata, layer).copy()
    X = sp.sparse.csr_matrix(X)
    X.eliminate_zeros()
    gene_names = np.array(list(adata.var_names))
    cell_names = np.array(list(adata.obs_names))
    genes_cell_count = X.sum(0).A.flatten()
    genes = np.where(genes_cell_count >= min_cells)[0]
    genes_ix = genes.copy()

    X = X[:, genes]
    gene_names = gene_names[genes]
    genes = np.arange(X.shape[1])

    genes_log_gmean = np.log10(gmean(X, axis=0, eps=gmean_eps))

    # sample by n_cells, or use all cells
    if n_cells is not None and n_cells < X.shape[0]:
        cells_step1 = np.sort(np.random.choice(X.shape[0], replace=False, size=n_cells))
        genes_cell_count_step1 = X[cells_step1].sum(0).A.flatten()
        genes_step1 = np.where(genes_cell_count_step1 >= min_cells)[0]
        genes_log_gmean_step1 = np.log10(gmean(X[cells_step1][:, genes_step1], axis=0, eps=gmean_eps))
    else:
        cells_step1 = np.arange(X.shape[0])
        genes_step1 = genes
        genes_log_gmean_step1 = genes_log_gmean

    umi = X.sum(1).A.flatten()
    log_umi = np.log10(umi)
    X2 = X.copy()
    X2.data[:] = 1
    gene = X2.sum(1).A.flatten()
    log_gene = np.log10(gene)
    umi_per_gene = umi / gene
    log_umi_per_gene = np.log10(umi_per_gene)

    cell_attrs = pd.DataFrame(
        index=cell_names,
        data=np.vstack((umi, log_umi, gene, log_gene, umi_per_gene, log_umi_per_gene)).T,
        columns=["umi", "log_umi", "gene", "log_gene", "umi_per_gene", "log_umi_per_gene"],
    )

    data_step1 = cell_attrs.iloc[cells_step1]

    if n_genes is not None and n_genes < len(genes_step1):
        log_gmean_dens = stats.gaussian_kde(genes_log_gmean_step1, bw_method="scott")
        xlo = np.linspace(genes_log_gmean_step1.min(), genes_log_gmean_step1.max(), 512)
        ylo = log_gmean_dens.evaluate(xlo)
        xolo = genes_log_gmean_step1
        sampling_prob = 1 / (np.interp(xolo, xlo, ylo) + _EPS)
        genes_step1 = np.sort(
            np.random.choice(genes_step1, size=n_genes, p=sampling_prob / sampling_prob.sum(), replace=False)
        )
        genes_log_gmean_step1 = np.log10(gmean(X[cells_step1, :][:, genes_step1], eps=gmean_eps))

    bin_ind = np.ceil(np.arange(1, genes_step1.size + 1) / bin_size)
    max_bin = max(bin_ind)

    ps = multiprocessing.Manager().dict()

    # create a process context of fork that copy a Python process from an existing process.
    if sys.platform != "win32":
        ctx = multiprocessing.get_context("fork")  # this fixes loop on MacOS
    else:
        ctx = multiprocessing.get_context("spawn")

    for i in range(1, int(max_bin) + 1):
        genes_bin_regress = genes_step1[bin_ind == i]
        umi_bin = X[cells_step1, :][:, genes_bin_regress]

        mm = np.vstack((np.ones(data_step1.shape[0]), data_step1["log_umi"].values.flatten())).T

        pc_chunksize = umi_bin.shape[1] // os.cpu_count() + 1

        pool = ctx.Pool(os.cpu_count(), _parallel_init, [genes_bin_regress, umi_bin, gene_names, mm, ps])

        try:
            pool.map(_parallel_wrapper, range(umi_bin.shape[1]), chunksize=pc_chunksize)
        finally:
            pool.close()
            pool.join()

    ps = ps._getvalue()

    model_pars = pd.DataFrame(
        data=np.vstack([ps[x] for x in gene_names[genes_step1]]),
        columns=["Intercept", "log_umi", "theta"],
        index=gene_names[genes_step1],
    )

    min_theta = 1e-7
    x = model_pars["theta"].values.copy()
    x[x < min_theta] = min_theta
    model_pars["theta"] = x
    dispersion_par = np.log10(1 + 10**genes_log_gmean_step1 / model_pars["theta"].values.flatten())

    model_pars_theta = model_pars["theta"]
    model_pars = model_pars.iloc[:, model_pars.columns != "theta"].copy()
    model_pars["dispersion"] = dispersion_par

    outliers = (
        np.vstack(
            ([is_outlier(model_pars.values[:, i], genes_log_gmean_step1) for i in range(model_pars.shape[1])])
        ).sum(0)
        > 0
    )

    filt = np.invert(outliers)
    model_pars = model_pars[filt]
    genes_step1 = genes_step1[filt]
    genes_log_gmean_step1 = genes_log_gmean_step1[filt]

    z = FFTKDE(kernel="gaussian", bw="ISJ").fit(genes_log_gmean_step1)
    z.evaluate()
    bw = z.bw * bw_adjust

    x_points = np.vstack((genes_log_gmean, np.array([min(genes_log_gmean_step1)] * genes_log_gmean.size))).max(0)
    x_points = np.vstack((x_points, np.array([max(genes_log_gmean_step1)] * genes_log_gmean.size))).min(0)

    full_model_pars = pd.DataFrame(
        data=np.zeros((x_points.size, model_pars.shape[1])), index=gene_names, columns=model_pars.columns
    )
    for i in model_pars.columns:
        kr = statsmodels.nonparametric.kernel_regression.KernelReg(
            model_pars[i].values, genes_log_gmean_step1[:, None], ["c"], reg_type="ll", bw=[bw]
        )
        full_model_pars[i] = kr.fit(data_predict=x_points)[0]

    theta = 10**genes_log_gmean / (10 ** full_model_pars["dispersion"].values - 1)
    full_model_pars["theta"] = theta
    del full_model_pars["dispersion"]

    d = X.data
    x, y = X.nonzero()
    mud = np.exp(full_model_pars.values[:, 0][y] + full_model_pars.values[:, 1][y] * cell_attrs["log_umi"].values[x])
    vard = mud + mud**2 / full_model_pars["theta"].values.flatten()[y]

    X.data[:] = (d - mud) / vard**0.5
    X.data[X.data < 0] = 0
    X.eliminate_zeros()

    clip = np.sqrt(X.shape[0] / 30)
    X.data[X.data > clip] = clip

    if inplace:
        adata.raw = adata.copy()

        d = dict(zip(np.arange(X.shape[1]), genes_ix))
        x, y = X.nonzero()
        y = np.array([d[i] for i in y])
        data = X.data
        Xnew = sp.sparse.coo_matrix((data, (x, y)), shape=adata.shape).tocsr()
        if layer == DKM.X_LAYER:
            main_info("set sctransform results to adata.X", indent_level=2)
            DKM.set_layer_data(adata, layer, Xnew)  # TODO: add log1p of corrected umi counts to layers
        else:
            new_X_layer = DKM.gen_layer_X_key(layer)
            main_info_insert_adata_layer(new_X_layer, indent_level=2)
            DKM.set_layer_data(adata, new_X_layer, Xnew)  # TODO: add log1p of corrected umi counts to layers

        # TODO: reformat the following output according to adata key standards in dyn.
        for c in full_model_pars.columns:
            adata.var[c + "_sct"] = full_model_pars[c]

        for c in cell_attrs.columns:
            adata.obs[c + "_sct"] = cell_attrs[c]

        for c in model_pars.columns:
            adata.var[c + "_step1_sct"] = model_pars[c]
        adata.var["model_pars_theta_step1"] = model_pars_theta

        z = pd.Series(index=gene_names, data=np.zeros(gene_names.size, dtype="int"))
        z[gene_names[genes_step1]] = 1

        w = pd.Series(index=gene_names, data=np.zeros(gene_names.size, dtype="int"))
        w[gene_names] = genes_log_gmean
        adata.var["genes_step1_sct"] = z
        adata.var["log10_gmean_sct"] = w

    else:
        adata_new = AnnData(X=X)
        adata_new.var_names = pd.Index(gene_names)
        adata_new.obs_names = adata.obs_names
        adata_new.raw = adata.copy()

        for c in full_model_pars.columns:
            adata_new.var[c + "_sct"] = full_model_pars[c]

        for c in cell_attrs.columns:
            adata_new.obs[c + "_sct"] = cell_attrs[c]

        for c in model_pars.columns:
            adata_new.var[c + "_step1_sct"] = model_pars[c]

        z = pd.Series(index=gene_names, data=np.zeros(gene_names.size, dtype="int"))
        z[gene_names[genes_step1]] = 1
        adata_new.var["genes_step1_sct"] = z
        adata_new.var["log10_gmean_sct"] = genes_log_gmean
        return adata_new


[docs]def sctransform( adata: AnnData, layers: str = [DKM.X_LAYER], output_layer: str = None, n_top_genes=2000, **kwargs ) -> Optional[AnnData]: """A wrapper calls sctransform_core and set dynamo style keys in adata""" for layer in layers: if layer != DKM.X_LAYER: main_warning( f"Sctransform is only recommended for X layer while you are applying sctransform on layer: {layer}, " f"This will overwrite existing sctransform params and create negative values in layers, " f"which will cause error in the velocities calculation. Please run the sctransform recipe with default " f"if you plan to perform downstream analysis." ) sctransform_core(adata, layer=layer, n_genes=n_top_genes, **kwargs) if layer == DKM.X_LAYER: if adata.X.shape[1] > n_top_genes: X_squared = adata.X.copy() X_squared.data **= 2 variance = X_squared.mean(0) - np.square(adata.X.mean(0)) adata.var["sct_score"] = variance.A1 adata.var["use_for_pca"] = get_gene_selection_filter(adata.var["sct_score"], n_top_genes=n_top_genes)