corems.molecular_id.factory.MolecularLookupTable

  1__author__ = "Yuri E. Corilo"
  2__date__ = "Jul 02, 2019"
  3
  4import contextlib
  5import cProfile
  6import io
  7import itertools
  8import json
  9import multiprocessing
 10import pstats
 11from copy import deepcopy
 12from typing import Dict
 13
 14from sqlalchemy import create_engine, func
 15from sqlalchemy.orm import sessionmaker
 16from tqdm import tqdm
 17
 18from corems import chunks, timeit
 19from corems.encapsulation.constant import Atoms
 20from corems.encapsulation.factory.parameters import MSParameters
 21from corems.encapsulation.factory.processingSetting import MolecularLookupDictSettings
 22from corems.molecular_id.factory.molecularSQL import (
 23    CarbonHydrogen,
 24    HeteroAtoms,
 25    MolecularFormulaLink,
 26    MolForm_SQL,
 27)
 28
 29
 30@contextlib.contextmanager
 31def profiled():
 32    """A context manager for profiling."""
 33    pr = cProfile.Profile()
 34    pr.enable()
 35    yield
 36    pr.disable()
 37    s = io.StringIO()
 38    ps = pstats.Stats(pr, stream=s).sort_stats("cumulative")
 39    ps.print_stats()
 40    # uncomment this to see who's calling what
 41    # ps.print_callers()
 42    print(s.getvalue())
 43
 44
 45def insert_database_worker(args):
 46    """Inserts data into the database."""
 47    results, url = args
 48
 49    if not url:
 50        url = "sqlite:///db/molformulas.sqlite"
 51
 52    if url[0:6] == "sqlite":
 53        engine = create_engine(url, echo=False)
 54    else:
 55        engine = create_engine(url, echo=False, isolation_level="AUTOCOMMIT")
 56
 57    session_factory = sessionmaker(bind=engine)
 58    session = session_factory()
 59    insert_query = MolecularFormulaLink.__table__.insert().values(results)
 60    session.execute(insert_query)
 61    session.commit()
 62    session.close()
 63    engine.dispose()
 64
 65
 66class MolecularCombinations:
 67    """A class for generating molecular formula combinations.
 68
 69    Parameters
 70    ----------
 71    molecular_search_settings : object
 72        An object containing user-defined settings.
 73
 74    Attributes
 75    ----------
 76    sql_db : MolForm_SQL
 77        The SQLite database object.
 78    len_existing_classes : int
 79        The number of existing classes in the SQLite database.
 80    odd_ch_id : list
 81        A list of odd carbon and hydrogen atom IDs.
 82    odd_ch_dict : list
 83        A list of odd carbon and hydrogen atom dictionaries.
 84    odd_ch_mass : list
 85        A list of odd carbon and hydrogen atom masses.
 86    odd_ch_dbe : list
 87        A list of odd carbon and hydrogen atom double bond equivalents.
 88    even_ch_id : list
 89        A list of even carbon and hydrogen atom IDs.
 90    even_ch_dict : list
 91        A list of even carbon and hydrogen atom dictionaries.
 92    even_ch_mass : list
 93        A list of even carbon and hydrogen atom masses.
 94    even_ch_dbe : list
 95        A list of even carbon and hydrogen atom double bond equivalents.
 96
 97    Methods
 98    -------
 99    * cProfile_worker(args)
100        A cProfile worker for the get_mol_formulas function.
101    * check_database_get_class_list(molecular_search_settings)
102        Checks if the database has all the classes, if not create the missing classes.
103    * get_carbonsHydrogens(settings, odd_even)
104        Retrieves carbon and hydrogen atoms from the molecular lookup table based on user-defined settings.
105    * add_carbonsHydrogens(settings, existing_classes_objs)
106        Adds carbon and hydrogen atoms to the molecular lookup table based on user-defined settings.
107    * runworker(molecular_search_settings)
108        Runs the molecular formula lookup table worker.
109    * get_classes_in_order(molecular_search_settings)
110        Gets the classes in order.
111    * sort_classes(atoms_in_order, combination_dict)
112        Sorts the classes in order.
113    * get_fixed_initial_number_of_hydrogen(min_h, odd_even)
114        Gets the fixed initial number of hydrogen atoms.
115    * calc_mz(datadict, class_mass=0)
116        Calculates the mass-to-charge ratio (m/z) of a molecular formula.
117    * calc_dbe_class(datadict)
118        Calculates the double bond equivalent (DBE) of a molecular formula.
119    * populate_combinations(classe_tuple, settings)
120        Populates the combinations.
121    * get_or_add(SomeClass, kw)
122        Gets or adds a class.
123    * get_mol_formulas(odd_even_tag, classe_tuple, settings)
124        Gets the molecular formulas.
125    * get_h_odd_or_even(class_dict)
126        Gets the hydrogen odd or even.
127    * get_total_halogen_atoms(class_dict)
128        Gets the total number of halogen atoms.
129    * get_total_hetero_valence(class_dict)
130        Gets the total valence of heteroatoms other than N, F, Cl, and Br
131    """
132
133    def __init__(self, sql_db=None):
134        if not sql_db:
135            self.sql_db = MolForm_SQL()
136        else:
137            self.sql_db = sql_db
138
139    def cProfile_worker(self, args):
140        """cProfile worker for the get_mol_formulas function"""
141        cProfile.runctx(
142            "self.get_mol_formulas(*args)",
143            globals(),
144            locals(),
145            "mf_database_cprofile.prof",
146        )
147
148    def check_database_get_class_list(self, molecular_search_settings):
149        """check if the database has all the classes, if not create the missing classes
150
151        Parameters
152        ----------
153        molecular_search_settings : object
154            An object containing user-defined settings.
155
156        Returns
157        -------
158        list
159            list of tuples with the class name and the class dictionary
160        """
161        all_class_to_create = []
162
163        classes_dict = self.get_classes_in_order(molecular_search_settings)
164
165        class_str_set = set(classes_dict.keys())
166
167        existing_classes_objs = self.sql_db.session.query(HeteroAtoms).distinct().all()
168
169        existing_classes_str = set([classe.name for classe in existing_classes_objs])
170
171        self.len_existing_classes = len(existing_classes_str)
172
173        class_to_create = class_str_set - existing_classes_str
174
175        class_count = len(existing_classes_objs)
176
177        data_classes = list()
178        for index, class_str in enumerate(class_to_create):
179            class_dict = classes_dict.get(class_str)
180            halogen_count = self.get_total_halogen_atoms(class_dict)
181            data_classes.append(
182                {
183                    "name": class_str,
184                    "id": class_count + index + 1,
185                    "halogensCount": halogen_count,
186                }
187            )
188
189        # data_classes = [{"name":class_str, "id":class_count+ index + 1} for index, class_str in enumerate(class_to_create)]
190
191        if data_classes:
192            list_insert_chunks = chunks(data_classes, self.sql_db.chunks_count)
193            for insert_chunk in list_insert_chunks:
194                insert_query = HeteroAtoms.__table__.insert().values(insert_chunk)
195                self.sql_db.session.execute(insert_query)
196
197        for index, class_str in enumerate(class_to_create):
198            class_tuple = (
199                class_str,
200                classes_dict.get(class_str),
201                class_count + index + 1,
202            )
203
204            all_class_to_create.append(class_tuple)
205
206        return (
207            [(c_s, c_d) for c_s, c_d in classes_dict.items()],
208            all_class_to_create,
209            existing_classes_objs,
210        )
211
212    def get_carbonsHydrogens(self, settings, odd_even):
213        """Retrieve carbon and hydrogen atoms from the molecular lookup table based on user-defined settings.
214
215        Parameters
216        ----------
217        settings : object
218             An object containing user-defined settings.
219        odd_even : str
220            A string indicating whether to retrieve even or odd hydrogen atoms.
221
222        Returns
223        -------
224        list
225            A list of CarbonHydrogen objects that satisfy the specified conditions.
226        """
227        operator = "==" if odd_even == "even" else "!="
228        usedAtoms = settings.usedAtoms
229        user_min_c, user_max_c = usedAtoms.get("C")
230        user_min_h, user_max_h = usedAtoms.get("H")
231
232        return eval(
233            "self.sql_db.session.query(CarbonHydrogen).filter("
234            "CarbonHydrogen.C >= user_min_c,"
235            "CarbonHydrogen.H >= user_min_h,"
236            "CarbonHydrogen.C <= user_max_c,"
237            "CarbonHydrogen.H <= user_max_h,"
238            "CarbonHydrogen.H % 2" + operator + "0).all()"
239        )
240
241    def add_carbonsHydrogens(self, settings, existing_classes_objs):
242        """Add carbon and hydrogen atoms to the molecular lookup table based on user-defined settings.
243
244        Parameters
245        ----------
246        settings : object
247            An object containing user-defined settings.
248        existing_classes_objs : list
249            A list of HeteroAtoms objects.
250        """
251        usedAtoms = settings.usedAtoms
252
253        user_min_c, user_max_c = usedAtoms.get("C")
254        user_min_h, user_max_h = usedAtoms.get("H")
255
256        query_obj = self.sql_db.session.query(
257            func.max(CarbonHydrogen.C).label("max_c"),
258            func.min(CarbonHydrogen.C).label("min_c"),
259            func.max(CarbonHydrogen.H).label("max_h"),
260            func.min(CarbonHydrogen.H).label("min_h"),
261        )
262
263        database = query_obj.first()
264        if (
265            database.max_c == user_max_c
266            and database.min_c == user_min_c
267            and database.max_h == user_max_h
268            and database.min_h == user_min_h
269        ):
270            # all data is already available at the database
271            pass
272
273        else:
274            current_count = self.sql_db.session.query(CarbonHydrogen.C).count()
275
276            databaseCarbonHydrogen = self.sql_db.session.query(CarbonHydrogen).all()
277
278            userCarbon = set(range(user_min_c, user_max_c + 1))
279            userHydrogen = set(range(user_min_h, user_max_h + 1))
280
281            carbon_hydrogen_objs_database = {}
282            for obj in databaseCarbonHydrogen:
283                str_data = "C:{},H:{}".format(obj.C, obj.H)
284                carbon_hydrogen_objs_database[str_data] = str_data
285
286            carbon_hydrogen_objs_to_create = {"even": {}, "odd": {}}
287
288            list_ch_obj_to_add = list()
289            i = 0
290            for comb in itertools.product(userCarbon, userHydrogen):
291                C = comb[0]
292                H = comb[1]
293                data = {
294                    "C": C,
295                    "H": H,
296                }
297
298                data_insert = {
299                    "C": C,
300                    "H": H,
301                }
302
303                str_data = "C:{},H:{}".format(C, H)
304
305                if not str_data in carbon_hydrogen_objs_database.keys():
306                    label = "even" if comb[1] % 2 == 0 else "odd"
307                    data["mass"] = (C * Atoms.atomic_masses.get("C")) + (
308                        H * Atoms.atomic_masses.get("H")
309                    )
310                    data["dbe"] = C - (H / 2) + 1
311                    data["id"] = i + current_count + 1
312                    data_insert["id"] = i + current_count + 1
313                    i = i + 1
314                    carbon_hydrogen_objs_to_create[label][str_data] = data
315
316                    list_ch_obj_to_add.append(data_insert)
317
318            if list_ch_obj_to_add:
319                # insert carbon hydrogen objs
320                list_insert_chunks = chunks(
321                    list_ch_obj_to_add, self.sql_db.chunks_count
322                )
323                for insert_chunk in list_insert_chunks:
324                    insert_query = CarbonHydrogen.__table__.insert().values(
325                        insert_chunk
326                    )
327                    self.sql_db.session.execute(insert_query)
328                self.sql_db.session.commit()
329
330                list_molecular_form = list()
331                for classe_obj in existing_classes_objs:
332                    classe_dict = classe_obj.to_dict()
333                    classe_mass = self.calc_mz(classe_dict)
334                    classe_dbe = self.calc_dbe_class(classe_dict)
335
336                    odd_even_label = self.get_h_odd_or_even(classe_dict)
337
338                    ch_datalist = carbon_hydrogen_objs_to_create.get(
339                        odd_even_label
340                    ).values()
341
342                    for ch_dict in ch_datalist:
343                        mass = ch_dict.get("mass") + classe_mass
344                        dbe = ch_dict.get("dbe") + classe_dbe
345
346                        if settings.min_mz <= mass <= settings.max_mz:
347                            if settings.min_dbe <= dbe <= settings.max_dbe:
348                                list_molecular_form.append(
349                                    {
350                                        "heteroAtoms_id": classe_obj.id,
351                                        "carbonHydrogen_id": ch_dict.get("id"),
352                                        "mass": mass,
353                                        "DBE": dbe,
354                                    }
355                                )
356
357                list_insert_chunks = chunks(
358                    list_molecular_form, self.sql_db.chunks_count
359                )
360                for insert_chunk in list_insert_chunks:
361                    insert_query = MolecularFormulaLink.__table__.insert().values(
362                        insert_chunk
363                    )
364                    self.sql_db.session.execute(insert_query)
365                self.sql_db.session.commit()
366
367    @timeit(print_time=True)
368    def runworker(self, molecular_search_settings, **kwargs):
369        """Run the molecular formula lookup table worker.
370
371        Parameters
372        ----------
373        molecular_search_settings : object
374            An object containing user-defined settings.
375        kwargs : dict
376            A dictionary of keyword arguments.
377            Most notably, the print_time argument which is passed to the timeit decorator.
378
379        Returns
380        -------
381        list
382            A list of tuples with the class name and the class dictionary.
383
384
385        """
386        verbose = molecular_search_settings.verbose_processing
387
388        classes_list, class_to_create, existing_classes_objs = (
389            self.check_database_get_class_list(molecular_search_settings)
390        )
391
392        settings = MolecularLookupDictSettings()
393        settings.usedAtoms = deepcopy(molecular_search_settings.usedAtoms)
394        settings.url_database = molecular_search_settings.url_database
395        settings.db_jobs = molecular_search_settings.db_jobs
396
397        self.add_carbonsHydrogens(settings, existing_classes_objs)
398
399        if class_to_create:
400            settings = MolecularLookupDictSettings()
401            settings.usedAtoms = deepcopy(molecular_search_settings.usedAtoms)
402            settings.url_database = molecular_search_settings.url_database
403            settings.db_jobs = molecular_search_settings.db_jobs
404
405            self.sql_db.session.commit()
406            odd_ch_obj = self.get_carbonsHydrogens(settings, "odd")
407            self.odd_ch_id = [obj.id for obj in odd_ch_obj]
408            self.odd_ch_dict = [{"C": obj.C, "H": obj.H} for obj in odd_ch_obj]
409            self.odd_ch_mass = [obj.mass for obj in odd_ch_obj]
410            self.odd_ch_dbe = [obj.dbe for obj in odd_ch_obj]
411
412            even_ch_obj = self.get_carbonsHydrogens(settings, "even")
413            self.even_ch_id = [obj.id for obj in even_ch_obj]
414            self.even_ch_dict = [{"C": obj.C, "H": obj.H} for obj in even_ch_obj]
415            self.even_ch_mass = [obj.mass for obj in even_ch_obj]
416            self.even_ch_dbe = [obj.dbe for obj in even_ch_obj]
417
418            all_results = list()
419            for class_tuple in tqdm(class_to_create, disable = not verbose):
420                results = self.populate_combinations(class_tuple, settings)
421                all_results.extend(results)
422                if settings.db_jobs == 1:
423                    # if len(all_results) >= self.sql_db.chunks_count:
424                    list_insert_chunks = list(chunks(results, self.sql_db.chunks_count))
425                    for chunk in list_insert_chunks:
426                        insert_query = MolecularFormulaLink.__table__.insert().values(
427                            chunk
428                        )
429                        self.sql_db.session.execute(insert_query)
430                    # all_results = list()
431            self.sql_db.session.commit()
432            # each chunk takes ~600Mb of memory, so if using 8 processes the total free memory needs to be 5GB
433            if settings.db_jobs > 1:
434                list_insert_chunks = list(chunks(all_results, self.sql_db.chunks_count))
435                print(
436                    "Started database insert using {} iterations for a total of {} rows".format(
437                        len(list_insert_chunks), len(all_results)
438                    )
439                )
440                worker_args = [
441                    (chunk, settings.url_database) for chunk in list_insert_chunks
442                ]
443                p = multiprocessing.Pool(settings.db_jobs)
444                for class_list in tqdm(
445                        p.imap_unordered(insert_database_worker, worker_args), disable= not verbose
446                        ):
447                    pass
448                p.close()
449                p.join()
450
451        return classes_list
452
453    def get_classes_in_order(self, molecular_search_settings):
454        """Get the classes in order
455
456        Parameters
457        ----------
458        molecular_search_settings : object
459            An object containing user-defined settings.
460
461        Returns
462        -------
463        dict
464            A dictionary of classes in order.
465            structure is  ('HC', {'HC': 1})
466        """
467
468        usedAtoms = deepcopy(molecular_search_settings.usedAtoms)
469
470        usedAtoms.pop("C")
471        usedAtoms.pop("H")
472
473        min_n, max_n = usedAtoms.get("N") if usedAtoms.get("N") else (0, 0)
474        min_o, max_o = usedAtoms.get("O") if usedAtoms.get("O") else (0, 0)
475        min_s, max_s = usedAtoms.get("S") if usedAtoms.get("S") else (0, 0)
476        min_p, max_p = usedAtoms.get("P") if usedAtoms.get("P") else (0, 0)
477
478        possible_n = [n for n in range(min_n, max_n + 1)]
479        possible_o = [o for o in range(min_o, max_o + 1)]
480        possible_s = [s for s in range(min_s, max_s + 1)]
481        possible_p = [p for p in range(min_p, max_p + 1)]
482
483        atoms_in_order = ["N", "O", "S", "P"]
484
485        classe_in_order = {}
486
487        all_atoms_tuples = itertools.product(
488            possible_n, possible_o, possible_s, possible_p
489        )
490
491        for atom in atoms_in_order:
492            usedAtoms.pop(atom, None)
493
494        for selected_atom, min_max_tuple in usedAtoms.items():
495            min_x = min_max_tuple[0]
496            max_x = min_max_tuple[1]
497
498            possible_x = [x for x in range(min_x, max_x + 1)]
499
500            all_atoms_tuples = itertools.product(all_atoms_tuples, possible_x)
501            all_atoms_tuples = [
502                all_atoms_combined[0] + (all_atoms_combined[1],)
503                for all_atoms_combined in all_atoms_tuples
504            ]
505            atoms_in_order.append(selected_atom)
506
507        for all_atoms_tuple in all_atoms_tuples:
508            classe_str = ""
509            classe_dict = {}
510
511            for each_atoms_index, atom_number in enumerate(all_atoms_tuple):
512                if atom_number != 0:
513                    classe_dict[atoms_in_order[each_atoms_index]] = atom_number
514
515            if not classe_dict:
516                classe_in_order["HC"] = {"HC": ""}
517                continue
518
519            classe_str = json.dumps(classe_dict)
520
521            if len(classe_str) > 0:
522                classe_in_order[classe_str] = classe_dict
523
524        classe_in_order_dict = self.sort_classes(atoms_in_order, classe_in_order)
525
526        return classe_in_order_dict
527
528    @staticmethod
529    def sort_classes(atoms_in_order, combination_dict) -> Dict[str, Dict[str, int]]:
530        """Sort the classes in order
531
532        Parameters
533        ----------
534        atoms_in_order : list
535            A list of atoms in order.
536        combination_dict : dict
537            A dictionary of classes.
538
539        Returns
540        -------
541        dict
542            A dictionary of classes in order.
543        """
544        # ensures atoms are always in the order defined at atoms_in_order list
545        join_dict_classes = dict()
546        atoms_in_order = ["N", "S", "P", "O"] + atoms_in_order[4:] + ["HC"]
547
548        sort_method = lambda atoms_keys: [atoms_in_order.index(atoms_keys)]
549        for class_str, class_dict in combination_dict.items():
550            sorted_dict_keys = sorted(class_dict, key=sort_method)
551            class_dict = {atom: class_dict[atom] for atom in sorted_dict_keys}
552            class_str = json.dumps(class_dict)
553            # using json for the new database, class
554            # class_str = ' '.join([atom + str(class_dict[atom]) for atom in sorted_dict_keys])
555            join_dict_classes[class_str] = class_dict
556
557        return join_dict_classes
558
559    @staticmethod
560    def get_fixed_initial_number_of_hydrogen(min_h, odd_even):
561        """Get the fixed initial number of hydrogen atoms
562
563        Parameters
564        ----------
565        min_h : int
566            The minimum number of hydrogen atoms.
567        odd_even : str
568            A string indicating whether to retrieve even or odd hydrogen atoms.
569        """
570        remaining_h = min_h % 2
571
572        if odd_even == "even":
573            if remaining_h == 0:
574                return remaining_h
575
576            else:
577                return remaining_h + 1
578
579        else:
580            if remaining_h == 0:
581                return remaining_h + 1
582
583            else:
584                return remaining_h
585
586    def calc_mz(self, datadict, class_mass=0):
587        """Calculate the mass-to-charge ratio (m/z) of a molecular formula.
588
589        Parameters
590        ----------
591        datadict : dict
592            A dictionary of classes.
593        class_mass : int
594            The mass of the class.
595
596        Returns
597        -------
598        float
599            The mass-to-charge ratio (m/z) of a molecular formula.
600        """
601        mass = class_mass
602
603        for atom in datadict.keys():
604            if atom != "HC":
605                mass = mass + Atoms.atomic_masses[atom] * datadict.get(atom)
606
607        return mass
608
609    def calc_dbe_class(self, datadict):
610        """Calculate the double bond equivalent (DBE) of a molecular formula.
611
612        Parameters
613        ----------
614        datadict : dict
615            A dictionary of classes.
616
617        Returns
618        -------
619        float
620            The double bond equivalent (DBE) of a molecular formula.
621        """
622        init_dbe = 0
623        for atom in datadict.keys():
624            if atom == "HC":
625                continue
626
627            n_atom = int(datadict.get(atom))
628
629            clean_atom = "".join([i for i in atom if not i.isdigit()])
630
631            valencia = MSParameters.molecular_search.used_atom_valences.get(clean_atom)
632
633            if type(valencia) is tuple:
634                valencia = valencia[0]
635            if valencia > 0:
636                # print atom, valencia, n_atom, init_dbe
637                init_dbe = init_dbe + (n_atom * (valencia - 2))
638            else:
639                continue
640
641        return 0.5 * init_dbe
642
643    def populate_combinations(self, classe_tuple, settings):
644        """Populate the combinations
645
646        Parameters
647        ----------
648        classe_tuple : tuple
649            A tuple containing the class name, the class dictionary, and the class ID.
650        settings : object
651            An object containing user-defined settings.
652
653        Returns
654        -------
655        list
656            A list of molecular formula data dictionaries.
657        """
658        ion_charge = 0
659
660        class_dict = classe_tuple[1]
661        odd_or_even = self.get_h_odd_or_even(class_dict)
662
663        return self.get_mol_formulas(odd_or_even, classe_tuple, settings)
664
665    def get_or_add(self, SomeClass, kw):
666        """Get or add a class
667
668        Parameters
669        ----------
670        SomeClass : object
671            A class object.
672        kw : dict
673            A dictionary of classes.
674
675        Returns
676        -------
677        object
678            A class object.
679        """
680        obj = self.sql_db.session.query(SomeClass).filter_by(**kw).first()
681        if not obj:
682            obj = SomeClass(**kw)
683        return obj
684
685    def get_mol_formulas(self, odd_even_tag, classe_tuple, settings):
686        """Get the molecular formulas
687
688        Parameters
689        ----------
690        odd_even_tag : str
691            A string indicating whether to retrieve even or odd hydrogen atoms.
692        classe_tuple : tuple
693
694        settings : object
695            An object containing user-defined settings.
696
697        Returns
698        -------
699        list
700            A list of molecular formula data dictionaries.
701
702        """
703        class_str = classe_tuple[0]
704        class_dict = classe_tuple[1]
705        classe_id = classe_tuple[2]
706
707        results = list()
708
709        if "HC" in class_dict:
710            del class_dict["HC"]
711
712        class_dbe = self.calc_dbe_class(class_dict)
713        class_mass = self.calc_mz(class_dict)
714
715        carbonHydrogen_mass = (
716            self.odd_ch_mass if odd_even_tag == "odd" else self.even_ch_mass
717        )
718        carbonHydrogen_dbe = (
719            self.odd_ch_dbe if odd_even_tag == "odd" else self.even_ch_dbe
720        )
721        carbonHydrogen_id = self.odd_ch_id if odd_even_tag == "odd" else self.even_ch_id
722
723        for index, carbonHydrogen_obj in enumerate(carbonHydrogen_id):
724            mass = carbonHydrogen_mass[index] + class_mass
725            dbe = carbonHydrogen_dbe[index] + class_dbe
726
727            if settings.min_mz <= mass <= settings.max_mz:
728                if settings.min_dbe <= dbe <= settings.max_dbe:
729                    molecularFormula = {
730                        "heteroAtoms_id": classe_id,
731                        "carbonHydrogen_id": carbonHydrogen_id[index],
732                        "mass": mass,
733                        "DBE": dbe,
734                    }
735
736                    results.append(molecularFormula)
737
738        return results
739
740    def get_h_odd_or_even(self, class_dict):
741        """Get the hydrogen odd or even
742
743        Parameters
744        ----------
745        class_dict : dict
746            A dictionary of classes.
747
748        Returns
749        -------
750        str
751            A string indicating whether to retrieve even or odd hydrogen atoms.
752        """
753
754        HAS_NITROGEN = "N" in class_dict.keys()
755
756        number_of_halogen = self.get_total_halogen_atoms(class_dict)
757        number_of_hetero = self.get_total_heteroatoms(class_dict)
758
759        if number_of_halogen > 0:
760            HAS_HALOGEN = True
761
762        else:
763            HAS_HALOGEN = False
764
765        if HAS_HALOGEN:
766            remaining_halogen = number_of_halogen % 2
767
768        else:
769            remaining_halogen = 0
770
771        if number_of_hetero > 0:
772            HAS_OTHER_HETERO = True
773
774            total_hetero_valence = self.get_total_hetero_valence(class_dict)
775
776        else:
777            HAS_OTHER_HETERO = False
778
779            total_hetero_valence = 0
780
781        if HAS_OTHER_HETERO:
782            remaining_hetero_valence = total_hetero_valence % 2
783
784        else:
785            remaining_hetero_valence = 0
786
787        if HAS_NITROGEN and not HAS_OTHER_HETERO:
788            number_of_n = class_dict.get("N")
789            remaining_n = number_of_n % 2
790
791        elif HAS_NITROGEN and HAS_OTHER_HETERO:
792            number_of_n = class_dict.get("N")
793            remaining_n = (number_of_n + remaining_hetero_valence) % 2
794
795        elif HAS_OTHER_HETERO and not HAS_NITROGEN:
796            remaining_n = remaining_hetero_valence
797
798        else:
799            remaining_n = -1
800
801        if remaining_n > 0.0:
802            if HAS_NITROGEN or HAS_OTHER_HETERO:
803                if HAS_HALOGEN:
804                    if remaining_halogen == 0:
805                        return "odd"
806                    else:
807                        return "even"
808
809                else:
810                    return "odd"
811
812        elif remaining_n == 0.0:
813            if HAS_NITROGEN or HAS_OTHER_HETERO:
814                if HAS_HALOGEN:
815                    if remaining_halogen == 0:
816                        return "even"
817                    else:
818                        return "odd"
819
820                else:
821                    return "even"
822
823        else:
824            if HAS_HALOGEN:
825                if remaining_halogen == 0:
826                    return "even"
827                else:
828                    return "odd"
829
830            else:
831                return "even"
832
833    @staticmethod
834    def get_total_heteroatoms(class_dict):
835        """Get the total number of heteroatoms other than N, F, Cl, Br
836
837        Parameters
838        ----------
839        class_dict : dict
840            A dictionary of classes.
841
842        Returns
843        -------
844        int
845            The total number of heteroatoms.
846        """
847
848        total_number = 0
849
850        for atom in class_dict.keys():
851            if atom not in ["HC", "C", "H", "O", "N", "F", "Cl", "Br"]:
852                total_number = total_number + class_dict.get(atom)
853
854        return total_number
855
856    @staticmethod
857    def get_total_hetero_valence(class_dict):
858        """Get the total valence of heteroatoms other than N, F, Cl, Br
859
860        Parameters
861        ----------
862        class_dict : dict
863            A dictionary of classes.
864
865        Returns
866        -------
867        int
868            The total heteroatom valence.
869        """
870        total_valence = 0
871
872        for atom in class_dict.keys():
873            if atom not in ["HC", "C", "H", "O", "N", "F", "Cl", "Br"]:
874                clean_atom = "".join([i for i in atom if not i.isdigit()])
875
876                atom_valence = MSParameters.molecular_search.used_atom_valences.get(
877                    clean_atom
878                )
879
880                if type(atom_valence) is tuple:
881                    atom_valence = atom_valence[0]
882
883                n_atom = int(class_dict.get(atom))
884
885                n_atom_valence = atom_valence * n_atom
886
887                total_valence = total_valence + n_atom_valence
888
889        return total_valence
890
891    @staticmethod
892    def get_total_halogen_atoms(class_dict):
893        """Get the total number of halogen atoms
894
895        Parameters
896        ----------
897        class_dict : dict
898            A dictionary of classes.
899
900        Returns
901        -------
902        int
903            The total number of halogen atoms.
904        """
905        atoms = ["F", "Cl", "Br"]
906
907        total_number = 0
908
909        for atom in atoms:
910            if atom in class_dict.keys():
911                total_number = total_number + class_dict.get(atom)
912
913        return total_number
@contextlib.contextmanager
def profiled():
31@contextlib.contextmanager
32def profiled():
33    """A context manager for profiling."""
34    pr = cProfile.Profile()
35    pr.enable()
36    yield
37    pr.disable()
38    s = io.StringIO()
39    ps = pstats.Stats(pr, stream=s).sort_stats("cumulative")
40    ps.print_stats()
41    # uncomment this to see who's calling what
42    # ps.print_callers()
43    print(s.getvalue())

