Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions dextrademixer/test/TestSimulation.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import unittest

import muon as mu
import pandas as pd

from matplotlib import pyplot as plt

Expand Down Expand Up @@ -222,3 +223,29 @@ def test_simulation_TCR_covariance(self):
plt.savefig('cov_test.pdf')

plt.show()

def test_p_outlier_prob(self):
import seaborn as sns
import numpy as np
import matplotlib
import matplotlib.pyplot as plt

sim = DextramerSimulator()
mdat = sim.simulate_pmhc_data_from_distribution(total_cells=5000,
binding_ratio=0.1,
nof_clones=150,
binding_fold_increase_range=[100],
variance_fold_increase_range=[2],
simulate_neg_control=False,
p_binding_outlier=0.1,
p_nonbinding_outlier=0.1,
use_clonotype_cov=False,
nof_clonotype_cluster=None,
plot_data=False)
neg_out = mdat.mod["gex"].X[(mdat.mod["gex"].obs["is_outlier"] == 1) & (mdat.mod["airr"].obs["is_binder"] == 0),0]
true_neg = mdat.mod["gex"].X[(mdat.mod["gex"].obs["is_outlier"] == 0) & (mdat.mod["airr"].obs["is_binder"] == 0),0]
pos_out = mdat.mod["gex"].X[(mdat.mod["gex"].obs["is_outlier"] == 1) & (mdat.mod["airr"].obs["is_binder"] == 1),0]
true_pos = mdat.mod["gex"].X[(mdat.mod["gex"].obs["is_outlier"] == 0) & (mdat.mod["airr"].obs["is_binder"] == 1),0]

