Source code for pytadbit.alignment

"""
14 May 2013


"""

from pytadbit.utils.extraviews import colorize
from random import random, shuffle
from sys import stdout
from pytadbit.boundary_aligner.aligner import align
from numpy import linspace, sin, pi
from warnings import warn

try:
    from scipy.interpolate import interp1d
except ImportError:
    from pytadbit.utils.tadmaths import Interpolate as interp1d


[docs]class Alignment(object): def __init__(self, name, alignment, experiments, score=None): self.name = name self.__experiments = experiments self.__values = [] self.__keys = [] self.__len = None for seq, exp in zip(alignment, experiments): self.add_aligned_exp(exp.name, seq) self.score = score def __len__(self): return self.__len def __str__(self): """ calls Alignment.print_alignment """ return self.write_alignment(string=True) def __repr__(self): """ Alignment description """ return ('Alignment of boundaries (length: %s, ' + 'number of experiments: %s)') % (len(self), len(self.__keys)) def __getitem__(self, i): try: return self.__values[i] except TypeError: try: i = self.__keys.index(i) return self[i] except ValueError: raise ValueError('ERROR: %s not in alignment' % (i)) def __setitem__(self, i, j): if i in self.__keys: self.__values[self.__keys[i]] = j else: self.__values.append(j) self.__keys.append(i) def __delitem__(self, i): try: self.__values.pop(i) self.__keys.pop(i) except TypeError: try: i = self.__keys.index(i) self.__values.pop(i) self.__keys.pop(i) except ValueError: raise ValueError('ERROR: %s not in alignment' % (i)) def __iter__(self): for key in self.__keys: yield key
[docs] def iteritems(self): """ Iterate over experiment names and aligned boundaries """ for i in xrange(len(self)): yield (self.__keys[i], self.__values[i])
[docs] def itervalues(self): """ Iterate over experiment names and aligned boundaries """ for i in xrange(len(self)): yield self.__values[i]
[docs] def itercolumns(self): """ Iterate over columns in the alignment """ for pos in xrange(len(self)): col = [] for exp in xrange(len(self.__keys)): col.append(self[exp][pos]) yield col
[docs] def write_alignment(self, names=None, xpers=None, string=False, title='', ftype='ansi'): """ Print alignment of TAD boundaries between different experiments. Alignment are displayed with colors according to the tadbit confidence score for each boundary. :param None names: if None print all experiments :param None xpers: if None print all experiments :param False string: return string instead of printing :param ansi ftype: display colors in 'ansi' or 'html' format """ if xpers: xpers = [self.__experiments[n.name] for n in xpers] else: xpers = self.__experiments if not names: names = [n.name for n in xpers] length = max([len(n) for n in names]) if ftype == 'html': out = '''<?xml version="1.0" encoding="UTF-8" ?> <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Strict//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd"> <!-- This file was created with the aha Ansi HTML Adapter. http://ziz.delphigl.com/tool_aha.php --> <html xmlns="http://www.w3.org/1999/xhtml"> <head> <meta http-equiv="Content-Type" content="application/xml+xhtml; charset=UTF-8" /> <title>stdin</title> </head> <h1>%s</h1> <body> <pre>''' % (title) elif ftype == 'ansi': out = title + '\n' if title else '' else: raise NotImplementedError('Only ansi and html ftype implemented.\n') out += 'Alignment shown in %s Kb (%s experiments) (' % ( int(xpers[0].resolution / 1000), len(xpers)) out += 'scores: %s)\n' % (' '.join( [colorize(x, x, ftype) for x in range(11)])) for i, xpr in enumerate(xpers): if not xpr.name in self.__keys: continue # res = xpr.resolution / 1000 out += ('%' + str(length) + 's') % (names[i]) out += ':' for x in self[xpr.name]: if x['end'] == 0.0: out += '| ' + '-' * 4 + ' ' continue cell = str(int(x['end']) + 1) # * res # TODO: +1 out += ('|' + ' ' * (6 - len(cell)) + colorize(cell, x['score'], ftype)) out += '\n' if ftype == 'html': out += '</pre></body></html>' if string: return out print out
[docs] def get_column(self, cond1, cond2=None, min_num=None): """ Get a list of column responding to a given characteristic. :param cond1: can be either a column number or a function to search for a given condition that may fulfill boundaries. :param None cond2: a second function to search for a given condition that may fulfill boundaries. :param None min_num: Number of boundaries in a given column that have to pass the defined conditions (all boundaries may pass them by default). :returns: a list of column, each column represented by a tuple which first element is the column number in the alignment and second element is the list of boundaries in this column. **Example** :: ali = Alignment('ALL', crm.get_alignment('ALL'), crm.experiments, score) ali.get_column(3) # [(3, [>55<, ,>56<, >55<, >58<])] # now we want boundaries with high scores cond1 = lambda x: x['score'] > 5 # and we want boundaries to be detected in Experiments exp1 and exp2 cond2=lambda x: x['exp'] in ['exp1', 'exp2'] ali.get_column(cond1=cond1, cond2=cond2, min_num=2) # [(33, [>268<, >192<, >-<, >-<]), # (46, [>324<, >323<, >335<, >329<]), # (51, [>348<, >335<, >357<, >347<]), # (56, [>374<, >358<, >383<, >-<]), # (64, [>397<, >396<, >407<, >-<]), # (77, [>444<, >442<, >456<, >-<])] """ if type(cond1) is int: val = cond1 - 1 cond1 = lambda x: x['pos'] == val elif type(cond1) is list: val = [v - 1 for v in cond1] cond1 = lambda x: x['pos'] in val if not cond2: cond2 = lambda x: True cond = lambda x: cond1(x) and cond2(x) min_num = min_num or len(self.__keys) column = [] for pos in xrange(len(self)): col = [] cnt = 0 for exp in xrange(len(self.__keys)): col.append(self.__values[exp][pos]) if cond(self.__values[exp][pos]): cnt += 1 if cnt >= min_num: column.append((pos, col)) return column
def add_aligned_exp(self, name, seq): p = 1 scores = [] exp = self.__experiments[name] for i, pos in enumerate(seq): if pos == '-': scores.append(TAD((('brk', None), ('end', 0.0), ('score', 0.0), ('start', 0.0)), i, self.__experiments[name])) continue try: while exp.tads[p]['brk'] < 0: p += 1 except KeyError: continue scr = abs(exp.tads[p]['score']) scores.append(TAD(exp.tads[p], i, self.__experiments[name])) scores[-1]['score'] = scr p += 1 # print name, len(scores) if not self.__len: self.__len = len(scores) elif self.__len != len(scores): raise AssertionError('ERROR: alignments of different lengths\n') self.__values.append(scores) self.__keys.append(name)
[docs] def draw(self, focus=None, extras=None, savefig=None): """ Draw alignments as a plot. :param None focus: can pass a tuple (bin_start, bin_stop) to display the alignment between these genomic bins :param None extras: list of coordinates (genomic bin) where to draw a red cross """ from matplotlib.cm import jet from matplotlib import pyplot as plt experiments = self.__experiments maxres = max([e.resolution for e in experiments]) facts = [maxres/e.resolution for e in experiments] siz = experiments[0].size if focus: figsiz = 4 + (focus[1] - focus[0])/30 else: figsiz = 4 + (siz)/30 fig, axes = plt.subplots(nrows=len(experiments), sharex=True, sharey=True, figsize=(figsiz, 1 + len(experiments) * 1.8)) fig.subplots_adjust(hspace=0) zsin = sin(linspace(0, pi)) ellipse = lambda h : h * zsin maxys = [] for iex, xpr in enumerate(experiments): if not xpr.name in self: continue zeros = xpr._zeros try: norms = xpr.norm[0] except TypeError: warn("WARNING: weights not available, TAD's height fixed to 1") norms = None diags = [] sp1 = siz + 1 if norms: for k in xrange(1, siz): s_k = siz * k diags.append(sum([norms[i * sp1 + s_k] if not (i in zeros or (i + k) in zeros) else 0. for i in xrange(siz - k)]) / (siz - k)) for tad in xpr.tads: start, end = (int(xpr.tads[tad]['start']) + 1, int(xpr.tads[tad]['end']) + 1) if norms: matrix = sum([norms[i + siz * j] if not (i in zeros or j in zeros) else 0. for i in xrange(start - 1, end - 1) for j in xrange(i + 1, end - 1)]) try: if norms: height = float(matrix) / sum( [diags[i-1] * (end - start - i) for i in xrange(1, end - start)]) else: height = 1. except ZeroDivisionError: height = 0. maxys.append(height) start = float(start) / facts[iex] end = float(end) / facts[iex] axes[iex].fill(linspace(start, end), ellipse(height), alpha=.8 if height > 1 else 0.4, facecolor='grey', edgecolor='grey') if extras: axes[iex].plot(extras, [.5 for _ in xrange(len(extras))], 'rx') axes[iex].grid() axes[iex].patch.set_visible(False) maxy = max(maxys) + 0.4 maxxs = [] for iex in range(len(experiments)): starting = focus[0] if focus else 1 ending = focus[1] if focus else experiments[iex].tads.values()[-1]['end'] axes[iex].hlines(1, 1, end, 'k', lw=1.5) axes[iex].set_ylim((0, maxy)) maxxs.append(ending / facts[iex]) axes[iex].set_ylabel('Relative\nHi-C count') axes[iex].text(starting + 1, float(maxy) / 20, experiments[iex].name, {'ha':'left', 'va':'bottom'}) axes[iex].set_yticks([float(i) / 2 for i in range(1, int(maxy + .5) * 2)]) axes[iex].set_xlim((starting, max(maxxs))) pos = {'ha':'center', 'va':'bottom'} for i, col in enumerate(self.itercolumns()): ends = sorted([(t['end'], j) for j, t in enumerate(col) if t['end']]) beg = (ends[0 ][0] + 0.9) / facts[ends[0 ][1]] end = (ends[-1][0] + 1.1) / facts[ends[-1][1]] if focus: if beg < focus[0] or end > focus[1]: continue axes[0].text(beg + float(end - beg) / 2, maxy + float(maxy) / 20, str(i + 1), pos, rotation=90, size='small') for iex, tad in enumerate(col): if not tad['end']: continue axes[iex].axvspan(beg-.2, end+.2, alpha=0.2, color='purple' if i%2 else 'darkgreen') axes[iex].plot(((tad['end'] + 1.) / facts[iex], ), (0, ), color=jet(tad['score'] / 10), mec='none', marker=6, ms=9, alpha=1, clip_on=False) axes[iex].set_xlabel('Genomic bin') tit1 = fig.suptitle("TAD borders' alignment", size='x-large') tit2 = axes[0].set_title("Alignment column number") tit2.set_y(1.3) plt.subplots_adjust(top=0.76) # This was for color bar instead of legend # ax1 = fig.add_axes([0.9 + 0.3/figsiz, 0.05, 0.2/figsiz, 0.9]) # cb1 = colorbar.ColorbarBase(ax1, cmap=jet, # norm=colors.Normalize(vmin=0., vmax=1.)) # cb1.set_label('Border prediction score') # cb1.ax.set_yticklabels([str(i)for i in range(1, 11)]) fig.set_facecolor('white') plots = [] for scr in xrange(1, 11): plots += plt.plot((100,),(100,), marker=6, ms=9, color=jet(float(scr) / 10), mec='none') axes[-1].legend(plots, [str(scr) for scr in xrange(1, 11)], numpoints=1, title='Border scores', fontsize='small', loc='lower left', bbox_to_anchor=(1, 0.5)) if savefig: plt.savefig(savefig) else: plt.show()
[docs]class TAD(dict): """ Specific class of TADs, used only within Alignment objects. It is directly inheriting from python dict. a TAD these keys: - 'start': position of the TAD - 'end': position of the TAD - 'score': of the prediction of boundary - 'brk': same as 'end' - 'pos': in the alignment (column number) - 'exp': Experiment this TAD belongs to - 'index': of this TAD within all TADs in the Experiment """ def __init__(self, thing, i, exp): super(TAD, self).__init__(thing) idx = [t for t in exp.tads if exp.tads[t]['start']==self['start']][0] self.update(dict((('pos', i),('exp', exp), ('index', idx)))) def __repr__(self): return '>' + (str(int(self['end']) * self['exp'].resolution / 1000) \ if int(self['end']) else '-') + '<'
def _interpolation(experiments): """ Calculate the distribution of TAD lengths, and interpolate it using interp1d function from scipy. :param experiments: names of experiments included in the distribution :return: function to interpolate a given TAD length according to a probability value """ # get all TAD lengths and multiply it by bin size of the experiment norm_tads = [] for tad in experiments: for brk in tad.tads.values(): if not brk['brk']: continue norm_tads.append((brk['end'] - brk['start']) * tad.resolution) win = [0.0] cnt = [0.0] max_d = max(norm_tads) # we ask here for a mean number of 20 values per bin bin_s = int(max_d / (float(len(norm_tads)) / 20)) for bee in range(0, int(max_d) + bin_s, bin_s): win.append(len([i for i in norm_tads if bee < i <= bee + bin_s]) + win[-1]) cnt.append(bee + float(bin_s)) win = [float(v) / max(win) for v in win] ## to see the distribution and its interpolation #distr = interp1d(x, y) #from matplotlib import pyplot as plt #plt.plot([distr(float(i)/1000) for i in xrange(1000)], # [float(i)/1000 for i in xrange(1000)]) #plt.hist(norm_tads, normed=True, bins=20, cumulative=True) #plt.show() return interp1d(win, cnt)
[docs]def randomization_test(xpers, score=None, num=1000, verbose=False, max_dist=100000, rnd_method='interpolate', r_size=None, method='reciprocal'): """ Return the probability that original alignment is better than an alignment of randomized boundaries. :param tads: original TADs of each experiment to align :param distr: the function to interpolate TAD lengths from probability :param None score: just to print it when verbose :param 1000 num: number of random alignment to generate for comparison :param False verbose: to print something nice :param interpolate method: how to generate random tads (alternative is 'shuffle'). 'interpolate' will calculate the distribution of TAD lengths, and generate a random set of TADs according to this distribution (see :func:`pytadbit.alignment.generate_rnd_tads`). In contrast, the 'shuffle' method uses directly the set of observed TADs and shuffle them (see :func:`pytadbit.alignment.generate_shuffle_tads`). """ if not rnd_method in ['interpolate', 'shuffle']: raise Exception('method should be either "interpolate" or ' + '"shuffle"\n') if rnd_method == 'interpolate' and not r_size: raise Exception('should provide Chromosome r_size if interpolate\n') tads = [] for xpr in xpers: if not xpr.tads: raise Exception('No TADs defined, use find_tad function.\n') tads.append([(t['end'] - t['start']) * \ xpr.resolution for t in xpr.tads.values()]) rnd_distr = [] # rnd_len = [] distr = _interpolation(xpers) if rnd_method is 'interpolate' else None rnd_exp = lambda : tads[int(random() * len(tads))] for val in xrange(num): if verbose: val = float(val) if not val / num * 100 % 5: stdout.write('\r' + ' ' * 10 + ' randomizing: ' '%.2f completed' % (100 * val/num)) stdout.flush() if rnd_method is 'interpolate': rnd_tads = [generate_rnd_tads(r_size, distr) for _ in xrange(len(tads))] # rnd_len.append(float(sum([len(r) for r in rnd_tads])) # / len(rnd_tads)) else: rnd_tads = [generate_shuffle_tads(rnd_exp()) for _ in xrange(len(tads))] # rnd_len.append(len(tads)) rnd_distr.append(align(rnd_tads, verbose=False, method=method, max_dist=max_dist)[1]) # aligns, sc = align(rnd_tads, verbose=False) # rnd_distr.append(sc) # for xpr in aligns: # print sc, '|'.join(['%5s' % (str(x/1000)[:-2] \ # if x!='-' else '-' * 4)\ # for x in xpr]) # print '' pval = float(len([n for n in rnd_distr if n > score])) / len(rnd_distr) if verbose: stdout.write('\n %s randomizations finished.' % (num)) stdout.flush() print ' Observed alignment score: %s' % (score) # print ' Mean number of boundaries: {}; observed: {}'.format ( # sum(rnd_len)/len(rnd_len), # str([len(self.experiments[e].brks) # for e in self.experiments])) print 'Randomized scores between %s and %s; observed: %s' % ( min(rnd_distr), max(rnd_distr), score) print 'p-value: %s' % (pval if pval else '<%s' % (1./num)) return pval
[docs]def generate_rnd_tads(chromosome_len, distr, start=0): """ Generates random TADs over a chromosome of a given size according to a given distribution of lengths of TADs. :param chromosome_len: length of the chromosome :param distr: function that returns a TAD length depending on a p value :param bin_size: size of the bin of the Hi-C experiment :param 0 start: starting position in the chromosome :returns: list of TADs """ pos = start tads = [] while True: pos += distr(random()) if pos > chromosome_len: break tads.append(float(pos)) return tads
[docs]def generate_shuffle_tads(tads): """ Returns a shuffle version of a given list of TADs :param tads: list of TADs :returns: list of shuffled TADs """ rnd_tads = tads[:] shuffle(rnd_tads) tads = [] for tad in rnd_tads: if tads: tad += tads[-1] tads.append(tad) return tads

Table Of Contents