Module enviroMS.diWorkflow

Expand source code
import cProfile
from dataclasses import dataclass, asdict
import json
from multiprocessing import Pool
import os
from pathlib import Path

import click
from tqdm import tqdm

from matplotlib import pyplot as plt
from corems.mass_spectrum.calc.Calibration import MzDomainCalibration
from corems.molecular_id.factory.classification import HeteroatomsClassification
from corems.mass_spectrum.input.massList import ReadMassList

from corems.molecular_id.search.priorityAssignment import OxygenPriorityAssignment
from corems.molecular_id.search.molecularFormulaSearch import SearchMolecularFormulas
from corems.transient.input.brukerSolarix import ReadBrukerSolarix

from corems.molecular_id.factory.MolecularLookupTable import MolecularCombinations
from corems.encapsulation.factory.processingSetting import MolecularFormulaSearchSettings
from corems.encapsulation.input.parameter_from_json import load_and_set_parameters_class

@dataclass
class DiWorkflowParameters:

    # input type: masslist, bruker_transient, thermo_reduced_profile
    
    # scans to sum for thermo raw data, reduce profile
    raw_data_start_scan: int = 1
    raw_data_final_scan: int = 7

    # input output paths
    file_paths: tuple = ('data/...', 'data/...')
    output_directory: str = 'data/...'
    output_group_name: str = '...'
    output_type: str = 'csv'

    # polarity for masslist input
    polarity: int = -1
    is_centroid:bool = False
    # corems settings
    corems_toml_path: str = 'data/CoremsFile.json'

    # calibration
    calibrate: bool = True
    calibration_ref_file_path: str = 'data/SRFA.ref'

    # plots
    plot_mz_error: bool = True
    plot_ms_assigned_unassigned: bool = True

    plot_c_dbe: bool = True
    plot_van_krevelen: bool = True
    plot_ms_classes: bool = True
    plot_mz_error_classes: bool = True

    def to_json(self):
        return json.dumps(asdict(self))

def run_thermo_reduce_profile(file_location, corems_params_path, first_scan, last_scan):

    from corems.mass_spectra.input import rawFileReader
    parser =  rawFileReader.ImportMassSpectraThermoMSFileReader(file_location)
    
    mass_spectrum = parser.get_average_mass_spectrum_in_scan_range(first_scan=1, last_scan=5)
    return mass_spectrum

def run_bruker_transient(file_location, corems_params_path):

    with ReadBrukerSolarix(file_location) as transient:

        transient.set_parameter_from_json(corems_params_path) 
        mass_spectrum = transient.get_mass_spectrum(plot_result=False, auto_process=True)
    
    return mass_spectrum

def get_masslist(file_location, corems_params_path, polarity, is_centroid):

    if is_centroid:
        reader = ReadMassList(file_location)
    else:
       reader = ReadMassList(file_location, header_lines=7, isCentroid=False, isThermoProfile=True)
    
    reader.set_parameter_from_json(parameters_path=corems_params_path)

    return(reader.get_mass_spectrum(polarity=polarity))

def run_assignment(file_location, workflow_params):

    file_path = Path(file_location)
    
    if file_path.suffix == '.raw':
    
        first_scan, last_scan = workflow_params.raw_data_start_scan, workflow_params.raw_data_final_scan    
        mass_spectrum = run_thermo_reduce_profile(file_location, workflow_params, first_scan, last_scan)

    elif file_path.suffix == '.d':

        mass_spectrum = run_bruker_transient(file_location, workflow_params.corems_toml_path)

    elif file_path.suffix == '.txt' or file_path.suffix == 'csv':
        
        mass_spectrum = get_masslist(file_location, workflow_params.corems_toml_path, 
                        polarity=workflow_params.polarity, is_centroid=workflow_params.is_centroid)


    mass_spectrum.set_parameter_from_json(workflow_params.corems_toml_path)

    if workflow_params.calibrate:

        ref_file_location = Path(workflow_params.calibration_ref_file_path)

        MzDomainCalibration(mass_spectrum, ref_file_location).run()

    # force it to one job. daemon child can not have child process 
    mass_spectrum.molecular_search_settings.db_jobs = 1

    SearchMolecularFormulas(mass_spectrum, first_hit=False).run_worker_mass_spectrum()

    print(mass_spectrum.percentile_assigned())

    return mass_spectrum