A context manager for profiling.

def insert_database_worker(args):
46def insert_database_worker(args):
47    """Inserts data into the database."""
48    results, url = args
49
50    if not url:
51        url = "sqlite:///db/molformulas.sqlite"
52
53    if url[0:6] == "sqlite":
54        engine = create_engine(url, echo=False)
55    else:
56        engine = create_engine(url, echo=False, isolation_level="AUTOCOMMIT")
57
58    session_factory = sessionmaker(bind=engine)
59    session = session_factory()
60    insert_query = MolecularFormulaLink.__table__.insert().values(results)
61    session.execute(insert_query)
62    session.commit()
63    session.close()
64    engine.dispose()

Inserts data into the database.

class MolecularCombinations:
 67class MolecularCombinations:
 68    """A class for generating molecular formula combinations.
 69
 70    Parameters
 71    ----------
 72    molecular_search_settings : object
 73        An object containing user-defined settings.
 74
 75    Attributes
 76    ----------
 77    sql_db : MolForm_SQL
 78        The SQLite database object.
 79    len_existing_classes : int
 80        The number of existing classes in the SQLite database.
 81    odd_ch_id : list
 82        A list of odd carbon and hydrogen atom IDs.
 83    odd_ch_dict : list
 84        A list of odd carbon and hydrogen atom dictionaries.
 85    odd_ch_mass : list
 86        A list of odd carbon and hydrogen atom masses.
 87    odd_ch_dbe : list
 88        A list of odd carbon and hydrogen atom double bond equivalents.
 89    even_ch_id : list
 90        A list of even carbon and hydrogen atom IDs.
 91    even_ch_dict : list
 92        A list of even carbon and hydrogen atom dictionaries.
 93    even_ch_mass : list
 94        A list of even carbon and hydrogen atom masses.
 95    even_ch_dbe : list
 96        A list of even carbon and hydrogen atom double bond equivalents.
 97
 98    Methods
 99    -------
100    * cProfile_worker(args)
101        A cProfile worker for the get_mol_formulas function.
102    * check_database_get_class_list(molecular_search_settings)
103        Checks if the database has all the classes, if not create the missing classes.
104    * get_carbonsHydrogens(settings, odd_even)
105        Retrieves carbon and hydrogen atoms from the molecular lookup table based on user-defined settings.
106    * add_carbonsHydrogens(settings, existing_classes_objs)
107        Adds carbon and hydrogen atoms to the molecular lookup table based on user-defined settings.
108    * runworker(molecular_search_settings)
109        Runs the molecular formula lookup table worker.
110    * get_classes_in_order(molecular_search_settings)
111        Gets the classes in order.
112    * sort_classes(atoms_in_order, combination_dict)
113        Sorts the classes in order.
114    * get_fixed_initial_number_of_hydrogen(min_h, odd_even)
115        Gets the fixed initial number of hydrogen atoms.
116    * calc_mz(datadict, class_mass=0)
117        Calculates the mass-to-charge ratio (m/z) of a molecular formula.
118    * calc_dbe_class(datadict)
119        Calculates the double bond equivalent (DBE) of a molecular formula.
120    * populate_combinations(classe_tuple, settings)
121        Populates the combinations.
122    * get_or_add(SomeClass, kw)
123        Gets or adds a class.
124    * get_mol_formulas(odd_even_tag, classe_tuple, settings)
125        Gets the molecular formulas.
126    * get_h_odd_or_even(class_dict)
127        Gets the hydrogen odd or even.
128    * get_total_halogen_atoms(class_dict)
129        Gets the total number of halogen atoms.
130    * get_total_hetero_valence(class_dict)
131        Gets the total valence of heteroatoms other than N, F, Cl, and Br
132    """
133
134    def __init__(self, sql_db=None):
135        if not sql_db:
136            self.sql_db = MolForm_SQL()
137        else:
138            self.sql_db = sql_db
139
140    def cProfile_worker(self, args):
141        """cProfile worker for the get_mol_formulas function"""
142        cProfile.runctx(
143            "self.get_mol_formulas(*args)",
144            globals(),
145            locals(),
146            "mf_database_cprofile.prof",
147        )
148
149    def check_database_get_class_list(self, molecular_search_settings):
150        """check if the database has all the classes, if not create the missing classes
151
152        Parameters
153        ----------
154        molecular_search_settings : object
155            An object containing user-defined settings.
156
157        Returns
158        -------
159        list
160            list of tuples with the class name and the class dictionary
161        """
162        all_class_to_create = []
163
164        classes_dict = self.get_classes_in_order(molecular_search_settings)
165
166        class_str_set = set(classes_dict.keys())
167
168        existing_classes_objs = self.sql_db.session.query(HeteroAtoms).distinct().all()
169
170        existing_classes_str = set([classe.name for classe in existing_classes_objs])
171
172        self.len_existing_classes = len(existing_classes_str)
173
174        class_to_create = class_str_set - existing_classes_str
175
176        class_count = len(existing_classes_objs)
177
178        data_classes = list()
179        for index, class_str in enumerate(class_to_create):
180            class_dict = classes_dict.get(class_str)
181            halogen_count = self.get_total_halogen_atoms(class_dict)
182            data_classes.append(
183                {
184                    "name": class_str,
185                    "id": class_count + index + 1,
186                    "halogensCount": halogen_count,
187                }
188            )
189
190        # data_classes = [{"name":class_str, "id":class_count+ index + 1} for index, class_str in enumerate(class_to_create)]
191
192        if data_classes:
193            list_insert_chunks = chunks(data_classes, self.sql_db.chunks_count)
194            for insert_chunk in list_insert_chunks:
195                insert_query = HeteroAtoms.__table__.insert().values(insert_chunk)
196                self.sql_db.session.execute(insert_query)
197
198        for index, class_str in enumerate(class_to_create):
199            class_tuple = (
200                class_str,
201                classes_dict.get(class_str),
202                class_count + index + 1,
203            )
204
205            all_class_to_create.append(class_tuple)
206
207        return (
208            [(c_s, c_d) for c_s, c_d in classes_dict.items()],
209            all_class_to_create,
210            existing_classes_objs,
211        )
212
213    def get_carbonsHydrogens(self, settings, odd_even):
214        """Retrieve carbon and hydrogen atoms from the molecular lookup table based on user-defined settings.
215
216        Parameters
217        ----------
218        settings : object
219             An object containing user-defined settings.
220        odd_even : str
221            A string indicating whether to retrieve even or odd hydrogen atoms.
222
223        Returns
224        -------
225        list
226            A list of CarbonHydrogen objects that satisfy the specified conditions.
227        """
228        operator = "==" if odd_even == "even" else "!="
229        usedAtoms = settings.usedAtoms
230        user_min_c, user_max_c = usedAtoms.get("C")
231        user_min_h, user_max_h = usedAtoms.get("H")
232
233        return eval(
234            "self.sql_db.session.query(CarbonHydrogen).filter("
235            "CarbonHydrogen.C >= user_min_c,"
236            "CarbonHydrogen.H >= user_min_h,"
237            "CarbonHydrogen.C <= user_max_c,"
238            "CarbonHydrogen.H <= user_max_h,"
239            "CarbonHydrogen.H % 2" + operator + "0).all()"
240        )
241
242    def add_carbonsHydrogens(self, settings, existing_classes_objs):
243        """Add carbon and hydrogen atoms to the molecular lookup table based on user-defined settings.
244
245        Parameters
246        ----------
247        settings : object
248            An object containing user-defined settings.
249        existing_classes_objs : list
250            A list of HeteroAtoms objects.
251        """
252        usedAtoms = settings.usedAtoms
253
254        user_min_c, user_max_c = usedAtoms.get("C")
255        user_min_h, user_max_h = usedAtoms.get("H")
256
257        query_obj = self.sql_db.session.query(
258            func.max(CarbonHydrogen.C).label("max_c"),
259            func.min(CarbonHydrogen.C).label("min_c"),
260            func.max(CarbonHydrogen.H).label("max_h"),
261            func.min(CarbonHydrogen.H).label("min_h"),
262        )
263
264        database = query_obj.first()
265        if (
266            database.max_c == user_max_c
267            and database.min_c == user_min_c
268            and database.max_h == user_max_h
269            and database.min_h == user_min_h
270        ):
271            # all data is already available at the database
272            pass
273
274        else:
275            current_count = self.sql_db.session.query(CarbonHydrogen.C).count()
276
277            databaseCarbonHydrogen = self.sql_db.session.query(CarbonHydrogen).all()
278
279            userCarbon = set(range(user_min_c, user_max_c + 1))
280            userHydrogen = set(range(user_min_h, user_max_h + 1))
281
282            carbon_hydrogen_objs_database = {}
283            for obj in databaseCarbonHydrogen:
284                str_data = "C:{},H:{}".format(obj.C, obj.H)
285                carbon_hydrogen_objs_database[str_data] = str_data
286
287            carbon_hydrogen_objs_to_create = {"even": {}, "odd": {}}
288
289            list_ch_obj_to_add = list()
290            i = 0
291            for comb in itertools.product(userCarbon, userHydrogen):
292                C = comb[0]
293                H = comb[1]
294                data = {
295                    "C": C,
296                    "H": H,
297                }
298
299                data_insert = {
300                    "C": C,
301                    "H": H,
302                }
303
304                str_data = "C:{},H:{}".format(C, H)
305
306                if not str_data in carbon_hydrogen_objs_database.keys():
307                    label = "even" if comb[1] % 2 == 0 else "odd"
308                    data["mass"] = (C * Atoms.atomic_masses.get("C")) + (
309                        H * Atoms.atomic_masses.get("H")
310                    )
311                    data["dbe"] = C - (H / 2) + 1
312                    data["id"] = i + current_count + 1
313                    data_insert["id"] = i + current_count + 1
314                    i = i + 1
315                    carbon_hydrogen_objs_to_create[label][str_data] = data
316
317                    list_ch_obj_to_add.append(data_insert)
318
319            if list_ch_obj_to_add:
320                # insert carbon hydrogen objs
321                list_insert_chunks = chunks(
322                    list_ch_obj_to_add, self.sql_db.chunks_count
323                )
324                for insert_chunk in list_insert_chunks:
325                    insert_query = CarbonHydrogen.__table__.insert().values(
326                        insert_chunk
327                    )
328                    self.sql_db.session.execute(insert_query)
329                self.sql_db.session.commit()
330
331                list_molecular_form = list()
332                for classe_obj in existing_classes_objs:
333                    classe_dict = classe_obj.to_dict()
334                    classe_mass = self.calc_mz(classe_dict)
335                    classe_dbe = self.calc_dbe_class(classe_dict)
336
337                    odd_even_label = self.get_h_odd_or_even(classe_dict)
338
339                    ch_datalist = carbon_hydrogen_objs_to_create.get(
340                        odd_even_label
341                    ).values()
342
343                    for ch_dict in ch_datalist:
344                        mass = ch_dict.get("mass") + classe_mass
345                        dbe = ch_dict.get("dbe") + classe_dbe
346
347                        if settings.min_mz <= mass <= settings.max_mz:
348                            if settings.min_dbe <= dbe <= settings.max_dbe:
349                                list_molecular_form.append(
350                                    {
351                                        "heteroAtoms_id": classe_obj.id,
352                                        "carbonHydrogen_id": ch_dict.get("id"),
353                                        "mass": mass,
354                                        "DBE": dbe,
355                                    }
356                                )
357
358                list_insert_chunks = chunks(
359                    list_molecular_form, self.sql_db.chunks_count
360                )
361                for insert_chunk in list_insert_chunks:
362                    insert_query = MolecularFormulaLink.__table__.insert().values(
363                        insert_chunk
364                    )
365                    self.sql_db.session.execute(insert_query)
366                self.sql_db.session.commit()
367
368    @timeit(print_time=True)
369    def runworker(self, molecular_search_settings, **kwargs):
370        """Run the molecular formula lookup table worker.
371
372        Parameters
373        ----------
374        molecular_search_settings : object
375            An object containing user-defined settings.
376        kwargs : dict
377            A dictionary of keyword arguments.
378            Most notably, the print_time argument which is passed to the timeit decorator.
379
380        Returns
381        -------
382        list
383            A list of tuples with the class name and the class dictionary.
384
385
386        """
387        verbose = molecular_search_settings.verbose_processing
388
389        classes_list, class_to_create, existing_classes_objs = (
390            self.check_database_get_class_list(molecular_search_settings)
391        )
392
393        settings = MolecularLookupDictSettings()
394        settings.usedAtoms = deepcopy(molecular_search_settings.usedAtoms)
395        settings.url_database = molecular_search_settings.url_database
396        settings.db_jobs = molecular_search_settings.db_jobs
397
398        self.add_carbonsHydrogens(settings, existing_classes_objs)
399
400        if class_to_create:
401            settings = MolecularLookupDictSettings()
402            settings.usedAtoms = deepcopy(molecular_search_settings.usedAtoms)
403            settings.url_database = molecular_search_settings.url_database
404            settings.db_jobs = molecular_search_settings.db_jobs
405
406            self.sql_db.session.commit()
407            odd_ch_obj = self.get_carbonsHydrogens(settings, "odd")
408            self.odd_ch_id = [obj.id for obj in odd_ch_obj]
409            self.odd_ch_dict = [{"C": obj.C, "H": obj.H} for obj in odd_ch_obj]
410            self.odd_ch_mass = [obj.mass for obj in odd_ch_obj]
411            self.odd_ch_dbe = [obj.dbe for obj in odd_ch_obj]
412
413            even_ch_obj = self.get_carbonsHydrogens(settings, "even")
414            self.even_ch_id = [obj.id for obj in even_ch_obj]
415            self.even_ch_dict = [{"C": obj.C, "H": obj.H} for obj in even_ch_obj]
416            self.even_ch_mass = [obj.mass for obj in even_ch_obj]
417            self.even_ch_dbe = [obj.dbe for obj in even_ch_obj]
418
419            all_results = list()
420            for class_tuple in tqdm(class_to_create, disable = not verbose):
421                results = self.populate_combinations(class_tuple, settings)
422                all_results.extend(results)
423                if settings.db_jobs == 1:
424                    # if len(all_results) >= self.sql_db.chunks_count:
425                    list_insert_chunks = list(chunks(results, self.sql_db.chunks_count))
426                    for chunk in list_insert_chunks:
427                        insert_query = MolecularFormulaLink.__table__.insert().values(
428                            chunk
429                        )
430                        self.sql_db.session.execute(insert_query)
431                    # all_results = list()
432            self.sql_db.session.commit()
433            # each chunk takes ~600Mb of memory, so if using 8 processes the total free memory needs to be 5GB
434            if settings.db_jobs > 1:
435                list_insert_chunks = list(chunks(all_results, self.sql_db.chunks_count))
436                print(
437                    "Started database insert using {} iterations for a total of {} rows".format(
438                        len(list_insert_chunks), len(all_results)
439                    )
440                )
441                worker_args = [
442                    (chunk, settings.url_database) for chunk in list_insert_chunks
443                ]
444                p = multiprocessing.Pool(settings.db_jobs)
445                for class_list in tqdm(
446                        p.imap_unordered(insert_database_worker, worker_args), disable= not verbose
447                        ):
448                    pass
449                p.close()
450                p.join()
451
452        return classes_list
453
454    def get_classes_in_order(self, molecular_search_settings):
455        """Get the classes in order
456
457        Parameters
458        ----------
459        molecular_search_settings : object
460            An object containing user-defined settings.
461
462        Returns
463        -------
464        dict
465            A dictionary of classes in order.
466            structure is  ('HC', {'HC': 1})
467        """
468
469        usedAtoms = deepcopy(molecular_search_settings.usedAtoms)
470
471        usedAtoms.pop("C")
472        usedAtoms.pop("H")
473
474        min_n, max_n = usedAtoms.get("N") if usedAtoms.get("N") else (0, 0)
475        min_o, max_o = usedAtoms.get("O") if usedAtoms.get("O") else (0, 0)
476        min_s, max_s = usedAtoms.get("S") if usedAtoms.get("S") else (0, 0)
477        min_p, max_p = usedAtoms.get("P") if usedAtoms.get("P") else (0, 0)
478
479        possible_n = [n for n in range(min_n, max_n + 1)]
480        possible_o = [o for o in range(min_o, max_o + 1)]
481        possible_s = [s for s in range(min_s, max_s + 1)]
482        possible_p = [p for p in range(min_p, max_p + 1)]
483
484        atoms_in_order = ["N", "O", "S", "P"]
485
486        classe_in_order = {}
487
488        all_atoms_tuples = itertools.product(
489            possible_n, possible_o, possible_s, possible_p
490        )
491
492        for atom in atoms_in_order:
493            usedAtoms.pop(atom, None)
494
495        for selected_atom, min_max_tuple in usedAtoms.items():
496            min_x = min_max_tuple[0]
497            max_x = min_max_tuple[1]
498
499            possible_x = [x for x in range(min_x, max_x + 1)]
500
501            all_atoms_tuples = itertools.product(all_atoms_tuples, possible_x)
502            all_atoms_tuples = [
503                all_atoms_combined[0] + (all_atoms_combined[1],)
504                for all_atoms_combined in all_atoms_tuples
505            ]
506            atoms_in_order.append(selected_atom)
507
508        for all_atoms_tuple in all_atoms_tuples:
509            classe_str = ""
510            classe_dict = {}
511
512            for each_atoms_index, atom_number in enumerate(all_atoms_tuple):
513                if atom_number != 0:
514                    classe_dict[atoms_in_order[each_atoms_index]] = atom_number
515
516            if not classe_dict:
517                classe_in_order["HC"] = {"HC": ""}
518                continue
519
520            classe_str = json.dumps(classe_dict)
521
522            if len(classe_str) > 0:
523                classe_in_order[classe_str] = classe_dict
524
525        classe_in_order_dict = self.sort_classes(atoms_in_order, classe_in_order)
526
527        return classe_in_order_dict
528
529    @staticmethod
530    def sort_classes(atoms_in_order, combination_dict) -> Dict[str, Dict[str, int]]:
531        """Sort the classes in order
532
533        Parameters
534        ----------
535        atoms_in_order : list
536            A list of atoms in order.
537        combination_dict : dict
538            A dictionary of classes.
539
540        Returns
541        -------
542        dict
543            A dictionary of classes in order.
544        """
545        # ensures atoms are always in the order defined at atoms_in_order list
546        join_dict_classes = dict()
547        atoms_in_order = ["N", "S", "P", "O"] + atoms_in_order[4:] + ["HC"]
548
549        sort_method = lambda atoms_keys: [atoms_in_order.index(atoms_keys)]
550        for class_str, class_dict in combination_dict.items():
551            sorted_dict_keys = sorted(class_dict, key=sort_method)
552            class_dict = {atom: class_dict[atom] for atom in sorted_dict_keys}
553            class_str = json.dumps(class_dict)
554            # using json for the new database, class
555            # class_str = ' '.join([atom + str(class_dict[atom]) for atom in sorted_dict_keys])
556            join_dict_classes[class_str] = class_dict
557
558        return join_dict_classes
559
560    @staticmethod
561    def get_fixed_initial_number_of_hydrogen(min_h, odd_even):
562        """Get the fixed initial number of hydrogen atoms
563
564        Parameters
565        ----------
566        min_h : int
567            The minimum number of hydrogen atoms.
568        odd_even : str
569            A string indicating whether to retrieve even or odd hydrogen atoms.
570        """
571        remaining_h = min_h % 2
572
573        if odd_even == "even":
574            if remaining_h == 0:
575                return remaining_h
576
577            else:
578                return remaining_h + 1
579
580        else:
581            if remaining_h == 0:
582                return remaining_h + 1
583
584            else:
585                return remaining_h
586
587    def calc_mz(self, datadict, class_mass=0):
588        """Calculate the mass-to-charge ratio (m/z) of a molecular formula.
589
590        Parameters
591        ----------
592        datadict : dict
593            A dictionary of classes.
594        class_mass : int
595            The mass of the class.
596
597        Returns
598        -------
599        float
600            The mass-to-charge ratio (m/z) of a molecular formula.
601        """
602        mass = class_mass
603
604        for atom in datadict.keys():
605            if atom != "HC":
606                mass = mass + Atoms.atomic_masses[atom] * datadict.get(atom)
607
608        return mass
609
610    def calc_dbe_class(self, datadict):
611        """Calculate the double bond equivalent (DBE) of a molecular formula.
612
613        Parameters
614        ----------
615        datadict : dict
616            A dictionary of classes.
617
618        Returns
619        -------
620        float
621            The double bond equivalent (DBE) of a molecular formula.
622        """
623        init_dbe = 0
624        for atom in datadict.keys():
625            if atom == "HC":
626                continue
627
628            n_atom = int(datadict.get(atom))
629
630            clean_atom = "".join([i for i in atom if not i.isdigit()])
631
632            valencia = MSParameters.molecular_search.used_atom_valences.get(clean_atom)
633
634            if type(valencia) is tuple:
635                valencia = valencia[0]
636            if valencia > 0:
637                # print atom, valencia, n_atom, init_dbe
638                init_dbe = init_dbe + (n_atom * (valencia - 2))
639            else:
640                continue
641
642        return 0.5 * init_dbe
643
644    def populate_combinations(self, classe_tuple, settings):
645        """Populate the combinations
646
647        Parameters
648        ----------
649        classe_tuple : tuple
650            A tuple containing the class name, the class dictionary, and the class ID.
651        settings : object
652            An object containing user-defined settings.
653
654        Returns
655        -------
656        list
657            A list of molecular formula data dictionaries.
658        """
659        ion_charge = 0
660
661        class_dict = classe_tuple[1]
662        odd_or_even = self.get_h_odd_or_even(class_dict)
663
664        return self.get_mol_formulas(odd_or_even, classe_tuple, settings)
665
666    def get_or_add(self, SomeClass, kw):
667        """Get or add a class
668
669        Parameters
670        ----------
671        SomeClass : object
672            A class object.
673        kw : dict
674            A dictionary of classes.
675
676        Returns
677        -------
678        object
679            A class object.
680        """
681        obj = self.sql_db.session.query(SomeClass).filter_by(**kw).first()
682        if not obj:
683            obj = SomeClass(**kw)
684        return obj
685
686    def get_mol_formulas(self, odd_even_tag, classe_tuple, settings):
687        """Get the molecular formulas
688
689        Parameters
690        ----------
691        odd_even_tag : str
692            A string indicating whether to retrieve even or odd hydrogen atoms.
693        classe_tuple : tuple
694
695        settings : object
696            An object containing user-defined settings.
697
698        Returns
699        -------
700        list
701            A list of molecular formula data dictionaries.
702
703        """
704        class_str = classe_tuple[0]
705        class_dict = classe_tuple[1]
706        classe_id = classe_tuple[2]
707
708        results = list()
709
710        if "HC" in class_dict:
711            del class_dict["HC"]
712
713        class_dbe = self.calc_dbe_class(class_dict)
714        class_mass = self.calc_mz(class_dict)
715
716        carbonHydrogen_mass = (
717            self.odd_ch_mass if odd_even_tag == "odd" else self.even_ch_mass
718        )
719        carbonHydrogen_dbe = (
720            self.odd_ch_dbe if odd_even_tag == "odd" else self.even_ch_dbe
721        )
722        carbonHydrogen_id = self.odd_ch_id if odd_even_tag == "odd" else self.even_ch_id
723
724        for index, carbonHydrogen_obj in enumerate(carbonHydrogen_id):
725            mass = carbonHydrogen_mass[index] + class_mass
726            dbe = carbonHydrogen_dbe[index] + class_dbe
727
728            if settings.min_mz <= mass <= settings.max_mz:
729                if settings.min_dbe <= dbe <= settings.max_dbe:
730                    molecularFormula = {
731                        "heteroAtoms_id": classe_id,
732                        "carbonHydrogen_id": carbonHydrogen_id[index],
733                        "mass": mass,
734                        "DBE": dbe,
735                    }
736
737                    results.append(molecularFormula)
738
739        return results
740
741    def get_h_odd_or_even(self, class_dict):
742        """Get the hydrogen odd or even
743
744        Parameters
745        ----------
746        class_dict : dict
747            A dictionary of classes.
748
749        Returns
750        -------
751        str
752            A string indicating whether to retrieve even or odd hydrogen atoms.
753        """
754
755        HAS_NITROGEN = "N" in class_dict.keys()
756
757        number_of_halogen = self.get_total_halogen_atoms(class_dict)
758        number_of_hetero = self.get_total_heteroatoms(class_dict)
759
760        if number_of_halogen > 0:
761            HAS_HALOGEN = True
762
763        else:
764            HAS_HALOGEN = False
765
766        if HAS_HALOGEN:
767            remaining_halogen = number_of_halogen % 2
768
769        else:
770            remaining_halogen = 0
771
772        if number_of_hetero > 0:
773            HAS_OTHER_HETERO = True
774
775            total_hetero_valence = self.get_total_hetero_valence(class_dict)
776
777        else:
778            HAS_OTHER_HETERO = False
779
780            total_hetero_valence = 0
781
782        if HAS_OTHER_HETERO:
783            remaining_hetero_valence = total_hetero_valence % 2
784
785        else:
786            remaining_hetero_valence = 0
787
788        if HAS_NITROGEN and not HAS_OTHER_HETERO:
789            number_of_n = class_dict.get("N")
790            remaining_n = number_of_n % 2
791
792        elif HAS_NITROGEN and HAS_OTHER_HETERO:
793            number_of_n = class_dict.get("N")
794            remaining_n = (number_of_n + remaining_hetero_valence) % 2
795
796        elif HAS_OTHER_HETERO and not HAS_NITROGEN:
797            remaining_n = remaining_hetero_valence
798
799        else:
800            remaining_n = -1
801
802        if remaining_n > 0.0:
803            if HAS_NITROGEN or HAS_OTHER_HETERO:
804                if HAS_HALOGEN:
805                    if remaining_halogen == 0:
806                        return "odd"
807                    else:
808                        return "even"
809
810                else:
811                    return "odd"
812
813        elif remaining_n == 0.0:
814            if HAS_NITROGEN or HAS_OTHER_HETERO:
815                if HAS_HALOGEN:
816                    if remaining_halogen == 0:
817                        return "even"
818                    else:
819                        return "odd"
820
821                else:
822                    return "even"
823
824        else:
825            if HAS_HALOGEN:
826                if remaining_halogen == 0:
827                    return "even"
828                else:
829                    return "odd"
830
831            else:
832                return "even"
833
834    @staticmethod
835    def get_total_heteroatoms(class_dict):
836        """Get the total number of heteroatoms other than N, F, Cl, Br
837
838        Parameters
839        ----------
840        class_dict : dict
841            A dictionary of classes.
842
843        Returns
844        -------
845        int
846            The total number of heteroatoms.
847        """
848
849        total_number = 0
850
851        for atom in class_dict.keys():
852            if atom not in ["HC", "C", "H", "O", "N", "F", "Cl", "Br"]:
853                total_number = total_number + class_dict.get(atom)
854
855        return total_number
856
857    @staticmethod
858    def get_total_hetero_valence(class_dict):
859        """Get the total valence of heteroatoms other than N, F, Cl, Br
860
861        Parameters
862        ----------
863        class_dict : dict
864            A dictionary of classes.
865
866        Returns
867        -------
868        int
869            The total heteroatom valence.
870        """
871        total_valence = 0
872
873        for atom in class_dict.keys():
874            if atom not in ["HC", "C", "H", "O", "N", "F", "Cl", "Br"]:
875                clean_atom = "".join([i for i in atom if not i.isdigit()])
876
877                atom_valence = MSParameters.molecular_search.used_atom_valences.get(
878                    clean_atom
879                )
880
881                if type(atom_valence) is tuple:
882                    atom_valence = atom_valence[0]
883
884                n_atom = int(class_dict.get(atom))
885
886                n_atom_valence = atom_valence * n_atom
887
888                total_valence = total_valence + n_atom_valence
889
890        return total_valence
891
892    @staticmethod
893    def get_total_halogen_atoms(class_dict):
894        """Get the total number of halogen atoms
895
896        Parameters
897        ----------
898        class_dict : dict
899            A dictionary of classes.
900
901        Returns
902        -------
903        int
904            The total number of halogen atoms.
905        """
906        atoms = ["F", "Cl", "Br"]
907
908        total_number = 0
909
910        for atom in atoms:
911            if atom in class_dict.keys():
912                total_number = total_number + class_dict.get(atom)
913
914        return total_number

