corems.molecular_id.factory.MolecularLookupTable

  1__author__ = "Yuri E. Corilo"
  2__date__ = "Jul 02, 2019"
  3
  4import contextlib
  5import cProfile
  6import io
  7import itertools
  8import json
  9import multiprocessing
 10import pstats
 11from copy import deepcopy
 12from typing import Dict
 13
 14from sqlalchemy import create_engine, func
 15from sqlalchemy.orm import sessionmaker
 16from tqdm import tqdm
 17
 18from corems import chunks, timeit
 19from corems.encapsulation.constant import Atoms
 20from corems.encapsulation.factory.parameters import MSParameters
 21from corems.encapsulation.factory.processingSetting import MolecularLookupDictSettings
 22from corems.molecular_id.factory.molecularSQL import (
 23    CarbonHydrogen,
 24    HeteroAtoms,
 25    MolecularFormulaLink,
 26    MolForm_SQL,
 27)
 28
 29
 30@contextlib.contextmanager
 31def profiled():
 32    """A context manager for profiling."""
 33    pr = cProfile.Profile()
 34    pr.enable()
 35    yield
 36    pr.disable()
 37    s = io.StringIO()
 38    ps = pstats.Stats(pr, stream=s).sort_stats("cumulative")
 39    ps.print_stats()
 40    # uncomment this to see who's calling what
 41    # ps.print_callers()
 42    print(s.getvalue())
 43
 44
 45def insert_database_worker(args):
 46    """Inserts data into the database."""
 47    results, url = args
 48
 49    if not url:
 50        url = "sqlite:///db/molformulas.sqlite"
 51
 52    if url[0:6] == "sqlite":
 53        engine = create_engine(url, echo=False)
 54    else:
 55        engine = create_engine(url, echo=False, isolation_level="AUTOCOMMIT")
 56
 57    session_factory = sessionmaker(bind=engine)
 58    session = session_factory()
 59    insert_query = MolecularFormulaLink.__table__.insert().values(results)
 60    session.execute(insert_query)
 61    session.commit()
 62    session.close()
 63    engine.dispose()
 64
 65
 66class MolecularCombinations:
 67    """A class for generating molecular formula combinations.
 68
 69    Parameters
 70    ----------
 71    molecular_search_settings : object
 72        An object containing user-defined settings.
 73
 74    Attributes
 75    ----------
 76    sql_db : MolForm_SQL
 77        The SQLite database object.
 78    len_existing_classes : int
 79        The number of existing classes in the SQLite database.
 80    odd_ch_id : list
 81        A list of odd carbon and hydrogen atom IDs.
 82    odd_ch_dict : list
 83        A list of odd carbon and hydrogen atom dictionaries.
 84    odd_ch_mass : list
 85        A list of odd carbon and hydrogen atom masses.
 86    odd_ch_dbe : list
 87        A list of odd carbon and hydrogen atom double bond equivalents.
 88    even_ch_id : list
 89        A list of even carbon and hydrogen atom IDs.
 90    even_ch_dict : list
 91        A list of even carbon and hydrogen atom dictionaries.
 92    even_ch_mass : list
 93        A list of even carbon and hydrogen atom masses.
 94    even_ch_dbe : list
 95        A list of even carbon and hydrogen atom double bond equivalents.
 96
 97    Methods
 98    -------
 99    * cProfile_worker(args)
100        A cProfile worker for the get_mol_formulas function.
101    * check_database_get_class_list(molecular_search_settings)
102        Checks if the database has all the classes, if not create the missing classes.
103    * get_carbonsHydrogens(settings, odd_even)
104        Retrieves carbon and hydrogen atoms from the molecular lookup table based on user-defined settings.
105    * add_carbonsHydrogens(settings, existing_classes_objs)
106        Adds carbon and hydrogen atoms to the molecular lookup table based on user-defined settings.
107    * runworker(molecular_search_settings)
108        Runs the molecular formula lookup table worker.
109    * get_classes_in_order(molecular_search_settings)
110        Gets the classes in order.
111    * sort_classes(atoms_in_order, combination_dict)
112        Sorts the classes in order.
113    * get_fixed_initial_number_of_hydrogen(min_h, odd_even)
114        Gets the fixed initial number of hydrogen atoms.
115    * calc_mz(datadict, class_mass=0)
116        Calculates the mass-to-charge ratio (m/z) of a molecular formula.
117    * calc_dbe_class(datadict)
118        Calculates the double bond equivalent (DBE) of a molecular formula.
119    * populate_combinations(classe_tuple, settings)
120        Populates the combinations.
121    * get_or_add(SomeClass, kw)
122        Gets or adds a class.
123    * get_mol_formulas(odd_even_tag, classe_tuple, settings)
124        Gets the molecular formulas.
125    * get_h_odd_or_even(class_dict)
126        Gets the hydrogen odd or even.
127    * get_total_halogen_atoms(class_dict)
128        Gets the total number of halogen atoms.
129    * get_total_hetero_valence(class_dict)
130        Gets the total valence of heteroatoms other than N, F, Cl, and Br
131    """
132
133    def __init__(self, sql_db=None):
134        if not sql_db:
135            self.sql_db = MolForm_SQL()
136        else:
137            self.sql_db = sql_db
138
139    def cProfile_worker(self, args):
140        """cProfile worker for the get_mol_formulas function"""
141        cProfile.runctx(
142            "self.get_mol_formulas(*args)",
143            globals(),
144            locals(),
145            "mf_database_cprofile.prof",
146        )
147
148    def check_database_get_class_list(self, molecular_search_settings):
149        """check if the database has all the classes, if not create the missing classes
150
151        Parameters
152        ----------
153        molecular_search_settings : object
154            An object containing user-defined settings.
155
156        Returns
157        -------
158        list
159            list of tuples with the class name and the class dictionary
160        """
161        all_class_to_create = []
162
163        classes_dict = self.get_classes_in_order(molecular_search_settings)
164
165        class_str_set = set(classes_dict.keys())
166
167        existing_classes_objs = self.sql_db.session.query(HeteroAtoms).distinct().all()
168
169        existing_classes_str = set([classe.name for classe in existing_classes_objs])
170
171        self.len_existing_classes = len(existing_classes_str)
172
173        class_to_create = class_str_set - existing_classes_str
174
175        class_count = len(existing_classes_objs)
176
177        data_classes = list()
178        for index, class_str in enumerate(class_to_create):
179            class_dict = classes_dict.get(class_str)
180            halogen_count = self.get_total_halogen_atoms(class_dict)
181            data_classes.append(
182                {
183                    "name": class_str,
184                    "id": class_count + index + 1,
185                    "halogensCount": halogen_count,
186                }
187            )
188
189        # data_classes = [{"name":class_str, "id":class_count+ index + 1} for index, class_str in enumerate(class_to_create)]
190
191        if data_classes:
192            list_insert_chunks = chunks(data_classes, self.sql_db.chunks_count)
193            for insert_chunk in list_insert_chunks:
194                insert_query = HeteroAtoms.__table__.insert().values(insert_chunk)
195                self.sql_db.session.execute(insert_query)
196
197        for index, class_str in enumerate(class_to_create):
198            class_tuple = (
199                class_str,
200                classes_dict.get(class_str),
201                class_count + index + 1,
202            )
203
204            all_class_to_create.append(class_tuple)
205
206        return (
207            [(c_s, c_d) for c_s, c_d in classes_dict.items()],
208            all_class_to_create,
209            existing_classes_objs,
210        )
211
212    def get_carbonsHydrogens(self, settings, odd_even):
213        """Retrieve carbon and hydrogen atoms from the molecular lookup table based on user-defined settings.
214
215        Parameters
216        ----------
217        settings : object
218             An object containing user-defined settings.
219        odd_even : str
220            A string indicating whether to retrieve even or odd hydrogen atoms.
221
222        Returns
223        -------
224        list
225            A list of CarbonHydrogen objects that satisfy the specified conditions.
226        """
227        operator = "==" if odd_even == "even" else "!="
228        usedAtoms = settings.usedAtoms
229        user_min_c, user_max_c = usedAtoms.get("C")
230        user_min_h, user_max_h = usedAtoms.get("H")
231
232        return eval(
233            "self.sql_db.session.query(CarbonHydrogen).filter("
234            "CarbonHydrogen.C >= user_min_c,"
235            "CarbonHydrogen.H >= user_min_h,"
236            "CarbonHydrogen.C <= user_max_c,"
237            "CarbonHydrogen.H <= user_max_h,"
238            "CarbonHydrogen.H % 2" + operator + "0).all()"
239        )
240
241    def add_carbonsHydrogens(self, settings, existing_classes_objs):
242        """Add carbon and hydrogen atoms to the molecular lookup table based on user-defined settings.
243
244        Parameters
245        ----------
246        settings : object
247            An object containing user-defined settings.
248        existing_classes_objs : list
249            A list of HeteroAtoms objects.
250        """
251        usedAtoms = settings.usedAtoms
252
253        user_min_c, user_max_c = usedAtoms.get("C")
254        user_min_h, user_max_h = usedAtoms.get("H")
255
256        query_obj = self.sql_db.session.query(
257            func.max(CarbonHydrogen.C).label("max_c"),
258            func.min(CarbonHydrogen.C).label("min_c"),
259            func.max(CarbonHydrogen.H).label("max_h"),
260            func.min(CarbonHydrogen.H).label("min_h"),
261        )
262
263        database = query_obj.first()
264        if (
265            database.max_c == user_max_c
266            and database.min_c == user_min_c
267            and database.max_h == user_max_h
268            and database.min_h == user_min_h
269        ):
270            # all data is already available at the database
271            pass
272
273        else:
274            current_count = self.sql_db.session.query(CarbonHydrogen.C).count()
275
276            databaseCarbonHydrogen = self.sql_db.session.query(CarbonHydrogen).all()
277
278            userCarbon = set(range(user_min_c, user_max_c + 1))
279            userHydrogen = set(range(user_min_h, user_max_h + 1))
280
281            carbon_hydrogen_objs_database = {}
282            for obj in databaseCarbonHydrogen:
283                str_data = "C:{},H:{}".format(obj.C, obj.H)
284                carbon_hydrogen_objs_database[str_data] = str_data
285
286            carbon_hydrogen_objs_to_create = {"even": {}, "odd": {}}
287
288            list_ch_obj_to_add = list()
289            i = 0
290            for comb in itertools.product(userCarbon, userHydrogen):
291                C = comb[0]
292                H = comb[1]
293                data = {
294                    "C": C,
295                    "H": H,
296                }
297
298                data_insert = {
299                    "C": C,
300                    "H": H,
301                }
302
303                str_data = "C:{},H:{}".format(C, H)
304
305                if not str_data in carbon_hydrogen_objs_database.keys():
306                    label = "even" if comb[1] % 2 == 0 else "odd"
307                    data["mass"] = (C * Atoms.atomic_masses.get("C")) + (
308                        H * Atoms.atomic_masses.get("H")
309                    )
310                    data["dbe"] = C - (H / 2) + 1
311                    data["id"] = i + current_count + 1
312                    data_insert["id"] = i + current_count + 1
313                    i = i + 1
314                    carbon_hydrogen_objs_to_create[label][str_data] = data
315
316                    list_ch_obj_to_add.append(data_insert)
317
318            if list_ch_obj_to_add:
319                # insert carbon hydrogen objs
320                list_insert_chunks = chunks(
321                    list_ch_obj_to_add, self.sql_db.chunks_count
322                )
323                for insert_chunk in list_insert_chunks:
324                    insert_query = CarbonHydrogen.__table__.insert().values(
325                        insert_chunk
326                    )
327                    self.sql_db.session.execute(insert_query)
328                self.sql_db.session.commit()
329
330                list_molecular_form = list()
331                for classe_obj in existing_classes_objs:
332                    classe_dict = classe_obj.to_dict()
333                    classe_mass = self.calc_mz(classe_dict)
334                    classe_dbe = self.calc_dbe_class(classe_dict)
335
336                    odd_even_label = self.get_h_odd_or_even(classe_dict)
337
338                    ch_datalist = carbon_hydrogen_objs_to_create.get(
339                        odd_even_label
340                    ).values()
341
342                    for ch_dict in ch_datalist:
343                        mass = ch_dict.get("mass") + classe_mass
344                        dbe = ch_dict.get("dbe") + classe_dbe
345
346                        if settings.min_mz <= mass <= settings.max_mz:
347                            if settings.min_dbe <= dbe <= settings.max_dbe:
348                                list_molecular_form.append(
349                                    {
350                                        "heteroAtoms_id": classe_obj.id,
351                                        "carbonHydrogen_id": ch_dict.get("id"),
352                                        "mass": mass,
353                                        "DBE": dbe,
354                                    }
355                                )
356
357                list_insert_chunks = chunks(
358                    list_molecular_form, self.sql_db.chunks_count
359                )
360                for insert_chunk in list_insert_chunks:
361                    insert_query = MolecularFormulaLink.__table__.insert().values(
362                        insert_chunk
363                    )
364                    self.sql_db.session.execute(insert_query)
365                self.sql_db.session.commit()
366
367    @timeit(print_time=True)
368    def runworker(self, molecular_search_settings, **kwargs):
369        """Run the molecular formula lookup table worker.
370
371        Parameters
372        ----------
373        molecular_search_settings : object
374            An object containing user-defined settings.
375        kwargs : dict
376            A dictionary of keyword arguments.
377            Most notably, the print_time argument which is passed to the timeit decorator.
378
379        Returns
380        -------
381        list
382            A list of tuples with the class name and the class dictionary.
383
384
385        """
386        verbose = molecular_search_settings.verbose_processing
387
388        classes_list, class_to_create, existing_classes_objs = (
389            self.check_database_get_class_list(molecular_search_settings)
390        )
391
392        settings = MolecularLookupDictSettings()
393        settings.usedAtoms = deepcopy(molecular_search_settings.usedAtoms)
394        settings.url_database = molecular_search_settings.url_database
395        settings.db_jobs = molecular_search_settings.db_jobs
396
397        self.add_carbonsHydrogens(settings, existing_classes_objs)
398
399        if class_to_create:
400            settings = MolecularLookupDictSettings()
401            settings.usedAtoms = deepcopy(molecular_search_settings.usedAtoms)
402            settings.url_database = molecular_search_settings.url_database
403            settings.db_jobs = molecular_search_settings.db_jobs
404
405            self.sql_db.session.commit()
406            odd_ch_obj = self.get_carbonsHydrogens(settings, "odd")
407            self.odd_ch_id = [obj.id for obj in odd_ch_obj]
408            self.odd_ch_dict = [{"C": obj.C, "H": obj.H} for obj in odd_ch_obj]
409            self.odd_ch_mass = [obj.mass for obj in odd_ch_obj]
410            self.odd_ch_dbe = [obj.dbe for obj in odd_ch_obj]
411
412            even_ch_obj = self.get_carbonsHydrogens(settings, "even")
413            self.even_ch_id = [obj.id for obj in even_ch_obj]
414            self.even_ch_dict = [{"C": obj.C, "H": obj.H} for obj in even_ch_obj]
415            self.even_ch_mass = [obj.mass for obj in even_ch_obj]
416            self.even_ch_dbe = [obj.dbe for obj in even_ch_obj]
417
418            all_results = list()
419            for class_tuple in tqdm(class_to_create, disable = not verbose):
420                results = self.populate_combinations(class_tuple, settings)
421                all_results.extend(results)
422                if settings.db_jobs == 1:
423                    # if len(all_results) >= self.sql_db.chunks_count:
424                    list_insert_chunks = list(chunks(results, self.sql_db.chunks_count))
425                    for chunk in list_insert_chunks:
426                        insert_query = MolecularFormulaLink.__table__.insert().values(
427                            chunk
428                        )
429                        self.sql_db.session.execute(insert_query)
430                    # all_results = list()
431            self.sql_db.session.commit()
432            # each chunk takes ~600Mb of memory, so if using 8 processes the total free memory needs to be 5GB
433            if settings.db_jobs > 1:
434                list_insert_chunks = list(chunks(all_results, self.sql_db.chunks_count))
435                if verbose:
436                    print(
437                        "Started database insert using {} iterations for a total of {} rows".format(
438                            len(list_insert_chunks), len(all_results)
439                        )
440                    )
441                worker_args = [
442                    (chunk, settings.url_database) for chunk in list_insert_chunks
443                ]
444                p = multiprocessing.Pool(settings.db_jobs)
445                for class_list in tqdm(
446                        p.imap_unordered(insert_database_worker, worker_args), disable= not verbose
447                        ):
448                    pass
449                p.close()
450                p.join()
451
452        return classes_list
453
454    def get_classes_in_order(self, molecular_search_settings):
455        """Get the classes in order
456
457        Parameters
458        ----------
459        molecular_search_settings : object
460            An object containing user-defined settings.
461
462        Returns
463        -------
464        dict
465            A dictionary of classes in order.
466            structure is  ('HC', {'HC': 1})
467        """
468
469        usedAtoms = deepcopy(molecular_search_settings.usedAtoms)
470
471        usedAtoms.pop("C")
472        usedAtoms.pop("H")
473
474        min_n, max_n = usedAtoms.get("N") if usedAtoms.get("N") else (0, 0)
475        min_o, max_o = usedAtoms.get("O") if usedAtoms.get("O") else (0, 0)
476        min_s, max_s = usedAtoms.get("S") if usedAtoms.get("S") else (0, 0)
477        min_p, max_p = usedAtoms.get("P") if usedAtoms.get("P") else (0, 0)
478
479        possible_n = [n for n in range(min_n, max_n + 1)]
480        possible_o = [o for o in range(min_o, max_o + 1)]
481        possible_s = [s for s in range(min_s, max_s + 1)]
482        possible_p = [p for p in range(min_p, max_p + 1)]
483
484        atoms_in_order = ["N", "O", "S", "P"]
485
486        classe_in_order = {}
487
488        all_atoms_tuples = itertools.product(
489            possible_n, possible_o, possible_s, possible_p
490        )
491
492        for atom in atoms_in_order:
493            usedAtoms.pop(atom, None)
494
495        for selected_atom, min_max_tuple in usedAtoms.items():
496            min_x = min_max_tuple[0]
497            max_x = min_max_tuple[1]
498
499            possible_x = [x for x in range(min_x, max_x + 1)]
500
501            all_atoms_tuples = itertools.product(all_atoms_tuples, possible_x)
502            all_atoms_tuples = [
503                all_atoms_combined[0] + (all_atoms_combined[1],)
504                for all_atoms_combined in all_atoms_tuples
505            ]
506            atoms_in_order.append(selected_atom)
507
508        for all_atoms_tuple in all_atoms_tuples:
509            classe_str = ""
510            classe_dict = {}
511
512            for each_atoms_index, atom_number in enumerate(all_atoms_tuple):
513                if atom_number != 0:
514                    classe_dict[atoms_in_order[each_atoms_index]] = atom_number
515
516            if not classe_dict:
517                classe_in_order["HC"] = {"HC": ""}
518                continue
519
520            classe_str = json.dumps(classe_dict)
521
522            if len(classe_str) > 0:
523                classe_in_order[classe_str] = classe_dict
524
525        classe_in_order_dict = self.sort_classes(atoms_in_order, classe_in_order)
526
527        return classe_in_order_dict
528
529    @staticmethod
530    def sort_classes(atoms_in_order, combination_dict) -> Dict[str, Dict[str, int]]:
531        """Sort the classes in order
532
533        Parameters
534        ----------
535        atoms_in_order : list
536            A list of atoms in order.
537        combination_dict : dict
538            A dictionary of classes.
539
540        Returns
541        -------
542        dict
543            A dictionary of classes in order.
544        """
545        # ensures atoms are always in the order defined at atoms_in_order list
546        join_dict_classes = dict()
547        atoms_in_order = ["N", "S", "P", "O"] + atoms_in_order[4:] + ["HC"]
548
549        sort_method = lambda atoms_keys: [atoms_in_order.index(atoms_keys)]
550        for class_str, class_dict in combination_dict.items():
551            sorted_dict_keys = sorted(class_dict, key=sort_method)
552            class_dict = {atom: class_dict[atom] for atom in sorted_dict_keys}
553            class_str = json.dumps(class_dict)
554            # using json for the new database, class
555            # class_str = ' '.join([atom + str(class_dict[atom]) for atom in sorted_dict_keys])
556            join_dict_classes[class_str] = class_dict
557
558        return join_dict_classes
559
560    @staticmethod
561    def get_fixed_initial_number_of_hydrogen(min_h, odd_even):
562        """Get the fixed initial number of hydrogen atoms
563
564        Parameters
565        ----------
566        min_h : int
567            The minimum number of hydrogen atoms.
568        odd_even : str
569            A string indicating whether to retrieve even or odd hydrogen atoms.
570        """
571        remaining_h = min_h % 2
572
573        if odd_even == "even":
574            if remaining_h == 0:
575                return remaining_h
576
577            else:
578                return remaining_h + 1
579
580        else:
581            if remaining_h == 0:
582                return remaining_h + 1
583
584            else:
585                return remaining_h
586
587    def calc_mz(self, datadict, class_mass=0):
588        """Calculate the mass-to-charge ratio (m/z) of a molecular formula.
589
590        Parameters
591        ----------
592        datadict : dict
593            A dictionary of classes.
594        class_mass : int
595            The mass of the class.
596
597        Returns
598        -------
599        float
600            The mass-to-charge ratio (m/z) of a molecular formula.
601        """
602        mass = class_mass
603
604        for atom in datadict.keys():
605            if atom != "HC":
606                mass = mass + Atoms.atomic_masses[atom] * datadict.get(atom)
607
608        return mass
609
610    def calc_dbe_class(self, datadict):
611        """Calculate the double bond equivalent (DBE) of a molecular formula.
612
613        Parameters
614        ----------
615        datadict : dict
616            A dictionary of classes.
617
618        Returns
619        -------
620        float
621            The double bond equivalent (DBE) of a molecular formula.
622        """
623        init_dbe = 0
624        for atom in datadict.keys():
625            if atom == "HC":
626                continue
627
628            n_atom = int(datadict.get(atom))
629
630            clean_atom = "".join([i for i in atom if not i.isdigit()])
631
632            valencia = MSParameters.molecular_search.used_atom_valences.get(clean_atom)
633
634            if type(valencia) is tuple:
635                valencia = valencia[0]
636            if valencia > 0:
637                # print atom, valencia, n_atom, init_dbe
638                init_dbe = init_dbe + (n_atom * (valencia - 2))
639            else:
640                continue
641
642        return 0.5 * init_dbe
643
644    def populate_combinations(self, classe_tuple, settings):
645        """Populate the combinations
646
647        Parameters
648        ----------
649        classe_tuple : tuple
650            A tuple containing the class name, the class dictionary, and the class ID.
651        settings : object
652            An object containing user-defined settings.
653
654        Returns
655        -------
656        list
657            A list of molecular formula data dictionaries.
658        """
659        ion_charge = 0
660
661        class_dict = classe_tuple[1]
662        odd_or_even = self.get_h_odd_or_even(class_dict)
663
664        return self.get_mol_formulas(odd_or_even, classe_tuple, settings)
665
666    def get_or_add(self, SomeClass, kw):
667        """Get or add a class
668
669        Parameters
670        ----------
671        SomeClass : object
672            A class object.
673        kw : dict
674            A dictionary of classes.
675
676        Returns
677        -------
678        object
679            A class object.
680        """
681        obj = self.sql_db.session.query(SomeClass).filter_by(**kw).first()
682        if not obj:
683            obj = SomeClass(**kw)
684        return obj
685
686    def get_mol_formulas(self, odd_even_tag, classe_tuple, settings):
687        """Get the molecular formulas
688
689        Parameters
690        ----------
691        odd_even_tag : str
692            A string indicating whether to retrieve even or odd hydrogen atoms.
693        classe_tuple : tuple
694
695        settings : object
696            An object containing user-defined settings.
697
698        Returns
699        -------
700        list
701            A list of molecular formula data dictionaries.
702
703        """
704        class_str = classe_tuple[0]
705        class_dict = classe_tuple[1]
706        classe_id = classe_tuple[2]
707
708        results = list()
709
710        if "HC" in class_dict:
711            del class_dict["HC"]
712
713        class_dbe = self.calc_dbe_class(class_dict)
714        class_mass = self.calc_mz(class_dict)
715
716        carbonHydrogen_mass = (
717            self.odd_ch_mass if odd_even_tag == "odd" else self.even_ch_mass
718        )
719        carbonHydrogen_dbe = (
720            self.odd_ch_dbe if odd_even_tag == "odd" else self.even_ch_dbe
721        )
722        carbonHydrogen_id = self.odd_ch_id if odd_even_tag == "odd" else self.even_ch_id
723
724        for index, carbonHydrogen_obj in enumerate(carbonHydrogen_id):
725            mass = carbonHydrogen_mass[index] + class_mass
726            dbe = carbonHydrogen_dbe[index] + class_dbe
727
728            if settings.min_mz <= mass <= settings.max_mz:
729                if settings.min_dbe <= dbe <= settings.max_dbe:
730                    molecularFormula = {
731                        "heteroAtoms_id": classe_id,
732                        "carbonHydrogen_id": carbonHydrogen_id[index],
733                        "mass": mass,
734                        "DBE": dbe,
735                    }
736
737                    results.append(molecularFormula)
738
739        return results
740
741    def get_h_odd_or_even(self, class_dict):
742        """Get the hydrogen odd or even
743
744        Parameters
745        ----------
746        class_dict : dict
747            A dictionary of classes.
748
749        Returns
750        -------
751        str
752            A string indicating whether to retrieve even or odd hydrogen atoms.
753        """
754
755        HAS_NITROGEN = "N" in class_dict.keys()
756
757        number_of_halogen = self.get_total_halogen_atoms(class_dict)
758        number_of_hetero = self.get_total_heteroatoms(class_dict)
759
760        if number_of_halogen > 0:
761            HAS_HALOGEN = True
762
763        else:
764            HAS_HALOGEN = False
765
766        if HAS_HALOGEN:
767            remaining_halogen = number_of_halogen % 2
768
769        else:
770            remaining_halogen = 0
771
772        if number_of_hetero > 0:
773            HAS_OTHER_HETERO = True
774
775            total_hetero_valence = self.get_total_hetero_valence(class_dict)
776
777        else:
778            HAS_OTHER_HETERO = False
779
780            total_hetero_valence = 0
781
782        if HAS_OTHER_HETERO:
783            remaining_hetero_valence = total_hetero_valence % 2
784
785        else:
786            remaining_hetero_valence = 0
787
788        if HAS_NITROGEN and not HAS_OTHER_HETERO:
789            number_of_n = class_dict.get("N")
790            remaining_n = number_of_n % 2
791
792        elif HAS_NITROGEN and HAS_OTHER_HETERO:
793            number_of_n = class_dict.get("N")
794            remaining_n = (number_of_n + remaining_hetero_valence) % 2
795
796        elif HAS_OTHER_HETERO and not HAS_NITROGEN:
797            remaining_n = remaining_hetero_valence
798
799        else:
800            remaining_n = -1
801
802        if remaining_n > 0.0:
803            if HAS_NITROGEN or HAS_OTHER_HETERO:
804                if HAS_HALOGEN:
805                    if remaining_halogen == 0:
806                        return "odd"
807                    else:
808                        return "even"
809
810                else:
811                    return "odd"
812
813        elif remaining_n == 0.0:
814            if HAS_NITROGEN or HAS_OTHER_HETERO:
815                if HAS_HALOGEN:
816                    if remaining_halogen == 0:
817                        return "even"
818                    else:
819                        return "odd"
820
821                else:
822                    return "even"
823
824        else:
825            if HAS_HALOGEN:
826                if remaining_halogen == 0:
827                    return "even"
828                else:
829                    return "odd"
830
831            else:
832                return "even"
833
834    @staticmethod
835    def get_total_heteroatoms(class_dict):
836        """Get the total number of heteroatoms other than N, F, Cl, Br
837
838        Parameters
839        ----------
840        class_dict : dict
841            A dictionary of classes.
842
843        Returns
844        -------
845        int
846            The total number of heteroatoms.
847        """
848
849        total_number = 0
850
851        for atom in class_dict.keys():
852            if atom not in ["HC", "C", "H", "O", "N", "F", "Cl", "Br"]:
853                total_number = total_number + class_dict.get(atom)
854
855        return total_number
856
857    @staticmethod
858    def get_total_hetero_valence(class_dict):
859        """Get the total valence of heteroatoms other than N, F, Cl, Br
860
861        Parameters
862        ----------
863        class_dict : dict
864            A dictionary of classes.
865
866        Returns
867        -------
868        int
869            The total heteroatom valence.
870        """
871        total_valence = 0
872
873        for atom in class_dict.keys():
874            if atom not in ["HC", "C", "H", "O", "N", "F", "Cl", "Br"]:
875                clean_atom = "".join([i for i in atom if not i.isdigit()])
876
877                atom_valence = MSParameters.molecular_search.used_atom_valences.get(
878                    clean_atom
879                )
880
881                if type(atom_valence) is tuple:
882                    atom_valence = atom_valence[0]
883
884                n_atom = int(class_dict.get(atom))
885
886                n_atom_valence = atom_valence * n_atom
887
888                total_valence = total_valence + n_atom_valence
889
890        return total_valence
891
892    @staticmethod
893    def get_total_halogen_atoms(class_dict):
894        """Get the total number of halogen atoms
895
896        Parameters
897        ----------
898        class_dict : dict
899            A dictionary of classes.
900
901        Returns
902        -------
903        int
904            The total number of halogen atoms.
905        """
906        atoms = ["F", "Cl", "Br"]
907
908        total_number = 0
909
910        for atom in atoms:
911            if atom in class_dict.keys():
912                total_number = total_number + class_dict.get(atom)
913
914        return total_number
@contextlib.contextmanager
def profiled():
31@contextlib.contextmanager
32def profiled():
33    """A context manager for profiling."""
34    pr = cProfile.Profile()
35    pr.enable()
36    yield
37    pr.disable()
38    s = io.StringIO()
39    ps = pstats.Stats(pr, stream=s).sort_stats("cumulative")
40    ps.print_stats()
41    # uncomment this to see who's calling what
42    # ps.print_callers()
43    print(s.getvalue())