def generate_database(corems_parameters_file, jobs):

    '''corems_parameters_file: Path for CoreMS JSON Parameters file
       --jobs: Number of processes to run   
    '''
    click.echo('Loading Searching Settings from %s' % corems_parameters_file)

    molecular_search_settings = load_and_set_parameters_class('MolecularSearch', MolecularFormulaSearchSettings(), parameters_path=corems_parameters_file)
    molecular_search_settings.db_jobs = jobs
    molecular_search_settings.url_database = None
    MolecularCombinations().runworker(molecular_search_settings)

def read_workflow_parameter(di_workflow_paramaters_json_file):

    with open(di_workflow_paramaters_json_file, 'r') as infile:
        return DiWorkflowParameters(**json.load(infile))

def create_plots(mass_spectrum, workflow_params, dirloc):

    ms_by_classes = HeteroatomsClassification(mass_spectrum, choose_molecular_formula=False)

    if workflow_params.plot_ms_assigned_unassigned:
        print("Plotting assigned vs. unassigned mass spectrum")
        ax_ms = ms_by_classes.plot_ms_assigned_unassigned()
        plt.savefig(dirloc / "assigned_unassigned.png", bbox_inches='tight')
        plt.clf()

    if workflow_params.plot_mz_error:
        print("Plotting mz_error")
        ax_ms = ms_by_classes.plot_mz_error()
        plt.savefig(dirloc / "mz_error.png", bbox_inches='tight')
        plt.clf()

    if workflow_params.plot_van_krevelen:
        van_krevelen_dirloc = dirloc / "van_krevelen"
        van_krevelen_dirloc.mkdir(exist_ok=True, parents=True)  

    if workflow_params.plot_c_dbe:
        c_dbe_dirloc = dirloc / "dbe_vs_c"
        c_dbe_dirloc.mkdir(exist_ok=True, parents=True)      

    if workflow_params.plot_ms_classes:
        ms_class_dirloc = dirloc / "ms_class"
        ms_class_dirloc.mkdir(exist_ok=True, parents=True)  

    if workflow_params.plot_mz_error_classes:
        mz_error_class_dirloc = dirloc / "mz_error_class"
        mz_error_class_dirloc.mkdir(exist_ok=True, parents=True)  

    pbar = tqdm(ms_by_classes.get_classes())

    for classe in pbar:

        pbar.set_description_str(desc="Plotting results for class {}".format(classe), refresh=True)

        if workflow_params.plot_van_krevelen:
            ax_c = ms_by_classes.plot_van_krevelen(classe)
            plt.savefig(van_krevelen_dirloc / "{}.png".format(classe), bbox_inches='tight')
            plt.clf()

        if workflow_params.plot_mz_error_classes:
            ax_c = ms_by_classes.plot_mz_error_class(classe)
            plt.savefig(mz_error_class_dirloc / "{}.png".format(classe), bbox_inches='tight')
            plt.clf()

        if workflow_params.plot_ms_classes:
            ax_c = ms_by_classes.plot_ms_class(classe)
            plt.savefig(ms_class_dirloc / "{}.png".format(classe), bbox_inches='tight')
            plt.clf()

        if workflow_params.plot_c_dbe:
            ax_c = ms_by_classes.plot_dbe_vs_carbon_number(classe)
            plt.savefig(c_dbe_dirloc / "{}.png".format(classe), bbox_inches='tight')
            plt.clf()

def workflow_worker(args):

    file_location, workflow_params_json_str = args

    workflow_params = DiWorkflowParameters(**json.loads(workflow_params_json_str))

    mass_spec = run_assignment(file_location, workflow_params)

    dirloc = Path(workflow_params.output_directory) / mass_spec.sample_name

    dirloc.mkdir(exist_ok=True, parents=True)

    output_path = dirloc / mass_spec.sample_name

    eval('mass_spec.to_{OUT_TYPE}(output_path)'.format(OUT_TYPE=workflow_params.output_type))

    create_plots(mass_spec, workflow_params, dirloc)

    return 'Success' + str(os.getpid())

def cprofile_worker(file_location, workflow_params_json_str):

    cProfile.runctx('run_assignment(file_location, workflow_params)', globals(), locals(), 'di-fticr-di.prof')
    # stats = pstats.Stats("topics.prof")
    # stats.strip_dirs().sort_stats("time").print_stats() 