A class for generating molecular formula combinations.

Parameters
  • molecular_search_settings (object): An object containing user-defined settings.
Attributes
  • sql_db (MolForm_SQL): The SQLite database object.
  • len_existing_classes (int): The number of existing classes in the SQLite database.
  • odd_ch_id (list): A list of odd carbon and hydrogen atom IDs.
  • odd_ch_dict (list): A list of odd carbon and hydrogen atom dictionaries.
  • odd_ch_mass (list): A list of odd carbon and hydrogen atom masses.
  • odd_ch_dbe (list): A list of odd carbon and hydrogen atom double bond equivalents.
  • even_ch_id (list): A list of even carbon and hydrogen atom IDs.
  • even_ch_dict (list): A list of even carbon and hydrogen atom dictionaries.
  • even_ch_mass (list): A list of even carbon and hydrogen atom masses.
  • even_ch_dbe (list): A list of even carbon and hydrogen atom double bond equivalents.
Methods
  • cProfile_worker(args) A cProfile worker for the get_mol_formulas function.
  • check_database_get_class_list(molecular_search_settings) Checks if the database has all the classes, if not create the missing classes.
  • get_carbonsHydrogens(settings, odd_even) Retrieves carbon and hydrogen atoms from the molecular lookup table based on user-defined settings.
  • add_carbonsHydrogens(settings, existing_classes_objs) Adds carbon and hydrogen atoms to the molecular lookup table based on user-defined settings.
  • runworker(molecular_search_settings) Runs the molecular formula lookup table worker.
  • get_classes_in_order(molecular_search_settings) Gets the classes in order.
  • sort_classes(atoms_in_order, combination_dict) Sorts the classes in order.
  • get_fixed_initial_number_of_hydrogen(min_h, odd_even) Gets the fixed initial number of hydrogen atoms.
  • calc_mz(datadict, class_mass=0) Calculates the mass-to-charge ratio (m/z) of a molecular formula.
  • calc_dbe_class(datadict) Calculates the double bond equivalent (DBE) of a molecular formula.
  • populate_combinations(classe_tuple, settings) Populates the combinations.
  • get_or_add(SomeClass, kw) Gets or adds a class.
  • get_mol_formulas(odd_even_tag, classe_tuple, settings) Gets the molecular formulas.
  • get_h_odd_or_even(class_dict) Gets the hydrogen odd or even.
  • get_total_halogen_atoms(class_dict) Gets the total number of halogen atoms.
  • get_total_hetero_valence(class_dict) Gets the total valence of heteroatoms other than N, F, Cl, and Br