A context manager for profiling.

def insert_database_worker(args):
46def insert_database_worker(args):
47    """Inserts data into the database."""
48    results, url = args
49
50    if not url:
51        url = "sqlite:///db/molformulas.sqlite"
52
53    if url[0:6] == "sqlite":
54        engine = create_engine(url, echo=False)
55    else:
56        engine = create_engine(url, echo=False, isolation_level="AUTOCOMMIT")
57
58    session_factory = sessionmaker(bind=engine)
59    session = session_factory()
60    insert_query = MolecularFormulaLink.__table__.insert().values(results)
61    session.execute(insert_query)
62    session.commit()
63    session.close()
64    engine.dispose()

Inserts data into the database.

class MolecularCombinations:
 67class MolecularCombinations:
 68    """A class for generating molecular formula combinations.
 69
 70    Parameters
 71    ----------
 72    molecular_search_settings : object
 73        An object containing user-defined settings.
 74
 75    Attributes
 76    ----------
 77    sql_db : MolForm_SQL
 78        The SQLite database object.
 79    len_existing_classes : int
 80        The number of existing classes in the SQLite database.
 81    odd_ch_id : list
 82        A list of odd carbon and hydrogen atom IDs.
 83    odd_ch_dict : list
 84        A list of odd carbon and hydrogen atom dictionaries.
 85    odd_ch_mass : list
 86        A list of odd carbon and hydrogen atom masses.
 87    odd_ch_dbe : list
 88        A list of odd carbon and hydrogen atom double bond equivalents.
 89    even_ch_id : list
 90        A list of even carbon and hydrogen atom IDs.
 91    even_ch_dict : list
 92        A list of even carbon and hydrogen atom dictionaries.
 93    even_ch_mass : list
 94        A list of even carbon and hydrogen atom masses.
 95    even_ch_dbe : list
 96        A list of even carbon and hydrogen atom double bond equivalents.
 97
 98    Methods
 99    -------
100    * cProfile_worker(args)
101        A cProfile worker for the get_mol_formulas function.
102    * check_database_get_class_list(molecular_search_settings)
103        Checks if the database has all the classes, if not create the missing classes.
104    * get_carbonsHydrogens(settings, odd_even)
105        Retrieves carbon and hydrogen atoms from the molecular lookup table based on user-defined settings.
106    * add_carbonsHydrogens(settings, existing_classes_objs)
107        Adds carbon and hydrogen atoms to the molecular lookup table based on user-defined settings.
108    * runworker(molecular_search_settings)
109        Runs the molecular formula lookup table worker.
110    * get_classes_in_order(molecular_search_settings)
111        Gets the classes in order.
112    * sort_classes(atoms_in_order, combination_dict)
113        Sorts the classes in order.
114    * get_fixed_initial_number_of_hydrogen(min_h, odd_even)
115        Gets the fixed initial number of hydrogen atoms.
116    * calc_mz(datadict, class_mass=0)
117        Calculates the mass-to-charge ratio (m/z) of a molecular formula.
118    * calc_dbe_class(datadict)
119        Calculates the double bond equivalent (DBE) of a molecular formula.
120    * populate_combinations(classe_tuple, settings)
121        Populates the combinations.
122    * get_or_add(SomeClass, kw)
123        Gets or adds a class.
124    * get_mol_formulas(odd_even_tag, classe_tuple, settings)
125        Gets the molecular formulas.
126    * get_h_odd_or_even(class_dict)
127        Gets the hydrogen odd or even.
128    * get_total_halogen_atoms(class_dict)
129        Gets the total number of halogen atoms.
130    * get_total_hetero_valence(class_dict)
131        Gets the total valence of heteroatoms other than N, F, Cl, and Br
132    """
133
134    def __init__(self, sql_db=None):
135        if not sql_db:
136            self.sql_db = MolForm_SQL()
137        else:
138            self.sql_db = sql_db
139
140    def cProfile_worker(self, args):
141        """cProfile worker for the get_mol_formulas function"""
142        cProfile.runctx(
143            "self.get_mol_formulas(*args)",
144            globals(),
145            locals(),
146            "mf_database_cprofile.prof",
147        )
148
149    def check_database_get_class_list(self, molecular_search_settings):
150        """check if the database has all the classes, if not create the missing classes
151
152        Parameters
153        ----------
154        molecular_search_settings : object
155            An object containing user-defined settings.
156
157        Returns
158        -------
159        list
160            list of tuples with the class name and the class dictionary
161        """
162        all_class_to_create = []
163
164        classes_dict = self.get_classes_in_order(molecular_search_settings)
165
166        class_str_set = set(classes_dict.keys())
167
168        existing_classes_objs = self.sql_db.session.query(HeteroAtoms).distinct().all()
169
170        existing_classes_str = set([classe.name for classe in existing_classes_objs])
171
172        self.len_existing_classes = len(existing_classes_str)
173
174        class_to_create = class_str_set - existing_classes_str
175
176        class_count = len(existing_classes_objs)
177
178        data_classes = list()
179        for index, class_str in enumerate(class_to_create):
180            class_dict = classes_dict.get(class_str)
181            halogen_count = self.get_total_halogen_atoms(class_dict)
182            data_classes.append(
183                {
184                    "name": class_str,
185                    "id": class_count + index + 1,
186                    "halogensCount": halogen_count,
187                }
188            )
189
190        # data_classes = [{"name":class_str, "id":class_count+ index + 1} for index, class_str in enumerate(class_to_create)]
191
192        if data_classes:
193            list_insert_chunks = chunks(data_classes, self.sql_db.chunks_count)
194            for insert_chunk in list_insert_chunks:
195                insert_query = HeteroAtoms.__table__.insert().values(insert_chunk)
196                self.sql_db.session.execute(insert_query)
197
198        for index, class_str in enumerate(class_to_create):
199            class_tuple = (
200                class_str,
201                classes_dict.get(class_str),
202                class_count + index + 1,
203            )
204
205            all_class_to_create.append(class_tuple)
206
207        return (
208            [(c_s, c_d) for c_s, c_d in classes_dict.items()],
209            all_class_to_create,
210            existing_classes_objs,
211        )
212
213    def get_carbonsHydrogens(self, settings, odd_even):
214        """Retrieve carbon and hydrogen atoms from the molecular lookup table based on user-defined settings.
215
216        Parameters
217        ----------
218        settings : object
219             An object containing user-defined settings.
220        odd_even : str
221            A string indicating whether to retrieve even or odd hydrogen atoms.
222
223        Returns
224        -------
225        list
226            A list of CarbonHydrogen objects that satisfy the specified conditions.
227        """
228        operator = "==" if odd_even == "even" else "!="
229        usedAtoms = settings.usedAtoms
230        user_min_c, user_max_c = usedAtoms.get("C")
231        user_min_h, user_max_h = usedAtoms.get("H")
232
233        return eval(
234            "self.sql_db.session.query(CarbonHydrogen).filter("
235            "CarbonHydrogen.C >= user_min_c,"
236            "CarbonHydrogen.H >= user_min_h,"
237            "CarbonHydrogen.C <= user_max_c,"
238            "CarbonHydrogen.H <= user_max_h,"
239            "CarbonHydrogen.H % 2" + operator + "0).all()"
240        )
241
242    def add_carbonsHydrogens(self, settings, existing_classes_objs):
243        """Add carbon and hydrogen atoms to the molecular lookup table based on user-defined settings.
244
245        Parameters
246        ----------
247        settings : object
248            An object containing user-defined settings.
249        existing_classes_objs : list
250            A list of HeteroAtoms objects.
251        """
252        usedAtoms = settings.usedAtoms
253
254        user_min_c, user_max_c = usedAtoms.get("C")
255        user_min_h, user_max_h = usedAtoms.get("H")
256
257        query_obj = self.sql_db.session.query(
258            func.max(CarbonHydrogen.C).label("max_c"),
259            func.min(CarbonHydrogen.C).label("min_c"),
260            func.max(CarbonHydrogen.H).label("max_h"),
261            func.min(CarbonHydrogen.H).label("min_h"),
262        )
263
264        database = query_obj.first()
265        if (
266            database.max_c == user_max_c
267            and database.min_c == user_min_c
268            and database.max_h == user_max_h
269            and database.min_h == user_min_h
270        ):
271            # all data is already available at the database
272            pass
273
274        else:
275            current_count = self.sql_db.session.query(CarbonHydrogen.C).count()
276
277            databaseCarbonHydrogen = self.sql_db.session.query(CarbonHydrogen).all()
278
279            userCarbon = set(range(user_min_c, user_max_c + 1))
280            userHydrogen = set(range(user_min_h, user_max_h + 1))
281
282            carbon_hydrogen_objs_database = {}
283            for obj in databaseCarbonHydrogen:
284                str_data = "C:{},H:{}".format(obj.C, obj.H)
285                carbon_hydrogen_objs_database[str_data] = str_data
286
287            carbon_hydrogen_objs_to_create = {"even": {}, "odd": {}}
288
289            list_ch_obj_to_add = list()
290            i = 0
291            for comb in itertools.product(userCarbon, userHydrogen):
292                C = comb[0]
293                H = comb[1]
294                data = {
295                    "C": C,
296                    "H": H,
297                }
298
299                data_insert = {
300                    "C": C,
301                    "H": H,
302                }
303
304                str_data = "C:{},H:{}".format(C, H)
305
306                if not str_data in carbon_hydrogen_objs_database.keys():
307                    label = "even" if comb[1] % 2 == 0 else "odd"
308                    data["mass"] = (C * Atoms.atomic_masses.get("C")) + (
309                        H * Atoms.atomic_masses.get("H")
310                    )
311                    data["dbe"] = C - (H / 2) + 1
312                    data["id"] = i + current_count + 1
313                    data_insert["id"] = i + current_count + 1
314                    i = i + 1
315                    carbon_hydrogen_objs_to_create[label][str_data] = data
316
317                    list_ch_obj_to_add.append(data_insert)
318
319            if list_ch_obj_to_add:
320                # insert carbon hydrogen objs
321                list_insert_chunks = chunks(
322                    list_ch_obj_to_add, self.sql_db.chunks_count
323                )
324                for insert_chunk in list_insert_chunks:
325                    insert_query = CarbonHydrogen.__table__.insert().values(
326                        insert_chunk
327                    )
328                    self.sql_db.session.execute(insert_query)
329                self.sql_db.session.commit()
330
331                list_molecular_form = list()
332                for classe_obj in existing_classes_objs:
333                    classe_dict = classe_obj.to_dict()
334                    classe_mass = self.calc_mz(classe_dict)
335                    classe_dbe = self.calc_dbe_class(classe_dict)
336
337                    odd_even_label = self.get_h_odd_or_even(classe_dict)
338
339                    ch_datalist = carbon_hydrogen_objs_to_create.get(
340                        odd_even_label
341                    ).values()
342
343                    for ch_dict in ch_datalist:
344                        mass = ch_dict.get("mass") + classe_mass
345                        dbe = ch_dict.get("dbe") + classe_dbe
346
347                        if settings.min_mz <= mass <= settings.max_mz:
348                            if settings.min_dbe <= dbe <= settings.max_dbe:
349                                list_molecular_form.append(
350                                    {
351                                        "heteroAtoms_id": classe_obj.id,
352                                        "carbonHydrogen_id": ch_dict.get("id"),
353                                        "mass": mass,
354                                        "DBE": dbe,
355                                    }
356                                )
357
358                list_insert_chunks = chunks(
359                    list_molecular_form, self.sql_db.chunks_count
360                )
361                for insert_chunk in list_insert_chunks:
362                    insert_query = MolecularFormulaLink.__table__.insert().values(
363                        insert_chunk
364                    )
365                    self.sql_db.session.execute(insert_query)
366                self.sql_db.session.commit()
367
368    @timeit(print_time=True)
369    def runworker(self, molecular_search_settings, **kwargs):
370        """Run the molecular formula lookup table worker.
371
372        Parameters
373        ----------
374        molecular_search_settings : object
375            An object containing user-defined settings.
376        kwargs : dict
377            A dictionary of keyword arguments.
378            Most notably, the print_time argument which is passed to the timeit decorator.
379
380        Returns
381        -------
382        list
383            A list of tuples with the class name and the class dictionary.
384
385
386        """
387        verbose = molecular_search_settings.verbose_processing
388
389        classes_list, class_to_create, existing_classes_objs = (
390            self.check_database_get_class_list(molecular_search_settings)
391        )
392
393        settings = MolecularLookupDictSettings()
394        settings.usedAtoms = deepcopy(molecular_search_settings.usedAtoms)
395        settings.url_database = molecular_search_settings.url_database
396        settings.db_jobs = molecular_search_settings.db_jobs
397
398        self.add_carbonsHydrogens(settings, existing_classes_objs)
399
400        if class_to_create:
401            settings = MolecularLookupDictSettings()
402            settings.usedAtoms = deepcopy(molecular_search_settings.usedAtoms)
403            settings.url_database = molecular_search_settings.url_database
404            settings.db_jobs = molecular_search_settings.db_jobs
405
406            self.sql_db.session.commit()
407            odd_ch_obj = self.get_carbonsHydrogens(settings, "odd")
408            self.odd_ch_id = [obj.id for obj in odd_ch_obj]
409            self.odd_ch_dict = [{"C": obj.C, "H": obj.H} for obj in odd_ch_obj]
410            self.odd_ch_mass = [obj.mass for obj in odd_ch_obj]
411            self.odd_ch_dbe = [obj.dbe for obj in odd_ch_obj]
412
413            even_ch_obj = self.get_carbonsHydrogens(settings, "even")
414            self.even_ch_id = [obj.id for obj in even_ch_obj]
415            self.even_ch_dict = [{"C": obj.C, "H": obj.H} for obj in even_ch_obj]
416            self.even_ch_mass = [obj.mass for obj in even_ch_obj]
417            self.even_ch_dbe = [obj.dbe for obj in even_ch_obj]
418
419            all_results = list()
420            for class_tuple in tqdm(class_to_create, disable = not verbose):
421                results = self.populate_combinations(class_tuple, settings)
422                all_results.extend(results)
423                if settings.db_jobs == 1:
424                    # if len(all_results) >= self.sql_db.chunks_count:
425                    list_insert_chunks = list(chunks(results, self.sql_db.chunks_count))
426                    for chunk in list_insert_chunks:
427                        insert_query = MolecularFormulaLink.__table__.insert().values(
428                            chunk
429                        )
430                        self.sql_db.session.execute(insert_query)
431                    # all_results = list()
432            self.sql_db.session.commit()
433            # each chunk takes ~600Mb of memory, so if using 8 processes the total free memory needs to be 5GB
434            if settings.db_jobs > 1:
435                list_insert_chunks = list(chunks(all_results, self.sql_db.chunks_count))
436                if verbose:
437                    print(
438                        "Started database insert using {} iterations for a total of {} rows".format(
439                            len(list_insert_chunks), len(all_results)
440                        )
441                    )
442                worker_args = [
443                    (chunk, settings.url_database) for chunk in list_insert_chunks
444                ]
445                p = multiprocessing.Pool(settings.db_jobs)
446                for class_list in tqdm(
447                        p.imap_unordered(insert_database_worker, worker_args), disable= not verbose
448                        ):
449                    pass
450                p.close()
451                p.join()
452
453        return classes_list
454
455    def get_classes_in_order(self, molecular_search_settings):
456        """Get the classes in order
457
458        Parameters
459        ----------
460        molecular_search_settings : object
461            An object containing user-defined settings.
462
463        Returns
464        -------
465        dict
466            A dictionary of classes in order.
467            structure is  ('HC', {'HC': 1})
468        """
469
470        usedAtoms = deepcopy(molecular_search_settings.usedAtoms)
471
472        usedAtoms.pop("C")
473        usedAtoms.pop("H")
474
475        min_n, max_n = usedAtoms.get("N") if usedAtoms.get("N") else (0, 0)
476        min_o, max_o = usedAtoms.get("O") if usedAtoms.get("O") else (0, 0)
477        min_s, max_s = usedAtoms.get("S") if usedAtoms.get("S") else (0, 0)
478        min_p, max_p = usedAtoms.get("P") if usedAtoms.get("P") else (0, 0)
479
480        possible_n = [n for n in range(min_n, max_n + 1)]
481        possible_o = [o for o in range(min_o, max_o + 1)]
482        possible_s = [s for s in range(min_s, max_s + 1)]
483        possible_p = [p for p in range(min_p, max_p + 1)]
484
485        atoms_in_order = ["N", "O", "S", "P"]
486
487        classe_in_order = {}
488
489        all_atoms_tuples = itertools.product(
490            possible_n, possible_o, possible_s, possible_p
491        )
492
493        for atom in atoms_in_order:
494            usedAtoms.pop(atom, None)
495
496        for selected_atom, min_max_tuple in usedAtoms.items():
497            min_x = min_max_tuple[0]
498            max_x = min_max_tuple[1]
499
500            possible_x = [x for x in range(min_x, max_x + 1)]
501
502            all_atoms_tuples = itertools.product(all_atoms_tuples, possible_x)
503            all_atoms_tuples = [
504                all_atoms_combined[0] + (all_atoms_combined[1],)
505                for all_atoms_combined in all_atoms_tuples
506            ]
507            atoms_in_order.append(selected_atom)
508
509        for all_atoms_tuple in all_atoms_tuples:
510            classe_str = ""
511            classe_dict = {}
512
513            for each_atoms_index, atom_number in enumerate(all_atoms_tuple):
514                if atom_number != 0:
515                    classe_dict[atoms_in_order[each_atoms_index]] = atom_number
516
517            if not classe_dict:
518                classe_in_order["HC"] = {"HC": ""}
519                continue
520
521            classe_str = json.dumps(classe_dict)
522
523            if len(classe_str) > 0:
524                classe_in_order[classe_str] = classe_dict
525
526        classe_in_order_dict = self.sort_classes(atoms_in_order, classe_in_order)
527
528        return classe_in_order_dict
529
530    @staticmethod
531    def sort_classes(atoms_in_order, combination_dict) -> Dict[str, Dict[str, int]]:
532        """Sort the classes in order
533
534        Parameters
535        ----------
536        atoms_in_order : list
537            A list of atoms in order.
538        combination_dict : dict
539            A dictionary of classes.
540
541        Returns
542        -------
543        dict
544            A dictionary of classes in order.
545        """
546        # ensures atoms are always in the order defined at atoms_in_order list
547        join_dict_classes = dict()
548        atoms_in_order = ["N", "S", "P", "O"] + atoms_in_order[4:] + ["HC"]
549
550        sort_method = lambda atoms_keys: [atoms_in_order.index(atoms_keys)]
551        for class_str, class_dict in combination_dict.items():
552            sorted_dict_keys = sorted(class_dict, key=sort_method)
553            class_dict = {atom: class_dict[atom] for atom in sorted_dict_keys}
554            class_str = json.dumps(class_dict)
555            # using json for the new database, class
556            # class_str = ' '.join([atom + str(class_dict[atom]) for atom in sorted_dict_keys])
557            join_dict_classes[class_str] = class_dict
558
559        return join_dict_classes
560
561    @staticmethod
562    def get_fixed_initial_number_of_hydrogen(min_h, odd_even):
563        """Get the fixed initial number of hydrogen atoms
564
565        Parameters
566        ----------
567        min_h : int
568            The minimum number of hydrogen atoms.
569        odd_even : str
570            A string indicating whether to retrieve even or odd hydrogen atoms.
571        """
572        remaining_h = min_h % 2
573
574        if odd_even == "even":
575            if remaining_h == 0:
576                return remaining_h
577
578            else:
579                return remaining_h + 1
580
581        else:
582            if remaining_h == 0:
583                return remaining_h + 1
584
585            else:
586                return remaining_h
587
588    def calc_mz(self, datadict, class_mass=0):
589        """Calculate the mass-to-charge ratio (m/z) of a molecular formula.
590
591        Parameters
592        ----------
593        datadict : dict
594            A dictionary of classes.
595        class_mass : int
596            The mass of the class.
597
598        Returns
599        -------
600        float
601            The mass-to-charge ratio (m/z) of a molecular formula.
602        """
603        mass = class_mass
604
605        for atom in datadict.keys():
606            if atom != "HC":
607                mass = mass + Atoms.atomic_masses[atom] * datadict.get(atom)
608
609        return mass
610
611    def calc_dbe_class(self, datadict):
612        """Calculate the double bond equivalent (DBE) of a molecular formula.
613
614        Parameters
615        ----------
616        datadict : dict
617            A dictionary of classes.
618
619        Returns
620        -------
621        float
622            The double bond equivalent (DBE) of a molecular formula.
623        """
624        init_dbe = 0
625        for atom in datadict.keys():
626            if atom == "HC":
627                continue
628
629            n_atom = int(datadict.get(atom))
630
631            clean_atom = "".join([i for i in atom if not i.isdigit()])
632
633            valencia = MSParameters.molecular_search.used_atom_valences.get(clean_atom)
634
635            if type(valencia) is tuple:
636                valencia = valencia[0]
637            if valencia > 0:
638                # print atom, valencia, n_atom, init_dbe
639                init_dbe = init_dbe + (n_atom * (valencia - 2))
640            else:
641                continue
642
643        return 0.5 * init_dbe
644
645    def populate_combinations(self, classe_tuple, settings):
646        """Populate the combinations
647
648        Parameters
649        ----------
650        classe_tuple : tuple
651            A tuple containing the class name, the class dictionary, and the class ID.
652        settings : object
653            An object containing user-defined settings.
654
655        Returns
656        -------
657        list
658            A list of molecular formula data dictionaries.
659        """
660        ion_charge = 0
661
662        class_dict = classe_tuple[1]
663        odd_or_even = self.get_h_odd_or_even(class_dict)
664
665        return self.get_mol_formulas(odd_or_even, classe_tuple, settings)
666
667    def get_or_add(self, SomeClass, kw):
668        """Get or add a class
669
670        Parameters
671        ----------
672        SomeClass : object
673            A class object.
674        kw : dict
675            A dictionary of classes.
676
677        Returns
678        -------
679        object
680            A class object.
681        """
682        obj = self.sql_db.session.query(SomeClass).filter_by(**kw).first()
683        if not obj:
684            obj = SomeClass(**kw)
685        return obj
686
687    def get_mol_formulas(self, odd_even_tag, classe_tuple, settings):
688        """Get the molecular formulas
689
690        Parameters
691        ----------
692        odd_even_tag : str
693            A string indicating whether to retrieve even or odd hydrogen atoms.
694        classe_tuple : tuple
695
696        settings : object
697            An object containing user-defined settings.
698
699        Returns
700        -------
701        list
702            A list of molecular formula data dictionaries.
703
704        """
705        class_str = classe_tuple[0]
706        class_dict = classe_tuple[1]
707        classe_id = classe_tuple[2]
708
709        results = list()
710
711        if "HC" in class_dict:
712            del class_dict["HC"]
713
714        class_dbe = self.calc_dbe_class(class_dict)
715        class_mass = self.calc_mz(class_dict)
716
717        carbonHydrogen_mass = (
718            self.odd_ch_mass if odd_even_tag == "odd" else self.even_ch_mass
719        )
720        carbonHydrogen_dbe = (
721            self.odd_ch_dbe if odd_even_tag == "odd" else self.even_ch_dbe
722        )
723        carbonHydrogen_id = self.odd_ch_id if odd_even_tag == "odd" else self.even_ch_id
724
725        for index, carbonHydrogen_obj in enumerate(carbonHydrogen_id):
726            mass = carbonHydrogen_mass[index] + class_mass
727            dbe = carbonHydrogen_dbe[index] + class_dbe
728
729            if settings.min_mz <= mass <= settings.max_mz:
730                if settings.min_dbe <= dbe <= settings.max_dbe:
731                    molecularFormula = {
732                        "heteroAtoms_id": classe_id,
733                        "carbonHydrogen_id": carbonHydrogen_id[index],
734                        "mass": mass,
735                        "DBE": dbe,
736                    }
737
738                    results.append(molecularFormula)
739
740        return results
741
742    def get_h_odd_or_even(self, class_dict):
743        """Get the hydrogen odd or even
744
745        Parameters
746        ----------
747        class_dict : dict
748            A dictionary of classes.
749
750        Returns
751        -------
752        str
753            A string indicating whether to retrieve even or odd hydrogen atoms.
754        """
755
756        HAS_NITROGEN = "N" in class_dict.keys()
757
758        number_of_halogen = self.get_total_halogen_atoms(class_dict)
759        number_of_hetero = self.get_total_heteroatoms(class_dict)
760
761        if number_of_halogen > 0:
762            HAS_HALOGEN = True
763
764        else:
765            HAS_HALOGEN = False
766
767        if HAS_HALOGEN:
768            remaining_halogen = number_of_halogen % 2
769
770        else:
771            remaining_halogen = 0
772
773        if number_of_hetero > 0:
774            HAS_OTHER_HETERO = True
775
776            total_hetero_valence = self.get_total_hetero_valence(class_dict)
777
778        else:
779            HAS_OTHER_HETERO = False
780
781            total_hetero_valence = 0
782
783        if HAS_OTHER_HETERO:
784            remaining_hetero_valence = total_hetero_valence % 2
785
786        else:
787            remaining_hetero_valence = 0
788
789        if HAS_NITROGEN and not HAS_OTHER_HETERO:
790            number_of_n = class_dict.get("N")
791            remaining_n = number_of_n % 2
792
793        elif HAS_NITROGEN and HAS_OTHER_HETERO:
794            number_of_n = class_dict.get("N")
795            remaining_n = (number_of_n + remaining_hetero_valence) % 2
796
797        elif HAS_OTHER_HETERO and not HAS_NITROGEN:
798            remaining_n = remaining_hetero_valence
799
800        else:
801            remaining_n = -1
802
803        if remaining_n > 0.0:
804            if HAS_NITROGEN or HAS_OTHER_HETERO:
805                if HAS_HALOGEN:
806                    if remaining_halogen == 0:
807                        return "odd"
808                    else:
809                        return "even"
810
811                else:
812                    return "odd"
813
814        elif remaining_n == 0.0:
815            if HAS_NITROGEN or HAS_OTHER_HETERO:
816                if HAS_HALOGEN:
817                    if remaining_halogen == 0:
818                        return "even"
819                    else:
820                        return "odd"
821
822                else:
823                    return "even"
824
825        else:
826            if HAS_HALOGEN:
827                if remaining_halogen == 0:
828                    return "even"
829                else:
830                    return "odd"
831
832            else:
833                return "even"
834
835    @staticmethod
836    def get_total_heteroatoms(class_dict):
837        """Get the total number of heteroatoms other than N, F, Cl, Br
838
839        Parameters
840        ----------
841        class_dict : dict
842            A dictionary of classes.
843
844        Returns
845        -------
846        int
847            The total number of heteroatoms.
848        """
849
850        total_number = 0
851
852        for atom in class_dict.keys():
853            if atom not in ["HC", "C", "H", "O", "N", "F", "Cl", "Br"]:
854                total_number = total_number + class_dict.get(atom)
855
856        return total_number
857
858    @staticmethod
859    def get_total_hetero_valence(class_dict):
860        """Get the total valence of heteroatoms other than N, F, Cl, Br
861
862        Parameters
863        ----------
864        class_dict : dict
865            A dictionary of classes.
866
867        Returns
868        -------
869        int
870            The total heteroatom valence.
871        """
872        total_valence = 0
873
874        for atom in class_dict.keys():
875            if atom not in ["HC", "C", "H", "O", "N", "F", "Cl", "Br"]:
876                clean_atom = "".join([i for i in atom if not i.isdigit()])
877
878                atom_valence = MSParameters.molecular_search.used_atom_valences.get(
879                    clean_atom
880                )
881
882                if type(atom_valence) is tuple:
883                    atom_valence = atom_valence[0]
884
885                n_atom = int(class_dict.get(atom))
886
887                n_atom_valence = atom_valence * n_atom
888
889                total_valence = total_valence + n_atom_valence
890
891        return total_valence
892
893    @staticmethod
894    def get_total_halogen_atoms(class_dict):
895        """Get the total number of halogen atoms
896
897        Parameters
898        ----------
899        class_dict : dict
900            A dictionary of classes.
901
902        Returns
903        -------
904        int
905            The total number of halogen atoms.
906        """
907        atoms = ["F", "Cl", "Br"]
908
909        total_number = 0
910
911        for atom in atoms:
912            if atom in class_dict.keys():
913                total_number = total_number + class_dict.get(atom)
914
915        return total_number

