Source code for ngs_plumbing.xsq

# This file is part of ngs_plumbing.

# ngs_plumbing is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# ngs_plumbing is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with ngs_plumbing.  If not, see <http://www.gnu.org/licenses/>.

# Copyright 2012 Laurent Gautier

from collections import namedtuple, OrderedDict

import io, array, math
import os, re
import sys
import h5py
import numpy
logical_and = numpy.logical_and
right_shift = numpy.right_shift
bitwise_and = numpy.bitwise_and

from ngs_plumbing.dnaqual import DnaQual
from ngs_plumbing._libxsq import colourqual_frombuffer, basequal_frombuffer, bytearray_phredtoascii, bytearray_addint

if sys.version_info[0] < 3:
    range = xrange

ColourQual = namedtuple('ColourQual', 'colour qual')

_ACGT = 'ACGT'
FRAGMENT_NAMES = ('F3', 'F5-DNA', 'F5-RNA', 'F5-BC', 'F5-2', 'R3', 'R5')
# F3: Forward strand. starts with a T
# F5/R3: Used to prime both F5-BC read and R3 or barcode. starts with a G
#
FRAGMENT_START = {'F3': 'T',
                  'F5-DNA': 'G',
                  'F5-RNA': 'G', # to be verified
                  'F5-BC': None,
                  'F5-2': None,
                  'R3': 'G',
                  'R5': None # I don't know
}

_pack_installdir = os.path.abspath(os.path.dirname(__file__))
testdata_dir = os.path.join(_pack_installdir, 'data')

inc = int(1)