MolecularCombinations(sql_db=None)
134    def __init__(self, sql_db=None):
135        if not sql_db:
136            self.sql_db = MolForm_SQL()
137        else:
138            self.sql_db = sql_db
def cProfile_worker(self, args):
140    def cProfile_worker(self, args):
141        """cProfile worker for the get_mol_formulas function"""
142        cProfile.runctx(
143            "self.get_mol_formulas(*args)",
144            globals(),
145            locals(),
146            "mf_database_cprofile.prof",
147        )

cProfile worker for the get_mol_formulas function

def check_database_get_class_list(self, molecular_search_settings):
149    def check_database_get_class_list(self, molecular_search_settings):
150        """check if the database has all the classes, if not create the missing classes
151
152        Parameters
153        ----------
154        molecular_search_settings : object
155            An object containing user-defined settings.
156
157        Returns
158        -------
159        list
160            list of tuples with the class name and the class dictionary
161        """
162        all_class_to_create = []
163
164        classes_dict = self.get_classes_in_order(molecular_search_settings)
165
166        class_str_set = set(classes_dict.keys())
167
168        existing_classes_objs = self.sql_db.session.query(HeteroAtoms).distinct().all()
169
170        existing_classes_str = set([classe.name for classe in existing_classes_objs])
171
172        self.len_existing_classes = len(existing_classes_str)
173
174        class_to_create = class_str_set - existing_classes_str
175
176        class_count = len(existing_classes_objs)
177
178        data_classes = list()
179        for index, class_str in enumerate(class_to_create):
180            class_dict = classes_dict.get(class_str)
181            halogen_count = self.get_total_halogen_atoms(class_dict)
182            data_classes.append(
183                {
184                    "name": class_str,
185                    "id": class_count + index + 1,
186                    "halogensCount": halogen_count,
187                }
188            )
189
190        # data_classes = [{"name":class_str, "id":class_count+ index + 1} for index, class_str in enumerate(class_to_create)]
191
192        if data_classes:
193            list_insert_chunks = chunks(data_classes, self.sql_db.chunks_count)
194            for insert_chunk in list_insert_chunks:
195                insert_query = HeteroAtoms.__table__.insert().values(insert_chunk)
196                self.sql_db.session.execute(insert_query)
197
198        for index, class_str in enumerate(class_to_create):
199            class_tuple = (
200                class_str,
201                classes_dict.get(class_str),
202                class_count + index + 1,
203            )
204
205            all_class_to_create.append(class_tuple)
206
207        return (
208            [(c_s, c_d) for c_s, c_d in classes_dict.items()],
209            all_class_to_create,
210            existing_classes_objs,
211        )