A class for generating molecular formula combinations.

Parameters
  • molecular_search_settings (object): An object containing user-defined settings.
Attributes
  • sql_db (MolForm_SQL): The SQLite database object.
  • len_existing_classes (int): The number of existing classes in the SQLite database.
  • odd_ch_id (list): A list of odd carbon and hydrogen atom IDs.
  • odd_ch_dict (list): A list of odd carbon and hydrogen atom dictionaries.
  • odd_ch_mass (list): A list of odd carbon and hydrogen atom masses.
  • odd_ch_dbe (list): A list of odd carbon and hydrogen atom double bond equivalents.
  • even_ch_id (list): A list of even carbon and hydrogen atom IDs.
  • even_ch_dict (list): A list of even carbon and hydrogen atom dictionaries.
  • even_ch_mass (list): A list of even carbon and hydrogen atom masses.
  • even_ch_dbe (list): A list of even carbon and hydrogen atom double bond equivalents.
Methods
  • cProfile_worker(args) A cProfile worker for the get_mol_formulas function.
  • check_database_get_class_list(molecular_search_settings) Checks if the database has all the classes, if not create the missing classes.
  • get_carbonsHydrogens(settings, odd_even) Retrieves carbon and hydrogen atoms from the molecular lookup table based on user-defined settings.
  • add_carbonsHydrogens(settings, existing_classes_objs) Adds carbon and hydrogen atoms to the molecular lookup table based on user-defined settings.
  • runworker(molecular_search_settings) Runs the molecular formula lookup table worker.
  • get_classes_in_order(molecular_search_settings) Gets the classes in order.
  • sort_classes(atoms_in_order, combination_dict) Sorts the classes in order.
  • get_fixed_initial_number_of_hydrogen(min_h, odd_even) Gets the fixed initial number of hydrogen atoms.
  • calc_mz(datadict, class_mass=0) Calculates the mass-to-charge ratio (m/z) of a molecular formula.
  • calc_dbe_class(datadict) Calculates the double bond equivalent (DBE) of a molecular formula.
  • populate_combinations(classe_tuple, settings) Populates the combinations.
  • get_or_add(SomeClass, kw) Gets or adds a class.
  • get_mol_formulas(odd_even_tag, classe_tuple, settings) Gets the molecular formulas.
  • get_h_odd_or_even(class_dict) Gets the hydrogen odd or even.
  • get_total_halogen_atoms(class_dict) Gets the total number of halogen atoms.
  • get_total_hetero_valence(class_dict) Gets the total valence of heteroatoms other than N, F, Cl, and Br
