Source code for bripipetools.monitoring.workflowbatches

"""
Monitor the outputs of a workflow processing batch.
"""
import logging
import os
import re

from .. import util
from .. import io

logger = logging.getLogger(__name__)


[docs]class WorkflowBatchMonitor(object): def __init__(self, workflowbatch_file, pipeline_root): """ Controls operations (identification, annotation, etc.) for the set of outputs generated by a batch processing job in Globus Galaxy. :type workflowbatch_file: str :param workflowbatch_file: File path of the submitted workflow batch file. :type pipeline_root: str :param pipeline_root: Path to the root directory for processing """ logger.debug("creating `WorkflowBatchMonitor` instance for '{}'" .format(workflowbatch_file)) self.workflowbatch_file = workflowbatch_file self.workflowbatch_data = io.WorkflowBatchFile( self.workflowbatch_file, state='submit' ).parse() self.pipeline_root = pipeline_root
[docs] def _get_outputs(self): """ Collect all output files for the workflow batch, grouped by sample. :return: A list of dicts, one for each sample in the workflow batch, where key-value pairs in the dict describe the tag/label and path to each output file for the sample. """ return [{p['tag']: p['value'] for p in sample_params if p['type'] == 'output' and p['name'] == 'to_path'} for sample_params in self.workflowbatch_data['samples']]
[docs] def _clean_output_paths(self, outputs): """ Replaces ambiguous file path roots with current system root. :type outputs: list :param outputs: A list of dicts, one for each sample in the workflow batch, where key-value pairs in the dict describe the tag/label and path to each output file for the sample. :rtype: list :return: A list of dicts, with output file paths updated to use the current system root for the 'genomics' server. """ return [{out_tag: util.swap_root(out_path, 'pipeline', self.pipeline_root) for out_tag, out_path in list(sample_outputs.items())} for sample_outputs in outputs]
[docs] def check_outputs(self): """ Check whether all expected output files are present for each sample in the batch. :rtype: dict :return: A dict, where for each sample, output files are flagged as ok, missing, or empty. """ outputs = self._clean_output_paths(self._get_outputs()) logger.debug("checking status for the following outputs: {}" .format(outputs)) output_status = {} for sample_outputs in outputs: for out_tag, out_path in list(sample_outputs.items()): logger.debug("checking status for output '{}' at path '{}'" .format(out_tag, out_path)) path_exists = os.path.exists(out_path) path_size = os.stat(out_path).st_size if path_exists else 0 path_status = 'empty' if path_size == 0 else 'ok' path_status = 'missing' if not path_exists else path_status output_status[out_path] = { 'exists': path_exists, 'size': path_size, 'status': path_status } logger.debug("status info for output at path '{}' is {}" .format(out_path, output_status[out_path])) return output_status
[docs] def check_project_outputs(self, project_id): """ Check whether all expected output files are present for each sample in the batch that is part of the indicated project. :rtype: dict :return: A dict, where for each sample, output files are flagged as ok, missing, or empty. """ all_outputs = self._clean_output_paths(self._get_outputs()) logger.debug("checking status for the following outputs: {}" .format(all_outputs)) project_outputs = [] for sample_output in all_outputs: project_output = {k:v for k,v in list(sample_output.items()) if project_id in v} if project_output: project_outputs.append(project_output) output_status = {} for sample_outputs in project_outputs: for out_tag, out_path in list(sample_outputs.items()): logger.debug("checking status for output '{}' at path '{}'" .format(out_tag, out_path)) path_exists = os.path.exists(out_path) path_size = os.stat(out_path).st_size if path_exists else 0 path_status = 'empty' if path_size == 0 else 'ok' path_status = 'missing' if not path_exists else path_status output_status[out_path] = { 'exists': path_exists, 'size': path_size, 'status': path_status } logger.debug("status info for output at path '{}' is {}" .format(out_path, output_status[out_path])) return output_status