check if the database has all the classes, if not create the missing classes

Parameters
  • molecular_search_settings (object): An object containing user-defined settings.
Returns
  • list: list of tuples with the class name and the class dictionary
def get_carbonsHydrogens(self, settings, odd_even):
213    def get_carbonsHydrogens(self, settings, odd_even):
214        """Retrieve carbon and hydrogen atoms from the molecular lookup table based on user-defined settings.
215
216        Parameters
217        ----------
218        settings : object
219             An object containing user-defined settings.
220        odd_even : str
221            A string indicating whether to retrieve even or odd hydrogen atoms.
222
223        Returns
224        -------
225        list
226            A list of CarbonHydrogen objects that satisfy the specified conditions.
227        """
228        operator = "==" if odd_even == "even" else "!="
229        usedAtoms = settings.usedAtoms
230        user_min_c, user_max_c = usedAtoms.get("C")
231        user_min_h, user_max_h = usedAtoms.get("H")
232
233        return eval(
234            "self.sql_db.session.query(CarbonHydrogen).filter("
235            "CarbonHydrogen.C >= user_min_c,"
236            "CarbonHydrogen.H >= user_min_h,"
237            "CarbonHydrogen.C <= user_max_c,"
238            "CarbonHydrogen.H <= user_max_h,"
239            "CarbonHydrogen.H % 2" + operator + "0).all()"
240        )