MolecularCombinations(sql_db=None)
134    def __init__(self, sql_db=None):
135        if not sql_db:
136            self.sql_db = MolForm_SQL()
137        else:
138            self.sql_db = sql_db
def cProfile_worker(self, args):
140    def cProfile_worker(self, args):
141        """cProfile worker for the get_mol_formulas function"""
142        cProfile.runctx(
143            "self.get_mol_formulas(*args)",
144            globals(),
145            locals(),
146            "mf_database_cprofile.prof",
147        )

cProfile worker for the get_mol_formulas function

def check_database_get_class_list(self, molecular_search_settings):
149    def check_database_get_class_list(self, molecular_search_settings):
150        """check if the database has all the classes, if not create the missing classes
151
152        Parameters
153        ----------
154        molecular_search_settings : object
155            An object containing user-defined settings.
156
157        Returns
158        -------
159        list
160            list of tuples with the class name and the class dictionary
161        """
162        all_class_to_create = []
163
164        classes_dict = self.get_classes_in_order(molecular_search_settings)
165
166        class_str_set = set(classes_dict.keys())
167
168        existing_classes_objs = self.sql_db.session.query(HeteroAtoms).distinct().all()
169
170        existing_classes_str = set([classe.name for classe in existing_classes_objs])
171
172        self.len_existing_classes = len(existing_classes_str)
173
174        class_to_create = class_str_set - existing_classes_str
175
176        class_count = len(existing_classes_objs)
177
178        data_classes = list()
179        for index, class_str in enumerate(class_to_create):
180            class_dict = classes_dict.get(class_str)
181            halogen_count = self.get_total_halogen_atoms(class_dict)
182            data_classes.append(
183                {
184                    "name": class_str,
185                    "id": class_count + index + 1,
186                    "halogensCount": halogen_count,
187                }
188            )
189
190        # data_classes = [{"name":class_str, "id":class_count+ index + 1} for index, class_str in enumerate(class_to_create)]
191
192        if data_classes:
193            list_insert_chunks = chunks(data_classes, self.sql_db.chunks_count)
194            for insert_chunk in list_insert_chunks:
195                insert_query = HeteroAtoms.__table__.insert().values(insert_chunk)
196                self.sql_db.session.execute(insert_query)
197
198        for index, class_str in enumerate(class_to_create):
199            class_tuple = (
200                class_str,
201                classes_dict.get(class_str),
202                class_count + index + 1,
203            )
204
205            all_class_to_create.append(class_tuple)
206
207        return (
208            [(c_s, c_d) for c_s, c_d in classes_dict.items()],
209            all_class_to_create,
210            existing_classes_objs,
211        )

