Source code for bripipetools.postprocessing.cleanup

"""
Clean up & organize outputs from processing workflow batch.
"""
import logging
import os
import re
import zipfile
import shutil

logger = logging.getLogger(__name__)


[docs]class OutputCleaner(object): """ Moves, renames, and deletes individual output files from a workflow processing batch for a selected project. """ def __init__(self, path): logger.debug("creating `OutputCleaner` instance for '{}'".format(path)) self.path = path self.output_types = self._get_output_types()
[docs] def _get_output_types(self): """ Identify the types of outputs included for the project. """ out_types = ['qc', 'metrics', 'counts', 'alignments', 'logs'] logging.debug("subfolders in project folder: {}" .format(os.listdir(self.path))) return [f for f in os.listdir(self.path) if f.lower() in out_types]
[docs] def _get_output_paths(self, output_type): """ Return full path for individual output files. """ logging.debug("locating output files of type '{}'".format(output_type)) output_root = os.path.join(self.path, output_type) return [os.path.join(self.path, root, f) for root, dirs, files in os.walk(output_root) for f in files if not re.search('(DS_Store|_old)', f)]
[docs] def _unzip_output(self, path): """ Unzip the contents of a compressed output file. """ logging.debug("extracting contents of '{}' to '{}'" .format(path, os.path.dirname(path))) paths = [] with zipfile.ZipFile(path) as zf: logger.debug("zip folder contents: {}".format(zf.namelist())) for f in zf.namelist(): if f != './': paths.append(zf.extract(f, os.path.dirname(path))) logging.debug("unzipped the following files: {}".format(paths)) return paths
[docs] def _unnest_output(self, path): """ Unnest files in a subfolder by concatenating filenames and moving up one level. """ logging.debug("unnesting output '{}' from subfolder '{}'" .format(path, os.path.dirname(path))) prefix = os.path.dirname(path) if re.search('.zip$', path): logging.debug("unzipping contents of '{}' before unnesting" .format(path)) for p in self._unzip_output(path): shutil.move(p, '{}_{}'.format(prefix, os.path.basename(p))) try: shutil.rmtree(os.path.splitext(path)[0]) except OSError: pass else: shutil.move(path, '{}_{}'.format(prefix, os.path.basename(path)))
[docs] def _recode_output(self, path, output_type): """ Rename file according to template. """ filename_map = {'QC': ('fastqc_data.txt', 'fastqc_qc.txt')} swap = filename_map[output_type] newpath = re.sub(swap[0], swap[1], path) logging.debug("renaming '{}' to '{}'".format(path, newpath)) shutil.move(path, newpath) return newpath
[docs] def clean_outputs(self): """ Walk through output types to unzip, unnest, and rename files. """ for output_type in self.output_types: if output_type == 'QC': outputs = self._get_output_paths(output_type) for o in outputs: outregex = re.compile(output_type + '$') if not outregex.search(os.path.dirname(o)): self._unnest_output(o) for o in os.listdir(os.path.join(self.path, output_type)): self._recode_output( os.path.join(self.path, output_type, o), output_type )