"""
19 Jul 2013
"""
from pytadbit.utils.tadmaths import calinski_harabasz, calc_eqv_rmsd
from pytadbit.utils.tadmaths import calc_consistency, dihedral
from pytadbit.utils.extraviews import color_residues, chimera_view
from pytadbit.utils.extraviews import augmented_dendrogram, plot_hist_box
from pytadbit.imp.impmodel import IMPmodel
from pytadbit.centroid import centroid_wrapper
from cPickle import load, dump
from subprocess import Popen, PIPE
from math import sqrt, acos, degrees, pi
from numpy import median as np_median
from numpy import std as np_std, log2
from numpy import array, cross, dot, ma, isnan
from numpy.linalg import norm
from scipy.cluster.hierarchy import linkage, fcluster
from scipy.stats import spearmanr
from warnings import warn
from string import uppercase as uc, lowercase as lc
from random import random
try:
from matplotlib import pyplot as plt
from matplotlib.cm import jet
except ImportError:
warn('matplotlib not found\n')
def load_structuralmodels(path_f):
"""
:param path: to the pickled StructuralModels object.
:returns: a :class:`pytadbit.imp.imp_model.StructuralModels`.
"""
svd = load(open(path_f))
return StructuralModels(
nloci=svd['nloci'], models=svd['models'], bad_models=svd['bad_models'],
resolution=svd['resolution'], original_data=svd['original_data'],
clusters=svd['clusters'], config=svd['config'], zscores=svd['zscore'])
[docs]class StructuralModels(object):
"""
This function generates three-dimensional models starting from Hi-C data.
The final analysis will be performed on the n_keep top models.
:param nloci: number of particles in the selected region
:param models: a dictionary containing the generated
:class:`pytadbit.imp.impmodel.IMPmodel` to be used as 'best models'
:param bad_models: a dictionary of :class:`pytadbit.imp.impmodel.IMPmodel`,
these model will not be used, just stored in case the set of
'best models' needs to be extended later-on (
:func:`pytadbit.imp.structuralmodels.StructuralModels.define_best_models`
).
:param resolution: number of nucleotides per Hi-C bin. This will be the
number of nucleotides in each model's particle.
:param None original_data: a list of list (equivalent to a square matrix) of
the normalized Hi-C data, used to build this list of models.
:param None clusters: a dictionary of type
:class:`pytadbit.imp.structuralmodels.ClusterOfModels`
:param None config: a dictionary containing the parameter to be used for the
generation of three dimensional models.
"""
def __init__(self, nloci, models, bad_models, resolution,
original_data=None, zscores=None, clusters=None,
config=None):
self.__models = models
self._bad_models = bad_models
self.nloci = nloci
self.clusters = clusters or ClusterOfModels()
self.resolution = float(resolution)
self._original_data = original_data # only used for correlation
self._zscores = zscores # only used for plotting
self._config = config
def __getitem__(self, nam):
if type(nam) == str:
nam = int(nam)
for m in self.__models:
if self.__models[m]['rand_init'] == nam:
return self.__models[m]
else:
raise KeyError('Model %s not found\n' % (nam))
try:
return self.__models[nam]
except TypeError:
for i, key in self.__models:
if nam == i:
return self.__models[key]
raise KeyError('Model %s not found\n' % (i))
def __iter__(self):
for m in self.__models:
yield self.__models[m]
def __len__(self):
return len(self.__models)
def __repr__(self):
return ('StructuralModels with %s models (objective function range: %s - %s)\n' +
' (corresponding to the best models out of %s models).\n' +
' IMP modeling used this parameters:\n' +
'%s\n' +
' Models where clustered into %s clusters') % (
len(self.__models),
int(self.__models[0]['objfun']),
int(self.__models[len(self.__models) - 1]['objfun']),
len(self.__models) + len(self._bad_models),
'\n'.join([' - %-12s: %s' % (k, v)
for k, v in self._config.iteritems()]),
len(self.clusters))
[docs] def fetch_model_by_rand_init(self, rand_init, all_models=False):
"""
Models are stored according to their objective function value (first
best), but in order to reproduce a model, we need its initial random
number. This method helps to fetch the model corresponding to a given
initial random number stored under
StructuralModels.models[N]['rand_init'].
:param rand_init: the wanted rand_init number.
:param False all_models: whether or not to use 'bad' models
:returns: index of 3d model
"""
for i, model in enumerate(self):
if model['rand_init'] == rand_init:
return i
if all_models:
for m in self._bad_models:
if self._bad_models[m]['rand_init'] == rand_init:
return m
raise IndexError(('Model with initial random number: %s, not found\n' +
'') % (rand_init))
[docs] def centroid_model(self, models=None, cluster=None, verbose=False):
"""
:param None models: if None (default) the contact map will be computed
using all the models. A list of numbers corresponding to a given set
of models can be passed
:param None cluster: compute the contact map only for the models in the
cluster number 'cluster'
:param False verbose: prints the distance of each model to average model
(in stderr)
:returns: the centroid model of a given group of models (the most model
representative)
"""
if models:
models = models
elif cluster > -1:
models = [str(m) for m in self.clusters[cluster]]
else:
models = self.__models
idx = centroid_wrapper([self[x]['x'] for x in models],
[self[x]['y'] for x in models],
[self[x]['z'] for x in models],
self.nloci, len(models), int(verbose), 0)
return models[idx]
[docs] def average_model(self, models=None, cluster=None, verbose=False):
"""
:param None models: if None (default) the contact map will be computed
using all the models. A list of numbers corresponding to a given set
of models can be passed
:param None cluster: compute the contact map only for the models in the
cluster number 'cluster'
:param False verbose: prints the distance of each model to average model
(in stderr)
:returns: the average model of a given group of models (a new and
ARTIFICIAL model)
"""
if models:
models = models
elif cluster > -1:
models = [str(m) for m in self.clusters[cluster]]
else:
models = self.__models
idx = centroid_wrapper([self[x]['x'] for x in models],
[self[x]['y'] for x in models],
[self[x]['z'] for x in models],
self.nloci, len(models), int(verbose), 1)
avgmodel = IMPmodel((('x', idx[0]), ('y', idx[1]), ('z', idx[2]),
('rand_init', 'avg'), ('objfun', None),
('radius', self['radius'])))
return avgmodel
[docs] def cluster_models(self, fact=0.75, dcutoff=200, var='score', method='mcl',
mcl_bin='mcl', tmp_file=None, verbose=True):
"""
This function performs a clustering analysis of the generated models
based on structural comparison. The result will be stored in
StructuralModels.clusters
:param 0.75 fact: factor to define the percentage of equivalent
positions to be considered in the clustering
:param 200 dcutoff: distance threshold (nm) to determine if two
particles are in contact
:param 'score' var: value to return, which can be either (i) 'drmsd'
(symmetry independent: mirrors will show no differences), or (ii)
'score', defined as:
::
dRMSD[i] / max(dRMSD)
score[i] = eqvs[i] * -----------------------
RMSD[i] / max(RMSD)
where eqvs[i] is the number of equivalent position for the ith
pairwise model comparison
:param 'mcl' method: clustering method to use, which can be either
'mcl' or 'ward'. The last one uses scipy implementation, and is
NOT RECOMMENDED.
:param 'mcl' mcl_bin: path to the mcl executable file, in case of the
'mcl is not in the PATH' warning message
:param None tmp_file: path to a temporary file created during
the clustering computation. Default will be created in /tmp/ folder
:param True verbose: same as print StructuralModels.clusters
"""
tmp_file = '/tmp/tadbit_tmp_%s.txt' % (
''.join([(uc + lc)[int(random() * 52)] for _ in xrange(4)]))
scores = calc_eqv_rmsd(self.__models, self.nloci, dcutoff, var)
from distutils.spawn import find_executable
if not find_executable(mcl_bin):
print('\nWARNING: MCL not found in path using WARD clustering\n')
method = 'ward'
# Initialize cluster definition of models:
for model in self:
model['cluster'] = 'Singleton'
if method == 'ward':
matrix = [[0.0 for _ in xrange(len(self))]
for _ in xrange(len(self))]
for (i, j), score in scores.iteritems():
matrix[i][j] = score if score > fact * self.nloci else 0.0
clust = linkage(matrix, method='ward')
solutions = {}
# x = []
# y = []
for k in clust[:,2]:
clusters = ClusterOfModels()
_ = [clusters.setdefault(j, []).append(i) for i, j in
enumerate(fcluster(clust, k, criterion='distance'))]
solutions[k] = {'out': clusters}
solutions[k]['score'] = calinski_harabasz(scores, clusters)
# print len(a), k, solutions[k]['score']
# x.append(solutions[k]['score'])
# y.append(len(a))
# plt.plot(y,x)
# plt.show()
clusters = [solutions[s] for s in sorted(
solutions, key=lambda x: solutions[x]['score'])
if solutions[s]['score']>0][-1]['out']
for cluster in clusters:
for model in clusters[cluster]:
self[model]['cluster'] = cluster
self.clusters = clusters
else:
out_f = open(tmp_file, 'w')
uniqs = list(set([tuple(sorted((m1, m2))) for m1, m2 in scores]))
cut = fact * self.nloci
for md1, md2 in uniqs:
score = scores[(md1, md2)]
if score >= cut:
out_f.write('model_%s\tmodel_%s\t%s\n' % (md1, md2, score))
out_f.close()
Popen('%s %s --abc -V all -o %s.mcl' % (
mcl_bin, tmp_file, tmp_file), stdout=PIPE, stderr=PIPE,
shell=True).communicate()
self.clusters = ClusterOfModels()
for cluster, line in enumerate(open(tmp_file + '.mcl')):
self.clusters[cluster + 1] = []
for model in line.split():
model = int(model.split('_')[1])
self[model]['cluster'] = cluster + 1
self.clusters[cluster + 1].append(self[model]['rand_init'])
self.clusters[cluster + 1].sort(
key=lambda x: self[str(x)]['objfun'])
# sort clusters according to their lowest energy
# for clt in clusters:
# clusters[clt].sort()
# for i, clt in enumerate(sorted(
# clusters, key=lambda x: self[clusters[x][0]]['energy'])):
# self.clusters[i] = clusters[clt]
# for model in self.clusters[i]:
# self.__models[model]['cluster'] = i
if verbose:
print 'Number of Singletons excluded from clustering: %s' % (
len([1 for model in self if model['cluster'] == 'Singleton']))
print self.clusters
def _build_distance_matrix(self, n_best_clusters):
"""
"""
clusters = sorted(self.clusters.keys())[:n_best_clusters]
matrix = [[0.0 for _ in clusters] for _ in clusters]
clust_count = dict([(c, len([m for m in self if m['cluster']==c]))
for c in clusters])
objfun = dict([(c, [m for m in self if m['cluster']==c][0]['objfun'])
for c in clusters])
for i, cl1 in enumerate(clusters):
md1 = md2 = None
# find model with lowest energy for each cluster
for md1 in self:
if md1['cluster'] == cl1:
# the first on found is the best :)
break
for j, cl2 in enumerate(clusters[i+1:]):
# find model with lowest energy for each cluster
for md2 in self:
if md2['cluster'] == cl2:
# the first on found is the best :)
break
matrix[i][j+i+1] = calc_eqv_rmsd({0: md1, 1: md2}, self.nloci,
var='drmsd', one=True)
return clust_count, objfun, matrix
[docs] def cluster_analysis_dendrogram(self, n_best_clusters=None, color=False,
axe=None, savefig=None):
"""
Representation of the clustering results. The length of the leaves if
proportional to the final objective function value of each model. The
branch widths are proportional to the number of models in a given
cluster (or group of clusters, if it is an internal branch).
:param None n_best_clusters: number of clusters to represent (by
default all clusters will be shown)
:param False color: color the dendrogram based on the significance of
the clustering (basically it depends of the internal branch lengths)
"""
if not self.clusters:
self.cluster_models()
if not n_best_clusters:
n_best_clusters = len(self.clusters)
if n_best_clusters <= 1:
warn("Need at least 2 clusters to display...")
return None
clust_count, objfun, matrix = self._build_distance_matrix(n_best_clusters)
z = linkage(matrix)
minnrj = min(objfun.values()) - 1
maxnrj = max(objfun.values()) - 1
val = (maxnrj-minnrj)
maxz = max([i[2] for i in z])
for i in z:
i[2] = i[2]/maxz * val
dads = {}
i = max(clust_count)
for a, b, _, _ in z:
i += 1
clust_count[i] = clust_count[a + 1] + clust_count[b + 1]
dads[a + 1] = i
dads[b + 1] = i
d = augmented_dendrogram(clust_count, dads, objfun, color,
axe, savefig, z)
return d
[docs] def density_plot(self, models=None, cluster=None, steps=(1, 2, 3, 4, 5),
error=False, axe=None, savefig=None, savedata=None):
"""
Plots the number of nucleotides per nm of chromatin vs the modeled
region bins.
:param None models: if None (default) the contact map will be computed
using all the models. A list of numbers corresponding to a given set
of models can be passed
:param None cluster: compute the contact map only for the models in the
cluster number 'cluster'
:param (1, 2, 3, 4, 5) steps: how many particles to group for the
estimation. By default 5 curves are drawn
:param False error: represent the error of the estimates
:param None savedata: path to a file where to save the density data
generated (1 column per step + 1 for particle number).
"""
if type(steps) == int:
steps = (steps, )
if len(steps) > 6:
raise Exception('Sorry not enough colors to do this.\n')
colors = ['grey', 'darkgreen', 'darkblue', 'purple', 'darkorange',
'darkred'][-len(steps):]
dists = []
for part1, part2 in zip(range(self.nloci - 1), range(1, self.nloci)):
dists.append(self.median_3d_dist(part1 + 1, part2 + 1, models,
cluster, plot=False, median=False))
lmodels = len(dists[0])
distsk = {1: dists}
for k in (steps[1:] if steps[0]==1 else steps):
distsk[k] = [None for _ in range(k/2)]
for i in range(self.nloci - k):
distsk[k].append(reduce(lambda x, y: x + y,
[dists[i+j] for j in range(k)]))
if k == 1:
continue
# calculate the mean for steps larger than 1
distsk[k][-1] = [float(sum([distsk[k][-1][i+lmodels*j]
for j in xrange(k)])) / k
for i in xrange(lmodels)]
new_distsk = {}
errorp = {}
errorn = {}
for k, dists in distsk.iteritems():
new_distsk[k] = []
errorp[k] = []
errorn[k] = []
for part in dists:
if not part:
new_distsk[k].append(None)
errorp[k].append(None)
errorn[k].append(None)
continue
part = [self.resolution / p for p in part]
new_distsk[k].append(np_median(part))
try:
errorn[k].append(new_distsk[k][-1] - 2 * np_std(part))
errorn[k][-1] = errorn[k][-1] if errorn[k][-1] > 0 else 0.0
errorp[k].append(new_distsk[k][-1] + 2 * np_std(part))
errorp[k][-1] = errorp[k][-1] if errorp[k][-1] > 0 else 0.0
except TypeError:
errorn[-1].append(None)
errorp[-1].append(None)
distsk = new_distsk
# write consistencies to file
if savedata:
out = open(savedata, 'w')
out.write('#Particle\t%s\n' % ('\t'.join([str(c) + '\t' +
'2*stddev(%d)' % c for c in steps])))
for part in xrange(self.nloci):
out.write('%s\t%s\n' % (part + 1, '\t'.join(
['None\tNone' if part >= len(distsk[c]) else
(str(round(distsk[c][part], 3)) + '\t' +
str(errorp[c][part]))
if distsk[c][part] else 'None\tNone'
for c in steps])))
out.close()
# plot
if axe:
ax = axe
fig = ax.get_figure()
else:
fig = plt.figure(figsize=(11, 5))
ax = fig.add_subplot(111)
ax.patch.set_facecolor('lightgrey')
ax.patch.set_alpha(0.4)
ax.grid(ls='-', color='w', lw=1.5, alpha=0.6, which='major')
ax.grid(ls='-', color='w', lw=1, alpha=0.3, which='minor')
ax.set_axisbelow(True)
ax.minorticks_on() # always on, not only for log
# remove tick marks
ax.tick_params(axis='both', direction='out', top=False, right=False,
left=False, bottom=False)
ax.tick_params(axis='both', direction='out', top=False, right=False,
left=False, bottom=False, which='minor')
plots = []
for k in steps:
plots += ax.plot(range(1, len(distsk[k]) + 1), distsk[k],
color=colors[steps.index(k)],
lw=steps.index(k) + 1, alpha=0.5)
if error:
for k in steps:
plots += ax.plot(range(1, len(errorp[k]) + 1), errorp[k],
color=colors[steps.index(k)], ls='--')
ax.plot(range(1, len(errorp[k]) + 1), errorn[k],
color=colors[steps.index(k)], ls='--')
ax.set_ylabel('Density (bp / nm)')
ax.set_xlabel('Particle number')
ax.legend(plots, ['Average for %s particle%s' % (k, 's' if k else '')
for k in steps] + (
['+/- 2 standard deviations'
for k in steps] if error else []), fontsize='small',
bbox_to_anchor=(1, 0.5), loc='center left')
ax.set_xlim((1, self.nloci))
ax.set_title('Chromatin density')
plt.subplots_adjust(left=0.1, right=0.78)
if savefig:
fig.savefig(savefig)
elif not axe:
plt.show()
[docs] def define_best_models(self, nbest):
"""
Defines the number of top models (based on the objective function) to
keep. If keep_all is set to True in
:func:`pytadbit.imp.imp_model.generate_3d_models` or in
:func:`pytadbit.experiment.Experiment.model_region`, then the full set
of models (n_models parameter) will be used, otherwise only the n_keep
models will be available.
:param nbest: number of top models to keep (usually 20% of the
generated models).
"""
tmp_models = self.__models
tmp_models.update(self._bad_models)
self.__models = dict([(i, tmp_models[i]) for i in xrange(nbest)])
self._bad_models = dict([(i, tmp_models[i]) for i in
xrange(nbest, len(tmp_models))])
[docs] def zscore_plot(self, axe=None, savefig=None):
"""
Generate 3 plots. Two heatmaps of the Z-scores used for modeling, one
of which is binary showing in red Z-scores higher than upper cut-off;
and in blue Z-scores lower than lower cut-off. Last plot is an histogram
of the distribution of Z-scores, showing selected regions.
:param None axe: a matplotlib.axes.Axes object to define the plot
appearance
:param None savefig: path to a file where to save the image generated;
if None, the image will be shown using matplotlib GUI
"""
zsc_mtrx = reduce(lambda x, y: x + y, [[k] + self._zscores[k].keys()
for k in self._zscores.keys()])
max_bin = max([int(i) for i in zsc_mtrx])
zsc_mtrx = [[float('nan') for _ in xrange(max_bin)] for _ in xrange(max_bin)]
for i in xrange(max_bin):
for j in xrange(max_bin):
try:
zsc_mtrx[i][j] = self._zscores[str(i)][str(j)]
except KeyError:
continue
for i in xrange(max_bin):
for j in xrange(max_bin):
if zsc_mtrx[i][j]:
zsc_mtrx[j][i] = zsc_mtrx[i][j]
for i in xrange(max_bin):
for j in xrange(max_bin):
if self._config['lowfreq'] < zsc_mtrx[i][j] < self._config['upfreq']:
zsc_mtrx[j][i] = float('nan')
zsc_mtrx[i][j] = float('nan')
masked_array = ma.array (zsc_mtrx, mask=isnan(zsc_mtrx))
cmap = jet
cmap.set_bad('w', 1.)
if not axe:
fig = plt.figure(figsize=(25, 5.5))
else:
fig = axe.get_figure()
ax = fig.add_subplot(131)
ims = ax.imshow(masked_array, origin='lower',
interpolation="nearest", cmap=cmap)
ax.set_ylabel('Particles')
ax.set_xlabel('Particles')
ax.set_title('Z-scores of the observed Hi-C count')
cbar = ax.figure.colorbar(ims, cmap=cmap)
cbar.ax.set_ylabel('Z-score value')
#
ax = fig.add_subplot(132)
_, _, patches = ax.hist(
reduce(lambda x, y: x+y, [self._zscores[v].values()
for v in self._zscores.keys()]),
bins=50)
for thispatch in patches:
color = ('grey' if self._config['lowfreq'] < thispatch.get_x() + thispatch.get_width() < self._config['upfreq']
else 'green')
thispatch.set_facecolor(color)
thispatch.set_alpha(0.7)
ax.set_title('Histogram of Z-scores')
ax.vlines(self._config['lowfreq'], 1, ax.get_ylim()[1], color='red',
linestyle='--')
ax.vlines(self._config['upfreq'] , 1, ax.get_ylim()[1], color='red',
linestyle='--')
#
ax = fig.add_subplot(133)
masked_array = ma.array (zsc_mtrx, mask=isnan(zsc_mtrx))
for i in masked_array:
for j in xrange(len(i)):
try:
i[j].mask
continue
except AttributeError:
pass
if i[j] > self._config['upfreq']:
i[j] = 1
elif i[j] < self._config['lowfreq']:
i[j] = -1
ims = ax.imshow(masked_array, origin='lower', vmin=-1.2, vmax=1.2,
interpolation="nearest", cmap=cmap)
ax.set_ylabel('Particles')
ax.set_xlabel('Particles')
ax.set_title('Binary representation of Z-scores\nred: > ' +
'%.2f; blue: < %.2f' % (self._config['upfreq'],
self._config['lowfreq']))
if savefig:
fig.savefig(savefig)
elif not axe:
plt.show()
[docs] def correlate_with_real_data(self, models=None, cluster=None, cutoff=200,
plot=False, axe=None, savefig=None):
"""
:param None models: if None (default) the contact map will be computed
using all the models. A list of numbers corresponding to a given set
of models can be passed
:param None cluster: compute the contact map only for the models in the
cluster number 'cluster'
:param 200 cutoff: distance cutoff (nm) to define whether two particles
are in contact or not
:param None savefig: path to a file where to save the image generated;
if None, the image will be shown using matplotlib GUI
:param False plot: to display the plot
:param None axe: a matplotlib.axes.Axes object to define the plot
appearance
:returns: correlation coefficient rho, between the two
matrices. A rho value greater than 0.7 indicates a very good
correlation
"""
model_matrix = self.get_contact_matrix(models=models, cluster=cluster,
cutoff=cutoff)
oridata = []
moddata = []
for i in xrange(len(self._original_data)):
for j in xrange(i + 1, len(self._original_data)):
if not self._original_data[i][j] > 0:
continue
oridata.append(self._original_data[i][j])
moddata.append(model_matrix[i][j])
# corr = spearmanr(model_matrix, self._original_data, axis=None)
corr = spearmanr(moddata, oridata)
# corr = corrcoef(moddata, oridata)[1]
if not plot and not savefig:
return corr
if not axe:
fig = plt.figure(figsize=(15, 5.5))
else:
fig = axe.get_figure()
fig.suptitle('Correlation between normalized-real and modeled '
+ 'contact maps (correlation=%.4f)' % (corr[0]),
size='x-large')
ax = fig.add_subplot(121)
self.contact_map(models, cluster, cutoff, axe=ax)
ax = fig.add_subplot(122)
ims = ax.imshow(log2(self._original_data), origin='lower',
interpolation="nearest",
extent=(0.5, self.nloci + 0.5, 0.5, self.nloci + 0.5))
ax.set_ylabel('Particles')
ax.set_xlabel('Particles')
ax.set_title('Normalized Hi-C count')
cbar = ax.figure.colorbar(ims)
cbar.ax.set_ylabel('Log2 (normalized Hi-C data)')
if savefig:
fig.savefig(savefig)
elif not axe:
plt.show()
return corr
[docs] def model_consistency(self, cutoffs=(50, 100, 150, 200), models=None,
cluster=None, axe=None, savefig=None, savedata=None):
"""
Plots the particle consistency, over a given set of models, vs the
modeled region bins. The consistency is a measure of the variability
(or stability) of the modeled region (the higher the consistency value,
the higher stability).
:param (50,100,150,200) cutoffs: list of distance cutoffs (nm) used to
compute the consistency. Two particle are considered consistent if
their distance is less than the given cutoff
:param None models: if None (default) the contact map will be computed
using all the models. A list of numbers corresponding to a given set
of models can be passed
:param None cluster: compute the contact map only for the models in the
cluster number 'cluster'
:param '/tmp/tmp_cons' tmp_path: location of the input files for
TM-score program
:param '' tmsc: path to the TMscore_consistency script (assumed to be
installed by default)
:param None savedata: path to a file where to save the consistency data
generated (1 column per cutoff + 1 for particle number).
"""
if models:
models = dict([(i, self[m]) for i, m in enumerate(models)])
elif cluster > -1:
models = dict([(i, self[str(m)]) for i, m in
enumerate(self.clusters[cluster])])
else:
models = self.__models
consistencies = {}
for cut in cutoffs:
consistencies[cut] = calc_consistency(models, self.nloci, cut)
# write consistencies to file
if savedata:
out = open(savedata, 'w')
out.write('#Particle\t%s\n' % ('\t'.join([str(c) for c in cutoffs])))
for part in xrange(self.nloci):
out.write('%s\t%s\n' % (str(part + 1), '\t'.join(
[str(round(consistencies[c][part], 3)) for c in cutoffs])))
out.close()
# plot
show = False
if not axe:
fig = plt.figure(figsize=(11, 5))
axe = fig.add_subplot(111)
show=True
axe.patch.set_facecolor('lightgrey')
axe.patch.set_alpha(0.4)
axe.grid(ls='-', color='w', lw=1.5, alpha=0.6, which='major')
axe.grid(ls='-', color='w', lw=1, alpha=0.3, which='minor')
axe.set_axisbelow(True)
axe.minorticks_on() # always on, not only for log
# remove tick marks
axe.tick_params(axis='both', direction='out', top=False,
right=False, left=False, bottom=False)
axe.tick_params(axis='both', direction='out', top=False,
right=False, left=False, bottom=False,
which='minor')
else:
fig = axe.get_figure()
# plot!
# colors = ['grey', 'darkgreen', 'darkblue', 'purple', 'darkorange', 'darkred'][-len(cutoffs):]
plots = []
for i, cut in enumerate(cutoffs[::-1]):
plots += axe.plot(range(1, self.nloci + 1),
consistencies[cut], color='darkred',
alpha= 1 - i / float(len(cutoffs)))
axe.legend(plots, ['%s nm' % (k) for k in cutoffs[::-1]],
fontsize='small', loc='center left', bbox_to_anchor=(1, 0.5))
axe.set_xlim((1, self.nloci))
axe.set_ylim((0, 100))
axe.set_xlabel('Particle')
axe.set_ylabel('Consistency (%)')
if cluster:
axe.set_title('Cluster %s' % (cluster))
elif len(models) == len(self):
axe.set_title('All clusters')
else:
axe.set_title('Selected models')
if savefig:
fig.savefig(savefig)
elif show:
plt.show()
[docs] def view_model(self, models=None, cluster=None, tool='chimera',
savefig=None, cmd=None):
"""
Visualize a selected model in the three dimensions.
:param None models: if None (default) the contact map will be computed
using all the models. A list of numbers corresponding to a given set
of models can be passed
:param None cluster: compute the contact map only for the models in the
cluster number 'cluster'
:param 'chimera' tool: path to the external tool used to visualize the
model
:param None savefig: path to a file where to save the image OR movie
generated (depending on the extension; accepted formats are png, mov
and webm). if set to None, the image or movie will be shown using
the default GUI.
:param None cmd: list of commands to be passed to the viewer. The chimera list is:
::
focus
set bg_color white
windowsize 800 600
bonddisplay never #0
represent wire
shape tube #0 radius 5 bandLength 100 segmentSubdivisions 1 followBonds on
clip yon -500
~label
set subdivision 1
set depth_cue
set dc_color black
set dc_start 0.5
set dc_end 1
scale 0.8
Followed by the movie command to record movies:
::
movie record supersample 1
turn y 3 120
wait 120
movie stop
movie encode output SAVEFIG
Or the copy command for images:
::
copy file SAVEFIG png
Passing as the following list as 'cmd' parameter:
::
cmd = ['focus', 'set bg_color white', 'windowsize 800 600', 'bonddisplay never #0', 'shape tube #0 radius 10 bandLength 200 segmentSubdivisions 100 followBonds on', 'clip yon -500', '~label', 'set subdivision 1', 'set depth_cue', 'set dc_color black', 'set dc_start 0.5', 'set dc_end 1', 'scale 0.8']
will return the default image (other commands can be passed to
modified the final image/movie).
"""
if models:
models = models
elif cluster > -1:
models = [str(m) for m in self.clusters[cluster]]
else:
models = self.__models
models = [m['rand_init'] if 'IMPmodel' in type(m) else m for m in models]
for model_num in models:
self.write_cmm('/tmp/', model_num=model_num)
chimera_view(['/tmp/model.%s.cmm' % (self[m]['rand_init'])
for m in models],
savefig=savefig, chimera_bin=tool, chimera_cmd=cmd)
[docs] def measure_angle_3_particles(self, parta, partb, partc,
models=None, cluster=None,
radian=False, all_angles=False):
"""
Given three particles A, B and C, the angle g (angle ACB, shown below):
::
A
/|
/i|
c/ |
/ |
/ |
B )g |b
\ |
\ |
a\ |
\h|
\|
C
is given by the theorem of Al-Kashi:
.. math::
b^2 = a^2 + c^2 - 2ac\cos(g)
:param parta: A particle number
:param partb: A particle number
:param partc: A particle number
:param None models: if None (default) the contact map will be computed
using all the models. A list of numbers corresponding to a given set
of models can be passed
:param None cluster: compute the contact map only for the models in the
cluster number 'cluster'
:param False radian: if True, return value in radians (in degrees
otherwise)
:returns: an angle, either in degrees or radians. If all_angles is true
returns a list of the angle g, h, i (see picture above)
"""
# WARNING: here particle numbers are +1, they will be reduced
# inside median_3d_dist
a = self.median_3d_dist(partb, partc, models=models,
cluster=cluster, plot=False)
c = self.median_3d_dist(parta, partb, models=models,
cluster=cluster, plot=False)
b = self.median_3d_dist(parta, partc, models=models,
cluster=cluster, plot=False)
try:
g = acos((a**2 - b**2 + c**2) / (2 * a * c))
except ValueError:
g = 0.
if not all_angles:
return g if radian else degrees(g)
try:
h = acos((a**2 + b**2 - c**2) / (2 * a * b))
except ValueError:
h = 0.
i = pi - g - h
return (g, h, i)
[docs] def particle_coordinates(self, part, models=None, cluster=None):
"""
"""
if models:
models=models
elif cluster > -1:
models = [str(m) for m in self.clusters[cluster]]
else:
models = self.__models
part -= 1
xis = 0.
yis = 0.
zis = 0.
for mod in models:
xis += self[mod]['x'][part]
yis += self[mod]['y'][part]
zis += self[mod]['z'][part]
xis /= len(models)
yis /= len(models)
zis /= len(models)
return [xis, yis, zis]
[docs] def dihedral_angle(self, parta, partb, partc, partd, models=None,
cluster=None):
"""
"""
parta = array(self.particle_coordinates(parta, models, cluster))
partb = array(self.particle_coordinates(partb, models, cluster))
partc = array(self.particle_coordinates(partc, models, cluster))
partd = array(self.particle_coordinates(partd, models, cluster))
return dihedral(parta, partb, partc, partd)
[docs] def walking_dihedral(self, models=None, cluster=None, steps=(1,3),
plot=True, savefig=None, axe=None):
"""
Plots the dihedral angle between successive plans. A plan is formed by 3
successive loci.
:param None models: if None (default) the contact map will be computed
using all the models. A list of numbers corresponding to a given set
of models can be passed
:param None cluster: compute the contact map only for the models in the
cluster number 'cluster'
:param (1, 3) steps: how many particles to group for the estimation.
By default 2 curves are drawn
:param True signed: whether to compute the sign of the angle according
to a normal plane, or not.
:param None savefig: path to a file where to save the image generated;
if None, the image will be shown using matplotlib GUI
:param None savedata: path to a file where to save the angle data
generated (1 column per step + 1 for particle number).
::
C..........D
... ...
... ...
... ...
A..........B .E
.. .
. .
. .
. .
F...............G
"""
# plot
if axe:
ax = axe
fig = ax.get_figure()
else:
fig = plt.figure(figsize=(11, 5))
ax = fig.add_subplot(111)
ax.patch.set_facecolor('lightgrey')
ax.patch.set_alpha(0.4)
ax.grid(ls='-', color='w', lw=1.5, alpha=0.6, which='major')
ax.grid(ls='-', color='w', lw=1, alpha=0.3, which='minor')
ax.set_axisbelow(True)
ax.minorticks_on() # always on, not only for log
# remove tick marks
ax.tick_params(axis='both', direction='out', top=False, right=False,
left=False, bottom=False)
ax.tick_params(axis='both', direction='out', top=False, right=False,
left=False, bottom=False, which='minor')
colors = ['grey', 'darkgreen', 'darkblue', 'purple', 'darkorange',
'darkred'][-len(steps):]
#
rads = {}
rads[1] = []
for res in xrange(self.nloci - 6):
rads[1].append(self.dihedral_angle(res + 1, res + 4,
res + 5, res + 7,
models=models, cluster=cluster))
lmodels = len(rads[1])
for k in (steps[1:] if steps[0]==1 else steps):
rads[k] = [None for _ in range(k/2)]
for i in range(1, self.nloci - k - 5):
rads[k].append(reduce(lambda x, y: x + y,
[rads[1][i+j] for j in range(k)]) / k)
if k == 1:
continue
plots = []
for k in steps:
plots += ax.plot(range(1, len(rads[k]) + 1), rads[k],
color=colors[steps.index(k)],
lw=steps.index(k) + 1, alpha=0.5)
if savefig:
fig.savefig(savefig)
elif not axe:
plt.show()
[docs] def walking_angle(self, models=None, cluster=None, steps=(1,3), signed=True,
savefig=None, savedata=None, axe=None):
"""
Plots the angle between successive loci in a given model or set of
models. In order to limit the noise of the measure angle is calculated
between 3 loci, between each are two other loci. E.g. in the scheme
bellow, angle are calculated between loci A, D and G.
:param None models: if None (default) the contact map will be computed
using all the models. A list of numbers corresponding to a given set
of models can be passed
:param None cluster: compute the contact map only for the models in the
cluster number 'cluster'
:param (1, 3) steps: how many particles to group for the estimation.
By default 2 curves are drawn
:param True signed: whether to compute the sign of the angle according
to a normal plane, or not.
:param None savefig: path to a file where to save the image generated;
if None, the image will be shown using matplotlib GUI
:param None savedata: path to a file where to save the angle data
generated (1 column per step + 1 for particle number).
::
C..........D
... ...
... ...
... ...
A..........B .E
.. .
. .
. .
. .
F...............G
"""
# plot
if axe:
ax = axe
fig = ax.get_figure()
else:
fig = plt.figure(figsize=(11, 5))
ax = fig.add_subplot(111)
ax.patch.set_facecolor('lightgrey')
ax.patch.set_alpha(0.4)
ax.grid(ls='-', color='w', lw=1.5, alpha=0.6, which='major')
ax.grid(ls='-', color='w', lw=1, alpha=0.3, which='minor')
ax.set_axisbelow(True)
ax.minorticks_on() # always on, not only for log
# remove tick marks
ax.tick_params(axis='both', direction='out', top=False, right=False,
left=False, bottom=False)
ax.tick_params(axis='both', direction='out', top=False, right=False,
left=False, bottom=False, which='minor')
colors = ['grey', 'darkgreen', 'darkblue', 'purple', 'darkorange',
'darkred'][-len(steps):]
#
if not type(steps) == tuple:
steps = (steps,)
rads = {}
rads[1] = []
sign = 1
for res in xrange(self.nloci - 6):
rads[1].append(self.measure_angle_3_particles(res + 1, res + 4,
res + 7,
models=models,
cluster=cluster))
if signed:
res1 = self.particle_coordinates(res+1)
res2 = self.particle_coordinates(res+4)
res3 = self.particle_coordinates(res+7)
vec1 = array(res1) - array(res2) / norm(array(res1) - array(res2))
vec2 = array(res1) - array(res3) / norm(array(res1) - array(res3))
sign = dot(array([1.,1.,1.]), cross(vec1, vec2))
sign = -1 if sign < 0 else 1
rads[1][-1] *= sign
for k in (steps[1:] if steps[0]==1 else steps):
rads[k] = [None for _ in range(k/2)]
for i in range(1, self.nloci - k - 5):
rads[k].append(reduce(lambda x, y: x + y,
[rads[1][i+j] for j in range(k)]) / k)
if k == 1:
continue
plots = []
for k in steps:
plots += ax.plot(range(1, len(rads[k]) + 1), rads[k],
color=colors[steps.index(k)],
lw=steps.index(k) + 1, alpha=0.5)
if savedata:
out = open(savedata, 'w')
out.write('#Particle\t' +
'\t'.join(['angle(step:%s)' % (s) for s in steps]) + '\n')
for p in xrange(len(rads[1])):
out.write(str(p+1))
for s in steps:
try:
out.write('\t%s' % rads[s][p])
except IndexError:
out.write('\tNone')
out.write('\n')
out.close()
ax.set_ylabel('Angle in degrees')
ax.set_xlabel('Particle number')
ax.legend(plots, ['Average for %s angle%s' % (k, 's' if k else '')
for k in steps], fontsize='small',
bbox_to_anchor=(1, 0.5), loc='center left')
ax.set_xlim((1, self.nloci))
ax.set_title('Angle between consecutive loci')
plt.subplots_adjust(left=0.1, right=0.8)
if savefig:
fig.savefig(savefig)
elif not axe:
plt.show()
[docs] def objective_function_model(self, model, log=False, smooth=True, axe=None,
savefig=None):
"""
This function plots the objective function value per each Monte-Carlo
step
:param model: the number of the model to plot
:param False log: log plot
:param True smooth: curve smoothing
"""
self[model].objective_function(log=log, smooth=smooth, axe=axe,
savefig=savefig)
[docs] def write_cmm(self, directory, model_num=None, models=None, cluster=None,
color=color_residues, rndname=True):
"""
Save a model in the cmm format, read by Chimera
(http://www.cgl.ucsf.edu/chimera).
**Note:** If none of model_num, models or cluster parameter are set,
ALL the models will be written.
:param directory: location where the file will be written (note: the
name of the file will be model_1.cmm if model number is 1)
:param None model_num: the number of the model to save
:param None models: a list of numbers corresponding to a given set of
models to save
:param None cluster: save the models in the cluster number 'cluster'
:param True rndname: If True, file names will be formatted as:
model.RND.cmm, where RND is the random number feed used by IMP to
generate the corresponding model. If False, the format will be:
model_NUM_RND.cmm where NUM is the rank of the model in terms of
objective function value
:param color_residues color: either a coloring function like
:func:`pytadbit.imp.imp_model.color_residues` or a list of (r, g, b)
tuples (as long as the number of particles)
"""
if model_num > -1:
models = [model_num]
elif models:
models = models
elif cluster > -1:
models = [str(m) for m in self.clusters[cluster]]
else:
models = self.__models
for model_num in models:
try:
model = self[model_num]
except KeyError:
model = self._bad_models[model_num]
model.write_cmm(directory, color=color, rndname=rndname,
model_num=model_num)
[docs] def write_xyz(self, directory, model_num=None, models=None, cluster=None,
get_path=False, rndname=True):
"""
Writes a xyz file containing the 3D coordinates of each particle in the
model.
**Note:** If none of model_num, models or cluster parameter are set,
ALL the models will be written.
:param directory: location where the file will be written (note: the
file name will be model.1.xyz, if the model number is 1)
:param None model_num: the number of the model to save
:param None models: a list of numbers corresponding to a given set of
models to be written
:param None cluster: save the models in the cluster number 'cluster'
:param True rndname: If True, file names will be formatted as:
model.RND.xyz, where RND is the random number feed used by IMP to
generate the corresponding model. If False, the format will be:
model_NUM_RND.xyz where NUM is the rank of the model in terms of
objective function value
:param False get_path: whether to return, or not, the full path where
the file has been written
"""
if model_num > -1:
models = [model_num]
elif models:
models = models
elif cluster > -1:
models = [str(m) for m in self.clusters[cluster]]
else:
models = self.__models
for model_num in models:
try:
model = self[model_num]
except KeyError:
model = self._bad_models[model_num]
path_f = model.write_xyz(directory, model_num=model_num,
get_path=get_path, rndname=rndname)
if get_path:
return path_f
[docs] def save_models(self, path_f):
"""
Saves all the models in pickle format (python object written to disk).
:param path_f: path where to save the pickle file
"""
to_save = {}
to_save['models'] = self.__models
to_save['bad_models'] = self._bad_models
to_save['nloci'] = self.nloci
to_save['clusters'] = self.clusters
to_save['resolution'] = self.resolution
to_save['original_data'] = self._original_data
to_save['config'] = self._config
to_save['zscore'] = self._zscores
out = open(path_f, 'w')
dump(to_save, out)
out.close()
class ClusterOfModels(dict):
def __str__(self):
out1 = ' Cluster #%s has %s models [top model: %s]\n'
out = 'Total number of clusters: %s\n%s' % (
len(self),
''.join([out1 % (k, len(self[k]), self[k][0]) for k in self]))
return out