check if the database has all the classes, if not create the missing classes

Parameters
  • molecular_search_settings (object): An object containing user-defined settings.
Returns
  • list: list of tuples with the class name and the class dictionary
def get_carbonsHydrogens(self, settings, odd_even):
213    def get_carbonsHydrogens(self, settings, odd_even):
214        """Retrieve carbon and hydrogen atoms from the molecular lookup table based on user-defined settings.
215
216        Parameters
217        ----------
218        settings : object
219             An object containing user-defined settings.
220        odd_even : str
221            A string indicating whether to retrieve even or odd hydrogen atoms.
222
223        Returns
224        -------
225        list
226            A list of CarbonHydrogen objects that satisfy the specified conditions.
227        """
228        operator = "==" if odd_even == "even" else "!="
229        usedAtoms = settings.usedAtoms
230        user_min_c, user_max_c = usedAtoms.get("C")
231        user_min_h, user_max_h = usedAtoms.get("H")
232
233        return eval(
234            "self.sql_db.session.query(CarbonHydrogen).filter("
235            "CarbonHydrogen.C >= user_min_c,"
236            "CarbonHydrogen.H >= user_min_h,"
237            "CarbonHydrogen.C <= user_max_c,"
238            "CarbonHydrogen.H <= user_max_h,"
239            "CarbonHydrogen.H % 2" + operator + "0).all()"
240        )

