Module metaMS.gcmsWorkflow

Expand source code
from dataclasses import dataclass
from multiprocessing import Pool
from pathlib import Path
import json

from metaMS.metadata_factory import NMDC_Metadata
from corems.mass_spectra.input.andiNetCDF import ReadAndiNetCDF
from corems.encapsulation.input import parameter_from_json
from corems.mass_spectra.calc.GC_RI_Calibration import get_rt_ri_pairs
from corems.molecular_id.search.compoundSearch import LowResMassSpectralMatch

import cProfile

@dataclass
class WorkflowParameters:
    
    file_paths: tuple = ('data/...', 'data/...')
    calibration_file_path: str = 'data/...'
    output_directory: str = 'data/...'
    output_filename: str = 'data/...'
    output_type: str = 'csv'
    corems_json_path: str = 'data/corems.json'

def worker(args):

    cProfile.runctx('workflow_worker(args)', globals(), locals(), 'gc-ms.prof')

def run_gcms_metabolomics_workflow_wdl(file_paths, calibration_file_path, output_directory,output_filename, output_type, corems_json_path, jobs, db_path=None):
    
    import click
    workflow_params = WorkflowParameters()
    workflow_params.file_paths = file_paths.split(",")
    workflow_params.calibration_file_path = calibration_file_path
    workflow_params.output_directory = output_directory
    workflow_params.output_filename = output_filename
    workflow_params.output_type = output_type
    workflow_params.corems_json_path = corems_json_path
    
    dirloc = Path(workflow_params.output_directory)
    dirloc.mkdir(exist_ok=True)
    output_path = Path(workflow_params.output_directory)/workflow_params.output_filename
    
    rt_ri_pairs = get_calibration_rtri_pairs(workflow_params.calibration_file_path, workflow_params.corems_json_path)   

    worker_args = [(file_path, rt_ri_pairs, workflow_params.corems_json_path, workflow_params.calibration_file_path ) for file_path in workflow_params.file_paths]
    #gcms_list = pool.map(workflow_worker, worker_args)
    pool = Pool(int(jobs))
    
    for i, gcms in enumerate(pool.imap_unordered(workflow_worker, worker_args), 1):
        eval('gcms.to_'+ workflow_params.output_type + '(output_path, highest_score=False)')

    pool.close()
    pool.join()

def run_nmdc_metabolomics_workflow(workflow_params_file, jobs):
    
    import click
    dms_file_path = 'db/GC-MS Metabolomics Experiments to Process Final.xlsx'
    
    click.echo('Loading Searching Settings from %s' % workflow_params_file)
    workflow_params = read_workflow_parameter(workflow_params_file)
    
    dirloc = Path(workflow_params.output_directory)
    dirloc.mkdir(exist_ok=True)
    
    rt_ri_pairs = get_calibration_rtri_pairs(workflow_params.calibration_file_path, workflow_params.corems_json_path)   

    worker_args = [(file_path, rt_ri_pairs, workflow_params.corems_json_path, workflow_params.calibration_file_path) for file_path in workflow_params.file_paths]
    #gcms_list = pool.map(workflow_worker, worker_args)
    pool = Pool(jobs)
    
    for i, gcms in enumerate(pool.imap_unordered(workflow_worker, worker_args), 1):
        
        in_file_path = Path(workflow_params.file_paths[i])
        output_path = Path(workflow_params.output_directory)/in_file_path.name

        eval('gcms.to_'+ workflow_params.output_type + '(output_path, write_metadata=False)')
        
        nmdc = NMDC_Metadata(in_file_path, workflow_params.calibration_file_path, output_path, dms_file_path)
        nmdc.create_nmdc_metadata(gcms)

    pool.close()
    pool.join()
    

