Source code for ngs_plumbing.dna

# This file is part of ngs_plumbing.

# ngs_plumbing is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# ngs_plumbing is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with ngs_plumbing.  If not, see <http://www.gnu.org/licenses/>.

# Copyright 2012 Laurent Gautier

"""
Tools to handle DNA.

When representing DNA as a string the use of storage is not optimal
as 8 bits (one byte) are used to store each base while there are
traditionnally only 4 possibilities for bases, 5 when considering
that 'N' can be used to signify that the base is not know / not defined
at a given position.

4 bases can be encoded with 2 bits, and 5 bases can be encoded with
3 bits.

The module contains the following lookup structure.
2-bit encoding:
_bit2_to_dna
_dna_to_bit2

3-bit encoding:
_bit3_to_dna
_dna_to_bit3

"""

from struct import pack, unpack
from array import array
from operator import or_
from io import IOBase, BytesIO
from ngs_plumbing._libdna import byte_frombit2bytearray, byte_frombit2bytes
from ngs_plumbing._libdna import slice_frombit2bytes
from ngs_plumbing._libdna import bit2_frombytes, bit2_frombytearray, bit2_frombuffer
from ngs_plumbing._libdna import bit2reverse_frombit2bytes, bit2complement_frombit2bytes, bit2reversecomplement_frombit2bytes
from ngs_plumbing.ngsp_string import sequentialfragments, sequentialfragments_iter, randomstring

# Compatibility with Python 3
import sys
if sys.version_info[0] > 2:
    xrange = range

# A: 0b00
# C: 0b01
# G: 0b10
# T: 0b11
_alphabet = b'ACGT'
_bit2_to_dna = _alphabet
# arbitrary: put an A when out of the alphabet
# error checking is assumed to take place upstream
_dna_to_bit2 = array('i', (0b00 for x in range(255)))

for val, b in enumerate(_bit2_to_dna):
    if sys.version_info[0] < 3:
        i = ord(b)
        _dna_to_bit2[i] = val
    else:
        _dna_to_bit2[b] = val
del(b, val)


# also store pairs (masking + lookups to be done divided by 2)
_bit2_to_dna2 = tuple(l1+l2 for i2,l2 in enumerate(_alphabet) \
                          for i1,l1 in enumerate(_alphabet))


_alphabet_n = b'ATGCN'
_alphabet_to_bits = {b'A': (0b000, ),
                     b'T': (0b001, ),
                     b'G': (0b110, ),
                     b'C': (0b111, ),
                     b'N': (0b010, 0b011, 0b100, 0b101)}

# setup byte<->integer mappings
_bit3_to_dna = list(b'N' * 8)
for k,v in _alphabet_to_bits.items():
    for i in v:
        _bit3_to_dna[i] = k
_bit3_to_dna = b''.join(_bit3_to_dna)
del(k, v, i)

_dna_to_bit3 = array('i', (_alphabet_to_bits[b'N'][0] for x in range(255)))
for val, b in enumerate(_bit3_to_dna):
    if sys.version_info[0] < 3:
        i = ord(b)
        _dna_to_bit3[i] = val
    else:
        _dna_to_bit3[b] = val
del(b, val)
# ---

def randomgenome(size):
    """ Make a random genome """
    return randomstring(size, _alphabet_n)