Retrieve carbon and hydrogen atoms from the molecular lookup table based on user-defined settings.

Parameters
  • settings (object): An object containing user-defined settings.
  • odd_even (str): A string indicating whether to retrieve even or odd hydrogen atoms.
Returns
  • list: A list of CarbonHydrogen objects that satisfy the specified conditions.
def add_carbonsHydrogens(self, settings, existing_classes_objs):
242    def add_carbonsHydrogens(self, settings, existing_classes_objs):
243        """Add carbon and hydrogen atoms to the molecular lookup table based on user-defined settings.
244
245        Parameters
246        ----------
247        settings : object
248            An object containing user-defined settings.
249        existing_classes_objs : list
250            A list of HeteroAtoms objects.
251        """
252        usedAtoms = settings.usedAtoms
253
254        user_min_c, user_max_c = usedAtoms.get("C")
255        user_min_h, user_max_h = usedAtoms.get("H")
256
257        query_obj = self.sql_db.session.query(
258            func.max(CarbonHydrogen.C).label("max_c"),
259            func.min(CarbonHydrogen.C).label("min_c"),
260            func.max(CarbonHydrogen.H).label("max_h"),
261            func.min(CarbonHydrogen.H).label("min_h"),
262        )
263
264        database = query_obj.first()
265        if (
266            database.max_c == user_max_c
267            and database.min_c == user_min_c
268            and database.max_h == user_max_h
269            and database.min_h == user_min_h
270        ):
271            # all data is already available at the database
272            pass
273
274        else:
275            current_count = self.sql_db.session.query(CarbonHydrogen.C).count()
276
277            databaseCarbonHydrogen = self.sql_db.session.query(CarbonHydrogen).all()
278
279            userCarbon = set(range(user_min_c, user_max_c + 1))
280            userHydrogen = set(range(user_min_h, user_max_h + 1))
281
282            carbon_hydrogen_objs_database = {}
283            for obj in databaseCarbonHydrogen:
284                str_data = "C:{},H:{}".format(obj.C, obj.H)
285                carbon_hydrogen_objs_database[str_data] = str_data
286
287            carbon_hydrogen_objs_to_create = {"even": {}, "odd": {}}
288
289            list_ch_obj_to_add = list()
290            i = 0
291            for comb in itertools.product(userCarbon, userHydrogen):
292                C = comb[0]
293                H = comb[1]
294                data = {
295                    "C": C,
296                    "H": H,
297                }
298
299                data_insert = {
300                    "C": C,
301                    "H": H,
302                }
303
304                str_data = "C:{},H:{}".format(C, H)
305
306                if not str_data in carbon_hydrogen_objs_database.keys():
307                    label = "even" if comb[1] % 2 == 0 else "odd"
308                    data["mass"] = (C * Atoms.atomic_masses.get("C")) + (
309                        H * Atoms.atomic_masses.get("H")
310                    )
311                    data["dbe"] = C - (H / 2) + 1
312                    data["id"] = i + current_count + 1
313                    data_insert["id"] = i + current_count + 1
314                    i = i + 1
315                    carbon_hydrogen_objs_to_create[label][str_data] = data
316
317                    list_ch_obj_to_add.append(data_insert)
318
319            if list_ch_obj_to_add:
320                # insert carbon hydrogen objs
321                list_insert_chunks = chunks(
322                    list_ch_obj_to_add, self.sql_db.chunks_count
323                )
324                for insert_chunk in list_insert_chunks:
325                    insert_query = CarbonHydrogen.__table__.insert().values(
326                        insert_chunk
327                    )
328                    self.sql_db.session.execute(insert_query)
329                self.sql_db.session.commit()
330
331                list_molecular_form = list()
332                for classe_obj in existing_classes_objs:
333                    classe_dict = classe_obj.to_dict()
334                    classe_mass = self.calc_mz(classe_dict)
335                    classe_dbe = self.calc_dbe_class(classe_dict)
336
337                    odd_even_label = self.get_h_odd_or_even(classe_dict)
338
339                    ch_datalist = carbon_hydrogen_objs_to_create.get(
340                        odd_even_label
341                    ).values()
342
343                    for ch_dict in ch_datalist:
344                        mass = ch_dict.get("mass") + classe_mass
345                        dbe = ch_dict.get("dbe") + classe_dbe
346
347                        if settings.min_mz <= mass <= settings.max_mz:
348                            if settings.min_dbe <= dbe <= settings.max_dbe:
349                                list_molecular_form.append(
350                                    {
351                                        "heteroAtoms_id": classe_obj.id,
352                                        "carbonHydrogen_id": ch_dict.get("id"),
353                                        "mass": mass,
354                                        "DBE": dbe,
355                                    }
356                                )
357
358                list_insert_chunks = chunks(
359                    list_molecular_form, self.sql_db.chunks_count
360                )
361                for insert_chunk in list_insert_chunks:
362                    insert_query = MolecularFormulaLink.__table__.insert().values(
363                        insert_chunk
364                    )
365                    self.sql_db.session.execute(insert_query)
366                self.sql_db.session.commit()

