"""
"""
import logging
import re
from .. import util
from .. import database
from .. import model as docs
logger = logging.getLogger(__name__)
[docs]class ProcessedLibraryAnnotator(object):
"""
Identifies, stores, and updates information about a processed library.
"""
def __init__(self, workflowbatch_id, params, db):
logger.debug("creating `ProcessedLibraryAnnotator` instance")
self.workflowbatch_id = workflowbatch_id
logger.debug("workflowbatch_id set to '{}'".format(workflowbatch_id))
self.db = db
self.params = params
self.seqlib_id = self._get_seqlib_id()
self.proclib_id = '{}_processed'.format(self.seqlib_id)
logger.debug("processed library is '{}'".format(self.proclib_id))
self.processedlibrary = self._init_processedlibrary()
# TODO: move to parsing module
[docs] def _get_seqlib_id(self):
"""
Return the ID of the parent sequenced library.
"""
return [p['value'] for p in self.params
if p['name'] == 'SampleName'][0]
[docs] def _init_processedlibrary(self):
"""
Try to retrieve data for the processed library from GenLIMS;
if unsuccessful, create new ``ProcessedLibrary`` object.
"""
logger.debug("initializing `ProcessedLibrary` instance")
try:
logger.debug("getting `ProcessedLibrary` from GenLIMS")
return database.map_to_object(
database.get_genomicsSamples(self.db, {'_id': self.proclib_id})[0])
except IndexError:
logger.debug("creating new ProcessedLibrary object")
return docs.ProcessedLibrary(_id=self.proclib_id)
[docs] def _get_outputs(self):
"""
Return the list of outputs from the processing workflow batch.
"""
return {p['tag']: p['value'] for p in self.params
if p['type'] == 'output' and p['name'] == 'to_path'}
# TODO: move to parsing module
[docs] def _parse_output_name(self, output_name):
"""
Parse output name indicated by parameter tag in workflow batch
submit file and return individual components indicating name,
source, and type.
"""
name = re.sub('_out$', '', output_name)
name_parts = name.split('_')
if len(name_parts) == 3:
name = '_'.join(name_parts)
output_type = name_parts[1]
source = name_parts[0]
else:
# to handle old style tags
output_type = name_parts.pop(-1)
source = '_'.join(name_parts)
return {'name': name, 'type': output_type, 'source': source}
[docs] def _group_outputs(self):
"""
Organize outputs according to type and source.
"""
outputs = self._get_outputs()
grouped_outputs = {}
for k, v in list(outputs.items()):
if 'fastq_' not in k:
output_items = self._parse_output_name(k)
grouped_outputs.setdefault(
output_items['type'], []
).append(
{'source': output_items['source'],
'file': util.swap_root(v, 'pipeline', '/'),
'name': output_items['name']})
return grouped_outputs
[docs] def _append_processed_data(self):
"""
Add details and outputs for current workflow batch to processed
data array field for processed library.
"""
processed_data = self.processedlibrary.processed_data
if (not len(processed_data)
or not any(d['workflowbatch_id'] == self.workflowbatch_id
for d in processed_data)):
logger.debug("inserting outputs from new workflow batch '{}' "
"for processed library '{}'"
.format(self.workflowbatch_id, self.proclib_id))
self.processedlibrary.processed_data.append(
{'workflowbatch_id': self.workflowbatch_id,
'outputs': self._group_outputs()}
)
else:
logger.debug("updating outputs from workflow batch '{}' "
"for processed library '{}'"
.format(self.workflowbatch_id, self.proclib_id))
batch_data = [d for d in processed_data
if d['workflowbatch_id'] == self.workflowbatch_id][0]
batch_data['outputs'] = self._group_outputs()
[docs] def _update_processedlibrary(self):
"""
Add or update any missing fields in ProcessedLibrary object.
"""
self._append_processed_data()
update_fields = {'parent_id': self.seqlib_id}
self.processedlibrary.is_mapped = False
self.processedlibrary.update_attrs(update_fields, force=True)
[docs] def get_processed_library(self):
"""
Return updated ProcessedLibrary object.
"""
self._update_processedlibrary()
logger.debug("returning processed library object info: {}"
.format(self.processedlibrary.__dict__))
return self.processedlibrary