assert(np.mean(neg_out) > np.mean(true_neg))
assert(np.mean(pos_out) < np.mean(true_pos))
116 changes: 72 additions & 44 deletions dextrademixer/utils/simulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -403,10 +403,11 @@ def simulate_pmhc_data_from_distribution(self,
binding_ratio: float = 0.05,
binding_fold_increase_range: list[float] = None,
variance_fold_increase_range: list[float] = None,
p_nonbinding_clone_outlier=0.0,
p_binding_outlier=0.0,
nof_clonotype_cluster=None,
p_nonbinding_outlier=0.0,
use_clonotype_cov: bool = False,
nof_clonotype_cluster=None,
p_nonbinding_clone_outlier=0.0,
simulate_neg_control: bool = False,
plot_data: bool = False,
rng_key: int = 42
Expand All @@ -422,19 +423,38 @@ def simulate_pmhc_data_from_distribution(self,
binding_fold_increase_range: list of fold increase for pMHC binding cells
variance_fold_increase_range: list of fold increase of the variance to the mean of a negative binomial
(i.e. variance_fold_increase_range=[1] => mean = var).
If not specified estimated inverdispersion of clonotypes will be used
p_nonbinding_clone_outlier: The probability that a non-binding clone is clustered together with binding clones
If not specified estimated invers dispersion of clonotypes will be used
p_binding_outlier: the probability of a cell of binding clonotype to have low pMHC counts
p_nonbinding_outlier: the probability of a cell of non-binding clonotype to have high pMHC counts
nof_clonotype_cluster: number of clonotype clusters to simulate in case covariance matrix should be
simulated (if None than randomly sampled between [2, nof_clones]
use_clonotype_cov: whether to use clonotype covariance to assign binding or randomly (default: False)
p_nonbinding_clone_outlier: The probability that a non-binding clone is clustered together with binding clones
simulate_neg_control: whether to simulate a negative control pMHC for each cell (default: False)
plot_data: boolean whether to plot simulated data (default: False)
rng_key: random seed.

Returns:
An Anndata object containing all generated count data and clonal information, and binder status
"""
def __simulate_pos_umi(n_cells, neg_mean, binding_fold_increase_range,concentration_param,
variance_fold_increase_range, rng, rng_key):
fold_change = float(rng.choice(binding_fold_increase_range))
mean = fold_change * neg_mean
if variance_fold_increase_range is None:
concentration = stats.gamma.rvs(*concentration_param, random_state=rng)
else:
concentration = convert_to_invdispersion(mean, mean * rng.choice(variance_fold_increase_range))
return self.generate_nb_val(mean, concentration, size=n_cells, rng_key=rng_key), fold_change

def __simulate_neg_umi(n_cells, neg_mean, neg_concentration, rng, rng_key):
# add some noise to neg_concentration
a = (0.001 - neg_concentration) / (neg_concentration / 3)
concentration = stats.truncnorm.rvs(a, np.inf, loc=neg_concentration, scale=neg_concentration / 3,
random_state=rng)
return self.generate_nb_val(neg_mean, concentration, size=n_cells, rng_key=rng_key), 0.0


rng = np.random.RandomState(seed=rng_key)

if self.dist_params is not None:
Expand Down Expand Up @@ -469,19 +489,17 @@ def simulate_pmhc_data_from_distribution(self,

# simulate TCR similarity clusters
if use_clonotype_cov:

cc_assignment = self.__cc_assignment(binder_assignment,
nof_clones,
nof_clonotype_cluster,
p_nonbinding_clone_outlier, rng)

K = self.__construct_tcr_kernel(nof_clones, cc_assignment, params, rng)


# generate cell per clonotype following a discrete exponentially decreasing distribution normalized to
# specified total cell count
total_le = total_cells - nof_clones
raw_cells_per_clone = np.array([stats.boltzmann.rvs(*cells_per_clonotype,random_state=rng) for _ in range(nof_clones)])
raw_cells_per_clone = np.array([stats.boltzmann.rvs(*cells_per_clonotype,random_state=rng)
for _ in range(nof_clones)])
cells_per_clone_p = raw_cells_per_clone/raw_cells_per_clone.sum()
cells_per_clone = (rng.multinomial(total_le, cells_per_clone_p) + np.ones(nof_clones)).astype("int32")

Expand All @@ -492,50 +510,42 @@ def simulate_pmhc_data_from_distribution(self,
n_cells = cells_per_clone[i]

if is_binder:
fold_change = float(rng.choice(binding_fold_increase_range))
mean = fold_change * neg_mean
if variance_fold_increase_range is None:
concentration = stats.gamma.rvs(*concentration_param, random_state=rng)
else:
concentration = convert_to_invdispersion(mean, mean*rng.choice(variance_fold_increase_range))
primary_sim = __simulate_pos_umi
primary_params = (n_cells, neg_mean, binding_fold_increase_range, concentration_param,
variance_fold_increase_range, rng, rng_key)
outlier_sim = __simulate_neg_umi
outlier_params = (neg_mean, neg_concentration, rng, rng_key)
p_outlier = p_binding_outlier
else:
fold_change = 0.0
mean = neg_mean
# add some noise to neg_concentration
a = (0.001 - neg_concentration) / (neg_concentration / 3)
concentration = stats.truncnorm.rvs(a, np.inf, loc=neg_concentration, scale=neg_concentration / 3,
random_state=rng)

x = self.generate_nb_val(mean, concentration, size=n_cells, rng_key=rng_key)
primary_sim = __simulate_neg_umi
primary_params = (n_cells, neg_mean, neg_concentration, rng, rng_key)
outlier_sim = __simulate_pos_umi
outlier_params = (neg_mean, binding_fold_increase_range, concentration_param,
variance_fold_increase_range, rng, rng_key)
p_outlier = p_nonbinding_outlier

if p_binding_outlier > 0 and is_binder:
outlier = stats.binom.rvs(1, p_binding_outlier, size=n_cells, random_state=rng)
outlier_idx = np.where(outlier)
x, fold_change = primary_sim(*primary_params)

a = (0.001 - neg_concentration) / (neg_concentration / 3)
concentration = stats.truncnorm.rvs(a, np.inf, loc=neg_concentration, scale=neg_concentration / 3,
random_state=rng)

x = x.at[outlier_idx].set(self.generate_nb_val(mean, concentration, size=np.sum(outlier),
rng_key=rng_key))
if simulate_neg_control:
x_neg = self.generate_nb_val(neg_mean, neg_concentration, size=n_cells, rng_key=rng_key)
d["x_neg"].extend(x_neg.tolist())

if p_outlier > 0:
outlier = stats.binom.rvs(1, p_outlier, size=n_cells, random_state=rng)
x_out, _ = outlier_sim(np.sum(outlier), *outlier_params)
x = x.at[outlier.astype(bool)].set(x_out)
d["outlier"].extend(outlier.tolist())
else:
d["outlier"].extend([0]*n_cells)

if simulate_neg_control:
mean = neg_mean
x_neg = self.generate_nb_val(mean, neg_concentration, size=n_cells, rng_key=rng_key)
d["x_neg"].extend(x_neg.tolist())
d["outlier"].extend([0] * n_cells)

d["x"].extend(x.tolist())
d["binder"].extend([is_binder] * n_cells)
d["clone"].extend([i] * n_cells)
d["fold_increase"].extend([fold_change] * n_cells)

mdat = DextramerSimulator.__generate_mdata(d, simulate_neg_control, K, cc_assignment)
mdat = self.__generate_mdata(d, simulate_neg_control, K, cc_assignment)
if plot_data:
return mdat, DextramerSimulator.__plot_simulated_data(d)
return mdat, self.__plot_simulated_data(d)
else:
return mdat

Expand All @@ -544,9 +554,11 @@ def simulate_pmhc_data_from_sample(self,
nof_clones: int = 150,
binding_ratio: float = 0.05,
binding_fold_increase_range: list[float] = None,
p_binding_outlier=0.0,
p_nonbinding_outlier=0.0,
use_clonotype_cov: bool = False,
nof_clonotype_cluster = None,
p_nonbinding_clone_outlier = 0.0,
nof_clonotype_cluster=None,
p_nonbinding_clone_outlier=0.0,
simulate_neg_control: bool = False,
plot_data: bool = False,
rng_key: int = 42
Expand All @@ -560,7 +572,13 @@ def simulate_pmhc_data_from_sample(self,
nof_clones: number of clones measured in experiments.
binding_ratio: ratio of binder vs non-binder
binding_fold_increase_range: list of fold increase for pMHC binding cells
p_binding_outlier: the probability of a cell of binding clonotype to have low pMHC counts
p_nonbinding_outlier: the probability of a cell of non-binding clonotype to have high pMHC counts
nof_clonotype_cluster: number of clonotype clusters to simulate in case covariance matrix should be
simulated (if None than randomly sampled between [2, nof_clones]
use_clonotype_cov: whether to use clonotype covariance to assign binding or randomly (default: False)
p_nonbinding_clone_outlier: The probability that a non-binding clone is clustered together with binding
clones
simulate_neg_control: whether to simulate a negative control pMHC for each cell (default: False)
plot_data: boolean whether to plot simulated data (default: False)
rng_key: random seed.
Expand Down Expand Up @@ -590,7 +608,7 @@ def simulate_pmhc_data_from_sample(self,
else:
nof_clonotype_cluster = rng.randint(2, nof_clones)

d = {"x": [], "x_neg": [], "binder": [], "clone": [], "fold_increase": []}
d = {"x": [], "x_neg": [], "binder": [], "clone": [], "fold_increase": [], "outlier":[]}

binder_assignment = rng.binomial(1, binding_ratio, size=nof_clones)
K = None
Expand Down Expand Up @@ -619,6 +637,15 @@ def simulate_pmhc_data_from_sample(self,
nx = rng.choice(neg_x, size=n_cells)
x = fold_change*nx if is_binder else nx

#outlier infusion:
prob = p_binding_outlier if is_binder else p_nonbinding_outlier
if prob > 0:
outlier = stats.binom.rvs(1, prob, size=n_cells, random_state=rng)
x[outlier.astype(bool)] = nx if is_binder else fold_change * nx
d["outlier"].extend(outlier.tolist())
else:
d["outlier"].extend([0] * n_cells)

if simulate_neg_control:
d["x_neg"].extend(rng.choice(neg_x, size=n_cells).tolist())

Expand All @@ -627,10 +654,10 @@ def simulate_pmhc_data_from_sample(self,
d["clone"].extend([i] * n_cells)
d["fold_increase"].extend([fold_change] * n_cells)

mdat = DextramerSimulator.__generate_mdata(d, simulate_neg_control, K, cc_assignment)
mdat = self.__generate_mdata(d, simulate_neg_control, K, cc_assignment)

if plot_data:
return mdat, DextramerSimulator.__plot_simulated_data(d)
return mdat, self.__plot_simulated_data(d)
else:
return mdat

Expand Down Expand Up @@ -679,6 +706,7 @@ def __generate_mdata(d, simulate_neg_control, cov, cc_assignment) -> MuData:
adata.var["feature_types"] = ["Antigen Capture"]

adata.obs["fold_increase"] = d["fold_increase"]
adata.obs["is_outlier"] = d["outlier"]
adata.obs.index = adata.obs.index.astype("int64")

adata_tcr = ad.AnnData()
Expand Down