Retrieve carbon and hydrogen atoms from the molecular lookup table based on user-defined settings.

Parameters
  • settings (object): An object containing user-defined settings.
  • odd_even (str): A string indicating whether to retrieve even or odd hydrogen atoms.
Returns
  • list: A list of CarbonHydrogen objects that satisfy the specified conditions.
def add_carbonsHydrogens(self, settings, existing_classes_objs):
242    def add_carbonsHydrogens(self, settings, existing_classes_objs):
243        """Add carbon and hydrogen atoms to the molecular lookup table based on user-defined settings.
244
245        Parameters
246        ----------
247        settings : object
248            An object containing user-defined settings.
249        existing_classes_objs : list
250            A list of HeteroAtoms objects.
251        """
252        usedAtoms = settings.usedAtoms
253
254        user_min_c, user_max_c = usedAtoms.get("C")
255        user_min_h, user_max_h = usedAtoms.get("H")
256
257        query_obj = self.sql_db.session.query(
258            func.max(CarbonHydrogen.C).label("max_c"),
259            func.min(CarbonHydrogen.C).label("min_c"),
260            func.max(CarbonHydrogen.H).label("max_h"),
261            func.min(CarbonHydrogen.H).label("min_h"),
262        )
263
264        database = query_obj.first()
265        if (
266            database.max_c == user_max_c
267            and database.min_c == user_min_c
268            and database.max_h == user_max_h
269            and database.min_h == user_min_h
270        ):
271            # all data is already available at the database
272            pass
273
274        else:
275            current_count = self.sql_db.session.query(CarbonHydrogen.C).count()
276
277            databaseCarbonHydrogen = self.sql_db.session.query(CarbonHydrogen).all()
278
279            userCarbon = set(range(user_min_c, user_max_c + 1))
280            userHydrogen = set(range(user_min_h, user_max_h + 1))
281
282            carbon_hydrogen_objs_database = {}
283            for obj in databaseCarbonHydrogen:
284                str_data = "C:{},H:{}".format(obj.C, obj.H)
285                carbon_hydrogen_objs_database[str_data] = str_data
286
287            carbon_hydrogen_objs_to_create = {"even": {}, "odd": {}}
288
289            list_ch_obj_to_add = list()
290            i = 0
291            for comb in itertools.product(userCarbon, userHydrogen):
292                C = comb[0]
293                H = comb[1]
294                data = {
295                    "C": C,
296                    "H": H,
297                }
298
299                data_insert = {
300                    "C": C,
301                    "H": H,
302                }
303
304                str_data = "C:{},H:{}".format(C, H)
305
306                if not str_data in carbon_hydrogen_objs_database.keys():
307                    label = "even" if comb[1] % 2 == 0 else "odd"
308                    data["mass"] = (C * Atoms.atomic_masses.get("C")) + (
309                        H * Atoms.atomic_masses.get("H")
310                    )
311                    data["dbe"] = C - (H / 2) + 1
312                    data["id"] = i + current_count + 1
313                    data_insert["id"] = i + current_count + 1
314                    i = i + 1
315                    carbon_hydrogen_objs_to_create[label][str_data] = data
316
317                    list_ch_obj_to_add.append(data_insert)
318
319            if list_ch_obj_to_add:
320                # insert carbon hydrogen objs
321                list_insert_chunks = chunks(
322                    list_ch_obj_to_add, self.sql_db.chunks_count
323                )
324                for insert_chunk in list_insert_chunks:
325                    insert_query = CarbonHydrogen.__table__.insert().values(
326                        insert_chunk
327                    )
328                    self.sql_db.session.execute(insert_query)
329                self.sql_db.session.commit()
330
331                list_molecular_form = list()
332                for classe_obj in existing_classes_objs:
333                    classe_dict = classe_obj.to_dict()
334                    classe_mass = self.calc_mz(classe_dict)
335                    classe_dbe = self.calc_dbe_class(classe_dict)
336
337                    odd_even_label = self.get_h_odd_or_even(classe_dict)
338
339                    ch_datalist = carbon_hydrogen_objs_to_create.get(
340                        odd_even_label
341                    ).values()
342
343                    for ch_dict in ch_datalist:
344                        mass = ch_dict.get("mass") + classe_mass
345                        dbe = ch_dict.get("dbe") + classe_dbe
346
347                        if settings.min_mz <= mass <= settings.max_mz:
348                            if settings.min_dbe <= dbe <= settings.max_dbe:
349                                list_molecular_form.append(
350                                    {
351                                        "heteroAtoms_id": classe_obj.id,
352                                        "carbonHydrogen_id": ch_dict.get("id"),
353                                        "mass": mass,
354                                        "DBE": dbe,
355                                    }
356                                )
357
358                list_insert_chunks = chunks(
359                    list_molecular_form, self.sql_db.chunks_count
360                )
361                for insert_chunk in list_insert_chunks:
362                    insert_query = MolecularFormulaLink.__table__.insert().values(
363                        insert_chunk
364                    )
365                    self.sql_db.session.execute(insert_query)
366                self.sql_db.session.commit()