[docs]def list_xsq(path): """ list XSQ (.xsq) files in a given directory """ for path,dirs,files in os.walk(path): for f in files: if f.endswith('.xsq'): yield os.path.join(path,f)
[docs]class FragmentError(KeyError): pass
[docs]class DataError(KeyError): pass
[docs]class InvalidXSQError(ValueError): pass
[docs]class XSQFile(h5py.File): """ Sequencing data from a run """ @staticmethod def _valid_library_name(name, unclassified = False, unassigned = False): invalid_names = ('RunMetadata', 'Indexing') if name in invalid_names: return False if not unclassified and name == 'Unclassified': return False if not unassigned and name.startswith('Unassigned'): return False return True def _library_names_get(self): """ Names of the libraries in the file """ return tuple(x for x in self.keys() if XSQFile._valid_library_name(x)) library_names = property(_library_names_get, None, None, """ Names of the libraries in the file (Ignoring the unclassifed and unassigned groups)""")
[docs] def library(self, name, unclassified = False, unassigned = False): if not XSQFile._valid_library_name(name, unclassified = unclassified, unassigned = unassigned): raise ValueError("Invalid library name.") return XSQLibrary(self[name].id)
[docs] def iter_lib(self): for lb_n in self.library_names: yield self.library(lb_n)
def _run_metadata_get(self): """ Metadata about the run """ if 'RunMetadata' not in self: raise KeyError("The file does not have meta-data.") res = self['RunMetadata'] return XSQRunMetadata(res.id) run_metadata = property(_run_metadata_get, None, None, """ Metadata about the run """)
[docs]class XSQLibrary(h5py._hl.group.Group): """ Sequenced library in an XSQ file. """ def _name_get(self): return self.attrs[u'LibraryName'][0] name = property(_name_get, None, None, 'Library Name')
[docs] def fragments(self): """ Fragments sequenced in this library """ frags = set() for tile_n in self.keys(): tile = super(XSQLibrary, self).__getitem__(tile_n) for k in tile.keys(): if k in FRAGMENT_NAMES: frags.add(k) return frags
[docs] def readcount(self): """ Return the number of reads for the library """ count = int(0) for number in self.keys(): tmp = super(XSQLibrary, self).__getitem__(number) #FIXME: hack ! for k in tmp.attrs.keys(): if k.endswith('Count'): ct = tmp.attrs[k] if len(ct) != 1: raise ValueError("Length of field expected to be 1.") count += ct[0] break return count
def _readvalues(self, fragment, what): return _read_values(self, fragment, what)
[docs] def iter_reads(self, fragment, what = 'BaseCallQV'): """ Iterator over the fragment 'fragment' for the data 'what' """ if fragment not in FRAGMENT_NAMES: raise ValueError('Fragment "%s" not in %s' %(fragment, FRAGMENT_NAMES)) for number in self.keys(): num = super(XSQLibrary, self).__getitem__(number) if fragment not in num: raise FragmentError('No fragment "%s" in "%s".' %(fragment, number)) frg = num[fragment] if what not in frg: raise FragmentError('No "%s" for fragment "%s" in "%s" (possible value(s): "%s").' %(what, fragment, number, '", "'.join(frg.keys()))) ds = frg[what].value a = numpy.empty(shape = ds.shape, dtype=ds.dtype) a[:] = ds[:] for read in a: yield read
[docs] def iter_colourqual(self, fragment): """ Iterator over the sequence reads for the fragment 'fragment'. Each iteration returns a named tuple of length 2: - colour: the sequence of colours - qual: the quality for the sequence For a naive colour-to-sequence translation, the first base for the fragment should be used. It is in the dict FRAGMENT_START. """ for read in self.iter_reads(fragment, what='ColorCallQV'): colour, qual = colourqual_frombuffer(read) yield ColourQual(colour, qual)
[docs] def iter_dnaqual(self, fragment, numbase): """ Iterator over the sequence reads for the fragment 'fragment'. Each iteration returns a tuple of length 2: - the sequence - the quality """ #FIXME: automagic "numbase" return iter_fastq_reads(self, fragment, numbase)
def _read_values(group, fragment, what): """ Iterator over the fragment 'fragment' for the data 'what' """ assert(fragment in FRAGMENT_NAMES) for number in group.keys(): num = group[number] if fragment not in num: raise FragmentError('No such fragment "%s"' %fragment) ds = num[fragment][what].value yield ds
[docs]def tile_valuearray(tile, fragment, what): """ Get the array of values for a tile. """ if fragment not in tile: raise FragmentError('No such fragment "%s"' %fragment) elif what not in tile[fragment]: raise DataError('No such data "%s"' %what) ds = tile[fragment][what].value a = numpy.empty(shape = ds.shape, dtype = ds.dtype) #FIXME: force copy into memory, presumably. Is this is always wise, or even true ? a[:] = ds[:] return a
[docs]def iter_valuearrays(group, fragment, what='BaseCallQV'): """ Iterator over the fragment 'fragment' for the data 'what' """ assert(fragment in FRAGMENT_NAMES) if sys.version_info[0] > 2: for tile_n in group.keys(): tile = group[tile_n] a = tile_valuearray(tile, fragment, what) tile_n = bytes(tile_n, 'ASCII') yield (tile_n, a) else: for tile_n in group.keys(): tile = group[tile_n] a = tile_valuearray(tile, fragment, what) yield (tile_n, a)
[docs]def iter_reads(group, fragment, what='BaseCallQV'): """ Iterator over the fragment 'fragment' for the data 'what' """ assert(fragment in FRAGMENT_NAMES) for number in group.keys(): num = group[number] if fragment not in num: raise FragmentError('No such fragment "%s"' %fragment) ds = num[fragment][what].value a = numpy.empty(shape = ds.shape, dtype = ds.dtype) a[:] = ds[:] for read in a: yield read
[docs]def iter_fastq_reads(group, fragment, numbase): """ Iterator over reads in base space - group: HDF5 group - fragment: fragment name - numbase: number of bases (read length) """ for tile_n, a in iter_valuearrays(group, fragment, what='BaseCallQV'): for row_i in range(a.shape[0]): dna, qual = basequal_frombuffer(a[row_i]) qual = bytearray_phredtoascii(qual) yield DnaQual(dna, qual)
[docs]def iter_csfasta_reads(group, fragment, numbase): """ Iterator over reads in base space - group: HDF5 group - fragment: fragment name - numbase: number of bases (read length) """ for tile_n, a in iter_valuearrays(group, fragment, what='ColorCallQV'): for row_i in range(a.shape[0]): colour, qual = colourqual_frombuffer(a[row_i]) yield ColourQual(colour, qual)
[docs]class XSQRunMetadata(h5py._hl.group.Group): """ Details about a sequencing run in an XSQ file. A run can have several libraries sequenced at once. """ def _getattr(self, attr): try: return self.attrs[attr] except KeyError: return [None, ] def _tagdetails_get(self): return tagdetails = property(lambda self: XSQTagDetailsList(self[u'TagDetails']).tags, None, None, 'Tag details') lanenumber = property(lambda self: self._getattr(u'LaneNumber')[0], None, None, 'Lane number') librarytype = property(lambda self: self._getattr(u'LibraryType')[0], None, None, 'Library type') hdfversion = property(lambda self: self._getattr(u'HDFVersion')[0], None, None, 'Version of the HDF5 file') fileversion = property(lambda self: self._getattr(u'FileVersion')[0], None, None, 'Version of the XSQ file') sequencingsamplename = property(lambda self: self._getattr(u'SequencingSampleName')[0], None, None, 'Sequencing sample name') sequencingsampledescription = property(lambda self: self._getattr(u'SequencingSampleDescription')[0], None, None, 'Sequencing sample description') runname = property(lambda self: self._getattr(u'RunName')[0], None, None, 'Name for the run')
[docs]class XSQInfo(OrderedDict): __ats = (('tagdetails', lambda x: ', '.join(x.tags)), ('lanenumber', lambda x: x), ('hdfversion', lambda x: x), ('fileversion',lambda x: x), ('sequencingsamplename', lambda x: x), ('sequencingsampledescription', lambda x: x), ('runname',lambda x: x)) def __init__(self): raise Error('') @classmethod
[docs] def fromxsqfile_toordict(cls, xsqfile): assert(isinstance(xsqfile, XSQFile)) res = OrderedDict() try: mtd = xsqfile.run_metadata except KeyError as ke: mtd = None for at, fun in cls.__ats: field = getattr(XSQRunMetadata, at).__doc__ if mtd is None: res[field] = 'Missing' continue try: tmp = getattr(mtd, at) value = str(fun(tmp)) except AttributeError: value = 'Error' res[field] = value return res
[docs]class XSQTagDetailsList(h5py._hl.group.Group): """ """ tags = property(lambda x: x.keys(), None, None, 'Sequencing tags. F3 is typically the forward strand for single read sequencing.')
[docs]class XSQTagDetails(h5py._hl.group.Group): def _numbasecalls_get(self): res = self.attrs['NumBaseCalls'] if len(res) != 1: raise InvalidXSQError("Invalid XSQ file. NumBaseCalls should be of length 1.") return res numbasecalls = property(_numbasecalls_get, None, None, 'Number of bases called for the tag ("read length" for the tag)') def _isbasepresent_get(self): res = self.attrs['IsBasePresent'] if len(res) != 1: raise InvalidXSQError("Invalid XSQ file. IsBasePresent should be of length 1.") if res == 1: return True elif res == 0: return False else: raise InvalidXSQError("Invalid XSQ file. IsBasePresent should 0 or 1 to represent a boolean.") isbasepresent = property(_isbasepresent_get, None, None, 'Is base information present ? When running a SOLiD, True likely means that ECC was run.')
[docs]def seqid_template(f, flowcell, tile): try: serial = f.run_metadata.attrs['InstrumentSerial'][0] except (AttributeError, KeyError) as ke: serial = b'UnknownSerial' seq_id = '@%s:%s:%s' %(serial, flowcell, tile) + ':%i:%i#0' return seq_id
[docs]def seqid_template_csfasta(flowcell, fragment): seq_id = '> %s' %(flowcell) + '_%i_%i_' + '%s' %fragment return seq_id
def _progress_bar_iter(iterator, size = 20, out = sys.stdout, suffix = ''): n = len(iterator) ms = int(math.ceil(1.0 * n / size)) if sys.version_info[0] == 2: block = '\xe2\x96\x88' else: block = u"\u2588" for j, i in enumerate(iterator): if j % ms == 0: if os.isatty(out.fileno()): bar = '\r|%s%s|%s' %(block * min(size, (j / ms)), ' ' * (size - (j / ms)), suffix) out.write(bar) out.flush() yield i def _xsqvalues_to_fastq(xsqvalues, xyloc, buf, seq_id, prefix, n_count, n_perfect): if sys.version_info[0] > 2: for row_i in range(xsqvalues.shape[0]): dna, qual = basequal_frombuffer(xsqvalues[row_i]) qual = bytearray_phredtoascii(qual) #yield DnaQual(dna, qual) n_count_read = dna.count(b'N') n_count += n_count_read if n_count_read == 0: n_perfect += inc x,y = xyloc[row_i] buf.writelines((seq_id %(x,y), '\n', dna.decode('ASCII'), '\n+\n', qual.decode('ASCII'), '\n')) else: for row_i in range(xsqvalues.shape[0]): dna, qual = basequal_frombuffer(xsqvalues[row_i]) qual = bytearray_phredtoascii(qual) #yield DnaQual(dna, qual) n_count_read = dna.count(b'N') n_count += n_count_read if n_count_read == 0: n_perfect += inc x,y = xyloc[row_i] buf.writelines((seq_id %(x,y), '\n', dna, '\n+\n', qual, '\n')) return (n_count, n_perfect, len(dna))
[docs]def make_fastq(lib, fragment, buf, f, flowcell, numbase = None, progress_mark = 50000, out=sys.stdout): n_count = int(0) n_perfect = int(0) read_count = int(0) if sys.version_info[0] == 2: block = '\xe2\x96\x88' else: block = u"\u2588" it = enumerate(iter_valuearrays(lib, fragment, what='BaseCallQV')) for tile_i, (tile_n, a) in it: if numbase is None: numbase = a.shape[1] prefix = FRAGMENT_START[fragment] seq_id = seqid_template(f, flowcell, tile_n) xyloc = lib[tile_n]['Fragments'][u'yxLocation'][:] read_count += xyloc.shape[0] ms = int(math.ceil((tile_i+1) * 1.0 * 20 / len(lib))) out.write('\r|%s%s|tile %i' %(block * min(20, ms), ' ' * (20 - ms), tile_i+1)) out.flush() n_count, n_perfect, read_length = _xsqvalues_to_fastq(a, xyloc, buf, seq_id, prefix, n_count, n_perfect) #numbase = None if os.isatty(out.fileno()): out.write('\n') out.write("%i reads. ((%.3fGb, %.3f%% perfect , %.5f%% Ns)\n" %(read_count, read_count*numbase / 1E9, 100.0*n_perfect/read_count, n_count * 100.0 / (read_count * read_length))) out.flush()
def _xsqvalues_to_csfasta(xsqvalues, xyloc, buf_cs, buf_qual, seq_id, prefix): if xsqvalues.shape[1] == 0: sys.stderr.write('Reads of length 0 (!?)\n') sys.stderr.flush() return bases_i = range(xsqvalues.shape[1]-1) if sys.version_info[0] > 2: for row_i in range(xsqvalues.shape[0]): seq, qual = colourqual_frombuffer(xsqvalues[row_i]) x,y = xyloc[row_i] this_seq_id = seq_id % (x,y) buf_cs.writelines((this_seq_id, '\n', prefix, seq.decode('ASCII'), '\n')) buf_qual.writelines((this_seq_id, '\n')) for q_i in bases_i: buf_qual.write('%i ' % qual[q_i]) buf_qual.writelines((str(qual[len(bases_i)]), '\n')) else: for row_i in range(xsqvalues.shape[0]): seq, qual = colourqual_frombuffer(xsqvalues[row_i]) x,y = xyloc[row_i] this_seq_id = seq_id % (x,y) buf_cs.writelines((this_seq_id, '\n', prefix, seq, '\n')) buf_qual.writelines((this_seq_id, '\n')) for q_i in bases_i: buf_qual.write('%i ' % qual[q_i]) buf_qual.writelines((str(qual[len(bases_i)]), '\n'))
[docs]def make_csfasta(lib, fragment, buf_cs, buf_qual, f, flowcell, numbase = None, progress_mark = 50000, out=sys.stdout): read_count = int(0) if sys.version_info[0] == 2: block = '\xe2\x96\x88' else: block = u"\u2588" for tile_i, (tile_n, a) in enumerate(iter_valuearrays(lib, fragment, what='ColorCallQV')): if numbase is None: numbase = a.shape[1] seq_id = seqid_template_csfasta(flowcell, fragment) xyloc = lib[tile_n]['Fragments'][u'yxLocation'][:] read_count += xyloc.shape[0] prefix = FRAGMENT_START[fragment] ms = int(math.ceil((tile_i + 1) * 1.0 * 20 / len(lib))) out.write('\r|%s%s|tile %i' %(block * min(20, ms), ' ' * (20 - ms), tile_i + 1)) out.flush() _xsqvalues_to_csfasta(a, xyloc, buf_cs, buf_qual, seq_id, prefix) if os.isatty(out.fileno()): out.write('\n') out.write("%i reads (%.3fGb).\n" %(read_count, read_count*numbase / 1E9)) out.flush()
def _xsqvalues_to_csfastq(xsqvalues, xyloc, buf, seq_id, prefix): if sys.version_info[0] > 2: for row_i in range(xsqvalues.shape[0]): seq, qual = colourqual_frombuffer(xsqvalues[row_i]) seq = bytearray_addint(seq, 48) qual = bytearray_phredtoascii(qual) x,y = xyloc[row_i] buf.writelines((seq_id %(x,y), '\n', prefix, seq.decode('ASCII'), '\n+\n', qual.decode('ASCII'), '\n')) else: seq_id = seq_id.encode('ASCII') for row_i in range(xsqvalues.shape[0]): seq, qual = colourqual_frombuffer(xsqvalues[row_i]) seq = bytearray_addint(seq, 48) qual = bytearray_phredtoascii(qual) x,y = xyloc[row_i] buf.writelines((seq_id %(x,y), '\n', prefix, seq, '\n+\n', qual, '\n'))
[docs]def make_csfastq(lib, fragment, buf, f, flowcell, numbase = None, progress_mark = 50000, out=sys.stdout): read_count = int(0) if sys.version_info[0] == 2: block = '\xe2\x96\x88' else: block = u"\u2588" for tile_i, (tile_n, a) in enumerate(iter_valuearrays(lib, fragment, what='ColorCallQV')): if numbase is None: numbase = a.shape[1] seq_id = seqid_template(f, flowcell, tile_n) xyloc = lib[tile_n]['Fragments'][u'yxLocation'][:] read_count += xyloc.shape[0] prefix = FRAGMENT_START[fragment] ms = int(math.ceil((tile_i+1) * 1.0 * 20 / len(lib))) out.write('\r|%s%s|tile %i' %(block * min(20, ms), ' ' * (20 - ms), tile_i + 1)) out.flush() _xsqvalues_to_csfastq(a, xyloc, buf, seq_id, prefix) if os.isatty(out.fileno()): out.write('\n') out.write("%i reads (%.3fGb).\n" %(read_count, read_count*numbase / 1E9)) out.flush()
[docs]def automagic_fastq(filename, path_out = '.', fragment = None, buf_size = int(2E6), extension = 'fq', out = sys.stdout, cs = False, want_unassigned = False): """ Extract sequence-space data into a FASTAQ file """ if cs: maker = make_csfastq else: maker = make_fastq f = XSQFile(filename, 'r') p = re.compile('.+_([0-9]+)\.xsq') if want_unassigned: library_names = tuple(x for x in f.keys() if f._valid_library_name(x, unassigned = want_unassigned)) else: library_names = f.library_names for i, lib_name in enumerate(library_names): out.write('%s (%i / %i)\n' %(lib_name, i+1, len(library_names))) out.flush() lib = f.library(lib_name, unassigned = want_unassigned) if fragment is None: fragment_list = tuple(lib.fragments()) else: fragment_list = (fragment, ) m = p.match(filename) flowcell = m.groups()[0] for frg in fragment_list: out.write('Fragment %s\n' %frg) out.flush() out_f = io.FileIO(os.path.join(path_out, "%s_%s.%s" %(lib_name, frg, extension)), "w") buf = io.BufferedWriter(out_f, buf_size) maker(lib, frg, buf, f, flowcell) buf.flush() out_f.close() f.close()
[docs]def automagic_csfasta(filename, path_out = '.', fragment = None, buf_size = int(2E6), out = sys.stdout, want_unassigned = False): """ Extract sequence-space data into a FASTAQ file """ assert((fragment is None) or (fragment in FRAGMENT_NAMES)) f = XSQFile(filename, 'r') p = re.compile('.+_([0-9]+)\.xsq') if want_unassigned: library_names = tuple(x for x in f.keys() if f._valid_library_name(x, unassigned = want_unassigned)) else: library_names = f.library_names for i, lib_name in enumerate(library_names): out.write('%s (%i / %i)\n' %(lib_name, i+1, len(library_names))) out.flush() lib = f.library(lib_name, unassigned = want_unassigned) if fragment is None: fragment_list = tuple(lib.fragments()) else: fragment_list = (fragment, ) m = p.match(filename) flowcell = m.groups()[0] for frg in fragment_list: out.write('Fragment %s\n' %frg) out.flush() out_cs = io.FileIO(os.path.join(path_out, "%s_%s.%s" %(lib_name, frg, 'csfasta')), "w") buf_cs = io.BufferedWriter(out_cs, buf_size) out_qual = io.FileIO(os.path.join(path_out, "%s_%s.%s" %(lib_name, frg, 'qual')), "w") buf_qual = io.BufferedWriter(out_qual, buf_size) #tagdetails = XSQTagDetails(lib.tagdetails.attrs[frg].id) #make_fastq(lib, fragment, tagdetails.numbasecalls, buf, seq_id) make_csfasta(lib, frg, buf_cs, buf_qual, f, flowcell) buf_cs.flush() out_cs.close() buf_qual.flush() out_qual.close() f.close()
import random
[docs]def fastqual(xsqlib, fragment, what, sample = 0.05): """ Perform a FASTQUAL-type QC. - xsqlibn: the XSQ library - fragment: name of the frament - what: colour-space or base-space data - sample: proportion to sample in order to perform the QC; 1 is 100% """ if sample <= 0 or sample > 1: raise ValueError("sample must be a number > 0 and <= 1") # determine the indices for the reads sampled nfragments = xsqlib.readcount() nsample = int(sample * nfragments) reads_idx = [None, ] * nsample for i in range(nsample): reads_idx[i] = random.randint(0, nfragments-1) reads_idx.sort() #reads_idx.reverse() # fetch the reads #FIXME: read length hardcoded - fetch it from the XSQ readlength = 75 maxval = 64 vals = [ [None, ] * nsample for bp in range(readlength) ] s_i = reads_idx.pop() read_i_offset = 0 read_m_iter = xsqlib._readvalues(fragment, what) read_m = read_m_iter.next() for read_i, read_idx in enumerate(reads_idx): while read_idx >= read_m.shape[0] + read_i_offset: read_i_offset += read_m.shape[0] read_m = read_m_iter.next() read = read_m[read_idx - read_i_offset, :] colour, qual = colourqual_frombuffer(read) for q_i, q in enumerate(qual): vals[q_i][read_i] = q for x in vals: x.sort() BoxplotInfo = namedtuple("BoxplotInfo", "p10 p25 p50 p75 p90") return tuple((BoxplotInfo(*(x[int(len(x) * p)] \ for p in (.1, .25, .5, .75, .9))) \ for x in vals))
from ngs_plumbing import report import csv, json
[docs]def make_htmlreport(xsqfile, directory = '.', verbose = True, sample_percent = 0.05, data_type = 'json'): import jinja2 assert(isinstance(xsqfile, XSQFile)) assert(data_type in ('json', 'csv')) pl = jinja2.PackageLoader('ngs_plumbing', package_path = os.path.join('data', 'html', 'templates')); j_env = jinja2.Environment(loader = pl) template = j_env.get_template('xsqlibreport.html', parent = os.path.join(_pack_installdir, 'data', 'html', 'templates')) # get information about the XSQ file xsqinfo = XSQInfo.fromxsqfile_toordict(xsqfile) LibReport = namedtuple("LibReport", "name lib_i readcount fragments") FragmentReport = namedtuple("FragmentReport", "name varname data_fn") libs = list() for lib_i, lib in enumerate(xsqfile.iter_lib()): if verbose: sys.stdout.write('Processing library: %s.\n' % lib.name) sys.stdout.flush() frags = list() for fragment in lib.fragments(): frg_varname = fragment.replace('-', '') #FIXME: only consider colour space for now fq = fastqual(lib, fragment, 'ColorCallQV', sample = sample_percent) # build the CSV file with quality data_fn = os.path.join(directory, 'fastqual_%s_%s_%i.%s' % (lib.name, frg_varname, lib_i, data_type)) if verbose: sys.stdout.write('Creating file %s...' % data_fn) f = file(data_fn, mode = 'w') if data_type == 'csv': csv_w = csv.writer(f) for row in report.fastqual_tocsv_iter(fq): csv_w.writerow(row) else: # json tmp = report.fastqual_tojson(fq) f.write('dataf_%i_%s = ' % (lib_i, frg_varname) ) f.write(tmp) f.write(';') f.close() if verbose: sys.stdout.write('done.\n') frags.append(FragmentReport(fragment, frg_varname, os.path.relpath(data_fn, start = directory))) libs.append(LibReport(lib.name, lib_i, lib.readcount(), frags)) # render the HTML rd = template.render(**{ 'basequal_max': 33, 'data_type': data_type, 'filename': os.path.basename(xsqfile.filename), 'xsqinfo': tuple(xsqinfo.iteritems()), 'libs': libs, 'sample_percent': sample_percent}) html_fn = os.path.join(directory, 'fastqual_%s.html' % (os.path.basename(xsqfile.filename))) if verbose: sys.stdout.write('Creating file %s...' % html_fn) f = file(html_fn, mode = 'w') f.writelines(rd) f.close() if verbose: sys.stdout.write('done.\n') # copy the javascript js_fn = os.path.join(_pack_installdir, 'data', 'html', 'readqual.js') jscp_fn = os.path.join(directory, 'readqual.js') if verbose: sys.stdout.write('Copying file %s...' % jscp_fn) f = file(js_fn, mode = 'r') f_cp = file(jscp_fn, mode = 'w') f_cp.writelines(row for row in f) f.close() f_cp.close() if verbose: sys.stdout.write('done.\n')
if __name__ == '__main__': pass