def run_wdl_direct_infusion_workflow(*args, **kwargs):
    
    cores = kwargs.get('jobs')
    del kwargs['jobs']
    kwargs["polarity"] = -1 if kwargs.get('polarity') == 'negative' else 1
    
    workflow_params = DiWorkflowParameters(**kwargs)
    workflow_params.file_paths = workflow_params.file_paths.split(",")
    # workflow_params.output_directory = kwargs.get['output_directory']
    # workflow_params.output_type = kwargs.get['output_type']
    # workflow_params.corems_toml_path = kwargs.get['corems_toml_path']
    # workflow_params.polarity = -1 if kwargs.get['polarity'] == 'negative' else 1
    # workflow_params.raw_data_start_scan = kwargs.get['raw_data_start_scan']
    # workflow_params.raw_data_final_scan = kwargs.get['raw_data_final_scan']
    # workflow_params.is_centroid = kwargs.get['is_centroid']
    # workflow_params.calibration_ref_file_path = kwargs.get['calibration_ref_file_path']

    # workflow_params.calibrate = kwargs.get['calibrate']
    # workflow_params.calibrate = kwargs.get['plot_mz_error']
    # workflow_params.calibrate = kwargs.get['plot_ms_assigned_unassigned']
    # workflow_params.calibrate = kwargs.get['plot_c_dbe']
    # workflow_params.calibrate = kwargs.get['plot_van_krevelen']
    # workflow_params.calibrate = kwargs.get['plot_ms_classes']
    # workflow_params.calibrate = kwargs.get['plot_mz_error_classes']
    # workflow_params.calibrate = kwargs.get['calibrate']

    dirloc = Path(workflow_params.output_directory)
    dirloc.mkdir(exist_ok=True)

    pool = Pool(cores)

    worker_args = [(file_path, workflow_params.to_json()) for file_path in workflow_params.file_paths]
    file_path = Path(worker_args[0][0])
    #for worker_arg in worker_args:
    #    workflow_worker(worker_arg)
    
    for i, results in enumerate(pool.imap_unordered(workflow_worker, worker_args), 1):
        pass
    pool.close()
    pool.join()

def run_direct_infusion_workflow(workflow_params_file, jobs, replicas):

    click.echo('Loading Searching Settings from %s' % workflow_params_file)

    workflow_params = read_workflow_parameter(workflow_params_file)

    dirloc = Path(workflow_params.output_directory)
    dirloc.mkdir(exist_ok=True)

    worker_args = replicas * [(file_path, workflow_params.to_json()) for file_path in workflow_params.file_paths]

    cores = jobs
    pool = Pool(cores)

    for worker_arg in worker_args:
        workflow_worker(worker_arg)
    # for i, results in enumerate(pool.imap_unordered(workflow_worker, worker_args), 1):

    #    pass

    pool.close()
    pool.join()

def run_di_mpi(workflow_params_file, tasks, replicas):

    import os, sys
    from mpi4py import MPI
    # from mpi4py.futures import MPIPoolExecutor
    sys.path.append(os.getcwd())

    comm = MPI.COMM_WORLD
    rank = comm.Get_rank()
    size = comm.Get_size()

    workflow_params = read_workflow_parameter(workflow_params_file)
    all_worker_args = replicas * [(file_path, workflow_params.to_json()) for file_path in workflow_params.file_paths]

    # worker_args = comm.scatter(all_worker_args, root=0)

    # will only run tasks up to the number of files paths selected in the EnviroMS File
    if len(all_worker_args) <= size:

        workflow_worker(all_worker_args[0])

    else:

        print("Tasks needs to be the same size of the input data count, , until you find time to come and help to code this section :D")

Functions

def cprofile_worker(file_location, workflow_params_json_str)
Expand source code
def cprofile_worker(file_location, workflow_params_json_str):

    cProfile.runctx('run_assignment(file_location, workflow_params)', globals(), locals(), 'di-fticr-di.prof')
