import numpy as np
import matplotlib.pyplot as plt
import GPy as gpy
from scipy.stats import chi2
from scipy.stats import spearmanr
from scipy.interpolate import interp1d
import sys
[docs]class WarpFunction:
""" Class to learn a (possibly nonlinear) function warping from
observed distribution to uniform (0,1) distribution and back.
:param t: pseudotime values
:type t: Nx1 array of float
:param quantiles: Number of quantiles to use in learning warp
:type quantiles: integer
:param method: Gaussian process regression or linear interpolation?
:type method: string
:param reverse: Reverse uniform quantiles before warping?
:type reverse: boolean
"""
def __init__(self,t,quantiles=50,method='gp',reverse=False):
""" Constructor for MATCHER class """
self.method = method
self.quantiles = quantiles
self.reverse = reverse
N = t.shape[0]
if N < self.quantiles:
x = np.array(sorted(t))
y = np.arange(1.,float(N+1))/float(N)
else:
x = np.percentile(t,range(0,100+100//quantiles,100//quantiles))
y = np.arange(0,1.+1./quantiles,1./quantiles)
if self.reverse:
y = y[::-1]
if self.method == 'linear':
self._warp = interp1d(x, y)
self._inverse_warp = interp1d(y, x)
else:
x = x.reshape(-1,1)
y = y.reshape(-1,1)
self._warp = gpy.models.GPRegression(x,y,gpy.kern.RBF(1))
self._warp.optimize()
[docs] def warp(self,t):
""" Warp from pseudotime to master time.
:param t: pseudotime values
:type t: Nx1 array of float
:returns: inferred master time values
:rtype: Nx1 array of float
"""
if self.method == 'linear':
return self._warp(t)
else:
return self._warp.predict(t.reshape(-1,1))[0]
[docs] def inverse_warp(self,tm):
""" Warp from master time to pseudotime .
:param tm: master time values
:type tm: Nx1 array of float
:returns: inferred pseudotime values
:rtype: Nx1 array of float
"""
if self.method == 'linear':
return self._inverse_warp(np.clip(tm,0,1))
else:
return self._warp.infer_newX(tm.reshape(-1,1))[0]
def plot(self):
if self.method == 'linear':
xnew = np.linspace(min(self._warp.x),max(self._warp.x))
f = plt.figure()
return plt.plot(self._warp.x,self._warp.y,'kx',xnew,self._warp(xnew),'b-')
else:
return self._warp.plot()
[docs]class MATCHER:
""" Manifold Alignment to Characterize Experimental Relationships (MATCHER)
:param X: One or more single cell datasets. To learn a joint trajectory from two datasets with known correspondences, add a second level list.
:type X: list, possibly nested
:param num_inducing: Number of "inducing inputs" to use in variational approximation
:type missing_data: integer
"""
def __init__(self,X,num_inducing=10):
""" Constructor for MATCHER class """
self.X = X
self.model = []
for i in range(len(self.X)):
if isinstance(X[i],list):
self.model.append(gpy.models.mrd.MRD(self.X[i], 1, num_inducing=num_inducing, kernel=gpy.kern.RBF(1)))
else:
self.model.append(gpy.models.bayesian_gplvm.BayesianGPLVM(self.X[i], input_dim=1, kernel=gpy.kern.RBF(1)))
self.warp_functions = []
self.master_time = []
[docs] def infer(self,quantiles=50,method=None,reverse=None):
""" Infer pseudotime and master time values.
:param quantiles: How many quantiles to use when computing warp functions
:type quantiles: int
:param method: Gaussian process regression or linear interpolation?
:type method: list of strings
:param reverse: Reverse pseudotime?
:type quantiles: list of booleans
"""
num_datasets = len(self.X)
self.warp_functions = [None] * num_datasets
self.master_time = [None] * num_datasets
if method is None:
method = ['gp']
method = np.repeat(method,[num_datasets])
if reverse is None:
reverse = [False]
reverse = np.repeat(reverse,[num_datasets])
for i in range(num_datasets):
self.model[i].optimize(messages=1)
self.learn_warp_function(i,quantiles,method[i],reverse[i])
def learn_warp_function(self,ind,quantiles=50,method='gp',reverse=False):
t = self.model[ind].latent_space.mean
self.warp_functions[ind] = WarpFunction(t,quantiles,method,reverse)
self.master_time[ind] = self.warp_functions[ind].warp(t)
[docs] def sample_master_time(self, ind, samples=10):
""" Sample from the posterior for the inferred master time values for the specified model.
:param ind: Index of model to sample
:type ind: int
:param samples: Number of samples
:type samples: int
:returns: Posterior samples from inferred master_time values for each cell
:rtype: SxN array, where S is the number of samples and N is the number of cells
"""
means = self.model[ind].latent_space.mean.flatten()
variances = self.model[ind].latent_space.variance.flatten()
sampled = np.random.multivariate_normal(means,np.diag(variances),samples)
mapped = self.warp_functions[ind].warp(sampled.reshape(-1,1)).reshape(samples,-1)
return mapped
def _p_adjust(self,p):
""" Helper function to perform Benjamini-Hochberg FDR control.
:param p: p-values to adjust
:type p: array
:returns: adjusted p-values
:rtype: list of floats
"""
p = np.asfarray(p)
by_descend = p.argsort()[::-1]
by_orig = by_descend.argsort()
steps = float(len(p)) / np.arange(len(p), 0, -1)
q = np.minimum(1, np.minimum.accumulate(steps * p[by_descend]))
return q[by_orig]
[docs] def find_markers(self,inds,fdr_correction=True):
""" Find features that are significantly related to pseudotime from each specified data type.
:param inds: Indices of data types to use
:type inds: list
:param fdr_correction: Perform multiple hypothesis test correction to control FDR?
:type fdr_correction: bool
:returns: list of indices for each data type
:rtype: list of lists
"""
p_vals = []
for ind in inds:
num_cells, num_features = self.X[ind].shape
raw_p_vals = []
t = self.model[ind].latent_space.mean
F = self.model[ind].predict(t)[0]
# Fit GP model with constant (bias) kernel
null_model = gpy.models.GPRegression(t,self.X[ind],gpy.kern.Bias(1))
null_model.optimize()
F_null = null_model.predict(t)[0]
for j in range(num_features):
f = F[:,j]
f_null = F_null[:,j]
y = self.X[ind][:,j]
null_loglik = sum(null_model.likelihood.logpdf_link(f_null,y))
alt_loglik = sum(self.model[ind].likelihood.logpdf_link(f,y))
df = 1
# Compute likelihood ratio statistic using chisquare with one df
D = 2*(alt_loglik - null_loglik)
p = chi2.sf(D,df)
raw_p_vals.append(p)
if fdr_correction:
p_vals.append(self._p_adjust(raw_p_vals))
else:
p_vals.append(raw_p_vals)
return p_vals
[docs] def correlation(self,model1,model2,inds1,inds2,method="Spearman"):
""" Approximate correlation between the specified features of two different data types.
:param model1: Index of first data type
:type model1: int
:param model2: Index of second data type
:type model2: int
:param inds1: Indices of features
:type inds1: list
:param inds2: Indices of features
:type inds2: list
:param method: Type of correlation coefficient to compute ("Spearman" or "Pearson")
:type method: string
:returns: Correlation matrix
:rytpe: 2D array
"""
t = self.master_time[model1]
model2_internal = self.warp_functions[model2].inverse_warp(t.reshape(-1,1))
vals1 = self.X[model1][:,inds1]
vals2 = self.model[model2].predict(model2_internal)[0][:,inds2]
n1 = len(inds1)
n2 = len(inds2)
n = n1 + n2
if method=="Spearman":
corr_res = spearmanr(vals1,vals2)
corr_res = corr_res[0]
elif method == "Pearson":
corr_res = np.corrcoef(vals1.transpose(),vals2.transpose())
else:
corr_res = np.corrcoef(vals1.transpose(),vals2.transpose())
corr_mat = corr_res[range(n1),n1:n]
return corr_mat
[docs] def plot_warp_functions(self,inds):
""" Plot the functions for each data type that map from domain-specific pseudotime to master time.
:param inds: Indices of data types
:type inds: list
"""
fig, axes = plt.subplots(1,len(inds))
for i in range(len(inds)):
self.warp_functions[inds[i]].plot(ax=axes[i])
t = self.model[inds[i]].latent_space.mean
axes[i].set_xlim([min(t),max(t)])
axes[i].set_ylim([0,1])
axes[i].legend(loc='best')
[docs] def plot_feature(self,model_ind,feature_ind):
""" Plot the specified feature and its model fit
:param model_ind: Index for data type
:type model_ind: int
:param feature_ind: Index of feature
:type feature_ind: int
"""
fig = self.model[model_ind].plot_f(which_data_ycols=[feature_ind],plot_inducing=False)
fig.scatter(self.model[model_ind].latent_space.mean,self.X[model_ind][:,feature_ind],marker='x',c="black")
[docs] def plot_master_time(self,inds):
""" Plot inferred master time values and uncertainty for models specified by inds.
:param inds: Indices of data types
:type inds: list
"""
fig, axes = plt.subplots(1,len(inds))
lines = []
fills = []
bg_lines = []
for i in range(len(self.model)):
means = self.model[i].latent_space.mean.flatten()
x = range(len(means))
vars = self.model[i].latent_space.variance.flatten()
axis_order = np.argsort(means)
ci_up = means[axis_order] + 2 * np.sqrt(vars[axis_order])
ci_up = self.warp_functions[i].warp(ci_up.reshape(-1,1)).flatten()
ci_down = means[axis_order] - 2 * np.sqrt(vars[axis_order])
ci_down = self.warp_functions[i].warp(ci_down.reshape(-1,1)).flatten()
means = self.warp_functions[i].warp(means.reshape(-1,1)).flatten()
lines.extend(axes[i].plot(x, means[axis_order], c='b', label=r"$\mathbf{{T_{{{}}}}}$".format(i+1)))
fills.append(axes[i].fill_between(x, ci_down, ci_up, facecolor=lines[i].get_color(), alpha=.3))
axes[i].legend(borderaxespad=0.,loc=0)
axes[i].set_xlim(min(x), max(x))
axes[i].set_ylim(min(means), max(means))
plt.draw()
axes[len(self.model)-1].figure.tight_layout(h_pad=.01)
return dict(lines=lines, fills=fills, bg_lines=bg_lines)