Add carbon and hydrogen atoms to the molecular lookup table based on user-defined settings.

Parameters
  • settings (object): An object containing user-defined settings.
  • existing_classes_objs (list): A list of HeteroAtoms objects.
def runworker(*args, **kw):
27        def timed(*args, **kw):
28            # Extract print_time from kwargs if provided
29            local_print_time = kw.pop('print_time', print_time)
30            ts = time.time()
31            result = method(*args, **kw)
32            te = time.time()
33            if "log_time" in kw:
34                name = kw.get("log_name", method.__name__.upper())
35                kw["log_time"][name] = int((te - ts) * 1000)
36            elif local_print_time:
37                print("%r  %2.2f ms" % (method.__name__, (te - ts) * 1000))
38            return result

Run the molecular formula lookup table worker.

Parameters
  • molecular_search_settings (object): An object containing user-defined settings.
  • kwargs (dict): A dictionary of keyword arguments. Most notably, the print_time argument which is passed to the timeit decorator.
Returns
  • list: A list of tuples with the class name and the class dictionary.
def get_classes_in_order(self, molecular_search_settings):
455    def get_classes_in_order(self, molecular_search_settings):
456        """Get the classes in order
457
458        Parameters
459        ----------
460        molecular_search_settings : object
461            An object containing user-defined settings.
462
463        Returns
464        -------
465        dict
466            A dictionary of classes in order.
467            structure is  ('HC', {'HC': 1})
468        """
469
470        usedAtoms = deepcopy(molecular_search_settings.usedAtoms)
471
472        usedAtoms.pop("C")
473        usedAtoms.pop("H")
474
475        min_n, max_n = usedAtoms.get("N") if usedAtoms.get("N") else (0, 0)
476        min_o, max_o = usedAtoms.get("O") if usedAtoms.get("O") else (0, 0)
477        min_s, max_s = usedAtoms.get("S") if usedAtoms.get("S") else (0, 0)
478        min_p, max_p = usedAtoms.get("P") if usedAtoms.get("P") else (0, 0)
479
480        possible_n = [n for n in range(min_n, max_n + 1)]
481        possible_o = [o for o in range(min_o, max_o + 1)]
482        possible_s = [s for s in range(min_s, max_s + 1)]
483        possible_p = [p for p in range(min_p, max_p + 1)]
484
485        atoms_in_order = ["N", "O", "S", "P"]
486
487        classe_in_order = {}
488
489        all_atoms_tuples = itertools.product(
490            possible_n, possible_o, possible_s, possible_p
491        )
492
493        for atom in atoms_in_order:
494            usedAtoms.pop(atom, None)
495
496        for selected_atom, min_max_tuple in usedAtoms.items():
497            min_x = min_max_tuple[0]
498            max_x = min_max_tuple[1]
499
500            possible_x = [x for x in range(min_x, max_x + 1)]
501
502            all_atoms_tuples = itertools.product(all_atoms_tuples, possible_x)
503            all_atoms_tuples = [
504                all_atoms_combined[0] + (all_atoms_combined[1],)
505                for all_atoms_combined in all_atoms_tuples
506            ]
507            atoms_in_order.append(selected_atom)
508
509        for all_atoms_tuple in all_atoms_tuples:
510            classe_str = ""
511            classe_dict = {}
512
513            for each_atoms_index, atom_number in enumerate(all_atoms_tuple):
514                if atom_number != 0:
515                    classe_dict[atoms_in_order[each_atoms_index]] = atom_number
516
517            if not classe_dict:
518                classe_in_order["HC"] = {"HC": ""}
519                continue
520
521            classe_str = json.dumps(classe_dict)
522
523            if len(classe_str) > 0:
524                classe_in_order[classe_str] = classe_dict
525
526        classe_in_order_dict = self.sort_classes(atoms_in_order, classe_in_order)
527
528        return classe_in_order_dict

Get the classes in order

Parameters
  • molecular_search_settings (object): An object containing user-defined settings.
Returns
  • dict: A dictionary of classes in order. structure is ('HC', {'HC': 1})
@staticmethod
def sort_classes(atoms_in_order, combination_dict) -> Dict[str, Dict[str, int]]:
530    @staticmethod
531    def sort_classes(atoms_in_order, combination_dict) -> Dict[str, Dict[str, int]]:
532        """Sort the classes in order
533
534        Parameters
535        ----------
536        atoms_in_order : list
537            A list of atoms in order.
538        combination_dict : dict
539            A dictionary of classes.
540
541        Returns
542        -------
543        dict
544            A dictionary of classes in order.
545        """
546        # ensures atoms are always in the order defined at atoms_in_order list
547        join_dict_classes = dict()
548        atoms_in_order = ["N", "S", "P", "O"] + atoms_in_order[4:] + ["HC"]
549
550        sort_method = lambda atoms_keys: [atoms_in_order.index(atoms_keys)]
551        for class_str, class_dict in combination_dict.items():
552            sorted_dict_keys = sorted(class_dict, key=sort_method)
553            class_dict = {atom: class_dict[atom] for atom in sorted_dict_keys}
554            class_str = json.dumps(class_dict)
555            # using json for the new database, class
556            # class_str = ' '.join([atom + str(class_dict[atom]) for atom in sorted_dict_keys])
557            join_dict_classes[class_str] = class_dict
558
559        return join_dict_classes

Sort the classes in order

Parameters
  • atoms_in_order (list): A list of atoms in order.
  • combination_dict (dict): A dictionary of classes.
Returns
  • dict: A dictionary of classes in order.
@staticmethod
def get_fixed_initial_number_of_hydrogen(min_h, odd_even):
561    @staticmethod
562    def get_fixed_initial_number_of_hydrogen(min_h, odd_even):
563        """Get the fixed initial number of hydrogen atoms
564
565        Parameters
566        ----------
567        min_h : int
568            The minimum number of hydrogen atoms.
569        odd_even : str
570            A string indicating whether to retrieve even or odd hydrogen atoms.
571        """
572        remaining_h = min_h % 2
573
574        if odd_even == "even":
575            if remaining_h == 0:
576                return remaining_h
577
578            else:
579                return remaining_h + 1
580
581        else:
582            if remaining_h == 0:
583                return remaining_h + 1
584
585            else:
586                return remaining_h

Get the fixed initial number of hydrogen atoms

Parameters
  • min_h (int): The minimum number of hydrogen atoms.
  • odd_even (str): A string indicating whether to retrieve even or odd hydrogen atoms.
