Source code for riboplot.riboplot

# -*- coding: utf-8 -*-
import sys

# check dependencies
module_errors = {}

try:
    import matplotlib
except ImportError as e:
    module_errors['matplotlib'] = str(e)
else:
    matplotlib.use('Agg')
    from matplotlib import pyplot as plt
    from matplotlib import gridspec
    from matplotlib.font_manager import FontProperties

if len(module_errors):
    msg = 'Could not import all required modules:\n\n'
    for module, error in module_errors.items():
        msg += 'Importing "{0}" failed with \n{1}\n\n'.format(module, error)
    sys.exit(msg)

import os
import re
import config
import shutil
import logging
import tempfile
import argparse
import subprocess

import ribocore
# Default is production
CONFIG = config.ProductionConfig()

# create logger
log = logging.getLogger('riboplot')


[docs]class ErrorLogFormatter(logging.Formatter): """Custom error log format for the HTML file""" def format(self, record): return '<h2>RiboPlot Error</h2><p>{}</p>'.format(record.msg)
[docs]def get_start_stops(transcript_sequence, start_codons=None, stop_codons=None): """Return start and stop positions for all frames in the given transcript. """ transcript_sequence = transcript_sequence.upper() # for comparison with codons below if not start_codons: start_codons = ['ATG'] if not stop_codons: stop_codons = ['TAA', 'TAG', 'TGA'] seq_frames = {1: {'starts': [], 'stops': []}, 2: {'starts': [], 'stops': []}, 3: {'starts': [], 'stops': []}} for codons, positions in ((start_codons, 'starts'), (stop_codons, 'stops')): if len(codons) > 1: pat = re.compile('|'.join(codons)) else: pat = re.compile(codons[0]) for m in re.finditer(pat, transcript_sequence): # Increment position by 1, Frame 1 starts at position 1 not 0 start = m.start() + 1 rem = start % 3 if rem == 1: # frame 1 seq_frames[1][positions].append(start) elif rem == 2: # frame 2 seq_frames[2][positions].append(start) elif rem == 0: # frame 3 seq_frames[3][positions].append(start) return seq_frames
[docs]def get_rna_counts(rna_file, transcript_name): """Get coverage for a given RNA BAM file, return read counts. """ # check if the RNA file exists if not os.path.exists(rna_file): msg = 'RNA-Seq BAM file "{}" does not exist'.format(rna_file) logging.error(msg) raise OSError(msg) rna_counts = {} cov_file = tempfile.NamedTemporaryFile(delete=False) try: subprocess.check_call( ['bedtools', 'genomecov', '-ibam', rna_file, '-bg'], stdout=cov_file) except subprocess.CalledProcessError as e: # needs testing raise ribocore.RNACountsError('Could not generate coverage for RNA BAM file. \n{}\n'.format(e)) for line in open(cov_file.name): line = line.split() if line[0] == transcript_name: position, count = int(line[1]) + 1, int(line[3]) rna_counts[position] = count cov_file.close() os.unlink(cov_file.name) return rna_counts
[docs]def set_axis_color(axis, color, alpha=None): """Sets the spine color of all sides of an axis (top, right, bottom, left).""" for side in ('top', 'right', 'bottom', 'left'): spine = axis.spines[side] spine.set_color(color) if alpha is not None: spine.set_alpha(alpha)
[docs]def get_color_palette(scheme): """Return colors for a given scheme. Default colors are returned for an item if undefined in scheme. """ color_schemes = { 'default': { 'frames': ['tomato', 'limegreen', 'deepskyblue'], 'background': '#ffffff', 'color': '#616161', 'ticks': '#757575', 'start': '#ffffff', 'stop': '#909090', 'rna': '#eaeaea', 'axis': '#e0e0e0', 'grey': '#bdbdbd' }, 'colorbrewer': { 'frames': ['#fc8d62', '#66c2a5', '#8da0cb'] }, 'rgb': { 'frames': ['red', 'green', 'blue'] }, 'greyorfs': {} } colors = {} for k, v in color_schemes['default'].items(): try: vals = color_schemes[scheme][k] except KeyError: vals = v colors[k] = vals return colors
[docs]def plot_profile(ribo_counts, transcript_name, transcript_length, start_stops, read_lengths=None, read_offsets=None, rna_counts=None, color_scheme='default', html_file='index.html', output_path='output'): """Plot read counts (in all 3 frames) and RNA coverage if provided for a single transcript. """ colors = get_color_palette(scheme=color_scheme) gs = gridspec.GridSpec(3, 1, height_ratios=[6, 1.3, 0.5], hspace=0.35) font_axis = {'family': 'sans-serif', 'color': colors['color'], 'weight': 'bold', 'size': 7} # riboseq bar plots gs2 = gridspec.GridSpecFromSubplotSpec(1, 1, subplot_spec=gs[0]) ax2 = plt.subplot(gs2[0]) label = 'Ribo-Seq count' if read_lengths: if len(read_lengths) > 1: label = 'Ribo-Seq count ({}-mers)'.format(', '.join('{}'.format(item) for item in read_lengths)) else: label = 'Ribo-Seq count ({}-mer)'.format('{}'.format(read_lengths[0])) ax2.set_ylabel(label, fontdict=font_axis, labelpad=10) # rna coverage if available ax_rna = None if rna_counts: ax_rna = ax2.twinx() ax_rna.set_ylabel('RNA-Seq count', fontdict=font_axis, labelpad=10) ax_rna.bar(rna_counts.keys(), rna_counts.values(), facecolor=colors['rna'], edgecolor=colors['rna'], label='RNA') ax_rna.set_zorder(1) frame_counts = {1: {}, 2: {}, 3: {}} for k, v in ribo_counts.iteritems(): for fr in (1, 2, 3): if v[fr] > 0: frame_counts[fr][k] = v[fr] break cnts = [] [cnts.extend(item.values()) for item in frame_counts.values()] y_max = float(max(cnts) * 1.25) ax2.set_ylim(0.0, y_max) ax2.set_zorder(2) ax2.patch.set_facecolor('none') for frame in (1, 2, 3): color = colors['frames'][frame - 1] x_vals = frame_counts[frame].keys() ax2.bar(x_vals, frame_counts[frame].values(), color=color, facecolor=color, edgecolor=color) # ORF architecture gs3 = gridspec.GridSpecFromSubplotSpec(3, 1, subplot_spec=gs[1], hspace=0.1) if color_scheme == 'greyorfs': axisbg = [colors['grey'] for i in range(3)] else: axisbg = colors['frames'] ax4 = plt.subplot(gs3[0], sharex=ax2, axisbg=axisbg[0]) ax5 = plt.subplot(gs3[1], sharex=ax2, axisbg=axisbg[1]) ax6 = plt.subplot(gs3[2], sharex=ax2, axisbg=axisbg[2]) ax6.set_xlabel('Transcript length ({} nt)'.format(transcript_length), fontdict=font_axis, labelpad=6) # Legend gs4 = gridspec.GridSpecFromSubplotSpec(1, 1, subplot_spec=gs[2], hspace=0.1) ax7 = plt.subplot(gs4[0], axisbg=colors['background']) set_axis_color(ax7, colors['background']) ax7.text(0.02, 0.1, "AUG", size=5, ha="center", va="center", color=colors['color'], bbox=dict(boxstyle="square", facecolor=colors['start'], edgecolor=colors['color'], linewidth=0.3)) ax7.text(0.06, 0.1, "STOP", size=5, ha="center", va="center", color='white', bbox=dict(boxstyle="square", color=colors['stop'])) ax7.text(0.13, 0.1, "Frames", size=5, ha='center', va='center', color=colors['color'], fontdict={'weight': 'bold'}) ax7.text(0.17, 0.1, "1", size=5, ha="center", va="center", color='white', bbox=dict(boxstyle="square", color=colors['frames'][0])) ax7.text(0.19, 0.1, "2", size=5, ha="center", va="center", color='white', bbox=dict(boxstyle="square", color=colors['frames'][1])) ax7.text(0.21, 0.1, "3", size=5, ha="center", va="center", color='white', bbox=dict(boxstyle="square", color=colors['frames'][2])) # No ticks or labels for ORF 1, 2 and Legend for axis in (ax4, ax5, ax7): axis.tick_params(top=False, left=False, right=False, bottom=False, labeltop=False, labelleft=False, labelright=False, labelbottom=False) axes = [ax2] if ax_rna: axes.append(ax_rna) fp = FontProperties(size='5') for axis in axes: set_axis_color(axis, colors['axis']) axis.tick_params(colors=colors['ticks']) for item in (axis.get_xticklabels() + axis.get_yticklabels()): item.set_fontproperties(fp) item.set_color(colors['color']) for axis, frame in ((ax4, 1), (ax5, 2), (ax6, 3)): if color_scheme == 'greyorfs': color = colors['grey'] else: color = colors['frames'][frame - 1] set_axis_color(axis, color, alpha=0.05) axis.patch.set_alpha(0.3) # opacity of ORF architecture for item in (axis.get_xticklabels()): item.set_fontproperties(fp) item.set_color(colors['color']) axis.set_ylim(0, 0.2) axis.set_xlim(0, transcript_length) starts = [(item, 1) for item in start_stops[frame]['starts']] stops = [(item, 1) for item in start_stops[frame]['stops']] start_colors = [colors['start'] for item in starts] axis.broken_barh(starts, (0.11, 0.2), facecolors=start_colors, edgecolors=start_colors, label='start', zorder=5) stop_colors = [colors['stop'] for item in stops] axis.broken_barh(stops, (0, 0.2), facecolors=stop_colors, edgecolors=stop_colors, label='stop', zorder=5) axis.set_ylabel('{}'.format(frame), fontdict={'family': 'sans-serif', 'color': colors['color'], 'weight': 'normal', 'size': '6'}, rotation='horizontal', labelpad=10, verticalalignment='center') axis.tick_params(top=False, left=False, right=False, labeltop=False, labelleft=False, labelright=False, direction='out', colors=colors['ticks']) plt.title('{}'.format(transcript_name), fontdict={'family': 'sans-serif', 'color': colors['color'], 'weight': 'bold', 'size': 8, 'y': 20}) if not os.path.exists(output_path): os.mkdir(output_path) plt.savefig(os.path.join(output_path, 'riboplot.svg'), facecolor=colors['background']) plt.savefig(os.path.join(output_path, 'riboplot.png'), dpi=600, facecolor=colors['background']) with open(os.path.join(CONFIG.PKG_DATA_DIR, 'riboplot.html')) as g, open(os.path.join(output_path, html_file), 'w') as h: h.write(g.read().format(transcript_name=transcript_name)) css_dir = os.path.join(output_path, 'css') if not os.path.exists(css_dir): os.mkdir(css_dir) css_data_dir = os.path.join(CONFIG.PKG_DATA_DIR, 'css') for fname in os.listdir(css_data_dir): shutil.copy(os.path.join(css_data_dir, fname), os.path.join(output_path, 'css', fname))
[docs]def create_parser(): """Argument parser. """ parser = argparse.ArgumentParser( prog='riboplot.py', description='Plot and output read counts for a single transcript') required = parser.add_argument_group('required arguments') required.add_argument('-b', '--ribo_file', help='Ribo-Seq alignment file in BAM format', required=True) required.add_argument('-f', '--transcriptome_fasta', help='FASTA format file of the transcriptome', required=True) required.add_argument('-t', '--transcript_name', help='Transcript name', metavar='TEXT', required=True) # plot function - optional arguments parser.add_argument('-n', '--rna_file', help='RNA-Seq alignment file (BAM)') parser.add_argument('-l', '--read_lengths', help='Read lengths to consider (default: %(default)s). ' 'Multiple read lengths should be separated by commas. If multiple read lengths ' 'are specified, corresponding read offsets should also be specified. If you do ' 'not wish to apply an offset, please input 0 for the corresponding read length', default='0', type=ribocore.lengths_offsets) parser.add_argument('-s', '--read_offsets', help='Read offsets (default: %(default)s). ' 'Multiple read offsets should be separated by commas', default='0', type=ribocore.lengths_offsets) parser.add_argument('-c', '--color_scheme', help='Color scheme to use (default: %(default)s)', choices=['default', 'colorbrewer', 'rgb', 'greyorfs'], default='default') parser.add_argument('-m', '--html_file', help='Output file for results (HTML)', default='riboplot.html') parser.add_argument('-o', '--output_path', help='Files are saved in this directory', default='output') parser.add_argument('-d', '--debug', help='Flag. Produce debug output', action='store_true') return parser
[docs]def main(args): """Main program""" (ribo_file, rna_file, transcript_name, transcriptome_fasta, read_lengths, read_offsets, output_path, html_file) = ( args.ribo_file, args.rna_file, args.transcript_name, args.transcriptome_fasta, args.read_lengths, args.read_offsets, args.output_path, args.html_file) # error messages (simple format) are written to html file fh = logging.FileHandler(html_file) fh.setLevel(logging.ERROR) fh.setFormatter(ErrorLogFormatter('%(message)s')) log.addHandler(fh) log.debug('Supplied arguments\n{}'.format( '\n'.join(['{:<20}: {}'.format(k, v) for k, v in vars(args).items()]))) log.debug('Testing debugggg') log.info('Checking if required arguments are valid...') ribocore.check_required_arguments( ribo_file=ribo_file, transcriptome_fasta=transcriptome_fasta, transcript_name=transcript_name) log.info('Done') if rna_file: log.info('Checking if RNA-Seq file is valid...') ribocore.check_rna_file(rna_file=rna_file) log.info('Done') log.info('Checking read lengths...') ribocore.check_read_lengths(ribo_file=ribo_file, read_lengths=read_lengths) log.info('Done') log.info('Checking read offsets...') ribocore.check_read_offsets(read_offsets=read_offsets) log.info('Done') log.info('Checking if each read length has a corresponding offset') ribocore.check_read_lengths_offsets(read_lengths=read_lengths, read_offsets=read_offsets) log.info('Done') log.info('Get sequence and length of the given transcript from FASTA file...') record = ribocore.get_fasta_record(transcriptome_fasta, transcript_name) transcript_sequence = record[transcript_name] transcript_length = len(transcript_sequence) log.info('Get ribo-seq read counts and total reads in Ribo-Seq...') with ribocore.open_pysam_file(fname=ribo_file, ftype='bam') as bam_fileobj: ribo_counts, total_reads = ribocore.get_ribo_counts( ribo_fileobj=bam_fileobj, transcript_name=transcript_name, read_lengths=read_lengths, read_offsets=read_offsets) if not ribo_counts: msg = ('No RiboSeq read counts for transcript {}. No plot will be ' 'generated!'.format(transcript_name)) log.error(msg) raise ribocore.RiboPlotError(msg) else: log.info('Get RNA counts for the given transcript...') mrna_counts = {} if rna_file: try: mrna_counts = get_rna_counts(rna_file, transcript_name) except OSError as e: log.error(e) raise if not mrna_counts: log.warn('No RNA counts for this transcript from the given RNA Seq file. ' 'RNA-Seq coverage will not be generated') else: log.debug('No RNA-Seq data provided. Not generating coverage') log.info('Get start/stop positions in transcript sequence (3 frames)...') codon_positions = get_start_stops(transcript_sequence) if not os.path.exists(output_path): os.mkdir(output_path) log.info('Writing RiboSeq read counts for {}'.format(transcript_name)) with open(os.path.join(output_path, 'RiboCounts.csv'), 'w') as f: f.write('"Position","Nucleotide","Frame 1","Frame 2","Frame 3"\n') for pos in range(1, transcript_length + 1): if pos in ribo_counts: f.write('{0},{1},{2},{3},{4}\n'.format( pos, transcript_sequence[pos - 1], ribo_counts[pos][1], ribo_counts[pos][2], ribo_counts[pos][3])) else: f.write('{0},{1},{2},{3},{4}\n'.format(pos, transcript_sequence[pos - 1], 0, 0, 0)) log.info('Generating RiboPlot...') plot_profile(ribo_counts, transcript_name, transcript_length, codon_positions, read_lengths, read_offsets, mrna_counts, color_scheme=args.color_scheme, html_file=args.html_file, output_path=args.output_path) log.info('Finished!')
[docs]def run(): """Run program""" parsed = create_parser() args = parsed.parse_args() main(args)
if __name__ == '__main__': run()