Add carbon and hydrogen atoms to the molecular lookup table based on user-defined settings.

Parameters
  • settings (object): An object containing user-defined settings.
  • existing_classes_objs (list): A list of HeteroAtoms objects.
def runworker(*args, **kw):
27        def timed(*args, **kw):
28            # Extract print_time from kwargs if provided
29            local_print_time = kw.pop('print_time', print_time)
30            ts = time.time()
31            result = method(*args, **kw)
32            te = time.time()
33            if "log_time" in kw:
34                name = kw.get("log_name", method.__name__.upper())
35                kw["log_time"][name] = int((te - ts) * 1000)
36            elif local_print_time:
37                print("%r  %2.2f ms" % (method.__name__, (te - ts) * 1000))
38            return result

Run the molecular formula lookup table worker.

Parameters
  • molecular_search_settings (object): An object containing user-defined settings.
  • kwargs (dict): A dictionary of keyword arguments. Most notably, the print_time argument which is passed to the timeit decorator.
Returns
  • list: A list of tuples with the class name and the class dictionary.
def get_classes_in_order(self, molecular_search_settings):
454    def get_classes_in_order(self, molecular_search_settings):
455        """Get the classes in order
456
457        Parameters
458        ----------
459        molecular_search_settings : object
460            An object containing user-defined settings.
461
462        Returns
463        -------
464        dict
465            A dictionary of classes in order.
466            structure is  ('HC', {'HC': 1})
467        """
468
469        usedAtoms = deepcopy(molecular_search_settings.usedAtoms)
470
471        usedAtoms.pop("C")
472        usedAtoms.pop("H")
473
474        min_n, max_n = usedAtoms.get("N") if usedAtoms.get("N") else (0, 0)
475        min_o, max_o = usedAtoms.get("O") if usedAtoms.get("O") else (0, 0)
476        min_s, max_s = usedAtoms.get("S") if usedAtoms.get("S") else (0, 0)
477        min_p, max_p = usedAtoms.get("P") if usedAtoms.get("P") else (0, 0)
478
479        possible_n = [n for n in range(min_n, max_n + 1)]
480        possible_o = [o for o in range(min_o, max_o + 1)]
481        possible_s = [s for s in range(min_s, max_s + 1)]
482        possible_p = [p for p in range(min_p, max_p + 1)]
483
484        atoms_in_order = ["N", "O", "S", "P"]
485
486        classe_in_order = {}
487
488        all_atoms_tuples = itertools.product(
489            possible_n, possible_o, possible_s, possible_p
490        )
491
492        for atom in atoms_in_order:
493            usedAtoms.pop(atom, None)
494
495        for selected_atom, min_max_tuple in usedAtoms.items():
496            min_x = min_max_tuple[0]
497            max_x = min_max_tuple[1]
498
499            possible_x = [x for x in range(min_x, max_x + 1)]
500
501            all_atoms_tuples = itertools.product(all_atoms_tuples, possible_x)
502            all_atoms_tuples = [
503                all_atoms_combined[0] + (all_atoms_combined[1],)
504                for all_atoms_combined in all_atoms_tuples
505            ]
506            atoms_in_order.append(selected_atom)
507
508        for all_atoms_tuple in all_atoms_tuples:
509            classe_str = ""
510            classe_dict = {}
511
512            for each_atoms_index, atom_number in enumerate(all_atoms_tuple):
513                if atom_number != 0:
514                    classe_dict[atoms_in_order[each_atoms_index]] = atom_number
515
516            if not classe_dict:
517                classe_in_order["HC"] = {"HC": ""}
518                continue
519
520            classe_str = json.dumps(classe_dict)
521
522            if len(classe_str) > 0:
523                classe_in_order[classe_str] = classe_dict
524
525        classe_in_order_dict = self.sort_classes(atoms_in_order, classe_in_order)
526
527        return classe_in_order_dict

Get the classes in order

Parameters
  • molecular_search_settings (object): An object containing user-defined settings.
Returns
  • dict: A dictionary of classes in order. structure is ('HC', {'HC': 1})
@staticmethod
def sort_classes(atoms_in_order, combination_dict) -> Dict[str, Dict[str, int]]:
529    @staticmethod
530    def sort_classes(atoms_in_order, combination_dict) -> Dict[str, Dict[str, int]]:
531        """Sort the classes in order
532
533        Parameters
534        ----------
535        atoms_in_order : list
536            A list of atoms in order.
537        combination_dict : dict
538            A dictionary of classes.
539
540        Returns
541        -------
542        dict
543            A dictionary of classes in order.
544        """
545        # ensures atoms are always in the order defined at atoms_in_order list
546        join_dict_classes = dict()
547        atoms_in_order = ["N", "S", "P", "O"] + atoms_in_order[4:] + ["HC"]
548
549        sort_method = lambda atoms_keys: [atoms_in_order.index(atoms_keys)]
550        for class_str, class_dict in combination_dict.items():
551            sorted_dict_keys = sorted(class_dict, key=sort_method)
552            class_dict = {atom: class_dict[atom] for atom in sorted_dict_keys}
553            class_str = json.dumps(class_dict)
554            # using json for the new database, class
555            # class_str = ' '.join([atom + str(class_dict[atom]) for atom in sorted_dict_keys])
556            join_dict_classes[class_str] = class_dict
557
558        return join_dict_classes

Sort the classes in order

Parameters
  • atoms_in_order (list): A list of atoms in order.
  • combination_dict (dict): A dictionary of classes.
Returns
  • dict: A dictionary of classes in order.
@staticmethod
def get_fixed_initial_number_of_hydrogen(min_h, odd_even):
560    @staticmethod
561    def get_fixed_initial_number_of_hydrogen(min_h, odd_even):
562        """Get the fixed initial number of hydrogen atoms
563
564        Parameters
565        ----------
566        min_h : int
567            The minimum number of hydrogen atoms.
568        odd_even : str
569            A string indicating whether to retrieve even or odd hydrogen atoms.
570        """
571        remaining_h = min_h % 2
572
573        if odd_even == "even":
574            if remaining_h == 0:
575                return remaining_h
576
577            else:
578                return remaining_h + 1
579
580        else:
581            if remaining_h == 0:
582                return remaining_h + 1
583
584            else:
585                return remaining_h

Get the fixed initial number of hydrogen atoms

Parameters
  • min_h (int): The minimum number of hydrogen atoms.
  • odd_even (str): A string indicating whether to retrieve even or odd hydrogen atoms.