def calc_mz(self, datadict, class_mass=0):
588    def calc_mz(self, datadict, class_mass=0):
589        """Calculate the mass-to-charge ratio (m/z) of a molecular formula.
590
591        Parameters
592        ----------
593        datadict : dict
594            A dictionary of classes.
595        class_mass : int
596            The mass of the class.
597
598        Returns
599        -------
600        float
601            The mass-to-charge ratio (m/z) of a molecular formula.
602        """
603        mass = class_mass
604
605        for atom in datadict.keys():
606            if atom != "HC":
607                mass = mass + Atoms.atomic_masses[atom] * datadict.get(atom)
608
609        return mass

Calculate the mass-to-charge ratio (m/z) of a molecular formula.

Parameters
  • datadict (dict): A dictionary of classes.
  • class_mass (int): The mass of the class.
Returns
  • float: The mass-to-charge ratio (m/z) of a molecular formula.
def calc_dbe_class(self, datadict):
611    def calc_dbe_class(self, datadict):
612        """Calculate the double bond equivalent (DBE) of a molecular formula.
613
614        Parameters
615        ----------
616        datadict : dict
617            A dictionary of classes.
618
619        Returns
620        -------
621        float
622            The double bond equivalent (DBE) of a molecular formula.
623        """
624        init_dbe = 0
625        for atom in datadict.keys():
626            if atom == "HC":
627                continue
628
629            n_atom = int(datadict.get(atom))
630
631            clean_atom = "".join([i for i in atom if not i.isdigit()])
632
633            valencia = MSParameters.molecular_search.used_atom_valences.get(clean_atom)
634
635            if type(valencia) is tuple:
636                valencia = valencia[0]
637            if valencia > 0:
638                # print atom, valencia, n_atom, init_dbe
639                init_dbe = init_dbe + (n_atom * (valencia - 2))
640            else:
641                continue
642
643        return 0.5 * init_dbe

Calculate the double bond equivalent (DBE) of a molecular formula.

Parameters
  • datadict (dict): A dictionary of classes.
Returns
  • float: The double bond equivalent (DBE) of a molecular formula.
def populate_combinations(self, classe_tuple, settings):
645    def populate_combinations(self, classe_tuple, settings):
646        """Populate the combinations
647
648        Parameters
649        ----------
650        classe_tuple : tuple
651            A tuple containing the class name, the class dictionary, and the class ID.
652        settings : object
653            An object containing user-defined settings.
654
655        Returns
656        -------
657        list
658            A list of molecular formula data dictionaries.
659        """
660        ion_charge = 0
661
662        class_dict = classe_tuple[1]
663        odd_or_even = self.get_h_odd_or_even(class_dict)
664
665        return self.get_mol_formulas(odd_or_even, classe_tuple, settings)

Populate the combinations

Parameters
  • classe_tuple (tuple): A tuple containing the class name, the class dictionary, and the class ID.
  • settings (object): An object containing user-defined settings.
Returns
  • list: A list of molecular formula data dictionaries.
def get_or_add(self, SomeClass, kw):
667    def get_or_add(self, SomeClass, kw):
668        """Get or add a class
669
670        Parameters
671        ----------
672        SomeClass : object
673            A class object.
674        kw : dict
675            A dictionary of classes.
676
677        Returns
678        -------
679        object
680            A class object.
681        """
682        obj = self.sql_db.session.query(SomeClass).filter_by(**kw).first()
683        if not obj:
684            obj = SomeClass(**kw)
685        return obj

Get or add a class

Parameters
  • SomeClass (object): A class object.
  • kw (dict): A dictionary of classes.
Returns
  • object: A class object.
def get_mol_formulas(self, odd_even_tag, classe_tuple, settings):
687    def get_mol_formulas(self, odd_even_tag, classe_tuple, settings):
688        """Get the molecular formulas
689
690        Parameters
691        ----------
692        odd_even_tag : str
693            A string indicating whether to retrieve even or odd hydrogen atoms.
694        classe_tuple : tuple
695
696        settings : object
697            An object containing user-defined settings.
698
699        Returns
700        -------
701        list
702            A list of molecular formula data dictionaries.
703
704        """
705        class_str = classe_tuple[0]
706        class_dict = classe_tuple[1]
707        classe_id = classe_tuple[2]
708
709        results = list()
710
711        if "HC" in class_dict:
712            del class_dict["HC"]
713
714        class_dbe = self.calc_dbe_class(class_dict)
715        class_mass = self.calc_mz(class_dict)
716
717        carbonHydrogen_mass = (
718            self.odd_ch_mass if odd_even_tag == "odd" else self.even_ch_mass
719        )
720        carbonHydrogen_dbe = (
721            self.odd_ch_dbe if odd_even_tag == "odd" else self.even_ch_dbe
722        )
723        carbonHydrogen_id = self.odd_ch_id if odd_even_tag == "odd" else self.even_ch_id
724
725        for index, carbonHydrogen_obj in enumerate(carbonHydrogen_id):
726            mass = carbonHydrogen_mass[index] + class_mass
727            dbe = carbonHydrogen_dbe[index] + class_dbe
728
729            if settings.min_mz <= mass <= settings.max_mz:
730                if settings.min_dbe <= dbe <= settings.max_dbe:
731                    molecularFormula = {
732                        "heteroAtoms_id": classe_id,
733                        "carbonHydrogen_id": carbonHydrogen_id[index],
734                        "mass": mass,
735                        "DBE": dbe,
736                    }
737
738                    results.append(molecularFormula)
739
740        return results

Get the molecular formulas

Parameters
  • odd_even_tag (str): A string indicating whether to retrieve even or odd hydrogen atoms.
  • classe_tuple (tuple):

  • settings (object): An object containing user-defined settings.

Returns
  • list: A list of molecular formula data dictionaries.
def get_h_odd_or_even(self, class_dict):
742    def get_h_odd_or_even(self, class_dict):
743        """Get the hydrogen odd or even
744
745        Parameters
746        ----------
747        class_dict : dict
748            A dictionary of classes.
749
750        Returns
751        -------
752        str
753            A string indicating whether to retrieve even or odd hydrogen atoms.
754        """
755
756        HAS_NITROGEN = "N" in class_dict.keys()
757
758        number_of_halogen = self.get_total_halogen_atoms(class_dict)
759        number_of_hetero = self.get_total_heteroatoms(class_dict)
760
761        if number_of_halogen > 0:
762            HAS_HALOGEN = True
763
764        else:
765            HAS_HALOGEN = False
766
767        if HAS_HALOGEN:
768            remaining_halogen = number_of_halogen % 2
769
770        else:
771            remaining_halogen = 0
772
773        if number_of_hetero > 0:
774            HAS_OTHER_HETERO = True
775
776            total_hetero_valence = self.get_total_hetero_valence(class_dict)
777
778        else:
779            HAS_OTHER_HETERO = False
780
781            total_hetero_valence = 0
782
783        if HAS_OTHER_HETERO:
784            remaining_hetero_valence = total_hetero_valence % 2
785
786        else:
787            remaining_hetero_valence = 0
788
789        if HAS_NITROGEN and not HAS_OTHER_HETERO:
790            number_of_n = class_dict.get("N")
791            remaining_n = number_of_n % 2
792
793        elif HAS_NITROGEN and HAS_OTHER_HETERO:
794            number_of_n = class_dict.get("N")
795            remaining_n = (number_of_n + remaining_hetero_valence) % 2
796
797        elif HAS_OTHER_HETERO and not HAS_NITROGEN:
798            remaining_n = remaining_hetero_valence
799
800        else:
801            remaining_n = -1
802
803        if remaining_n > 0.0:
804            if HAS_NITROGEN or HAS_OTHER_HETERO:
805                if HAS_HALOGEN:
806                    if remaining_halogen == 0:
807                        return "odd"
808                    else:
809                        return "even"
810
811                else:
812                    return "odd"
813
814        elif remaining_n == 0.0:
815            if HAS_NITROGEN or HAS_OTHER_HETERO:
816                if HAS_HALOGEN:
817                    if remaining_halogen == 0:
818                        return "even"
819                    else:
820                        return "odd"
821
822                else:
823                    return "even"
824
825        else:
826            if HAS_HALOGEN:
827                if remaining_halogen == 0:
828                    return "even"
829                else:
830                    return "odd"
831
832            else:
833                return "even"

Get the hydrogen odd or even

Parameters
  • class_dict (dict): A dictionary of classes.
Returns
  • str: A string indicating whether to retrieve even or odd hydrogen atoms.
@staticmethod
def get_total_heteroatoms(class_dict):
835    @staticmethod
836    def get_total_heteroatoms(class_dict):
837        """Get the total number of heteroatoms other than N, F, Cl, Br
838
839        Parameters
840        ----------
841        class_dict : dict
842            A dictionary of classes.
843
844        Returns
845        -------
846        int
847            The total number of heteroatoms.
848        """
849
850        total_number = 0
851
852        for atom in class_dict.keys():
853            if atom not in ["HC", "C", "H", "O", "N", "F", "Cl", "Br"]:
854                total_number = total_number + class_dict.get(atom)
855
856        return total_number

Get the total number of heteroatoms other than N, F, Cl, Br

Parameters
  • class_dict (dict): A dictionary of classes.
Returns
  • int: The total number of heteroatoms.
@staticmethod
def get_total_hetero_valence(class_dict):
858    @staticmethod
859    def get_total_hetero_valence(class_dict):
860        """Get the total valence of heteroatoms other than N, F, Cl, Br
861
862        Parameters
863        ----------
864        class_dict : dict
865            A dictionary of classes.
866
867        Returns
868        -------
869        int
870            The total heteroatom valence.
871        """
872        total_valence = 0
873
874        for atom in class_dict.keys():
875            if atom not in ["HC", "C", "H", "O", "N", "F", "Cl", "Br"]:
876                clean_atom = "".join([i for i in atom if not i.isdigit()])
877
878                atom_valence = MSParameters.molecular_search.used_atom_valences.get(
879                    clean_atom
880                )
881
882                if type(atom_valence) is tuple:
883                    atom_valence = atom_valence[0]
884
885                n_atom = int(class_dict.get(atom))
886
887                n_atom_valence = atom_valence * n_atom
888
889                total_valence = total_valence + n_atom_valence
890
891        return total_valence

Get the total valence of heteroatoms other than N, F, Cl, Br

Parameters
  • class_dict (dict): A dictionary of classes.
Returns
  • int: The total heteroatom valence.
@staticmethod
def get_total_halogen_atoms(class_dict):
893    @staticmethod
894    def get_total_halogen_atoms(class_dict):
895        """Get the total number of halogen atoms
896
897        Parameters
898        ----------
899        class_dict : dict
900            A dictionary of classes.
901
902        Returns
903        -------
904        int
905            The total number of halogen atoms.
906        """
907        atoms = ["F", "Cl", "Br"]
908
909        total_number = 0
910
911        for atom in atoms:
912            if atom in class_dict.keys():
913                total_number = total_number + class_dict.get(atom)
914
915        return total_number

Get the total number of halogen atoms

Parameters
  • class_dict (dict): A dictionary of classes.
Returns
  • int: The total number of halogen atoms.