def run_gcms_metabolomics_workflow(workflow_params_file, jobs):
    import click
    click.echo('Loading Searching Settings from %s' % workflow_params_file)

    workflow_params = read_workflow_parameter(workflow_params_file)

    dirloc = Path(workflow_params.output_directory)
    dirloc.mkdir(exist_ok=True)
    output_path = Path(workflow_params.output_directory)/workflow_params.output_filename
    
    rt_ri_pairs = get_calibration_rtri_pairs(workflow_params.calibration_file_path, workflow_params.corems_json_path)   

    worker_args = [(file_path, rt_ri_pairs, workflow_params.corems_json_path, workflow_params.calibration_file_path) for file_path in workflow_params.file_paths]
    #gcms_list = pool.map(workflow_worker, worker_args)
    pool = Pool(jobs)
    
    for i, gcms in enumerate(pool.imap_unordered(workflow_worker, worker_args), 1):
        eval('gcms.to_'+ workflow_params.output_type + '(output_path)')

    pool.close()
    pool.join()
    
def read_workflow_parameter(gcms_workflow_paramaters_json_file):
    with open(gcms_workflow_paramaters_json_file, 'r') as infile:
        return WorkflowParameters(**json.load(infile))    

def get_calibration_rtri_pairs(ref_file_path, corems_paramaters_json_file):
    
    gcms_ref_obj = get_gcms(ref_file_path, corems_paramaters_json_file)
    #sql_obj = start_sql_from_file()
    #rt_ri_pairs = get_rt_ri_pairs(gcms_ref_obj,sql_obj=sql_obj)
    # !!!!!! READ !!!!! use the previous two lines if db/EMSL_lowres_gcms_test_database.sqlite does not exist
    # and comment the next line
    rt_ri_pairs = get_rt_ri_pairs(gcms_ref_obj)
    return rt_ri_pairs

def workflow_worker(args):
    
    file_path, ref_dict, corems_params, cal_file_path = args
    
    gcms = get_gcms(file_path, corems_params)
    
    gcms.calibrate_ri(ref_dict, cal_file_path)
    
    # sql_obj = start_sql_from_file()
    # lowResSearch = LowResMassSpectralMatch(gcms, sql_obj=sql_obj)
    # !!!!!! READ !!!!! use the previous two lines if db/EMSL_lowres_gcms_test_database.sqlite does not exist
    # and comment the next line
    lowResSearch = LowResMassSpectralMatch(gcms)
    lowResSearch.run()

    return gcms

def get_gcms(file_path, corems_params):
    
    reader_gcms = ReadAndiNetCDF(file_path)
        
    reader_gcms.run()
    
    gcms = reader_gcms.get_gcms_obj()

    parameter_from_json.load_and_set_parameters_gcms(gcms, parameters_path=corems_params)
    
    gcms.process_chromatogram()

    return gcms

def start_sql_from_file():
    
    from pathlib import Path
    from corems.molecular_id.input.nistMSI import ReadNistMSI

    ref_lib_path = Path("data/PNNLMetV20191015.MSL")
    if ref_lib_path.exists:
        sql_obj = ReadNistMSI(ref_lib_path).get_sqlLite_obj()
        return sql_obj


def run_gcms_mpi(workflow_params_file, replicas, rt_ri_pairs):
    
    import os, sys
    sys.path.append(os.getcwd()) 
    from mpi4py import MPI
    
    workflow_params = read_workflow_parameter(workflow_params_file)
    rt_ri_pairs = get_calibration_rtri_pairs(workflow_params.calibration_file_path, workflow_params.corems_json_path) 
    worker_args = [(file_path, rt_ri_pairs, workflow_params.corems_json_path, workflow_params.calibration_file_path) for file_path in workflow_params.file_paths]

    comm = MPI.COMM_WORLD
    rank = comm.Get_rank()
    size = comm.Get_size()
    
    # will only run tasks up to the number of files paths selected in the EnviroMS File
    if rank < len(worker_args):
        workflow_worker(worker_args[rank])

Functions

def get_calibration_rtri_pairs(ref_file_path, corems_paramaters_json_file)
Expand source code
def get_calibration_rtri_pairs(ref_file_path, corems_paramaters_json_file):
    
    gcms_ref_obj = get_gcms(ref_file_path, corems_paramaters_json_file)
    #sql_obj = start_sql_from_file()
    #rt_ri_pairs = get_rt_ri_pairs(gcms_ref_obj,sql_obj=sql_obj)
    # !!!!!! READ !!!!! use the previous two lines if db/EMSL_lowres_gcms_test_database.sqlite does not exist
    # and comment the next line
    rt_ri_pairs = get_rt_ri_pairs(gcms_ref_obj)
    return rt_ri_pairs