[docs]class PackedDNABytes(bytes): """ Representation of a DNA string as an array of bytes """ #__slots__ = ('__bits', '_get_bit_encoding', 'bit_encoding', # 'unpack') def _get_bit_encoding(self): return self.__bits bit_encoding = property(_get_bit_encoding, None, None, 'Number of bits used for encoding') def __new__(cls, string='', bits = 2, dopack=True): if len(string) == 0: res = array('L') else: if bits == 2: if dopack: res = bit2_frombytes(string) else: res = string else: raise ValueError('Only 2-bit packing for now') obj = bytes.__new__(cls, res) return(obj) def __init__(self, string='', bits=2, dopack=True): self.__bits = bits if bits == 2: self.__unpack = byte_frombit2bytes elif bits == 3: raise ValueError('Only 2-bit packing for now') else: raise ValueError('The parameter "bits" can only take the value 2 or 3')
[docs] def reversed(self): """ Return the reverse, still packed. """ return PackedDNABytes(bit2reverse_frombit2bytes(self), dopack=False)
[docs] def complemented(self): """ Return the complement, still packed. """ return PackedDNABytes(bit2complement_frombit2bytes(self), dopack=False)
[docs] def reversecomplemented(self): """ Return the reverse complement, still packed. """ return PackedDNABytes(bit2reversecomplement_frombit2bytes(self), dopack=False)
def unpack(self): return self.__unpack(self)
[docs] def unpack_slice(self, i=None, j=None): """ Get the unpacked a subsequence as bytes (zero-offset indexing, like other Python sequences) """ res = slice_frombit2bytes(self, i, j) return res # class KmerPosition(dict): # """ # Given a sequence, or streamm, of DNA bases as bytes, build a Python dict # of bit representation for the Kmers. # KmerPosition(stream, K = 17, step = 1) # """ # def __init__(self, stream, K = 17, step = 1): # self.__K = K # self.__step = step # #FIXME: code duplication :/ refactor to avoid it without performance penalty # pos = long(0) # if isinstance(stream, bytes): # for i, chunk_bytes in enumerate(sequentialfragments(stream, K, step = step)): # if chunk_bytes not in self: # self[chunk_bytes] = [pos,] # else: # self[chunk_bytes].append(pos) # pos += step # else: # iterator = sequentialfragments_iter # for i, chunk_bytes in enumerate(sequentialfragments_iter(stream, K, step = step)): # b = chunk_bytes.tobytes() # if b not in self: # self[b] = [pos,] # else: # self[b].append(pos) # pos += step # def _get_K(self): # return self.__K # def _set_K(self, K): # self.__K = K # K = property(_get_K, None, None, 'K for the K-mers') # def _get_step(self): # return self.__step # def _set_step(self, step): # self.__step = step # step = property(_get_step, None, None, # 'Incremental step for the sliding window over the original sequence') # class PackedPosition(KmerPosition): # """ # Given a sequence, or streamm, of DNA bases as bytes, build a Python dict # of bit representation for the Kmers. # KmerPosition(stream, K = 17, step = 1, bits = 2) # """ # def _get_bit_encoding(self): # return self.__bits # bit_encoding = property(_get_bit_encoding, # None, None, # 'Number of bits used for encoding') # def _get_K(self): # return self.__K # def _set_K(self, K): # self.__K = K # K = property(_get_K, None, None, 'K for the K-mers') # def __init__(self, stream, K = 17, step = 1, bits = 2): # assert(isinstance(K, long) or isinstance(K, int)) # assert(isinstance(step, long) or isinstance(step, int)) # self.__K = K # self.__step = step # self.__bits = bits # if bits == 2: # self.__int64_to_dna = bit2int64_to_dna # elif bits == 3: # self.__int64_to_dna = bit3int64_to_dna # else: # raise ValueError('The parameter "bits" can only take the value 2 or 3') # if isinstance(stream, bytes): # if bits == 2: # iterator = dna_to_bit2int64 # elif bits == 3: # iterator = dna_to_bit3int64 # else: # raise ValueError('The parameter "bits" can only take the value 2 or 3') # else: # if bits == 2: # iterator = dna_to_bit2int64_iter # elif bits == 3: # iterator = dna_to_bit3int64_iter # else: # raise ValueError('The parameter "bits" can only take the value 2 or 3') # pos = long(0) # for i, chunk_int in enumerate(iterator(stream, K = K, step = step)): # if chunk_int not in self: # self[chunk_int] = [pos,] # else: # self[chunk_int].append(pos) # pos += step # def keys_bytes(self): # """ Homologous to the method keys(), # except that it returns the keys as DNA bytes """ # k = self.keys() # k = tuple(self.__int64_to_dna(k, self.__K)) # return k # def iter_bytes(self): # return iter(self.__int64_to_dna(x) for x in iter(self))