Source code for bripipetools.postprocessing.stitching

"""
Combine parsed data from a set of batch processing output files and write to a
single CSV file.
"""
import logging
import os
import re
import csv

import pandas as pd

from .. import io
from .. import parsing
from .. import util

logger = logging.getLogger(__name__)


[docs]class OutputStitcher(object): """ Given a path to an output folder or list of files, combine parsed data from files and write CSV. """ def __init__(self, path, output_type=None, outputs=None): logger.debug("creating `OutputStitcher` for path '{}'".format(path)) self.path = path if output_type is None: self.type = self._sniff_output_type()
[docs] def _sniff_output_type(self): """ Return predicted output type based on specified path. """ output_types = ['metrics', 'QC', 'counts', 'validation'] output_match = [t.lower() for t in output_types if re.search(t, self.path)] if len(output_match): return output_match[0]
[docs] def _get_outputs(self, output_type): """ Return list of outputs of specified type. """ output_filetypes = {'metrics': 'txt|html', 'qc': 'txt', 'counts': 'txt', 'validation': 'csv'} return [os.path.join(self.path, f) for f in os.listdir(self.path) if re.search(output_type, f) and not re.search('combined', f) and re.search(output_filetypes[output_type], os.path.splitext(f)[-1])]
[docs] def _get_parser(self, output_type, output_source): """ Return the appropriate parser for the current output file. """ parsers = { 'metrics': {'htseq': getattr(io, 'HtseqMetricsFile'), 'picard-rnaseq': getattr(io, 'PicardMetricsFile'), 'picard-markdups': getattr(io, 'PicardMetricsFile'), 'picard-align': getattr(io, 'PicardMetricsFile'), 'picard-alignment': getattr(io, 'PicardMetricsFile'), 'tophat-stats': getattr(io, 'TophatStatsFile')}, 'qc': {'fastqc': getattr(io, 'FastQCFile'), 'fastqc-R1': getattr(io, 'FastQCFile'), 'fastqc-R2': getattr(io, 'FastQCFile')}, 'counts': {'htseq': getattr(io, 'HtseqCountsFile')}, 'validation': {'sexcheck': getattr(io, 'SexcheckFile')} } logger.debug("matched parser '{}' for output type '{}' and source '{}'" .format(parsers[output_type][output_source], output_type, output_source)) return parsers[output_type][output_source]
[docs] def _read_data(self): """ Parse and store data for each output file. """ outputs = self._get_outputs(self.type) outputs.sort() self.data = {} for o in outputs: logger.debug("parsing output file '{}'".format(o)) out_items = parsing.parse_output_filename(o) proclib_id = out_items['sample_id'] out_type = out_items['type'] out_source = out_items['source'] logger.debug("storing data from '{}' in '{}' '{}'".format( out_source, proclib_id, out_type)) out_parser = self._get_parser(out_type, out_source)(path=o) self.data.setdefault( out_type, {}).setdefault(proclib_id, []).append( {out_source: out_parser.parse()} )
[docs] def _build_table(self): """ Combine parsed data into table for writing. """ output_data = self.data[self.type] if self.type == 'counts': logger.info("combining counts data") for idx, (sample_id, sample_data) in enumerate(output_data.items()): data = sample_data[0]['htseq'] data = data.rename(index=str, columns={'count': sample_id}) if idx == 0: table_data = data else: table_data = pd.merge(table_data, data, on='geneName') else: logger.info("combining non-counts data") table_data = [] for sample_id, sample_data in list(output_data.items()): header = [field for source in sample_data for name, data in list(source.items()) for field in list(data.keys())] logger.debug("header row: {}".format(header)) values = [value for source in sample_data for name, data in list(source.items()) for value in list(data.values())] logger.debug("values: {}".format(values)) if not len(table_data): table_data.append(['libId'] + sorted(header)) logger.debug("added header row: {}".format(table_data[-1])) table_data.append( [sample_id] + [v for h, v in sorted(zip(header, values))] ) logger.debug("added values row: {}".format(table_data[-1])) return table_data
[docs] def _add_mapped_reads_column(self, data): """ Add mapped_reads_w_dups column to metrics table data. """ try: for idx, row in enumerate(data[1:]): metrics = dict(list(zip(data[0], row))) # handle pct mapped reads calculation differently for PE reads if(metrics['category'] == 'unpaired'): mapped_reads = (float(metrics['unpaired_reads_examined']) / float(metrics['fastq_total_reads'])) else: mapped_reads = ((float(metrics['unpaired_reads_examined']) + float(metrics['read_pairs_examined'])) / float(metrics['fastq_total_reads'])) data[idx + 1].append(mapped_reads) data[0].append('mapped_reads_w_dups') except KeyError: logger.warning("required fields missing; skipping calculation " "of 'mapped_reads_w_dups") return data
[docs] def _build_combined_filename(self): """ Parse input path to create filename for combined CSV file. """ path_parts = re.sub('.*Illumina/|.*ICAC/', '', self.path).split('/') logger.debug("building combined filename from path parts {}" .format(path_parts)) run_items = parsing.parse_flowcell_run_id(path_parts[0]) project_label = parsing.get_project_label(path_parts[1]) date = util.matchdefault('[0-9]{6}', path_parts[1]) if project_label != '' and run_items['flowcell_id'] != '': filename_base = '{}_{}_{}'.format(project_label, run_items['flowcell_id'], date) else: filename_base = path_parts[0] return '{}_combined_{}.csv'.format(filename_base.rstrip('_'), self.type)
[docs] def _build_overrepresented_seq_table(self): """ Parse and combine overrepresented sequences tables from FastQC files. """ outputs = self._get_outputs(self.type) overrep_seq_table = pd.DataFrame([]) if self.type == 'qc': for o in outputs: logger.debug("parsing overrepresented sequences " "from output file '{}'".format(o)) out_items = parsing.parse_output_filename(o) proclib_id = out_items['sample_id'] out_type = out_items['type'] out_source = out_items['source'] logger.debug("storing data from {} in {} {}".format( out_source, proclib_id, out_type)) out_parser = io.FastQCFile(path=o) o_table = (pd.DataFrame(out_parser .parse_overrepresented_seqs()) .assign(libId=proclib_id)) logger.debug("found {} overrepresented sequence(s) " "for sample '{}'".format(len(o_table), proclib_id)) overrep_seq_table = overrep_seq_table.append(o_table) return overrep_seq_table
[docs] def write_overrepresented_seq_table(self): """ Write combined overrepresented sequences table to CSV file. """ table_path = os.path.join(self.path, self._build_combined_filename()) table_path = re.sub('_qc', '_overrep_seqs', table_path) table_data = self._build_overrepresented_seq_table() logger.debug("writing overrepresented seqs to file '{}'" .format(table_path)) table_data.to_csv(table_path, index=False)
[docs] def write_table(self): """ Write the combined table to a CSV file. """ self._read_data() table_data = self._build_table() if self.type == 'metrics': table_data = self._add_mapped_reads_column(table_data) table_path = os.path.join(self.path, self._build_combined_filename()) logger.debug("writing to file {}".format(table_path)) if self.type == 'counts': table_data.to_csv(table_path, index=False) else: with open(table_path, 'w') as f: writer = csv.writer(f, lineterminator='\n') for row in table_data: writer.writerow(row) return table_path