"""
cruzdb: library for pythonic access to UCSC genome-browser's MySQL database
"""
import soup
import sys
class BigException(Exception): pass
from tests import test
def _open(filelike, mode='r'):
if hasattr(filelike, 'read'): return filelike
return open(filelike, mode)
[docs]class Genome(soup.Genome):
"""
Connect to a particular database
Returns a new Genome object
Parameters
----------
db : str
either an sqlalchemy dburl, or just the database name.
user : str
if `db` is a dburl, this is not needed. otherwise it's
the database user
host : str
if `db` is a dburl, this is not needed. otherwise it's
the database host
password : str
if `db` is a dburl, this is not needed. otherwise it's
the database password
engine : sqlalchemy.engine
if specified, all other parameters must be unused. just forces
use of an existing engine
"""
url = "mysql://%(user)s%(password)s@%(host)s/%(db)s"
[docs] def __init__(self, db="", user="genome", host="genome-mysql.cse.ucsc.edu",
password="", engine=None):
self.create_url(db, user, host, password)
soup.Genome.__init__(self, self.dburl)
self.session.autoflush = False
[docs] def create_url(self, db="", user="genome", host="genome-mysql.cse.ucsc.edu",
password=""):
"""
internal: create a dburl from a set of parameters or the defaults on
this object
"""
if db.startswith(("sqlite://", "mysql://", "postgresql://")):
self.db = self.url = db
self.dburl = db
self.user = self.host = self.password = ""
else:
self.db = db
if user == "genome" and host != "genome-mysql.cse.ucsc.edu":
import getpass
user = getpass.getuser()
self.host = host
self.user = user
self.password = (":" + password) if password else ""
self.dburl = self.url % dict(db=self.db, user=self.user,
host=self.host, password=self.password)
[docs] def mirror(self, tables, dest_url):
"""
miror a set of `tables` from `dest_url`
Returns a new Genome object
Parameters
----------
tables : list
an iterable of tables
dest_url: str
a dburl string, e.g. 'sqlite:///local.db'
"""
from mirror import mirror
return mirror(self, tables, dest_url)
[docs] def dataframe(self, table, limit=None, offset=None):
"""
create a pandas dataframe from a table or query
Parameters
----------
table : table
a table in this database or a query
limit: integer
an integer limit on the query
offset: integer
an offset for the query
"""
from pandas import DataFrame
if isinstance(table, basestring):
table = getattr(self, table)
records = table._table.select()
if not limit is None:
records = records.limit(limit)
if not offset is None:
records = records.offset(offset)
records = list(records.execute())
cols = [c.name for c in table._table.columns]
return DataFrame.from_records(records, columns=cols)
@property
def tables(self):
return self._cache.keys()
[docs] def load_file(self, fname, table=None, sep="\t", bins=False, indexes=None):
"""
use some of the machinery in pandas to load a file into a table
Parameters
----------
fname : str
filename or filehandle to load
table : str
table to load the file to
sep : str
CSV separator
bins : bool
add a "bin" column for efficient spatial queries.
indexes : list[str]
list of columns to index
"""
convs = {"#chr": "chrom", "start": "txStart", "end": "txEnd", "chr":
"chrom", "pos": "start", "POS": "start", "chromStart": "txStart",
"chromEnd": "txEnd"}
if table is None:
import os.path as op
table = op.basename(op.splitext(fname)[0]).replace(".", "_")
print >>sys.stderr, "writing to:", table
from pandas.io import sql
import pandas as pa
from toolshed import nopen
needs_name = False
for i, chunk in enumerate(pa.read_csv(nopen(fname), iterator=True,
chunksize=100000, sep=sep, encoding="latin-1")):
chunk.columns = [convs.get(k, k) for k in chunk.columns]
if not "name" in chunk.columns:
needs_name = True
chunk['name'] = chunk.get('chrom', chunk[chunk.columns[0]])
if bins:
chunk['bin'] = 1
if i == 0 and not table in self.tables:
schema = sql.get_sqlite_schema(chunk, table)
print schema
self.engine.execute(schema)
elif i == 0:
print >>sys.stderr,\
"""adding to existing table, you may want to drop first"""
tbl = getattr(self, table)._table
cols = chunk.columns
data = list(dict(zip(cols, x)) for x in chunk.values)
if needs_name:
for d in data:
d['name'] = "%s:%s" % (d.get("chrom"), d.get("txStart", d.get("chromStart")))
if bins:
for d in data:
d['bin'] = max(Genome.bins(int(d["txStart"]), int(d["txEnd"])))
self.engine.execute(tbl.insert(), data)
self.session.commit()
if i > 0:
print >>sys.stderr, "writing row:", i * 100000
if "txStart" in chunk.columns:
if "chrom" in chunk.columns:
ssql = """CREATE INDEX "%s.chrom_txStart" ON "%s" (chrom, txStart)""" % (table, table)
else:
ssql = """CREATE INDEX "%s.txStart" ON "%s" (txStart)""" % (table, table)
self.engine.execute(ssql)
for index in (indexes or []):
ssql = """CREATE INDEX "%s.%s" ON "%s" (%s)""" % (table,
index, table, index)
self.engine.execute(ssql)
if bins:
ssql = """CREATE INDEX "%s.chrom_bin" ON "%s" (chrom, bin)""" % (table, table)
self.engine.execute(ssql)
self.session.commit()
@staticmethod
[docs] def david_go(refseq_list, annot=('SP_PIR_KEYWORDS', 'GOTERM_BP_FAT',
'GOTERM_CC_FAT', 'GOTERM_MF_FAT')):
"""
open a web-browser to the DAVID online enrichment tool
Parameters
----------
refseq_list : list
list of refseq names to check for enrichment
annot : list
iterable of DAVID annotations to check for enrichment
"""
URL = "http://david.abcc.ncifcrf.gov/api.jsp?type=REFSEQ_MRNA&ids=%s&tool=term2term&annot="
import webbrowser
webbrowser.open(URL % ",".join(set(refseq_list)) + ",".join(annot))
[docs] def bin_query(self, table, chrom, start, end):
"""
perform an efficient spatial query using the bin column if available.
The possible bins are calculated from the `start` and `end` sent to
this function.
Parameters
----------
table : str or table
table to query
chrom : str
chromosome for the query
start : int
0-based start postion
end : int
0-based end position
"""
if isinstance(table, basestring):
table = getattr(self, table)
try:
tbl = table._table
except AttributeError:
tbl = table.column_descriptions[0]['type']._table
q = table.filter(tbl.c.chrom == chrom)
if hasattr(tbl.c, "bin"):
bins = Genome.bins(start, end)
if len(bins) < 100:
q = q.filter(tbl.c.bin.in_(bins))
if hasattr(tbl.c, "txStart"):
return q.filter(tbl.c.txStart <= end).filter(tbl.c.txEnd >= start)
return q.filter(tbl.c.chromStart <= end).filter(tbl.c.chromEnd >= start)
[docs] def upstream(self, table, chrom_or_feat, start=None, end=None, k=1):
"""
Return k-nearest upstream features
Parameters
----------
table : str or table
table against which to query
chrom_or_feat : str or feat
either a chromosome, e.g. 'chr3' or a feature with .chrom, .start,
.end attributes
start : int
if `chrom_or_feat` is a chrom, then this must be the integer start
end : int
if `chrom_or_feat` is a chrom, then this must be the integer end
k : int
number of upstream neighbors to return
"""
res = self.knearest(table, chrom_or_feat, start, end, k, "up")
end = getattr(chrom_or_feat, "end", end)
start = getattr(chrom_or_feat, "start", start)
rev = getattr(chrom_or_feat, "strand", "+") == "-"
if rev:
return [x for x in res if x.end > start]
else:
return [x for x in res if x.start < end]
[docs] def downstream(self, table, chrom_or_feat, start=None, end=None, k=1):
"""
Return k-nearest downstream features
Parameters
----------
table : str or table
table against which to query
chrom_or_feat : str or feat
either a chromosome, e.g. 'chr3' or a feature with .chrom, .start,
.end attributes
start : int
if `chrom_or_feat` is a chrom, then this must be the integer start
end : int
if `chrom_or_feat` is a chrom, then this must be the integer end
k : int
number of downstream neighbors to return
"""
res = self.knearest(table, chrom_or_feat, start, end, k, "down")
end = getattr(chrom_or_feat, "end", end)
start = getattr(chrom_or_feat, "start", start)
rev = getattr(chrom_or_feat, "strand", "+") == "-"
if rev:
return [x for x in res if x.start < end]
else:
return [x for x in res if x.end > start]
[docs] def knearest(self, table, chrom_or_feat, start=None, end=None, k=1,
_direction=None):
"""
Return k-nearest features
Parameters
----------
table : str or table
table against which to query
chrom_or_feat : str or feat
either a chromosome, e.g. 'chr3' or a feature with .chrom, .start,
.end attributes
start : int
if `chrom_or_feat` is a chrom, then this must be the integer start
end : int
if `chrom_or_feat` is a chrom, then this must be the integer end
k : int
number of downstream neighbors to return
_direction : (None, "up", "down")
internal (don't use this)
"""
assert _direction in (None, "up", "down")
# they sent in a feature
if start is None:
assert end is None
chrom, start, end = chrom_or_feat.chrom, chrom_or_feat.start, chrom_or_feat.end
# if the query is directional and the feature as a strand,
# adjust...
if _direction in ("up", "down") and getattr(chrom_or_feat,
"strand", None) == "-":
_direction = "up" if _direction == "down" else "up"
else:
chrom = chrom_or_feat
qstart, qend = long(start), long(end)
res = self.bin_query(table, chrom, qstart, qend)
i, change = 1, 350
try:
while res.count() < k:
if _direction in (None, "up"):
if qstart == 0 and _direction == "up": break
qstart = max(0, qstart - change)
if _direction in (None, "down"):
qend += change
i += 1
change *= (i + 5)
res = self.bin_query(table, chrom, qstart, qend)
except BigException:
return []
def dist(f):
d = 0
if start > f.end:
d = start - f.end
elif f.start > end:
d = f.start - end
# add dist as an attribute to the feature
return d
dists = sorted([(dist(f), f) for f in res])
if len(dists) == 0:
return []
dists, res = zip(*dists)
if len(res) == k:
return res
if k > len(res): # had to break because of end of chrom
if k == 0: return []
k = len(res)
ndist = dists[k - 1]
# include all features that are the same distance as the nth closest
# feature (accounts for ties).
while k < len(res) and dists[k] == ndist:
k = k + 1
return res[:k]
[docs] def sql(self, query):
"""
show the sql of a query
"""
return self.engine.execute(query)
[docs] def annotate(self, fname, tables, feature_strand=False, in_memory=False,
header=None, out=sys.stdout, parallel=False):
"""
annotate a file with a number of tables
Parameters
----------
fname : str or file
file name or file-handle
tables : list
list of tables with which to annotate `fname`
feature_strand : bool
if this is True, then the up/downstream designations are based on
the features in `tables` rather than the features in `fname`
in_memoory : bool
if True, then tables are read into memory. This usually makes the
annotation much faster if there are more than 500 features in
`fname` and the number of features in the table is less than 100K.
header : str
header to print out (if True, use existing header)
out : file
where to print output
parallel : bool
if True, use multiprocessing library to execute the annotation of
each chromosome in parallel. Uses more memory.
"""
from .annotate import annotate
return annotate(self, fname, tables, feature_strand, in_memory, header=header,
out=out, parallel=parallel)
@staticmethod
[docs] def bins(start, end):
"""
Get all the bin numbers for a particular interval defined by
(start, end]
"""
if end - start < 536870912:
offsets = [585, 73, 9, 1]
else:
raise BigException
offsets = [4681, 585, 73, 9, 1]
binFirstShift = 17
binNextShift = 3
start = start >> binFirstShift
end = (end - 1) >> binFirstShift
bins = [1]
for offset in offsets:
bins.extend(range(offset + start, offset + end + 1))
start >>= binNextShift
end >>= binNextShift
return frozenset(bins)
def __repr__(self):
return "%s('%s')" % (self.__class__.__name__,
self.url % dict(db=self.db, user=self.user, host=self.host,
password="?" if self.password else ""))
@classmethod
[docs] def save_bed(cls, query, filename=sys.stdout):
"""
write a bed12 file of the query.
Parameters
----------
query : query
a table or query to save to file
filename : file
string or filehandle to write output
"""
out = _open(filename, 'w')
for o in query:
out.write(o.bed() + '\n')
if __name__ == "__main__":
#g = Genome(db="hg18", host="localhost", user="")
import sys
#g = Genome(db="hg18", host="localhost")
g = Genome("sqlite:///hg18.db")
#g.load_file('GSM882245.hg18.bed', bins=True)
#g.load_file('http://rafalab.jhsph.edu/CGI/model-based-cpg-islands-hg18.txt',
# table='cpgRafaLab', bins=True)
#1/0
print g.cpgIslandExt[12].bed()
print g.cpgIslandExt[12].bed('length', 'perCpg')
#sys.exit()
print "refGene"
f = g.refGene[19]
print f.bed12()
f = g.refGene[19]
print repr(f), f.cdsStart, f.cdsEnd
print "exons", f.exons
print "coding exons", f.coding_exons
print "cds", f.cds
print "introns", f.introns
print "5'utr", f.utr5
print "3'utr", f.utr3
print f.browser_link
#f.txEnd = f.txStart + 30
#print list(f.blat())
#print f.cds_sequence
import time
from sqlalchemy import and_
query = g.refGene.filter(and_(g.refGene.txStart > 10000, g.refGene.txEnd < 40000))
t = time.time()
print query
query.all()
print time.time() - t
query = g.refGene.filter(and_(g.refGene.txStart > 10000, g.refGene.txEnd < 40000))
query = query.filter(g.refGene.bin.in_(Genome.bins(10000, 40000)))
t = time.time()
query = g.bin_query(g.refGene, "chr1", 10000, 40000)
query.all()
print time.time() - t
g = Genome('hg19')
t = time.time()
q = g.snp135Common
q = q.filter(q.bin.in_(Genome.bins(1000, 2000)))
print q
q.first()
print time.time() - t
Genome.save_bed(query)
for transcript in g.refGene:
print transcript, transcript.sequence()[:100] + "..."
if transcript.txEnd > 8000: break
kg = g.refGene._table
q = kg.select(kg.c.txStart < 5000)
print list(g.session.execute(q))