# -*- coding: utf-8 -*-
"""ribocount"""
import os
import shutil
import zipfile
import logging
import argparse
import ribocore
import config
# Default is production
CONFIG = config.ProductionConfig()
log = logging.getLogger('riboplot')
[docs]def create_parser():
"""Argument parser. """
parser = argparse.ArgumentParser(
prog='ribocount.py', description='Output read counts for all transcripts')
# required arguments
required = parser.add_argument_group('required arguments')
required.add_argument('-b', '--ribo_file', help='Ribo-Seq alignment file in BAM format', required=True)
required.add_argument('-f', '--transcriptome_fasta', help='FASTA format file of the transcriptome', required=True)
# optional arguments
parser.add_argument('-l', '--read_lengths', help='Read lengths to consider (default: %(default)s). '
'Multiple read lengths should be separated by commas. If multiple read lengths '
'are specified, corresponding read offsets should also be specified. If you do '
'not wish to apply an offset, please input 0 for the corresponding read length',
default='0', type=ribocore.lengths_offsets)
parser.add_argument('-s', '--read_offsets', help='Read offsets (default: %(default)s). '
'Multiple read offsets should be separated by commas',
default='0', type=ribocore.lengths_offsets)
count_group = parser.add_mutually_exclusive_group()
count_group.add_argument('-v', '--count_five', help='Flag. Output reads in 5\' region', action='store_true')
count_group.add_argument('-r', '--count_three', help='Flag. Output reads in 3\' region', action='store_true')
parser.add_argument('-m', '--html_file', help='Output file for results (HTML)', default='ribocount.html')
parser.add_argument('-o', '--output_path', help='Files are saved in this directory', default='output')
parser.add_argument('-d', '--debug', help='Flag. Produce debug output', action='store_true')
return parser
[docs]def main(args):
"""Main program"""
(ribo_file, transcriptome_fasta, read_lengths, read_offsets, count_five, count_three,
output_path, html_file) = \
(args.ribo_file, args.transcriptome_fasta, args.read_lengths, args.read_offsets,
args.count_five, args.count_three, args.output_path, args.html_file)
log.debug('Supplied arguments\n{}'.format(
'\n'.join(['{:<20}: {}'.format(k, v) for k, v in vars(args).items()])))
# error messages (simple format) are written to html file
fh = logging.FileHandler(html_file)
fh.setLevel(logging.ERROR)
fh.setFormatter(ErrorLogFormatter('%(message)s'))
log.addHandler(fh)
log.info('Checking if required arguments are valid...')
ribocore.check_required_arguments(ribo_file=ribo_file, transcriptome_fasta=transcriptome_fasta)
log.info('Checking read lengths...')
ribocore.check_read_lengths(ribo_file=ribo_file, read_lengths=read_lengths)
log.info('Done')
log.info('Checking read offsets...')
ribocore.check_read_offsets(read_offsets=read_offsets)
log.info('Done')
log.info('Checking if each read length has a corresponding offset...')
ribocore.check_read_lengths_offsets(read_lengths=read_lengths, read_offsets=read_offsets)
log.info('Done')
with ribocore.open_pysam_file(fname=ribo_file, ftype='bam') as b, ribocore.open_pysam_file(fname=transcriptome_fasta, ftype='fasta') as f:
# Total valid transcript count (ones with reads)
count = 0
prime = None
table_body = '' # HTML table body content
if count_five:
log.info('Only 5\' read counts requested')
prime = '5'
elif count_three:
log.info('Only 3\' read counts requested')
prime = '3'
# create output directories
if not os.path.exists(output_path):
os.mkdir(output_path)
# zip_dir contents will be written here and a zip archive will be created
# from this directory
zip_dir = os.path.join(output_path, 'ribocount_output')
if not os.path.exists(zip_dir):
os.mkdir(zip_dir)
csv_dir = os.path.join(zip_dir, 'csv')
if not os.path.exists(csv_dir):
os.mkdir(csv_dir)
log.info('Get RiboSeq read counts for all transcripts in FASTA')
for transcript in f.references:
ribo_counts, ribo_reads = ribocore.get_ribo_counts(ribo_fileobj=b, transcript_name=transcript,
read_lengths=read_lengths, read_offsets=read_offsets)
if not ribo_reads: # no reads for this transcript. skip.
continue
transcript_sequence = f[transcript]
# By default, all counts will be written (ribo_counts)
# If 5' or 3' counts requested, filter and use
# those counts for printing instead
write_counts = ribo_counts
log.debug('Total read counts {}'.format(ribo_reads))
# find longest ORF and filter counts based on whether 5' or 3' is
# requested
longest_orf = {}
if count_five or count_three:
# use default start and stop codons and find ORFs in all 3
# frames (+)
orfs = ribocore.get_three_frame_orfs(sequence=transcript_sequence)
if not len(orfs):
log.debug('No ORFs for transcript {0}'.format(transcript))
continue
longest_orf = ribocore.get_longest_orf(orfs=orfs)
orf_start, orf_stop = longest_orf['start'], longest_orf['stop']
log.info('Transcript: {0} Longest ORF Start: {1}, Stop: {2}'.format(transcript, orf_start, orf_stop))
if count_five:
write_counts, five_reads = ribocore.filter_ribo_counts(counts=ribo_counts, orf_start=orf_start)
log.debug('5\' region read counts: {}'.format(five_reads))
elif count_three:
write_counts, three_reads = ribocore.filter_ribo_counts(counts=ribo_counts, orf_stop=orf_stop)
log.debug('3\' region read counts: {}'.format(three_reads))
if not len(write_counts):
# no counts for transcript
continue
log.debug('Writing counts to CSV file for transcript {}'.format(transcript))
count += 1
csv_file = 'RiboCounts{}.csv'.format(count)
with open(os.path.join(csv_dir, csv_file), 'w') as cw:
cw.write('"Position","Nucleotide","Frame 1","Frame 2","Frame 3"\n')
for pos in range(1, len(transcript_sequence) + 1):
nucleotide = transcript_sequence[pos - 1]
if pos in write_counts:
cw.write('{0},{1},{2},{3},{4}\n'.format(
pos, nucleotide, write_counts[pos][1], write_counts[pos][2], write_counts[pos][3]))
else:
cw.write('{0},{1},{2},{3},{4}\n'.format(pos, nucleotide, 0, 0, 0))
# HTML table
table_body += '<tr><td>{0}</td><td>{1}</td>'.format(transcript, ribo_reads)
if count_five:
table_body += '<td>{0}</td>'.format(five_reads)
elif count_three:
table_body += '<td>{0}</td>'.format(three_reads)
table_body += '<td><a href="csv/{0}">{0}</a></td></tr>'.format(csv_file)
table_body += '</tbody>'
# only for display in HTML
valid_lengths = ['{}'.format(item) for item in read_lengths]
if len(valid_lengths) == 1 and valid_lengths[0] == '0':
valid_lengths = ['All']
if not count:
if len(valid_lengths) >= 1:
log.info('No transcripts found for read lengths: {}'.format(', '.join(valid_lengths)))
else:
log.info('No transcripts found')
else:
if prime:
template = 'ribocount_prime.html'
else:
template = 'ribocount.html'
with open(os.path.join(CONFIG.PKG_DATA_DIR, template)) as g,\
open(os.path.join(zip_dir, 'index.html'), 'w') as h:
h.write(g.read().format(count=count, length='{}'.format(', '.join(valid_lengths)),
prime=prime, table_body=table_body))
for asset in ('css', 'js'):
asset_dir = os.path.join(zip_dir, asset)
if not os.path.exists(asset_dir):
os.mkdir(asset_dir)
asset_data_dir = os.path.join(CONFIG.PKG_DATA_DIR, asset)
for fname in os.listdir(asset_data_dir):
shutil.copy(os.path.join(asset_data_dir, fname),
os.path.join(zip_dir, asset, fname))
log.info('Creating zip file')
os.chdir(output_path)
with zipfile.ZipFile('ribocount_output.zip', 'w', allowZip64=True) as zipf:
for root, d, f in os.walk('ribocount_output'):
for name in f:
zipf.write(os.path.join(root, name))
shutil.rmtree('ribocount_output')
os.chdir('../')
log.debug('Writing HTML report')
with open(os.path.join(CONFIG.PKG_DATA_DIR, 'ribocount_index.html')) as j, open(args.html_file, 'w') as k:
k.write(j.read().format(count=count, read_length=', '.join(valid_lengths)))
log.info('Finished')
[docs]def run():
"""Run program"""
parsed = create_parser()
args = parsed.parse_args()
main(args)
if __name__ == '__main__':
run()