"""Doublet detection in single-cell RNA-seq data."""
import collections
import io
import warnings
from contextlib import redirect_stdout
import anndata
import numpy as np
import phenograph
import scanpy as sc
import scipy.sparse as sp_sparse
from scipy.sparse import csr_matrix
from scipy.stats import hypergeom
from sklearn.utils import check_array
from sklearn.utils.sparsefuncs_fast import inplace_csr_row_normalize_l1
from tqdm.auto import tqdm
[docs]class BoostClassifier:
"""Classifier for doublets in single-cell RNA-seq data.
Parameters:
boost_rate (float, optional): Proportion of cell population size to
produce as synthetic doublets.
n_components (int, optional): Number of principal components used for
clustering.
n_top_var_genes (int, optional): Number of highest variance genes to
use; other genes discarded. Will use all genes when zero.
replace (bool, optional): If False, a cell will be selected as a
synthetic doublet's parent no more than once.
self.clustering_algorithm (str, optional): One of `["louvain", "leiden",
"phenograph"]`. `"louvain"` and `leiden` refer to the scanpy implementations.
clustering_kwargs (dict, optional): Keyword args to pass directly
to clusering algorithm. Note that we change the PhenoGraph 'prune' default to
True. We also set `directed=False` and `resolution=4` for Louvain
and Leiden clustering. You must specifically include these params here
to change them. `random_state` and `key_added` should not be overriden
when clustering algorithm is Louvain or Leiden.
n_iters (int, optional): Number of fit operations from which to collect
p-values. Defualt value is 25.
normalizer ((sp_sparse) -> ndarray): Method to normalize raw_counts.
Defaults to normalize_counts, included in this package. Note: To use
normalize_counts with its pseudocount parameter changed from the
default pseudocount value to some positive float `new_var`, use:
normalizer=lambda counts: doubletdetection.normalize_counts(counts,
pseudocount=new_var)
pseudocount (int, optional): Pseudocount used in normalize_counts.
If `1` is used, and `standard_scaling=False`, the classifier is
much more memory efficient; however, this may result in fewer doublets
detected.
random_state (int, optional): If provided, passed to PCA and used to
seedrandom seed numpy's RNG. NOTE: PhenoGraph does not currently
admit a random seed, and so this will not guarantee identical
results across runs.
verbose (bool, optional): Set to False to silence all normal operation
informational messages. Defaults to True.
standard_scaling (bool, optional): Set to True to enable standard scaling
of normalized count matrix prior to PCA. Recommended when not using
Phenograph. Defaults to False.
n_jobs (int, optional): Number of jobs to use. Speeds up neighbor computation.
Attributes:
all_log_p_values_ (ndarray): Hypergeometric test natural log p-value per
cell for cluster enrichment of synthetic doublets. Use for tresholding.
Shape (n_iters, num_cells).
all_scores_ (ndarray): The fraction of a cell's cluster that is
synthetic doublets. Shape (n_iters, num_cells).
communities_ (ndarray): Cluster ID for corresponding cell. Shape
(n_iters, num_cells).
labels_ (ndarray, ndims=1): 0 for singlet, 1 for detected doublet.
parents_ (list of sequences of int): Parent cells' indexes for each
synthetic doublet. A list wrapping the results from each run.
suggested_score_cutoff_ (float): Cutoff used to classify cells when
n_iters == 1 (scores >= cutoff). Not produced when n_iters > 1.
synth_communities_ (sequence of ints): Cluster ID for corresponding
synthetic doublet. Shape (n_iters, num_cells * boost_rate).
top_var_genes_ (ndarray): Indices of the n_top_var_genes used. Not
generated if n_top_var_genes <= 0.
voting_average_ (ndarray): Fraction of iterations each cell is called a
doublet.
"""
def __init__(
self,
boost_rate=0.25,
n_components=30,
n_top_var_genes=10000,
replace=False,
clustering_algorithm="phenograph",
clustering_kwargs=None,
n_iters=10,
normalizer=None,
pseudocount=0.1,
random_state=0,
verbose=False,
standard_scaling=False,
n_jobs=1,
):
self.boost_rate = boost_rate
self.replace = replace
self.clustering_algorithm = clustering_algorithm
self.n_iters = n_iters
self.normalizer = normalizer
self.random_state = random_state
self.verbose = verbose
self.standard_scaling = standard_scaling
self.n_jobs = n_jobs
self.pseudocount = pseudocount
if self.clustering_algorithm not in ["louvain", "phenograph", "leiden"]:
raise ValueError(
"Clustering algorithm needs to be one of ['louvain', 'phenograph', 'leiden']"
)
if self.clustering_algorithm == "leiden":
warnings.warn("Leiden clustering is experimental and results have not been validated.")
if self.random_state:
np.random.seed(self.random_state)
if n_components == 30 and n_top_var_genes > 0:
# If user did not change n_components, silently cap it by n_top_var_genes if needed
self.n_components = min(n_components, n_top_var_genes)
else:
self.n_components = n_components
# Floor negative n_top_var_genes by 0
self.n_top_var_genes = max(0, n_top_var_genes)
self.clustering_kwargs = (
{} if not isinstance(clustering_kwargs, dict) else clustering_kwargs
)
self._set_clustering_kwargs()
if not self.replace and self.boost_rate > 0.5:
warn_msg = (
"boost_rate is trimmed to 0.5 when replace=False."
+ " Set replace=True to use greater boost rates."
)
warnings.warn(warn_msg)
self.boost_rate = 0.5
assert (self.n_top_var_genes == 0) or (
self.n_components <= self.n_top_var_genes
), "n_components={0} cannot be larger than n_top_var_genes={1}".format(
n_components, n_top_var_genes
)
[docs] def fit(self, raw_counts):
"""Fits the classifier on raw_counts.
Args:
raw_counts (array-like): Count matrix, oriented cells by genes.
Sets:
all_scores_, all_log_p_values_, communities_,
top_var_genes, parents, synth_communities
Returns:
The fitted classifier.
"""
raw_counts = check_array(
raw_counts,
accept_sparse="csr",
force_all_finite=True,
ensure_2d=True,
dtype="float32",
)
if sp_sparse.issparse(raw_counts) is not True:
if self.verbose:
print("Sparsifying matrix.")
raw_counts = csr_matrix(raw_counts)
old_n_jobs = sc.settings.n_jobs
sc.settings.n_jobs = self.n_jobs
if self.n_top_var_genes > 0:
if self.n_top_var_genes < raw_counts.shape[1]:
gene_variances = (
np.array(raw_counts.power(2).mean(axis=0))
- (np.array(raw_counts.mean(axis=0))) ** 2
)[0]
top_var_indexes = np.argsort(gene_variances)
self.top_var_genes_ = top_var_indexes[-self.n_top_var_genes :]
# csc if faster for column indexing
raw_counts = raw_counts.tocsc()
raw_counts = raw_counts[:, self.top_var_genes_]
raw_counts = raw_counts.tocsr()
self._raw_counts = raw_counts
(self._num_cells, self._num_genes) = self._raw_counts.shape
if self.normalizer is None:
# Memoize these; default normalizer treats these invariant for all synths
self._lib_size = np.sum(raw_counts, axis=1).A1
self._normed_raw_counts = self._raw_counts.copy()
inplace_csr_row_normalize_l1(self._normed_raw_counts)
self.all_scores_ = np.zeros((self.n_iters, self._num_cells))
self.all_log_p_values_ = np.zeros((self.n_iters, self._num_cells))
all_communities = np.zeros((self.n_iters, self._num_cells))
all_parents = []
all_synth_communities = np.zeros((self.n_iters, int(self.boost_rate * self._num_cells)))
for i in tqdm(range(self.n_iters)):
if self.verbose:
print("Iteration {:3}/{}".format(i + 1, self.n_iters))
self.all_scores_[i], self.all_log_p_values_[i] = self._one_fit()
all_communities[i] = self.communities_
all_parents.append(self.parents_)
all_synth_communities[i] = self.synth_communities_
# Release unneeded large data vars
del self._raw_counts
del self._raw_synthetics
if self.normalizer is None:
del self._normed_raw_counts
del self._lib_size
# reset scanpy n_jobs
sc.settings.n_jobs = old_n_jobs
self.communities_ = all_communities
self.parents_ = all_parents
self.synth_communities_ = all_synth_communities
return self
[docs] def predict(self, p_thresh=1e-7, voter_thresh=0.9):
"""Produce doublet calls from fitted classifier
Args:
p_thresh (float, optional): hypergeometric test p-value threshold
that determines per iteration doublet calls
voter_thresh (float, optional): fraction of iterations a cell must
be called a doublet
Sets:
labels_ and voting_average_ if n_iters > 1.
labels_ and suggested_score_cutoff_ if n_iters == 1.
Returns:
labels_ (ndarray, ndims=1): 0 for singlet, 1 for detected doublet
"""
log_p_thresh = np.log(p_thresh)
if self.n_iters > 1:
with np.errstate(invalid="ignore"): # Silence numpy warning about NaN comparison
self.voting_average_ = np.mean(
np.ma.masked_invalid(self.all_log_p_values_) <= log_p_thresh, axis=0
)
self.labels_ = np.ma.filled(
(self.voting_average_ >= voter_thresh).astype(float), np.nan
)
self.voting_average_ = np.ma.filled(self.voting_average_, np.nan)
else:
# Find a cutoff score
potential_cutoffs = np.unique(self.all_scores_[~np.isnan(self.all_scores_)])
if len(potential_cutoffs) > 1:
max_dropoff = np.argmax(potential_cutoffs[1:] - potential_cutoffs[:-1]) + 1
else: # Most likely pathological dataset, only one (or no) clusters
max_dropoff = 0
self.suggested_score_cutoff_ = potential_cutoffs[max_dropoff]
with np.errstate(invalid="ignore"): # Silence numpy warning about NaN comparison
self.labels_ = self.all_scores_[0, :] >= self.suggested_score_cutoff_
self.labels_[np.isnan(self.all_scores_)[0, :]] = np.nan
return self.labels_
[docs] def doublet_score(self):
"""Produce doublet scores
The doublet score is the average negative log p-value of doublet enrichment
averaged over the iterations. Higher means more likely to be doublet.
Returns:
scores (ndarray, ndims=1): Average negative log p-value over iterations
"""
if self.n_iters > 1:
with np.errstate(invalid="ignore"): # Silence numpy warning about NaN comparison
avg_log_p = np.mean(np.ma.masked_invalid(self.all_log_p_values_), axis=0)
else:
avg_log_p = self.all_log_p_values_[0]
return -avg_log_p
def _one_fit(self):
if self.verbose:
print("\nCreating synthetic doublets...")
self._createDoublets()
# Normalize combined augmented set
if self.verbose:
print("Normalizing...")
if self.normalizer is not None:
aug_counts = self.normalizer(
sp_sparse.vstack((self._raw_counts, self._raw_synthetics))
)
else:
# Follows doubletdetection.plot.normalize_counts, but uses memoized normed raw_counts
synth_lib_size = np.sum(self._raw_synthetics, axis=1).A1
aug_lib_size = np.concatenate([self._lib_size, synth_lib_size])
normed_synths = self._raw_synthetics.copy()
inplace_csr_row_normalize_l1(normed_synths)
aug_counts = sp_sparse.vstack((self._normed_raw_counts, normed_synths))
scaled_aug_counts = aug_counts * np.median(aug_lib_size)
if self.pseudocount != 1:
aug_counts = np.log(scaled_aug_counts.A + 0.1)
else:
aug_counts = np.log1p(scaled_aug_counts)
del scaled_aug_counts
aug_counts = anndata.AnnData(aug_counts)
aug_counts.obs["n_counts"] = aug_lib_size
if self.standard_scaling is True:
sc.pp.scale(aug_counts, max_value=15)
if self.verbose:
print("Running PCA...")
# "auto" solver faster for dense matrices
solver = "arpack" if sp_sparse.issparse(aug_counts.X) else "auto"
sc.tl.pca(
aug_counts,
n_comps=self.n_components,
random_state=self.random_state,
svd_solver=solver,
)
if self.verbose:
print("Clustering augmented data set...\n")
if self.clustering_algorithm == "phenograph":
f = io.StringIO()
with redirect_stdout(f):
fullcommunities, _, _ = phenograph.cluster(
aug_counts.obsm["X_pca"], n_jobs=self.n_jobs, **self.clustering_kwargs
)
out = f.getvalue()
if self.verbose:
print(out)
else:
if self.clustering_algorithm == "louvain":
clus = sc.tl.louvain
else:
clus = sc.tl.leiden
sc.pp.neighbors(
aug_counts,
random_state=self.random_state,
method="umap",
n_neighbors=10,
)
clus(
aug_counts,
key_added="clusters",
random_state=self.random_state,
**self.clustering_kwargs,
)
fullcommunities = np.array(aug_counts.obs["clusters"], dtype=int)
min_ID = min(fullcommunities)
self.communities_ = fullcommunities[: self._num_cells]
self.synth_communities_ = fullcommunities[self._num_cells :]
community_sizes = [
np.count_nonzero(fullcommunities == i) for i in np.unique(fullcommunities)
]
if self.verbose:
print(
"Found clusters [{0}, ... {2}], with sizes: {1}\n".format(
min(fullcommunities), community_sizes, max(fullcommunities)
)
)
# Count number of fake doublets in each community and assign score
# Number of synth/orig cells in each cluster.
synth_cells_per_comm = collections.Counter(self.synth_communities_)
orig_cells_per_comm = collections.Counter(self.communities_)
community_IDs = orig_cells_per_comm.keys()
community_scores = {
i: float(synth_cells_per_comm[i]) / (synth_cells_per_comm[i] + orig_cells_per_comm[i])
for i in community_IDs
}
scores = np.array([community_scores[i] for i in self.communities_])
community_log_p_values = {
i: hypergeom.logsf(
synth_cells_per_comm[i],
aug_counts.shape[0],
normed_synths.shape[0],
synth_cells_per_comm[i] + orig_cells_per_comm[i],
)
for i in community_IDs
}
log_p_values = np.array([community_log_p_values[i] for i in self.communities_])
if min_ID < 0:
scores[self.communities_ == -1] = np.nan
log_p_values[self.communities_ == -1] = np.nan
return scores, log_p_values
def _createDoublets(self):
"""Create synthetic doublets.
Sets .parents_
"""
# Number of synthetic doublets to add
num_synths = int(self.boost_rate * self._num_cells)
# Parent indices
choices = np.random.choice(self._num_cells, size=(num_synths, 2), replace=self.replace)
parents = [list(p) for p in choices]
parent0 = self._raw_counts[choices[:, 0], :]
parent1 = self._raw_counts[choices[:, 1], :]
synthetic = parent0 + parent1
self._raw_synthetics = synthetic
self.parents_ = parents
def _set_clustering_kwargs(self):
"""Sets .clustering_kwargs"""
if self.clustering_algorithm == "phenograph":
if "prune" not in self.clustering_kwargs:
self.clustering_kwargs["prune"] = True
self.clustering_kwargs = self.clustering_kwargs
if (self.n_iters == 1) and (self.clustering_kwargs.get("prune") is True):
warn_msg = (
"Using phenograph parameter prune=False is strongly recommended when "
+ "running only one iteration. Otherwise, expect many NaN labels."
)
warnings.warn(warn_msg)
else:
if "directed" not in self.clustering_kwargs:
self.clustering_kwargs["directed"] = False
if "resolution" not in self.clustering_kwargs:
self.clustering_kwargs["resolution"] = 4
if "key_added" in self.clustering_kwargs:
raise ValueError("'key_added' param cannot be overriden")
if "random_state" in self.clustering_kwargs:
raise ValueError(
"'random_state' param cannot be overriden. Please use classifier 'random_state'."
)