Source code for bioplus.genometools

"""
requires genome.db from bioplus, an sqlite3 database w/
the following structure:

tables represent genomes and are named according to genome name
each table has fields VARCHAR 'name' and INT UNSIGNED 'length'
each entry describes a chromosome/scaffold in a genome
"""
from pysam import Samfile
from pkg_resources import resource_filename, cleanup_resources
import sqlite3
_CONNECTION = sqlite3.connect(resource_filename('bioplus', 'data/genomes.db'))
import atexit
atexit.register(cleanup_resources)
from operator import itemgetter
from tempfile import NamedTemporaryFile


[docs]def TemporaryGenomeFile(genome_name): """ returns a file-like object pointing to a temporary file containing the chromsome names and sizes the current file position will be 0 it will be deleted when the object is garbage collected """ f = NamedTemporaryFile() genome_rows = genome(genome_name).iteritems() f.writelines(('%s\t%d\n' for chrom, size in genome_rows)) f.flush() f.seek(0) return f
def _populate_available_genomes(): name_tuples = _CONNECTION.execute("select name from sqlite_master WHERE" + " type='table'").fetchall() return map(itemgetter(0), name_tuples) AVAILABLE_GENOMES = _populate_available_genomes()
[docs]def import_db(db, replace=False, ignore_warnings=False): ''' imports a db like genome.db into the existing database note: this may not be persistent if your egg is zipped ''' connection = sqlite3.connect(db) name_tuples = connection.execute("select name from sqlite_master WHERE \ type='table'").fetchall() available_genomes = map(itemgetter(0), name_tuples) for genome_name in available_genomes: if not ignore_warnings: assert genome_name in AVAILABLE_GENOMES and not replace, \ "Genome %s already in database" % genome_name _CONNECTION.execute("create table %s (name VARCHAR, length \ INT UNSIGNED)" % genome_name) genome_dict = dict(connection.execute("select * from %s" % genome_name)) _CONNECTION.executemany("insert into %s values (?,?)" % genome_name, dict(genome_dict).items()) return
[docs]def add_genome(genome_name, genome_dict, replace=False): """ WARNING: THIS WILL PERMANENTLY ADD A GENOME make a call to pybedtool and tries to add to genome registry then reloads module and repopulates globals note: this may not be persistent if your egg is zipped """ assert genome_name in AVAILABLE_GENOMES and not replace, \ "Genome %s already in database" % genome_name _CONNECTION.execute("create table %s (name VARCHAR, length INT \ UNSIGNED)" % genome_name) _CONNECTION.executemany("insert into %s values (?,?)" % genome_name, dict(genome_dict).items()) return
[docs]def genome(genome_name): """ if available, returns a dict of {'chr_name': length} for all chromosomes in that genome """ if not genome_name in AVAILABLE_GENOMES: raise GenomeNotAvailableError else: try: return dict(_CONNECTION.execute("select * from %s" % genome_name)) except sqlite3.OperationalError: raise GenomeNotAvailableError(genome_name)
[docs]class GenomeNotAvailableError(ValueError): pass
[docs]class NoMatchFoundError(LookupError): pass
[docs]def guess_bam_genome(bam_file_or_filename): return guess_genome(genome_from_bam(bam_file_or_filename))
[docs]def genome_from_bam(bam_file_or_filename): if isinstance(bam_file_or_filename, Samfile): bam_file = bam_file_or_filename else: bam_file = Samfile(bam_file_or_filename) return dict(zip(bam_file.references, bam_file.lengths))
[docs]def guess_genome(genome1): """ expects a dictionary or iterable of ('name', length) checks for a match in the database """ for name in AVAILABLE_GENOMES: genome2 = genome(name) if matches_genome(genome1, genome2): return name raise NoMatchFoundError
[docs]def matches_genome(genome1, genome2, symmetric=False): """ expects two dictionaries or iterables of ('name', length) tells you if genome1 matches genome2 (not all the entries in genome2 need to be matched, but all the entries in genome1 must be) symmetric=True means matches(genome1,genome2) and matches(genome2,genome1) """ g1 = dict(genome1) g2 = dict(genome2) for k in g1.keys(): if not k in g2: return False elif not g2[k] == g1[k]: return False #else: continue if symmetric: return matches_genome(genome2, genome1) else: return True