def create_plots(mass_spectrum, workflow_params, dirloc)
Expand source code
def create_plots(mass_spectrum, workflow_params, dirloc):

    ms_by_classes = HeteroatomsClassification(mass_spectrum, choose_molecular_formula=False)

    if workflow_params.plot_ms_assigned_unassigned:
        print("Plotting assigned vs. unassigned mass spectrum")
        ax_ms = ms_by_classes.plot_ms_assigned_unassigned()
        plt.savefig(dirloc / "assigned_unassigned.png", bbox_inches='tight')
        plt.clf()

    if workflow_params.plot_mz_error:
        print("Plotting mz_error")
        ax_ms = ms_by_classes.plot_mz_error()
        plt.savefig(dirloc / "mz_error.png", bbox_inches='tight')
        plt.clf()

    if workflow_params.plot_van_krevelen:
        van_krevelen_dirloc = dirloc / "van_krevelen"
        van_krevelen_dirloc.mkdir(exist_ok=True, parents=True)  

    if workflow_params.plot_c_dbe:
        c_dbe_dirloc = dirloc / "dbe_vs_c"
        c_dbe_dirloc.mkdir(exist_ok=True, parents=True)      

    if workflow_params.plot_ms_classes:
        ms_class_dirloc = dirloc / "ms_class"
        ms_class_dirloc.mkdir(exist_ok=True, parents=True)  

    if workflow_params.plot_mz_error_classes:
        mz_error_class_dirloc = dirloc / "mz_error_class"
        mz_error_class_dirloc.mkdir(exist_ok=True, parents=True)  

    pbar = tqdm(ms_by_classes.get_classes())

    for classe in pbar:

        pbar.set_description_str(desc="Plotting results for class {}".format(classe), refresh=True)

        if workflow_params.plot_van_krevelen:
            ax_c = ms_by_classes.plot_van_krevelen(classe)
            plt.savefig(van_krevelen_dirloc / "{}.png".format(classe), bbox_inches='tight')
            plt.clf()

        if workflow_params.plot_mz_error_classes:
            ax_c = ms_by_classes.plot_mz_error_class(classe)
            plt.savefig(mz_error_class_dirloc / "{}.png".format(classe), bbox_inches='tight')
            plt.clf()

        if workflow_params.plot_ms_classes:
            ax_c = ms_by_classes.plot_ms_class(classe)
            plt.savefig(ms_class_dirloc / "{}.png".format(classe), bbox_inches='tight')
            plt.clf()

        if workflow_params.plot_c_dbe:
            ax_c = ms_by_classes.plot_dbe_vs_carbon_number(classe)
            plt.savefig(c_dbe_dirloc / "{}.png".format(classe), bbox_inches='tight')
            plt.clf()
def generate_database(corems_parameters_file, jobs)

corems_parameters_file: Path for CoreMS JSON Parameters file –jobs: Number of processes to run

Expand source code
def generate_database(corems_parameters_file, jobs):

    '''corems_parameters_file: Path for CoreMS JSON Parameters file
       --jobs: Number of processes to run   
    '''
    click.echo('Loading Searching Settings from %s' % corems_parameters_file)

    molecular_search_settings = load_and_set_parameters_class('MolecularSearch', MolecularFormulaSearchSettings(), parameters_path=corems_parameters_file)
    molecular_search_settings.db_jobs = jobs
    molecular_search_settings.url_database = None
    MolecularCombinations().runworker(molecular_search_settings)
def get_masslist(file_location, corems_params_path, polarity, is_centroid)
Expand source code
def get_masslist(file_location, corems_params_path, polarity, is_centroid):

    if is_centroid:
        reader = ReadMassList(file_location)
    else:
       reader = ReadMassList(file_location, header_lines=7, isCentroid=False, isThermoProfile=True)
    
    reader.set_parameter_from_json(parameters_path=corems_params_path)

    return(reader.get_mass_spectrum(polarity=polarity))
def read_workflow_parameter(di_workflow_paramaters_json_file)
Expand source code
def read_workflow_parameter(di_workflow_paramaters_json_file):

    with open(di_workflow_paramaters_json_file, 'r') as infile:
        return DiWorkflowParameters(**json.load(infile))
def run_assignment(file_location, workflow_params)
Expand source code
def run_assignment(file_location, workflow_params):

    file_path = Path(file_location)
    
    if file_path.suffix == '.raw':
    
        first_scan, last_scan = workflow_params.raw_data_start_scan, workflow_params.raw_data_final_scan    
        mass_spectrum = run_thermo_reduce_profile(file_location, workflow_params, first_scan, last_scan)

    elif file_path.suffix == '.d':

        mass_spectrum = run_bruker_transient(file_location, workflow_params.corems_toml_path)

    elif file_path.suffix == '.txt' or file_path.suffix == 'csv':
        
        mass_spectrum = get_masslist(file_location, workflow_params.corems_toml_path, 
                        polarity=workflow_params.polarity, is_centroid=workflow_params.is_centroid)


    mass_spectrum.set_parameter_from_json(workflow_params.corems_toml_path)

    if workflow_params.calibrate:

        ref_file_location = Path(workflow_params.calibration_ref_file_path)

        MzDomainCalibration(mass_spectrum, ref_file_location).run()

    # force it to one job. daemon child can not have child process 
    mass_spectrum.molecular_search_settings.db_jobs = 1

    SearchMolecularFormulas(mass_spectrum, first_hit=False).run_worker_mass_spectrum()

    print(mass_spectrum.percentile_assigned())

    return mass_spectrum
