Source code for twobitreader

"""
twobitreader

Licensed under Perl Artistic License 2.0
No warranty is provided, express or implied
"""
from array import array
from bisect import bisect_right
from errno import ENOENT, EACCES
from os import R_OK, access
try:
    from os import strerror
except ImportError:
    strerror = lambda x: 'strerror not supported'
from os.path import exists
from itertools import izip
import logging
import textwrap
import sys

[docs]def true_long_type(): """ OS X uses an 8-byte long, so make sure L (long) is the right size and switch to I (int) if needed """ for type_ in ['L', 'I']: test_array = array(type_, [0]) long_size = test_array.itemsize if long_size == 4: return type_ raise ImportError("Couldn't determine a valid 4-byte long type to use \ as equivalent to LONG")
LONG = true_long_type()
[docs]def byte_to_bases(x): """convert one byte to the four bases it encodes""" c = (x >> 4) & 0xf f = x & 0xf cc = (c >> 2) & 0x3 cf = c & 0x3 fc = (f >> 2) & 0x3 ff = f & 0x3 return map(bits_to_base, (cc, cf, fc, ff))
[docs]def bits_to_base(x): """convert integer representation of two bits to correct base""" if x is 0: return 'T' if x is 1: return 'C' if x is 2: return 'A' if x is 3: return 'G'
[docs]def base_to_bin(x): """ provided for user convenience convert a nucleotide to its bit representation """ if x == 'T': return '00' if x == 'C': return '01' if x == 'A': return '10' if x == 'G': return '11'
[docs]def create_byte_table(): """create BYTE_TABLE""" d = {} for x in xrange(2**8): d[x] = byte_to_bases(x) return d
[docs]def split16(x): """ split a 16-bit number into integer representation of its course and fine parts in binary representation """ c = (x >> 8) & 0xff f = x & 0xff return c, f
[docs]def create_twobyte_table(): """create TWOBYTE_TABLE""" d = {} for x in xrange(2**16): c, f = split16(x) d[x] = byte_to_bases(c) + byte_to_bases(f) return d
BYTE_TABLE = create_byte_table() TWOBYTE_TABLE = create_twobyte_table()
[docs]def longs_to_char_array(longs, first_base_offset, last_base_offset, array_size): """ takes in a iterable of longs and converts them to bases in a char array returns a ctypes string buffer """ longs_len = len(longs) # dna = ctypes.create_string_buffer(array_size) dna = array('c', 'N' * longs_len) # translate from 32-bit blocks to bytes # this method ensures correct endianess (byteswap as neeed) bytes = array('B') bytes.fromstring(longs.tostring()) # first block first_block = ''.join([''.join(BYTE_TABLE[bytes[x]]) for x in range(4)]) i = 16 - first_base_offset if array_size < i: i = array_size dna[0:i] = array('c', first_block[first_base_offset:first_base_offset + i]) if longs_len == 1: return dna # middle blocks (implicitly skipped if they don't exist) for byte in bytes[4:-4]: dna[i:i + 4] = array('c', BYTE_TABLE[byte]) i += 4 # last block last_block = array('c', ''.join([''.join(BYTE_TABLE[bytes[x]]) for x in range(-4,0)])) dna[i:i + last_base_offset] = last_block[0:last_base_offset] return dna
[docs]class TwoBitFile(dict): """ python-level reader for .2bit files (i.e., from UCSC genome browser) (note: no writing support) TwoBitFile inherits from dict You may access sequences by name, e.g. >>> genome = TwoBitFile('hg18.2bit') >>> chr20 = genome['chr20'] Sequences are returned as TwoBitSequence objects You may access intervals by slicing or using str() to dump the entire entry e.g. >>> chr20[100100:100200] 'ttttcctctaagataatttttgccttaaatactattttgttcaatactaagaagtaagataacttccttttgttggtat ttgcatgttaagtttttttcc' >>> whole_chr20 = str(chr20) Fair warning: dumping the entire chromosome requires a lot of memory See TwoBitSequence for more info """ def __init__(self, foo): super(TwoBitFile, self).__init__() if not exists(foo): raise IOError(ENOENT, strerror(ENOENT), foo) if not access(foo, R_OK): raise IOError(EACCES, strerror(EACCES), foo) self._filename = foo self._file_handle = open(foo, 'rb') self._load_header() self._load_index() for name, offset in self._offset_dict.iteritems(): self[name] = TwoBitSequence(self._file_handle, offset, self._byteswapped) return def _load_header(self): file_handle = self._file_handle header = array(LONG) header.fromfile(file_handle, 4) # check signature -- must be 0x1A412743 # if not, swap bytes byteswapped = False (signature, version, sequence_count, reserved) = header if not signature == 0x1A412743: byteswapped = True header.byteswap() (signature2, version, sequence_count, reserved) = header if not signature2 == 0x1A412743: raise TwoBitFileError('Signature in header should be 0x1A412743' + ', instead found 0x%X' % signature) if not version == 0: raise TwoBitFileError('File version in header should be 0.') if not reserved == 0: raise TwoBitFileError('Reserved field in header should be 0.') self._byteswapped = byteswapped self._sequence_count = sequence_count def _load_index(self): file_handle = self._file_handle byteswapped = self._byteswapped remaining = self._sequence_count sequence_offsets = [] file_handle.seek(16) while True: if remaining == 0: break name_size = array('B') name_size.fromfile(file_handle, 1) if byteswapped: name_size.byteswap() name = array('c') if byteswapped: name.byteswap() name.fromfile(file_handle, name_size[0]) offset = array(LONG) offset.fromfile(file_handle, 1) if byteswapped: offset.byteswap() sequence_offsets.append((name.tostring(), offset[0])) remaining -= 1 self._sequence_offsets = sequence_offsets self._offset_dict = dict(sequence_offsets)
[docs] def sequence_sizes(self): """returns a dictionary with the sizes of each sequence""" d = {} file_handle = self._file_handle byteswapped = self._byteswapped for name, offset in self._offset_dict.iteritems(): file_handle.seek(offset) dna_size = array(LONG) dna_size.fromfile(file_handle, 1) if byteswapped: dna_size.byteswap() d[name] = dna_size[0] return d
[docs]class TwoBitSequence(object): """ A TwoBitSequence object refers to an entry in a TwoBitFile You may access intervals by slicing or using str() to dump the entire entry e.g. >>> genome = TwoBitFile('hg18.2bit') >>> chr20 = genome['chr20'] >>> chr20[100100:100200] # slicing returns a string 'ttttcctctaagataatttttgccttaaatactattttgttcaatactaagaagtaagataacttccttttgttggtat ttgcatgttaagtttttttcc' >>> whole_chr20 = str(chr20) # get whole chr as string Fair warning: dumping the entire chromosome requires a lot of memory Note that we follow python/UCSC conventions: Coordinates are 0-based, end-open (Note: The UCSC web-based genome browser uses 1-based closed coordinates) If you attempt to access a slice past the end of the sequence, it will be truncated at the end. Your computer probably doesn't have enough memory to load a whole genome but if you want to string-ize your TwoBitFile, here's a recipe: x = TwoBitFile('my.2bit') d = x.dict() for k,v in d.iteritems(): d[k] = str(v) """ def __init__(self, file_handle, offset, byteswapped=False): self._file_handle = file_handle self._original_offset = offset self._byteswapped = byteswapped file_handle.seek(offset) header = array(LONG) header.fromfile(file_handle, 2) if byteswapped: header.byteswap() dna_size, n_block_count = header self._dna_size = dna_size self._packed_dna_size = (dna_size + 15) / 16 # this is 32-bit fragments n_block_starts = array(LONG) n_block_sizes = array(LONG) n_block_starts.fromfile(file_handle, n_block_count) if byteswapped: n_block_starts.byteswap() n_block_sizes.fromfile(file_handle, n_block_count) if byteswapped: n_block_sizes.byteswap() self._n_block_starts = n_block_starts self._n_block_sizes= n_block_sizes mask_rawc = array(LONG) mask_rawc.fromfile(file_handle, 1) if byteswapped: mask_rawc.byteswap() mask_block_count = mask_rawc[0] mask_block_starts = array(LONG) mask_block_starts.fromfile(file_handle, mask_block_count) if byteswapped: mask_block_starts.byteswap() mask_block_sizes = array(LONG) mask_block_sizes.fromfile(file_handle, mask_block_count) if byteswapped: mask_block_sizes.byteswap() self._mask_block_starts = mask_block_starts self._mask_block_sizes = mask_block_sizes file_handle.read(4) self._offset = file_handle.tell() def __len__(self): return self._dna_size def __getslice__(self, min_, max_=None): return self.get_slice(min_, max_)
[docs] def get_slice(self, min_, max_=None): """ get_slice returns only a sub-sequence """ # handle negative coordinates dna_size = self._dna_size if max_ < 0: if max_ < -dna_size: raise IndexError('index out of range') max_ = dna_size + 1 + max_ if min_ < 0: if max_ < -dna_size: raise IndexError('index out of range') min_ = dna_size + 1 + min_ # make sure there's a proper range if min_ > max_ and max_ is not None: return '' if max_ == 0: return '' # load all the data if max_ > dna_size: max_ = dna_size file_handle = self._file_handle byteswapped = self._byteswapped n_block_starts = self._n_block_starts n_block_sizes = self._n_block_sizes mask_block_starts = self._mask_block_starts mask_block_sizes = self._mask_block_sizes offset = self._offset packed_dna_size = self._packed_dna_size # region_size is how many bases the region is if max_ is None: region_size = dna_size - min_ else: region_size = max_ - min_ # start_block, end_block are the first/last 32-bit blocks we need # note: end_block is not read # blocks start at 0 start_block = min_ / 16 end_block = max_ / 16 # don't read past seq end if end_block >= packed_dna_size: end_block = packed_dna_size - 1 # +1 we still need to read block blocks_to_read = end_block - start_block + 1 # jump directly to desired file location local_offset = offset + start_block * 4 file_handle.seek(local_offset) # note we won't actually read the last base # this is a python slice first_base_offset:16*blocks+last_base_offset first_base_offset = min_ % 16 last_base_offset = max_ % 16 fourbyte_dna = array(LONG) fourbyte_dna.fromfile(file_handle, blocks_to_read) if byteswapped: fourbyte_dna.byteswap() string_as_array = longs_to_char_array(fourbyte_dna, first_base_offset, last_base_offset, region_size) for start, size in izip(n_block_starts, n_block_sizes): end = start + size if end <= min_: continue if start > max_: break if start < min_: start = min_ if end > max_: end = max_ start -= min_ end -= min_ string_as_array[start:end] = array('c', 'N'*(end-start)) lower = str.lower first_masked_region = max(0, bisect_right(mask_block_starts, min_) - 1) last_masked_region = min(len(mask_block_starts), 1 + bisect_right(mask_block_starts, max_, lo=first_masked_region)) for start, size in izip(mask_block_starts[first_masked_region:last_masked_region], mask_block_sizes[first_masked_region:last_masked_region]): end = start + size if end <= min_: continue if start > max_: break if start < min_: start = min_ if end > max_: end = max_ start -= min_ end -= min_ string_as_array[start:end] = array('c', lower(string_as_array[start:end].tostring())) if not len(string_as_array) == max_ - min_: raise RuntimeError, "Sequence was longer than it should be" return string_as_array.tostring()
def __str__(self): """ returns the entire chromosome """ return self.__getslice__(0, None)
[docs]class TwoBitFileError(StandardError): """ Base exception for TwoBit module """ def __init__(self, msg): errtext = 'Invalid 2-bit file. ' + msg return super(TwoBitFileError, self).__init__(errtext)
[docs]def cmdline_reader(): """ cmdline_reader allows twobitreader module to be executed as a script accepts only one argument -- the .2bit filename reads input (BED format) from stdin writes output (FASTA format) to stdout writes errors/warning to stderr Regions should be given in BED format on stdin chrom start(0-based) end(0-based, not included) To use a BED file of regions, do python -m twobitreader example.2bit < example.bed Non-regions will be skipped and warnings will be issued to logging (logging output to stderr by default) """ # if no argument provided, print docstring argv = sys.argv if len(argv) == 1: print argv[0] + ":" print cmdline_reader.__doc__ sys.exit() return # if user is trying to get help, print docstring elif len(argv) == 2 and argv[1] in ['--help', '-h', '-help']: print argv[0] + ":" print cmdline_reader.__doc__ sys.exit() return # if user specified multiple files, exit with error elif len(argv) > 2: sys.exit("Too many files specified") return # otherwise proceed with opening the .2bit file twobit_file = TwoBitFile(argv[1]) # print error/warning messages as we go warning_msg = 'Invalid %s at line %d\n\t"%s"' twobit_reader(twobit_file, input_stream=sys.stdin)
[docs]def twobit_reader(twobit_file, input_stream=None, write=None): """ twobit_reader takes a twobit_file (of class TwoBitFile) and an "input_stream" which can be any iterable (incl. file-like objects) writes output (FASTA format) using write (default is to print, if write=None) logs errors/warning to stderr Regions should be given in BED format on stdin chrom start(0-based) end(0-based, not included) To use a BED file of regions, do python -m twobitreader example.2bit < example.bed Non-regions will be skipped and warnings will be issued to logging (logging output to stderr by default) """ if input_stream is None: return for i, line in enumerate((line.rstrip('\n\r') for line in input_stream)): fields = line.split() if not len(fields) >= 3: logging.warn(warning_msg, 'start', i, line) continue chrom = fields[0] if not twobit_file.has_key(chrom): logging.warn(warning_msg, 'chrom', i, line) continue try: start = long(fields[1]) except ValueError: logging.warn(warning_msg, 'start', i, line) if start < 0: logging.warn(warning_msg, 'start', i, line) logging.warn('Using 0 as start instead for line %d', i) start = 0 try: end = long(fields[2]) except ValueError: logging.warn(warning_msg, 'end', i, line) continue chrom_len = len(twobit_file[chrom]) if end > len(twobit_file[chrom]): logging.warn('At line %d, end is greater than chrom length %d\n%s', i, chrom_len, line) logging.warn('Sequence will be truncated at chrom' + 'length for line %d', i) end = chrom_length seq = twobit_file[chrom][start:end] if write is not None: write(">%s:%d-%d" % (chrom, start, end)) write(textwrap.fill(seq, 60)) else: print ">%s:%d-%d" % (chrom, start, end) print textwrap.fill(seq, 60) return
if __name__ == '__main__': cmdline_reader()