def calc_mz(self, datadict, class_mass=0):
587    def calc_mz(self, datadict, class_mass=0):
588        """Calculate the mass-to-charge ratio (m/z) of a molecular formula.
589
590        Parameters
591        ----------
592        datadict : dict
593            A dictionary of classes.
594        class_mass : int
595            The mass of the class.
596
597        Returns
598        -------
599        float
600            The mass-to-charge ratio (m/z) of a molecular formula.
601        """
602        mass = class_mass
603
604        for atom in datadict.keys():
605            if atom != "HC":
606                mass = mass + Atoms.atomic_masses[atom] * datadict.get(atom)
607
608        return mass

Calculate the mass-to-charge ratio (m/z) of a molecular formula.

Parameters
  • datadict (dict): A dictionary of classes.
  • class_mass (int): The mass of the class.
Returns
  • float: The mass-to-charge ratio (m/z) of a molecular formula.
def calc_dbe_class(self, datadict):
610    def calc_dbe_class(self, datadict):
611        """Calculate the double bond equivalent (DBE) of a molecular formula.
612
613        Parameters
614        ----------
615        datadict : dict
616            A dictionary of classes.
617
618        Returns
619        -------
620        float
621            The double bond equivalent (DBE) of a molecular formula.
622        """
623        init_dbe = 0
624        for atom in datadict.keys():
625            if atom == "HC":
626                continue
627
628            n_atom = int(datadict.get(atom))
629
630            clean_atom = "".join([i for i in atom if not i.isdigit()])
631
632            valencia = MSParameters.molecular_search.used_atom_valences.get(clean_atom)
633
634            if type(valencia) is tuple:
635                valencia = valencia[0]
636            if valencia > 0:
637                # print atom, valencia, n_atom, init_dbe
638                init_dbe = init_dbe + (n_atom * (valencia - 2))
639            else:
640                continue
641
642        return 0.5 * init_dbe

Calculate the double bond equivalent (DBE) of a molecular formula.

Parameters
  • datadict (dict): A dictionary of classes.
Returns
  • float: The double bond equivalent (DBE) of a molecular formula.
def populate_combinations(self, classe_tuple, settings):
644    def populate_combinations(self, classe_tuple, settings):
645        """Populate the combinations
646
647        Parameters
648        ----------
649        classe_tuple : tuple
650            A tuple containing the class name, the class dictionary, and the class ID.
651        settings : object
652            An object containing user-defined settings.
653
654        Returns
655        -------
656        list
657            A list of molecular formula data dictionaries.
658        """
659        ion_charge = 0
660
661        class_dict = classe_tuple[1]
662        odd_or_even = self.get_h_odd_or_even(class_dict)
663
664        return self.get_mol_formulas(odd_or_even, classe_tuple, settings)

Populate the combinations

Parameters
  • classe_tuple (tuple): A tuple containing the class name, the class dictionary, and the class ID.
  • settings (object): An object containing user-defined settings.
Returns
  • list: A list of molecular formula data dictionaries.
def get_or_add(self, SomeClass, kw):
666    def get_or_add(self, SomeClass, kw):
667        """Get or add a class
668
669        Parameters
670        ----------
671        SomeClass : object
672            A class object.
673        kw : dict
674            A dictionary of classes.
675
676        Returns
677        -------
678        object
679            A class object.
680        """
681        obj = self.sql_db.session.query(SomeClass).filter_by(**kw).first()
682        if not obj:
683            obj = SomeClass(**kw)
684        return obj

Get or add a class

Parameters
  • SomeClass (object): A class object.
  • kw (dict): A dictionary of classes.
Returns
  • object: A class object.
def get_mol_formulas(self, odd_even_tag, classe_tuple, settings):
686    def get_mol_formulas(self, odd_even_tag, classe_tuple, settings):
687        """Get the molecular formulas
688
689        Parameters
690        ----------
691        odd_even_tag : str
692            A string indicating whether to retrieve even or odd hydrogen atoms.
693        classe_tuple : tuple
694
695        settings : object
696            An object containing user-defined settings.
697
698        Returns
699        -------
700        list
701            A list of molecular formula data dictionaries.
702
703        """
704        class_str = classe_tuple[0]
705        class_dict = classe_tuple[1]
706        classe_id = classe_tuple[2]
707
708        results = list()
709
710        if "HC" in class_dict:
711            del class_dict["HC"]
712
713        class_dbe = self.calc_dbe_class(class_dict)
714        class_mass = self.calc_mz(class_dict)
715
716        carbonHydrogen_mass = (
717            self.odd_ch_mass if odd_even_tag == "odd" else self.even_ch_mass
718        )
719        carbonHydrogen_dbe = (
720            self.odd_ch_dbe if odd_even_tag == "odd" else self.even_ch_dbe
721        )
722        carbonHydrogen_id = self.odd_ch_id if odd_even_tag == "odd" else self.even_ch_id
723
724        for index, carbonHydrogen_obj in enumerate(carbonHydrogen_id):
725            mass = carbonHydrogen_mass[index] + class_mass
726            dbe = carbonHydrogen_dbe[index] + class_dbe
727
728            if settings.min_mz <= mass <= settings.max_mz:
729                if settings.min_dbe <= dbe <= settings.max_dbe:
730                    molecularFormula = {
731                        "heteroAtoms_id": classe_id,
732                        "carbonHydrogen_id": carbonHydrogen_id[index],
733                        "mass": mass,
734                        "DBE": dbe,
735                    }
736
737                    results.append(molecularFormula)
738
739        return results

Get the molecular formulas

Parameters
  • odd_even_tag (str): A string indicating whether to retrieve even or odd hydrogen atoms.
  • classe_tuple (tuple):

  • settings (object): An object containing user-defined settings.

Returns
  • list: A list of molecular formula data dictionaries.
def get_h_odd_or_even(self, class_dict):
741    def get_h_odd_or_even(self, class_dict):
742        """Get the hydrogen odd or even
743
744        Parameters
745        ----------
746        class_dict : dict
747            A dictionary of classes.
748
749        Returns
750        -------
751        str
752            A string indicating whether to retrieve even or odd hydrogen atoms.
753        """
754
755        HAS_NITROGEN = "N" in class_dict.keys()
756
757        number_of_halogen = self.get_total_halogen_atoms(class_dict)
758        number_of_hetero = self.get_total_heteroatoms(class_dict)
759
760        if number_of_halogen > 0:
761            HAS_HALOGEN = True
762
763        else:
764            HAS_HALOGEN = False
765
766        if HAS_HALOGEN:
767            remaining_halogen = number_of_halogen % 2
768
769        else:
770            remaining_halogen = 0
771
772        if number_of_hetero > 0:
773            HAS_OTHER_HETERO = True
774
775            total_hetero_valence = self.get_total_hetero_valence(class_dict)
776
777        else:
778            HAS_OTHER_HETERO = False
779
780            total_hetero_valence = 0
781
782        if HAS_OTHER_HETERO:
783            remaining_hetero_valence = total_hetero_valence % 2
784
785        else:
786            remaining_hetero_valence = 0
787
788        if HAS_NITROGEN and not HAS_OTHER_HETERO:
789            number_of_n = class_dict.get("N")
790            remaining_n = number_of_n % 2
791
792        elif HAS_NITROGEN and HAS_OTHER_HETERO:
793            number_of_n = class_dict.get("N")
794            remaining_n = (number_of_n + remaining_hetero_valence) % 2
795
796        elif HAS_OTHER_HETERO and not HAS_NITROGEN:
797            remaining_n = remaining_hetero_valence
798
799        else:
800            remaining_n = -1
801
802        if remaining_n > 0.0:
803            if HAS_NITROGEN or HAS_OTHER_HETERO:
804                if HAS_HALOGEN:
805                    if remaining_halogen == 0:
806                        return "odd"
807                    else:
808                        return "even"
809
810                else:
811                    return "odd"
812
813        elif remaining_n == 0.0:
814            if HAS_NITROGEN or HAS_OTHER_HETERO:
815                if HAS_HALOGEN:
816                    if remaining_halogen == 0:
817                        return "even"
818                    else:
819                        return "odd"
820
821                else:
822                    return "even"
823
824        else:
825            if HAS_HALOGEN:
826                if remaining_halogen == 0:
827                    return "even"
828                else:
829                    return "odd"
830
831            else:
832                return "even"

Get the hydrogen odd or even

Parameters
  • class_dict (dict): A dictionary of classes.
Returns
  • str: A string indicating whether to retrieve even or odd hydrogen atoms.
@staticmethod
def get_total_heteroatoms(class_dict):
834    @staticmethod
835    def get_total_heteroatoms(class_dict):
836        """Get the total number of heteroatoms other than N, F, Cl, Br
837
838        Parameters
839        ----------
840        class_dict : dict
841            A dictionary of classes.
842
843        Returns
844        -------
845        int
846            The total number of heteroatoms.
847        """
848
849        total_number = 0
850
851        for atom in class_dict.keys():
852            if atom not in ["HC", "C", "H", "O", "N", "F", "Cl", "Br"]:
853                total_number = total_number + class_dict.get(atom)
854
855        return total_number

Get the total number of heteroatoms other than N, F, Cl, Br

Parameters
  • class_dict (dict): A dictionary of classes.
Returns
  • int: The total number of heteroatoms.
@staticmethod
def get_total_hetero_valence(class_dict):
857    @staticmethod
858    def get_total_hetero_valence(class_dict):
859        """Get the total valence of heteroatoms other than N, F, Cl, Br
860
861        Parameters
862        ----------
863        class_dict : dict
864            A dictionary of classes.
865
866        Returns
867        -------
868        int
869            The total heteroatom valence.
870        """
871        total_valence = 0
872
873        for atom in class_dict.keys():
874            if atom not in ["HC", "C", "H", "O", "N", "F", "Cl", "Br"]:
875                clean_atom = "".join([i for i in atom if not i.isdigit()])
876
877                atom_valence = MSParameters.molecular_search.used_atom_valences.get(
878                    clean_atom
879                )
880
881                if type(atom_valence) is tuple:
882                    atom_valence = atom_valence[0]
883
884                n_atom = int(class_dict.get(atom))
885
886                n_atom_valence = atom_valence * n_atom
887
888                total_valence = total_valence + n_atom_valence
889
890        return total_valence

Get the total valence of heteroatoms other than N, F, Cl, Br

Parameters
  • class_dict (dict): A dictionary of classes.
Returns
  • int: The total heteroatom valence.
@staticmethod
def get_total_halogen_atoms(class_dict):
892    @staticmethod
893    def get_total_halogen_atoms(class_dict):
894        """Get the total number of halogen atoms
895
896        Parameters
897        ----------
898        class_dict : dict
899            A dictionary of classes.
900
901        Returns
902        -------
903        int
904            The total number of halogen atoms.
905        """
906        atoms = ["F", "Cl", "Br"]
907
908        total_number = 0
909
910        for atom in atoms:
911            if atom in class_dict.keys():
912                total_number = total_number + class_dict.get(atom)
913
914        return total_number

Get the total number of halogen atoms

Parameters
  • class_dict (dict): A dictionary of classes.
Returns
  • int: The total number of halogen atoms.