def get_gcms(file_path, corems_params)
Expand source code
def get_gcms(file_path, corems_params):
    
    reader_gcms = ReadAndiNetCDF(file_path)
        
    reader_gcms.run()
    
    gcms = reader_gcms.get_gcms_obj()

    parameter_from_json.load_and_set_parameters_gcms(gcms, parameters_path=corems_params)
    
    gcms.process_chromatogram()

    return gcms
def read_workflow_parameter(gcms_workflow_paramaters_json_file)
Expand source code
def read_workflow_parameter(gcms_workflow_paramaters_json_file):
    with open(gcms_workflow_paramaters_json_file, 'r') as infile:
        return WorkflowParameters(**json.load(infile))    
def run_gcms_metabolomics_workflow(workflow_params_file, jobs)
Expand source code
def run_gcms_metabolomics_workflow(workflow_params_file, jobs):
    import click
    click.echo('Loading Searching Settings from %s' % workflow_params_file)

    workflow_params = read_workflow_parameter(workflow_params_file)

    dirloc = Path(workflow_params.output_directory)
    dirloc.mkdir(exist_ok=True)
    output_path = Path(workflow_params.output_directory)/workflow_params.output_filename
    
    rt_ri_pairs = get_calibration_rtri_pairs(workflow_params.calibration_file_path, workflow_params.corems_json_path)   

    worker_args = [(file_path, rt_ri_pairs, workflow_params.corems_json_path, workflow_params.calibration_file_path) for file_path in workflow_params.file_paths]
    #gcms_list = pool.map(workflow_worker, worker_args)
    pool = Pool(jobs)
    
    for i, gcms in enumerate(pool.imap_unordered(workflow_worker, worker_args), 1):
        eval('gcms.to_'+ workflow_params.output_type + '(output_path)')

    pool.close()
    pool.join()
def run_gcms_metabolomics_workflow_wdl(file_paths, calibration_file_path, output_directory, output_filename, output_type, corems_json_path, jobs, db_path=None)
Expand source code
def run_gcms_metabolomics_workflow_wdl(file_paths, calibration_file_path, output_directory,output_filename, output_type, corems_json_path, jobs, db_path=None):
    
    import click
    workflow_params = WorkflowParameters()
    workflow_params.file_paths = file_paths.split(",")
    workflow_params.calibration_file_path = calibration_file_path
    workflow_params.output_directory = output_directory
    workflow_params.output_filename = output_filename
    workflow_params.output_type = output_type
    workflow_params.corems_json_path = corems_json_path
    
    dirloc = Path(workflow_params.output_directory)
    dirloc.mkdir(exist_ok=True)
    output_path = Path(workflow_params.output_directory)/workflow_params.output_filename
    
    rt_ri_pairs = get_calibration_rtri_pairs(workflow_params.calibration_file_path, workflow_params.corems_json_path)   

    worker_args = [(file_path, rt_ri_pairs, workflow_params.corems_json_path, workflow_params.calibration_file_path ) for file_path in workflow_params.file_paths]
    #gcms_list = pool.map(workflow_worker, worker_args)
    pool = Pool(int(jobs))
    
    for i, gcms in enumerate(pool.imap_unordered(workflow_worker, worker_args), 1):
        eval('gcms.to_'+ workflow_params.output_type + '(output_path, highest_score=False)')

    pool.close()
    pool.join()
def run_gcms_mpi(workflow_params_file, replicas, rt_ri_pairs)
Expand source code
def run_gcms_mpi(workflow_params_file, replicas, rt_ri_pairs):
    
    import os, sys
    sys.path.append(os.getcwd()) 
    from mpi4py import MPI
    
    workflow_params = read_workflow_parameter(workflow_params_file)
    rt_ri_pairs = get_calibration_rtri_pairs(workflow_params.calibration_file_path, workflow_params.corems_json_path) 
    worker_args = [(file_path, rt_ri_pairs, workflow_params.corems_json_path, workflow_params.calibration_file_path) for file_path in workflow_params.file_paths]

    comm = MPI.COMM_WORLD
    rank = comm.Get_rank()
    size = comm.Get_size()
    
    # will only run tasks up to the number of files paths selected in the EnviroMS File
    if rank < len(worker_args):
        workflow_worker(worker_args[rank])
