import re
from functools import partial
from warnings import warn
from pathlib import Path
from types import MappingProxyType
from typing import Callable, Type, TypeVar, Union, Literal
from typing import Collection, Sequence, Mapping
import h5py
import numpy as np
import pandas as pd
from scipy import sparse
from .._core.sparse_dataset import SparseDataset
from .._core.file_backing import AnnDataFileManager
from .._core.anndata import AnnData
from ..compat import (
_from_fixed_length_strings,
_decode_structured_array,
_clean_uns,
)
from ..experimental import read_dispatched
from .utils import (
H5PY_V3,
report_read_key_on_error,
report_write_key_on_error,
idx_chunks_along_axis,
_read_legacy_raw,
)
from .specs import read_elem, write_elem
from anndata._warnings import OldFormatWarning
T = TypeVar("T")
def write_h5ad(
filepath: Union[Path, str],
adata: AnnData,
*,
as_dense: Sequence[str] = (),
dataset_kwargs: Mapping = MappingProxyType({}),
**kwargs,
) -> None:
from anndata.experimental import write_dispatched
if isinstance(as_dense, str):
as_dense = [as_dense]
if "raw.X" in as_dense:
as_dense = list(as_dense)
as_dense[as_dense.index("raw.X")] = "raw/X"
if any(val not in {"X", "raw/X"} for val in as_dense):
raise NotImplementedError(
"Currently, only `X` and `raw/X` are supported values in `as_dense`"
)
if "raw/X" in as_dense and adata.raw is None:
raise ValueError("Cannot specify writing `raw/X` to dense if it doesn’t exist.")
adata.strings_to_categoricals()
if adata.raw is not None:
adata.strings_to_categoricals(adata.raw.var)
dataset_kwargs = {**dataset_kwargs, **kwargs}
filepath = Path(filepath)
mode = "a" if adata.isbacked else "w"
if adata.isbacked: # close so that we can reopen below
adata.file.close()
with h5py.File(filepath, mode) as f:
# TODO: Use spec writing system for this
# Currently can't use write_dispatched here because this function is also called to do an
# inplace update of a backed object, which would delete "/"
f = f["/"]
f.attrs.setdefault("encoding-type", "anndata")
f.attrs.setdefault("encoding-version", "0.1.0")
if "X" in as_dense and isinstance(adata.X, (sparse.spmatrix, SparseDataset)):
write_sparse_as_dense(f, "X", adata.X, dataset_kwargs=dataset_kwargs)
elif not (adata.isbacked and Path(adata.filename) == Path(filepath)):
# If adata.isbacked, X should already be up to date
write_elem(f, "X", adata.X, dataset_kwargs=dataset_kwargs)
if "raw/X" in as_dense and isinstance(
adata.raw.X, (sparse.spmatrix, SparseDataset)
):
write_sparse_as_dense(
f, "raw/X", adata.raw.X, dataset_kwargs=dataset_kwargs
)
write_elem(f, "raw/var", adata.raw.var, dataset_kwargs=dataset_kwargs)
write_elem(
f, "raw/varm", dict(adata.raw.varm), dataset_kwargs=dataset_kwargs
)
elif adata.raw is not None:
write_elem(f, "raw", adata.raw, dataset_kwargs=dataset_kwargs)
write_elem(f, "obs", adata.obs, dataset_kwargs=dataset_kwargs)
write_elem(f, "var", adata.var, dataset_kwargs=dataset_kwargs)
write_elem(f, "obsm", dict(adata.obsm), dataset_kwargs=dataset_kwargs)
write_elem(f, "varm", dict(adata.varm), dataset_kwargs=dataset_kwargs)
write_elem(f, "obsp", dict(adata.obsp), dataset_kwargs=dataset_kwargs)
write_elem(f, "varp", dict(adata.varp), dataset_kwargs=dataset_kwargs)
write_elem(f, "layers", dict(adata.layers), dataset_kwargs=dataset_kwargs)
write_elem(f, "uns", dict(adata.uns), dataset_kwargs=dataset_kwargs)
@report_write_key_on_error
def write_sparse_as_dense(f, key, value, dataset_kwargs=MappingProxyType({})):
real_key = None # Flag for if temporary key was used
if key in f:
if (
isinstance(value, (h5py.Group, h5py.Dataset, SparseDataset))
and value.file.filename == f.file.filename
): # Write to temporary key before overwriting
real_key = key
# Transform key to temporary, e.g. raw/X -> raw/_X, or X -> _X
key = re.sub(r"(.*)(\w(?!.*/))", r"\1_\2", key.rstrip("/"))
else:
del f[key] # Wipe before write
dset = f.create_dataset(key, shape=value.shape, dtype=value.dtype, **dataset_kwargs)
compressed_axis = int(isinstance(value, sparse.csc_matrix))
for idx in idx_chunks_along_axis(value.shape, compressed_axis, 1000):
dset[idx] = value[idx].toarray()
if real_key is not None:
del f[real_key]
f[real_key] = f[key]
del f[key]
def read_h5ad_backed(filename: Union[str, Path], mode: Literal["r", "r+"]) -> AnnData:
d = dict(filename=filename, filemode=mode)
f = h5py.File(filename, mode)
attributes = ["obsm", "varm", "obsp", "varp", "uns", "layers"]
df_attributes = ["obs", "var"]
if "encoding-type" in f.attrs:
attributes.extend(df_attributes)
else:
for k in df_attributes:
if k in f: # Backwards compat
d[k] = read_dataframe(f[k])
d.update({k: read_elem(f[k]) for k in attributes if k in f})
d["raw"] = _read_raw(f, attrs={"var", "varm"})
adata = AnnData(**d)
# Backwards compat to <0.7
if isinstance(f["obs"], h5py.Dataset):
_clean_uns(adata)
return adata
[docs]def read_h5ad(
filename: Union[str, Path],
backed: Union[Literal["r", "r+"], bool, None] = None,
*,
as_sparse: Sequence[str] = (),
as_sparse_fmt: Type[sparse.spmatrix] = sparse.csr_matrix,
chunk_size: int = 6000, # TODO, probably make this 2d chunks
) -> AnnData:
"""\
Read `.h5ad`-formatted hdf5 file.
Parameters
----------
filename
File name of data file.
backed
If `'r'`, load :class:`~anndata.AnnData` in `backed` mode
instead of fully loading it into memory (`memory` mode).
If you want to modify backed attributes of the AnnData object,
you need to choose `'r+'`.
Currently, `backed` only support updates to `X`. That means any
changes to other slots like `obs` will not be written to disk in
`backed` mode. If you would like save changes made to these slots
of a `backed` :class:`~anndata.AnnData`, write them to a new file
(see :meth:`~anndata.AnnData.write`). For an example, see
[here] (https://anndata-tutorials.readthedocs.io/en/latest/getting-started.html#Partial-reading-of-large-data).
as_sparse
If an array was saved as dense, passing its name here will read it as
a sparse_matrix, by chunk of size `chunk_size`.
as_sparse_fmt
Sparse format class to read elements from `as_sparse` in as.
chunk_size
Used only when loading sparse dataset that is stored as dense.
Loading iterates through chunks of the dataset of this row size
until it reads the whole dataset.
Higher size means higher memory consumption and higher (to a point)
loading speed.
"""
if backed not in {None, False}:
mode = backed
if mode is True:
mode = "r+"
assert mode in {"r", "r+"}
return read_h5ad_backed(filename, mode)
if as_sparse_fmt not in (sparse.csr_matrix, sparse.csc_matrix):
raise NotImplementedError(
"Dense formats can only be read to CSR or CSC matrices at this time."
)
if isinstance(as_sparse, str):
as_sparse = [as_sparse]
else:
as_sparse = list(as_sparse)
for i in range(len(as_sparse)):
if as_sparse[i] in {("raw", "X"), "raw.X"}:
as_sparse[i] = "raw/X"
elif as_sparse[i] not in {"raw/X", "X"}:
raise NotImplementedError(
"Currently only `X` and `raw/X` can be read as sparse."
)
rdasp = partial(
read_dense_as_sparse, sparse_format=as_sparse_fmt, axis_chunk=chunk_size
)
with h5py.File(filename, "r") as f:
def callback(func, elem_name: str, elem, iospec):
if iospec.encoding_type == "anndata" or elem_name.endswith("/"):
return AnnData(
**{
# This is covering up backwards compat in the anndata initializer
# In most cases we should be able to call `func(elen[k])` instead
k: read_dispatched(elem[k], callback)
for k in elem.keys()
if not k.startswith("raw.")
}
)
elif elem_name.startswith("/raw."):
return None
elif elem_name == "/X" and "X" in as_sparse:
return rdasp(elem)
elif elem_name == "/raw":
return _read_raw(f, as_sparse, rdasp)
elif elem_name in {"/obs", "/var"}:
# Backwards compat
return read_dataframe(elem)
return func(elem)
adata = read_dispatched(f, callback=callback)
# Backwards compat (should figure out which version)
if "raw.X" in f:
raw = AnnData(**_read_raw(f, as_sparse, rdasp))
raw.obs_names = adata.obs_names
adata.raw = raw
# Backwards compat to <0.7
if isinstance(f["obs"], h5py.Dataset):
_clean_uns(adata)
return adata
def _read_raw(
f: Union[h5py.File, AnnDataFileManager],
as_sparse: Collection[str] = (),
rdasp: Callable[[h5py.Dataset], sparse.spmatrix] = None,
*,
attrs: Collection[str] = ("X", "var", "varm"),
) -> dict:
if as_sparse:
assert rdasp is not None, "must supply rdasp if as_sparse is supplied"
raw = {}
if "X" in attrs and "raw/X" in f:
read_x = rdasp if "raw/X" in as_sparse else read_elem
raw["X"] = read_x(f["raw/X"])
for v in ("var", "varm"):
if v in attrs and f"raw/{v}" in f:
raw[v] = read_elem(f[f"raw/{v}"])
return _read_legacy_raw(f, raw, read_dataframe, read_elem, attrs=attrs)
@report_read_key_on_error
def read_dataframe_legacy(dataset) -> pd.DataFrame:
"""Read pre-anndata 0.7 dataframes."""
warn(
f"'{dataset.name}' was written with a very old version of AnnData. "
"Consider rewriting it.",
OldFormatWarning,
)
if H5PY_V3:
df = pd.DataFrame(
_decode_structured_array(
_from_fixed_length_strings(dataset[()]), dtype=dataset.dtype
)
)
else:
df = pd.DataFrame(_from_fixed_length_strings(dataset[()]))
df.set_index(df.columns[0], inplace=True)
return df
def read_dataframe(group) -> pd.DataFrame:
"""Backwards compat function"""
if not isinstance(group, h5py.Group):
return read_dataframe_legacy(group)
else:
return read_elem(group)
@report_read_key_on_error
def read_dataset(dataset: h5py.Dataset):
if H5PY_V3:
string_dtype = h5py.check_string_dtype(dataset.dtype)
if (string_dtype is not None) and (string_dtype.encoding == "utf-8"):
dataset = dataset.asstr()
value = dataset[()]
if not hasattr(value, "dtype"):
return value
elif isinstance(value.dtype, str):
pass
elif issubclass(value.dtype.type, np.string_):
value = value.astype(str)
# Backwards compat, old datasets have strings as one element 1d arrays
if len(value) == 1:
return value[0]
elif len(value.dtype.descr) > 1: # Compound dtype
# For backwards compat, now strings are written as variable length
dtype = value.dtype
value = _from_fixed_length_strings(value)
if H5PY_V3:
value = _decode_structured_array(value, dtype=dtype)
if value.shape == ():
value = value[()]
return value
@report_read_key_on_error
def read_dense_as_sparse(
dataset: h5py.Dataset, sparse_format: sparse.spmatrix, axis_chunk: int
):
if sparse_format == sparse.csr_matrix:
return read_dense_as_csr(dataset, axis_chunk)
elif sparse_format == sparse.csc_matrix:
return read_dense_as_csc(dataset, axis_chunk)
else:
raise ValueError(f"Cannot read dense array as type: {sparse_format}")
def read_dense_as_csr(dataset, axis_chunk=6000):
sub_matrices = []
for idx in idx_chunks_along_axis(dataset.shape, 0, axis_chunk):
dense_chunk = dataset[idx]
sub_matrix = sparse.csr_matrix(dense_chunk)
sub_matrices.append(sub_matrix)
return sparse.vstack(sub_matrices, format="csr")
def read_dense_as_csc(dataset, axis_chunk=6000):
sub_matrices = []
for idx in idx_chunks_along_axis(dataset.shape, 1, axis_chunk):
sub_matrix = sparse.csc_matrix(dataset[idx])
sub_matrices.append(sub_matrix)
return sparse.hstack(sub_matrices, format="csc")