def run_bruker_transient(file_location, corems_params_path)
Expand source code
def run_bruker_transient(file_location, corems_params_path):

    with ReadBrukerSolarix(file_location) as transient:

        transient.set_parameter_from_json(corems_params_path) 
        mass_spectrum = transient.get_mass_spectrum(plot_result=False, auto_process=True)
    
    return mass_spectrum
def run_di_mpi(workflow_params_file, tasks, replicas)
Expand source code
def run_di_mpi(workflow_params_file, tasks, replicas):

    import os, sys
    from mpi4py import MPI
    # from mpi4py.futures import MPIPoolExecutor
    sys.path.append(os.getcwd())

    comm = MPI.COMM_WORLD
    rank = comm.Get_rank()
    size = comm.Get_size()

    workflow_params = read_workflow_parameter(workflow_params_file)
    all_worker_args = replicas * [(file_path, workflow_params.to_json()) for file_path in workflow_params.file_paths]

    # worker_args = comm.scatter(all_worker_args, root=0)

    # will only run tasks up to the number of files paths selected in the EnviroMS File
    if len(all_worker_args) <= size:

        workflow_worker(all_worker_args[0])

    else:

        print("Tasks needs to be the same size of the input data count, , until you find time to come and help to code this section :D")
def run_direct_infusion_workflow(workflow_params_file, jobs, replicas)
Expand source code
def run_direct_infusion_workflow(workflow_params_file, jobs, replicas):

    click.echo('Loading Searching Settings from %s' % workflow_params_file)

    workflow_params = read_workflow_parameter(workflow_params_file)

    dirloc = Path(workflow_params.output_directory)
    dirloc.mkdir(exist_ok=True)

    worker_args = replicas * [(file_path, workflow_params.to_json()) for file_path in workflow_params.file_paths]

    cores = jobs
    pool = Pool(cores)

    for worker_arg in worker_args:
        workflow_worker(worker_arg)
    # for i, results in enumerate(pool.imap_unordered(workflow_worker, worker_args), 1):

    #    pass

    pool.close()
    pool.join()
def run_thermo_reduce_profile(file_location, corems_params_path, first_scan, last_scan)
Expand source code
def run_thermo_reduce_profile(file_location, corems_params_path, first_scan, last_scan):

    from corems.mass_spectra.input import rawFileReader
    parser =  rawFileReader.ImportMassSpectraThermoMSFileReader(file_location)
    
    mass_spectrum = parser.get_average_mass_spectrum_in_scan_range(first_scan=1, last_scan=5)
    return mass_spectrum
def run_wdl_direct_infusion_workflow(*args, **kwargs)
Expand source code
def run_wdl_direct_infusion_workflow(*args, **kwargs):
    
    cores = kwargs.get('jobs')
    del kwargs['jobs']
    kwargs["polarity"] = -1 if kwargs.get('polarity') == 'negative' else 1
    
    workflow_params = DiWorkflowParameters(**kwargs)
    workflow_params.file_paths = workflow_params.file_paths.split(",")
    # workflow_params.output_directory = kwargs.get['output_directory']
    # workflow_params.output_type = kwargs.get['output_type']
    # workflow_params.corems_toml_path = kwargs.get['corems_toml_path']
    # workflow_params.polarity = -1 if kwargs.get['polarity'] == 'negative' else 1
    # workflow_params.raw_data_start_scan = kwargs.get['raw_data_start_scan']
    # workflow_params.raw_data_final_scan = kwargs.get['raw_data_final_scan']
    # workflow_params.is_centroid = kwargs.get['is_centroid']
    # workflow_params.calibration_ref_file_path = kwargs.get['calibration_ref_file_path']

    # workflow_params.calibrate = kwargs.get['calibrate']
    # workflow_params.calibrate = kwargs.get['plot_mz_error']
    # workflow_params.calibrate = kwargs.get['plot_ms_assigned_unassigned']
    # workflow_params.calibrate = kwargs.get['plot_c_dbe']
    # workflow_params.calibrate = kwargs.get['plot_van_krevelen']
    # workflow_params.calibrate = kwargs.get['plot_ms_classes']
    # workflow_params.calibrate = kwargs.get['plot_mz_error_classes']
    # workflow_params.calibrate = kwargs.get['calibrate']

    dirloc = Path(workflow_params.output_directory)
    dirloc.mkdir(exist_ok=True)

    pool = Pool(cores)

    worker_args = [(file_path, workflow_params.to_json()) for file_path in workflow_params.file_paths]
    file_path = Path(worker_args[0][0])
    #for worker_arg in worker_args:
    #    workflow_worker(worker_arg)
    
    for i, results in enumerate(pool.imap_unordered(workflow_worker, worker_args), 1):
        pass
    pool.close()
    pool.join()
