Source code for bioplus.motif

import math
import operator
from collections import defaultdict
from numpy.core.multiarray import zeros
from tabfile import TabFile

[docs]class CharDict(defaultdict): """ CharDict is a generalized dictionary that inherits defaultdict. default values are 0. Accepts any keys. """ def __init__(self, dict={}): def returnZero(): return 0 super(CharDict,self).__init__(returnZero, dict) def __add__(self, x, y): z = CharDict() for k in set(x.keys() + y.keys()): z[k] = x[k] + y[k] return z
[docs] def uncertainty(self): """ calculates the uncertainty H in this position (specified by CharDict). reats 0*log(0) as 0 """ H = 0 for pN in self.itervalues(): if pN==0: H = 0 else: H = -pN * math.log(pN, 2) return H
[docs]class PositionWeightMatrix(object): """ Stores counts of nucleotide bases at each position. objects are immutable. sequences may be added to the counts, but the object may not be modified in situ """ def __init__(self,n=None): raise DeprecationWarning("Use Bio.Motif. PositionWeightMatrix will be \ removed soon") self._errN = 'n must be a positive integer' if not n==None: self._initialize_positions(n) self._is_probs=False self._is_Ri=False def _initialize_positions(self,n): self._L = [] if type(n)==int: if n > 0: self._n = n for dummyVariable in range(self._n): self._L.append( CharDict() ) else: raise ValueError(self._errN) else: raise TypeError(self._errN) def __add__(self, x,y): n = x._n if n == y._n: z = PositionWeightMatrix(n) for i in range(n): z[i] = x[i] + y[i] else: raise ValueError('PositionWeightMatrix objects are not the same \ length (number of positions)') return z def __getitem__(self, y): return self._L[y] def __len__(self): return len(self._L)
[docs] def count_file(self, seqsFile, n=0): """uses a tabFile with a list of sequences, in column n (by default n=0, the first column) and extracts counts""" if self._is_probs: raise UserWarning('Already converted to probabilities') # open file seqsFile.open() # read first sequence and set siteLength rows = (row for row in seqsFile) row = rows.next() site = row[n] siteLength = len(site) self._initialize_positions(siteLength) # initialize the object for i in range(self._n): self._L[i][ site[i].upper() ] += 1 # read remaining sequences while True: try: row = rows.next() site = row[n] except StopIteration: break if len(site)==siteLength: for i in range(self._n): self._L[i][ site[i].upper() ] += 1 else: # clean up del self._L del self._n seqsFile.close() raise ValueError('One of the sequences you are trying to add is not the correct length ('+str(self._n)+'): '+site) self._n = siteLength
[docs] def count_seqs(self, L, debug=False): """adds a list of sequences to the counts""" if self._is_probs: raise UserWarning('Already converted to probabilities') firstSite = True n = 0 m = 0 for site in L: n += 1 if n%(10**6)==0: m += 1 if debug: print str(n) if firstSite: siteLength=len(site) self._initialize_positions(siteLength) firstSite = False for i in range(self._n): if len(site)==siteLength: self._L[i][ site[i].upper() ] += 1 else: # clean up del self._L del self._n raise ValueError('One of the sequences you are trying to add is not the correct length ('+str(self._n)+'): '+site)
[docs] def import_from_MEME(self,filename,n=1,mode='biotools'): """imports a motif from the output of MEME (meme.txt) if there are multiple motifs in the output, we will use motif n (the first is n=1, which is also the default) """ import Bio.Motif.Parsers.MEME as MEME f = open(filename) MEME_object = MEME.read(f) motif_name = 'Motif ' + str(n) biopython_motif = MEME_object.get_motif_by_name(motif_name) if mode=='biopython': return biopython_motif if mode=='biotools': internal_n = len(biopython_motif) # this next line is instead of initializePositions biotools_motif = [CharDict(biopython_motif[i]) for i in range(internal_n)] self._L = biotools_motif self._n = internal_n else: raise UserWarning('Not a valid mode.')
[docs] def rc(self): """returns the reverse complement of this object""" new = PositionWeightMatrix(self._n) # complement the object for i in range(self._n): for base in self._L[i].keys(): complement = {'A': 'T', 'C': 'G', 'G': 'C', 'T': 'A'} if complement.has_key(base): new[i][complement[base]] = self._L[i][base] else: new[i][base] = self._L[i][base] new._L.reverse() # reverse the object return new
def __repr__(self): return self._L.__repr__()
[docs] def make_probs(self,trueProbs=False): """normalizes everything to 1""" if self._is_Ri: for x in self._L: for k in x.keys(): x[k] = 2**(x[k]-2) self._is_Ri = False else: for x in self._L: total = sum( x.values() ) zeros = x.values().count(0) if trueProbs or zeros==0: for k in x.keys(): x[k] = float(x[k]) / total else: # fake one occurrence total += 1 for k in x.keys(): if x[k]==0: x[k]=1 x[k] = float(x[k]) / total self._is_probs = True
[docs] def make_Ri(self): """changes from counts or probabilities to Ri, information content""" if not self._is_Ri: if not self._is_probs: self.make_probs() for p in self._L: for k in p.keys(): p[k] = 2+math.log(p[k],2) self._is_probs = False self._is_Ri = True else: print 'Already Ri'
[docs] def seq_Ri(self,s): """seqRi returns the information content Ri in bits of a sequences, as measured with the given positionmatrix""" if not self._is_Ri: self.makeRi() Ri = 0 if len(s) != self._n: raise UserWarning('Cannot evaluate a sequence which is not the exact length of the position matrix') for x in range(self._n): Ri += self[x][s[x].upper()] return Ri
[docs] def uncertainty(self): """returns the uncertainty H(l) of the matrix as a list. Use sum() for the total uncertainty. Note: this function calls uncertainty() from the baseDict instance, and as such it can be overwritten implicitly. baseDict.uncertainty() treats 0*log(0) as 0""" if not self._is_probs: self.make_probs() return [position.uncertainty() for position in self]
[docs] def Rs(self): """returns the Schneider Rs value, which is the expectation of Ri over all possible sequences, calculated as the sum of 2-uncertainty.""" if not self._is_probs: self.make_probs() return sum([2 - position.uncertainty() for position in self])
[docs]def KL(p,q): """returns a list of the KL divergence (relative entropy) at each position from positionmatrix p to positionmatrix q. use sum() for the sum""" if not len(p)==len(q): raise SyntaxError('Length of p and q must be the same, instead length of p is ' + len(p) + ' and length of q is ' + len(q)) else: n = len(p) for i in xrange(n): KLi = 0 for j in ['A','G','T','C']: KLi += p[i][j] * math.log(p[i][j] / q[i][j], 2) KL.append(KLi) return KL # requires numpy, maybe relax requierment? # needs to return an mmMatrix object # needs to be able to save to file # needs to be able to make and save image
[docs]def joint_matrix(sites): """takes as input a filename and returns the joint Rate matrix for the list of sequences contained in that file Joint rates R(X;Y_ are defined as R(X;Y) = - sum over X,Y p(x,y) * I(X;Y) I(X;y) = - sum over X,Y p(x,y) * log2[p(x,y)/(p(x)p(y))] """ bases = ['A','C','G','T'] indexDictionary = {} # the index dictionary for i in range(4): for j in range(4): ssPair = bases[i] + bases[j] indexDictionary[ssPair]=i,j site_length = len(sites[0]) # initialize the matrix di_counts = zeros([site_length,site_length],dtype='(4,4)int') def add_seq(m,s,n,b): """adds the dinucleotide counts from one sequence to the mm_matrix (an array, passed by refence). requires the length n""" for i in range(n): for j in range(n): m[i,j][ b[s[i]+s[j]] ] += 1 # count pairs over every sequence for site in sites: add_seq(di_counts,site.upper(),site_length,indexDictionary) # convert to probabilities di_probs = zeros([site_length,site_length],dtype='(4,4)float') total_seqs = di_counts[0,0].sum() for i in range(site_length): for j in range(site_length): for ii in range(4): for jj in range(4): di_probs[i,j][ii,jj] = di_counts[i,j][ii,jj] / float(total_seqs) mm_matrix = zeros([site_length,site_length],dtype='float') for i in range(site_length): for j in range(site_length): # sum over all dinucleotide combinations pM = di_probs[i,j] # Determine Iij Iij = 0.0 for x in range(4): for y in range(4): px = pM[x,:].sum() py = pM[:,y].sum() pxy = pM[x,y] if any([pxy==0, py==0, px==0]): continue Iij += pxy * math.log(pxy/px/py, 2) # Determine Rij Rij = 0.0 for x in range(4): for y in range(4): pxy = pM[x, y] Rij -= pxy * Iij mm_matrix[i][j] = Rij return (di_counts, di_probs, mm_matrix)
[docs]def spacerGC(L, spacerOffset=6, spacerLength=3): """ spacerGC takes as input a list of [15 bp GBSs (strings)] and returns the number of sequences that have 0,1,2,3 G/Cs in the 3 bp spacer as an array in that order """ # gc counts of 0, 1, 2, 3 GCcounts = [0, 0, 0, 0] for s in L: spacer = s[0][spacerOffset:spacerOffset+spacerLength].upper() GCs = spacer.count('C') + spacer.count('G') GCcounts[GCs] += 1 return GCcounts
[docs]def center_region(f, max_dist=75, motif_length=17): """returns a function that specifies whether a given motif is in +/- x bp from the peak_center requires the tabFile object f to determine the indices properly """ column_dict = f.column_dict() peak_summit = column_dict['peak_summit'] for offset_name in ('motif_offset', 'offset', 'max_offset'): if column_dict.has_key(offset_name): site_offset = column_dict[offset_name] break return lambda x: int(x[peak_summit]) > (int(x[site_offset]) - max_dist) and \ int(x[peak_summit]) < (int(x[site_offset]) + max_dist - motif_length)
[docs]def count_spacers_from_info(foo, cutoff=None, region_rule=None, region_width=None, spacer_offset=8, spacer_length=3, output_file=None): """ count spacers from a .sites.info or .peaks.info file optionally you may supply cutoff, a minimum cutoff (float or int) region_rule, a function that selects the column """ input_file = TabFile(foo, colNames=True) rows = (x for x in input_file) conditions = [lambda x: x[7].isalpha, # col 7 is a sequence lambda x: x[7] is not '-', # col 7 is not - lambda x: x[7] is not 'NA', # col 7 is not NA lambda x: x[7].strip() is not '-'] # col 7 is not missing if cutoff is not None: conditions.append(lambda x: float(x[4])>cutoff) if region_rule is 'center_region': if region_width is not None: conditions.append(center_region(input_file, max_dist=region_width/2)) else: conditions.append(center_region(input_file, max_dist=75)) elif region_rule is not None: conditions.append(lambda x: region_rule(x)) selected_rows = (x[7].upper() for x in rows if all([f(x) for f in conditions])) spacers = CharDict(dict) for s in selected_rows: if not s== '-' and not s == 'NA': spacer = s[spacer_offset:spacer_offset + spacer_length].upper() spacers[spacer] += 1 if output_file is None: output_file = raw_input('Output file name: ') with TabFile(output_file, 'w') as f: f.write_table(sorted(spacers.iteritems(), key=operator.itemgetter(1), reverse=True)) return spacers
[docs]def count_letters(L): n=xrange(len(L[0])) counts = [] for j in n: counts.append(CharDict()) for x in L: for j in n: counts[j][x[j]]+=1 return counts