def run_nmdc_metabolomics_workflow(workflow_params_file, jobs)
Expand source code
def run_nmdc_metabolomics_workflow(workflow_params_file, jobs):
    
    import click
    dms_file_path = 'db/GC-MS Metabolomics Experiments to Process Final.xlsx'
    
    click.echo('Loading Searching Settings from %s' % workflow_params_file)
    workflow_params = read_workflow_parameter(workflow_params_file)
    
    dirloc = Path(workflow_params.output_directory)
    dirloc.mkdir(exist_ok=True)
    
    rt_ri_pairs = get_calibration_rtri_pairs(workflow_params.calibration_file_path, workflow_params.corems_json_path)   

    worker_args = [(file_path, rt_ri_pairs, workflow_params.corems_json_path, workflow_params.calibration_file_path) for file_path in workflow_params.file_paths]
    #gcms_list = pool.map(workflow_worker, worker_args)
    pool = Pool(jobs)
    
    for i, gcms in enumerate(pool.imap_unordered(workflow_worker, worker_args), 1):
        
        in_file_path = Path(workflow_params.file_paths[i])
        output_path = Path(workflow_params.output_directory)/in_file_path.name

        eval('gcms.to_'+ workflow_params.output_type + '(output_path, write_metadata=False)')
        
        nmdc = NMDC_Metadata(in_file_path, workflow_params.calibration_file_path, output_path, dms_file_path)
        nmdc.create_nmdc_metadata(gcms)

    pool.close()
    pool.join()
def start_sql_from_file()
Expand source code
def start_sql_from_file():
    
    from pathlib import Path
    from corems.molecular_id.input.nistMSI import ReadNistMSI

    ref_lib_path = Path("data/PNNLMetV20191015.MSL")
    if ref_lib_path.exists:
        sql_obj = ReadNistMSI(ref_lib_path).get_sqlLite_obj()
        return sql_obj
def worker(args)
Expand source code
def worker(args):

    cProfile.runctx('workflow_worker(args)', globals(), locals(), 'gc-ms.prof')
def workflow_worker(args)
Expand source code
def workflow_worker(args):
    
    file_path, ref_dict, corems_params, cal_file_path = args
    
    gcms = get_gcms(file_path, corems_params)
    
    gcms.calibrate_ri(ref_dict, cal_file_path)
    
    # sql_obj = start_sql_from_file()
    # lowResSearch = LowResMassSpectralMatch(gcms, sql_obj=sql_obj)
    # !!!!!! READ !!!!! use the previous two lines if db/EMSL_lowres_gcms_test_database.sqlite does not exist
    # and comment the next line
    lowResSearch = LowResMassSpectralMatch(gcms)
    lowResSearch.run()

    return gcms

Classes

class WorkflowParameters (file_paths: tuple = ('data/...', 'data/...'), calibration_file_path: str = 'data/...', output_directory: str = 'data/...', output_filename: str = 'data/...', output_type: str = 'csv', corems_json_path: str = 'data/corems.json')

WorkflowParameters(file_paths: tuple = ('data/…', 'data/…'), calibration_file_path: str = 'data/…', output_directory: str = 'data/…', output_filename: str = 'data/…', output_type: str = 'csv', corems_json_path: str = 'data/corems.json')

Expand source code
class WorkflowParameters:
    
    file_paths: tuple = ('data/...', 'data/...')
    calibration_file_path: str = 'data/...'
    output_directory: str = 'data/...'
    output_filename: str = 'data/...'
    output_type: str = 'csv'
    corems_json_path: str = 'data/corems.json'

Class variables

var calibration_file_path : str
var corems_json_path : str
var file_paths : tuple
var output_directory : str
var output_filename : str
var output_type : str