def workflow_worker(args)
Expand source code
def workflow_worker(args):

    file_location, workflow_params_json_str = args

    workflow_params = DiWorkflowParameters(**json.loads(workflow_params_json_str))

    mass_spec = run_assignment(file_location, workflow_params)

    dirloc = Path(workflow_params.output_directory) / mass_spec.sample_name

    dirloc.mkdir(exist_ok=True, parents=True)

    output_path = dirloc / mass_spec.sample_name

    eval('mass_spec.to_{OUT_TYPE}(output_path)'.format(OUT_TYPE=workflow_params.output_type))

    create_plots(mass_spec, workflow_params, dirloc)

    return 'Success' + str(os.getpid())

Classes

class DiWorkflowParameters (raw_data_start_scan: int = 1, raw_data_final_scan: int = 7, file_paths: tuple = ('data/...', 'data/...'), output_directory: str = 'data/...', output_group_name: str = '...', output_type: str = 'csv', polarity: int = -1, is_centroid: bool = False, corems_toml_path: str = 'data/CoremsFile.json', calibrate: bool = True, calibration_ref_file_path: str = 'data/SRFA.ref', plot_mz_error: bool = True, plot_ms_assigned_unassigned: bool = True, plot_c_dbe: bool = True, plot_van_krevelen: bool = True, plot_ms_classes: bool = True, plot_mz_error_classes: bool = True)

DiWorkflowParameters(raw_data_start_scan: int = 1, raw_data_final_scan: int = 7, file_paths: tuple = ('data/…', 'data/…'), output_directory: str = 'data/…', output_group_name: str = '…', output_type: str = 'csv', polarity: int = -1, is_centroid: bool = False, corems_toml_path: str = 'data/CoremsFile.json', calibrate: bool = True, calibration_ref_file_path: str = 'data/SRFA.ref', plot_mz_error: bool = True, plot_ms_assigned_unassigned: bool = True, plot_c_dbe: bool = True, plot_van_krevelen: bool = True, plot_ms_classes: bool = True, plot_mz_error_classes: bool = True)

Expand source code
class DiWorkflowParameters:

    # input type: masslist, bruker_transient, thermo_reduced_profile
    
    # scans to sum for thermo raw data, reduce profile
    raw_data_start_scan: int = 1
    raw_data_final_scan: int = 7

    # input output paths
    file_paths: tuple = ('data/...', 'data/...')
    output_directory: str = 'data/...'
    output_group_name: str = '...'
    output_type: str = 'csv'

    # polarity for masslist input
    polarity: int = -1
    is_centroid:bool = False
    # corems settings
    corems_toml_path: str = 'data/CoremsFile.json'

    # calibration
    calibrate: bool = True
    calibration_ref_file_path: str = 'data/SRFA.ref'

    # plots
    plot_mz_error: bool = True
    plot_ms_assigned_unassigned: bool = True

    plot_c_dbe: bool = True
    plot_van_krevelen: bool = True
    plot_ms_classes: bool = True
    plot_mz_error_classes: bool = True

    def to_json(self):
        return json.dumps(asdict(self))

Class variables

var calibrate : bool
var calibration_ref_file_path : str
var corems_toml_path : str
var file_paths : tuple
var is_centroid : bool
var output_directory : str
var output_group_name : str
var output_type : str
var plot_c_dbe : bool
var plot_ms_assigned_unassigned : bool
var plot_ms_classes : bool
var plot_mz_error : bool
var plot_mz_error_classes : bool
var plot_van_krevelen : bool
var polarity : int
var raw_data_final_scan : int
var raw_data_start_scan : int

Methods

def to_json(self)
Expand source code
def to_json(self):
    return json.dumps(asdict(self))