Source code for bripipetools.database.operations

"""
Basic operations for BRI Mongo databases.
"""
import logging
import re
from functools import wraps
import datetime

from .. import util

logger = logging.getLogger(__name__)


[docs]def find_objects(collection): """ Return a decorator that retrieves objects from the specified collection, given a db connection and query. :type collection: str :param collection: String indicating the name of the collection """ def decorator(f): @wraps(f) def wrapper(*args): db, query = f(*args) logger.debug("searching '{}' collection with query '{}'" .format(collection, query)) return list(db[collection].find(query)) return wrapper return decorator
[docs]def insert_objects(collection): """ Return a decorator that inserts one or more objects in into specified collection; if object exists, updates any individual fields that are not empty in the input object. :type collection: str :param collection: string indicating the name of the collection """ def decorator(f): @wraps(f) def wrapper(*args): db, objects = f(*args) objects = [objects] if not isinstance(objects, list) else objects logger.debug("inserting list of objects: {}".format(objects)) for o in objects: logger.debug("inserting '{}' into '{}' collection" .format(o, collection)) for k, v in list(o.items()): if v is not None: logger.debug("updating field {}".format(k)) db[collection].update_one({'_id': o['_id']}, {'$set': {k: v}}, upsert=True) return wrapper return decorator
[docs]@find_objects('genomicsWorkflowbatches') def get_genomicsWorkflowbatches(db, query): """ Return list of documents from 'genomicsWorkflowbatches' collection based on query. """ return db, query
[docs]@find_objects('genomicsSamples') def get_genomicsSamples(db, query): """ Return list of documents from 'genomicsSamples' collection based on query. """ return db, query
[docs]@find_objects('genomicsCounts') def get_genomicsCounts(db, query): """ Return list of documents from 'genomicsCounts' collection based on query. """ return db, query
[docs]@find_objects('genomicsMetrics') def get_genomicsMetrics(db, query): """ Return list of documents from 'genomicsMetrics' collection based on query. """ return db, query
[docs]@find_objects('genomicsRuns') def get_genomicsRuns(db, query): """ Return list of documents from 'genomicsRuns' collection based on query. """ return db, query
[docs]@insert_objects('genomicsWorkflowbatches') def put_genomicsWorkflowbatches(db, workflowbatches): """ Insert each document in list into 'genomicsWorkflowbatches' collection. """ return db, workflowbatches
[docs]@insert_objects('genomicsSamples') def put_genomicsSamples(db, samples): """ Insert each document in list into 'genomicsSamples' collection. """ return db, samples
[docs]@insert_objects('genomicsCounts') def put_genomicsCounts(db, counts): """ Insert each document in list into 'genomicsCounts' collection. """ return db, counts
[docs]@insert_objects('genomicsMetrics') def put_genomicsMetrics(db, metrics): """ Insert each document in list into 'genomicsMetrics' collection. """ return db, metrics
[docs]@insert_objects('genomicsRuns') def put_genomicsRuns(db, runs): """ Insert each document in list into 'genomicsRuns' collection. """ return db, runs
[docs]def create_workflowbatch_id(db, prefix, date): """ Check the 'workflowbatches' collection and construct ID with lowest available batch number (i.e., ''<prefix>_<date>_<number>'). :type db: type[pymongo.database.Database] :param db: database object for current MongoDB connection :type prefix: str :param prefix: base string for workflow batch ID, based on workflow batch type (e.g., 'globusgalaxy' for Globus Galaxy workflow :type date: type[datetime.datetime] :param date: date on which workflow batch was run :rtype: str :return: a unique ID for the workflow batch, with the prefix and date combination appended with the highest available integer """ isodate = datetime.date.isoformat(date) query = {'_id': {'$regex': '{}_{}_.+'.format(prefix, isodate)}} logger.debug("searching 'genomicsWorkflowbatches' collection with query '{}'" .format(query)) workflowbatches = get_genomicsWorkflowbatches(db, query) logger.debug("matched workflow batches: '{}'".format(workflowbatches)) num = 1 if len(workflowbatches): num = max([int(util.matchdefault(r'\d$', wb['_id'])) for wb in workflowbatches]) while True: num_regex = re.compile('_{}$'.format(num)) logger.debug("searching for workflowbatches '{}' ending in '{}'" .format(workflowbatches, num)) if any([num_regex.search(wb['_id']) for wb in workflowbatches]): num += 1 break return '{}_{}_{}'.format(prefix, isodate, num)
[docs]def search_ancestors(db, sample_id, field): """ Given an object in the 'samples' collection, specified by the input ID, iteratively walk through ancestors based on 'parentId' until a value is found for the requested field. :type db: type[pymongo.database.Database] :param db: database object for current MongoDB connection :type sample_id: str :param sample_id: a unique ID for a sample in GenLIMS :type field: str :param field: the field for which to search among ancestor samples :return: value for field, if found """ sample = db.samples.find_one({'_id': sample_id}) if sample is not None: if field in sample: return sample[field] else: try: return search_ancestors(db, sample['parentId'], field) except KeyError: logger.debug("input sample '{}' has no mapped parent sample" .format(sample_id), exc_info=True) else: logger.debug("input sample '{}' not found in db" .format(sample_id))