corems.molecular_id.factory.MolecularLookupTable
1__author__ = "Yuri E. Corilo" 2__date__ = "Jul 02, 2019" 3 4import contextlib 5import cProfile 6import io 7import itertools 8import json 9import multiprocessing 10import pstats 11from copy import deepcopy 12from typing import Dict 13 14from sqlalchemy import create_engine, func 15from sqlalchemy.orm import sessionmaker 16from tqdm import tqdm 17 18from corems import chunks, timeit 19from corems.encapsulation.constant import Atoms 20from corems.encapsulation.factory.parameters import MSParameters 21from corems.encapsulation.factory.processingSetting import MolecularLookupDictSettings 22from corems.molecular_id.factory.molecularSQL import ( 23 CarbonHydrogen, 24 HeteroAtoms, 25 MolecularFormulaLink, 26 MolForm_SQL, 27) 28 29 30@contextlib.contextmanager 31def profiled(): 32 """A context manager for profiling.""" 33 pr = cProfile.Profile() 34 pr.enable() 35 yield 36 pr.disable() 37 s = io.StringIO() 38 ps = pstats.Stats(pr, stream=s).sort_stats("cumulative") 39 ps.print_stats() 40 # uncomment this to see who's calling what 41 # ps.print_callers() 42 print(s.getvalue()) 43 44 45def insert_database_worker(args): 46 """Inserts data into the database.""" 47 results, url = args 48 49 if not url: 50 url = "sqlite:///db/molformulas.sqlite" 51 52 if url[0:6] == "sqlite": 53 engine = create_engine(url, echo=False) 54 else: 55 engine = create_engine(url, echo=False, isolation_level="AUTOCOMMIT") 56 57 session_factory = sessionmaker(bind=engine) 58 session = session_factory() 59 insert_query = MolecularFormulaLink.__table__.insert().values(results) 60 session.execute(insert_query) 61 session.commit() 62 session.close() 63 engine.dispose() 64 65 66class MolecularCombinations: 67 """A class for generating molecular formula combinations. 68 69 Parameters 70 ---------- 71 molecular_search_settings : object 72 An object containing user-defined settings. 73 74 Attributes 75 ---------- 76 sql_db : MolForm_SQL 77 The SQLite database object. 78 len_existing_classes : int 79 The number of existing classes in the SQLite database. 80 odd_ch_id : list 81 A list of odd carbon and hydrogen atom IDs. 82 odd_ch_dict : list 83 A list of odd carbon and hydrogen atom dictionaries. 84 odd_ch_mass : list 85 A list of odd carbon and hydrogen atom masses. 86 odd_ch_dbe : list 87 A list of odd carbon and hydrogen atom double bond equivalents. 88 even_ch_id : list 89 A list of even carbon and hydrogen atom IDs. 90 even_ch_dict : list 91 A list of even carbon and hydrogen atom dictionaries. 92 even_ch_mass : list 93 A list of even carbon and hydrogen atom masses. 94 even_ch_dbe : list 95 A list of even carbon and hydrogen atom double bond equivalents. 96 97 Methods 98 ------- 99 * cProfile_worker(args) 100 A cProfile worker for the get_mol_formulas function. 101 * check_database_get_class_list(molecular_search_settings) 102 Checks if the database has all the classes, if not create the missing classes. 103 * get_carbonsHydrogens(settings, odd_even) 104 Retrieves carbon and hydrogen atoms from the molecular lookup table based on user-defined settings. 105 * add_carbonsHydrogens(settings, existing_classes_objs) 106 Adds carbon and hydrogen atoms to the molecular lookup table based on user-defined settings. 107 * runworker(molecular_search_settings) 108 Runs the molecular formula lookup table worker. 109 * get_classes_in_order(molecular_search_settings) 110 Gets the classes in order. 111 * sort_classes(atoms_in_order, combination_dict) 112 Sorts the classes in order. 113 * get_fixed_initial_number_of_hydrogen(min_h, odd_even) 114 Gets the fixed initial number of hydrogen atoms. 115 * calc_mz(datadict, class_mass=0) 116 Calculates the mass-to-charge ratio (m/z) of a molecular formula. 117 * calc_dbe_class(datadict) 118 Calculates the double bond equivalent (DBE) of a molecular formula. 119 * populate_combinations(classe_tuple, settings) 120 Populates the combinations. 121 * get_or_add(SomeClass, kw) 122 Gets or adds a class. 123 * get_mol_formulas(odd_even_tag, classe_tuple, settings) 124 Gets the molecular formulas. 125 * get_h_odd_or_even(class_dict) 126 Gets the hydrogen odd or even. 127 * get_total_halogen_atoms(class_dict) 128 Gets the total number of halogen atoms. 129 * get_total_hetero_valence(class_dict) 130 Gets the total valence of heteroatoms other than N, F, Cl, and Br 131 """ 132 133 def __init__(self, sql_db=None): 134 if not sql_db: 135 self.sql_db = MolForm_SQL() 136 else: 137 self.sql_db = sql_db 138 139 def cProfile_worker(self, args): 140 """cProfile worker for the get_mol_formulas function""" 141 cProfile.runctx( 142 "self.get_mol_formulas(*args)", 143 globals(), 144 locals(), 145 "mf_database_cprofile.prof", 146 ) 147 148 def check_database_get_class_list(self, molecular_search_settings): 149 """check if the database has all the classes, if not create the missing classes 150 151 Parameters 152 ---------- 153 molecular_search_settings : object 154 An object containing user-defined settings. 155 156 Returns 157 ------- 158 list 159 list of tuples with the class name and the class dictionary 160 """ 161 all_class_to_create = [] 162 163 classes_dict = self.get_classes_in_order(molecular_search_settings) 164 165 class_str_set = set(classes_dict.keys()) 166 167 existing_classes_objs = self.sql_db.session.query(HeteroAtoms).distinct().all() 168 169 existing_classes_str = set([classe.name for classe in existing_classes_objs]) 170 171 self.len_existing_classes = len(existing_classes_str) 172 173 class_to_create = class_str_set - existing_classes_str 174 175 class_count = len(existing_classes_objs) 176 177 data_classes = list() 178 for index, class_str in enumerate(class_to_create): 179 class_dict = classes_dict.get(class_str) 180 halogen_count = self.get_total_halogen_atoms(class_dict) 181 data_classes.append( 182 { 183 "name": class_str, 184 "id": class_count + index + 1, 185 "halogensCount": halogen_count, 186 } 187 ) 188 189 # data_classes = [{"name":class_str, "id":class_count+ index + 1} for index, class_str in enumerate(class_to_create)] 190 191 if data_classes: 192 list_insert_chunks = chunks(data_classes, self.sql_db.chunks_count) 193 for insert_chunk in list_insert_chunks: 194 insert_query = HeteroAtoms.__table__.insert().values(insert_chunk) 195 self.sql_db.session.execute(insert_query) 196 197 for index, class_str in enumerate(class_to_create): 198 class_tuple = ( 199 class_str, 200 classes_dict.get(class_str), 201 class_count + index + 1, 202 ) 203 204 all_class_to_create.append(class_tuple) 205 206 return ( 207 [(c_s, c_d) for c_s, c_d in classes_dict.items()], 208 all_class_to_create, 209 existing_classes_objs, 210 ) 211 212 def get_carbonsHydrogens(self, settings, odd_even): 213 """Retrieve carbon and hydrogen atoms from the molecular lookup table based on user-defined settings. 214 215 Parameters 216 ---------- 217 settings : object 218 An object containing user-defined settings. 219 odd_even : str 220 A string indicating whether to retrieve even or odd hydrogen atoms. 221 222 Returns 223 ------- 224 list 225 A list of CarbonHydrogen objects that satisfy the specified conditions. 226 """ 227 operator = "==" if odd_even == "even" else "!=" 228 usedAtoms = settings.usedAtoms 229 user_min_c, user_max_c = usedAtoms.get("C") 230 user_min_h, user_max_h = usedAtoms.get("H") 231 232 return eval( 233 "self.sql_db.session.query(CarbonHydrogen).filter(" 234 "CarbonHydrogen.C >= user_min_c," 235 "CarbonHydrogen.H >= user_min_h," 236 "CarbonHydrogen.C <= user_max_c," 237 "CarbonHydrogen.H <= user_max_h," 238 "CarbonHydrogen.H % 2" + operator + "0).all()" 239 ) 240 241 def add_carbonsHydrogens(self, settings, existing_classes_objs): 242 """Add carbon and hydrogen atoms to the molecular lookup table based on user-defined settings. 243 244 Parameters 245 ---------- 246 settings : object 247 An object containing user-defined settings. 248 existing_classes_objs : list 249 A list of HeteroAtoms objects. 250 """ 251 usedAtoms = settings.usedAtoms 252 253 user_min_c, user_max_c = usedAtoms.get("C") 254 user_min_h, user_max_h = usedAtoms.get("H") 255 256 query_obj = self.sql_db.session.query( 257 func.max(CarbonHydrogen.C).label("max_c"), 258 func.min(CarbonHydrogen.C).label("min_c"), 259 func.max(CarbonHydrogen.H).label("max_h"), 260 func.min(CarbonHydrogen.H).label("min_h"), 261 ) 262 263 database = query_obj.first() 264 if ( 265 database.max_c == user_max_c 266 and database.min_c == user_min_c 267 and database.max_h == user_max_h 268 and database.min_h == user_min_h 269 ): 270 # all data is already available at the database 271 pass 272 273 else: 274 current_count = self.sql_db.session.query(CarbonHydrogen.C).count() 275 276 databaseCarbonHydrogen = self.sql_db.session.query(CarbonHydrogen).all() 277 278 userCarbon = set(range(user_min_c, user_max_c + 1)) 279 userHydrogen = set(range(user_min_h, user_max_h + 1)) 280 281 carbon_hydrogen_objs_database = {} 282 for obj in databaseCarbonHydrogen: 283 str_data = "C:{},H:{}".format(obj.C, obj.H) 284 carbon_hydrogen_objs_database[str_data] = str_data 285 286 carbon_hydrogen_objs_to_create = {"even": {}, "odd": {}} 287 288 list_ch_obj_to_add = list() 289 i = 0 290 for comb in itertools.product(userCarbon, userHydrogen): 291 C = comb[0] 292 H = comb[1] 293 data = { 294 "C": C, 295 "H": H, 296 } 297 298 data_insert = { 299 "C": C, 300 "H": H, 301 } 302 303 str_data = "C:{},H:{}".format(C, H) 304 305 if not str_data in carbon_hydrogen_objs_database.keys(): 306 label = "even" if comb[1] % 2 == 0 else "odd" 307 data["mass"] = (C * Atoms.atomic_masses.get("C")) + ( 308 H * Atoms.atomic_masses.get("H") 309 ) 310 data["dbe"] = C - (H / 2) + 1 311 data["id"] = i + current_count + 1 312 data_insert["id"] = i + current_count + 1 313 i = i + 1 314 carbon_hydrogen_objs_to_create[label][str_data] = data 315 316 list_ch_obj_to_add.append(data_insert) 317 318 if list_ch_obj_to_add: 319 # insert carbon hydrogen objs 320 list_insert_chunks = chunks( 321 list_ch_obj_to_add, self.sql_db.chunks_count 322 ) 323 for insert_chunk in list_insert_chunks: 324 insert_query = CarbonHydrogen.__table__.insert().values( 325 insert_chunk 326 ) 327 self.sql_db.session.execute(insert_query) 328 self.sql_db.session.commit() 329 330 list_molecular_form = list() 331 for classe_obj in existing_classes_objs: 332 classe_dict = classe_obj.to_dict() 333 classe_mass = self.calc_mz(classe_dict) 334 classe_dbe = self.calc_dbe_class(classe_dict) 335 336 odd_even_label = self.get_h_odd_or_even(classe_dict) 337 338 ch_datalist = carbon_hydrogen_objs_to_create.get( 339 odd_even_label 340 ).values() 341 342 for ch_dict in ch_datalist: 343 mass = ch_dict.get("mass") + classe_mass 344 dbe = ch_dict.get("dbe") + classe_dbe 345 346 if settings.min_mz <= mass <= settings.max_mz: 347 if settings.min_dbe <= dbe <= settings.max_dbe: 348 list_molecular_form.append( 349 { 350 "heteroAtoms_id": classe_obj.id, 351 "carbonHydrogen_id": ch_dict.get("id"), 352 "mass": mass, 353 "DBE": dbe, 354 } 355 ) 356 357 list_insert_chunks = chunks( 358 list_molecular_form, self.sql_db.chunks_count 359 ) 360 for insert_chunk in list_insert_chunks: 361 insert_query = MolecularFormulaLink.__table__.insert().values( 362 insert_chunk 363 ) 364 self.sql_db.session.execute(insert_query) 365 self.sql_db.session.commit() 366 367 @timeit(print_time=True) 368 def runworker(self, molecular_search_settings, **kwargs): 369 """Run the molecular formula lookup table worker. 370 371 Parameters 372 ---------- 373 molecular_search_settings : object 374 An object containing user-defined settings. 375 kwargs : dict 376 A dictionary of keyword arguments. 377 Most notably, the print_time argument which is passed to the timeit decorator. 378 379 Returns 380 ------- 381 list 382 A list of tuples with the class name and the class dictionary. 383 384 385 """ 386 verbose = molecular_search_settings.verbose_processing 387 388 classes_list, class_to_create, existing_classes_objs = ( 389 self.check_database_get_class_list(molecular_search_settings) 390 ) 391 392 settings = MolecularLookupDictSettings() 393 settings.usedAtoms = deepcopy(molecular_search_settings.usedAtoms) 394 settings.url_database = molecular_search_settings.url_database 395 settings.db_jobs = molecular_search_settings.db_jobs 396 397 self.add_carbonsHydrogens(settings, existing_classes_objs) 398 399 if class_to_create: 400 settings = MolecularLookupDictSettings() 401 settings.usedAtoms = deepcopy(molecular_search_settings.usedAtoms) 402 settings.url_database = molecular_search_settings.url_database 403 settings.db_jobs = molecular_search_settings.db_jobs 404 405 self.sql_db.session.commit() 406 odd_ch_obj = self.get_carbonsHydrogens(settings, "odd") 407 self.odd_ch_id = [obj.id for obj in odd_ch_obj] 408 self.odd_ch_dict = [{"C": obj.C, "H": obj.H} for obj in odd_ch_obj] 409 self.odd_ch_mass = [obj.mass for obj in odd_ch_obj] 410 self.odd_ch_dbe = [obj.dbe for obj in odd_ch_obj] 411 412 even_ch_obj = self.get_carbonsHydrogens(settings, "even") 413 self.even_ch_id = [obj.id for obj in even_ch_obj] 414 self.even_ch_dict = [{"C": obj.C, "H": obj.H} for obj in even_ch_obj] 415 self.even_ch_mass = [obj.mass for obj in even_ch_obj] 416 self.even_ch_dbe = [obj.dbe for obj in even_ch_obj] 417 418 all_results = list() 419 for class_tuple in tqdm(class_to_create, disable = not verbose): 420 results = self.populate_combinations(class_tuple, settings) 421 all_results.extend(results) 422 if settings.db_jobs == 1: 423 # if len(all_results) >= self.sql_db.chunks_count: 424 list_insert_chunks = list(chunks(results, self.sql_db.chunks_count)) 425 for chunk in list_insert_chunks: 426 insert_query = MolecularFormulaLink.__table__.insert().values( 427 chunk 428 ) 429 self.sql_db.session.execute(insert_query) 430 # all_results = list() 431 self.sql_db.session.commit() 432 # each chunk takes ~600Mb of memory, so if using 8 processes the total free memory needs to be 5GB 433 if settings.db_jobs > 1: 434 list_insert_chunks = list(chunks(all_results, self.sql_db.chunks_count)) 435 print( 436 "Started database insert using {} iterations for a total of {} rows".format( 437 len(list_insert_chunks), len(all_results) 438 ) 439 ) 440 worker_args = [ 441 (chunk, settings.url_database) for chunk in list_insert_chunks 442 ] 443 p = multiprocessing.Pool(settings.db_jobs) 444 for class_list in tqdm( 445 p.imap_unordered(insert_database_worker, worker_args), disable= not verbose 446 ): 447 pass 448 p.close() 449 p.join() 450 451 return classes_list 452 453 def get_classes_in_order(self, molecular_search_settings): 454 """Get the classes in order 455 456 Parameters 457 ---------- 458 molecular_search_settings : object 459 An object containing user-defined settings. 460 461 Returns 462 ------- 463 dict 464 A dictionary of classes in order. 465 structure is ('HC', {'HC': 1}) 466 """ 467 468 usedAtoms = deepcopy(molecular_search_settings.usedAtoms) 469 470 usedAtoms.pop("C") 471 usedAtoms.pop("H") 472 473 min_n, max_n = usedAtoms.get("N") if usedAtoms.get("N") else (0, 0) 474 min_o, max_o = usedAtoms.get("O") if usedAtoms.get("O") else (0, 0) 475 min_s, max_s = usedAtoms.get("S") if usedAtoms.get("S") else (0, 0) 476 min_p, max_p = usedAtoms.get("P") if usedAtoms.get("P") else (0, 0) 477 478 possible_n = [n for n in range(min_n, max_n + 1)] 479 possible_o = [o for o in range(min_o, max_o + 1)] 480 possible_s = [s for s in range(min_s, max_s + 1)] 481 possible_p = [p for p in range(min_p, max_p + 1)] 482 483 atoms_in_order = ["N", "O", "S", "P"] 484 485 classe_in_order = {} 486 487 all_atoms_tuples = itertools.product( 488 possible_n, possible_o, possible_s, possible_p 489 ) 490 491 for atom in atoms_in_order: 492 usedAtoms.pop(atom, None) 493 494 for selected_atom, min_max_tuple in usedAtoms.items(): 495 min_x = min_max_tuple[0] 496 max_x = min_max_tuple[1] 497 498 possible_x = [x for x in range(min_x, max_x + 1)] 499 500 all_atoms_tuples = itertools.product(all_atoms_tuples, possible_x) 501 all_atoms_tuples = [ 502 all_atoms_combined[0] + (all_atoms_combined[1],) 503 for all_atoms_combined in all_atoms_tuples 504 ] 505 atoms_in_order.append(selected_atom) 506 507 for all_atoms_tuple in all_atoms_tuples: 508 classe_str = "" 509 classe_dict = {} 510 511 for each_atoms_index, atom_number in enumerate(all_atoms_tuple): 512 if atom_number != 0: 513 classe_dict[atoms_in_order[each_atoms_index]] = atom_number 514 515 if not classe_dict: 516 classe_in_order["HC"] = {"HC": ""} 517 continue 518 519 classe_str = json.dumps(classe_dict) 520 521 if len(classe_str) > 0: 522 classe_in_order[classe_str] = classe_dict 523 524 classe_in_order_dict = self.sort_classes(atoms_in_order, classe_in_order) 525 526 return classe_in_order_dict 527 528 @staticmethod 529 def sort_classes(atoms_in_order, combination_dict) -> Dict[str, Dict[str, int]]: 530 """Sort the classes in order 531 532 Parameters 533 ---------- 534 atoms_in_order : list 535 A list of atoms in order. 536 combination_dict : dict 537 A dictionary of classes. 538 539 Returns 540 ------- 541 dict 542 A dictionary of classes in order. 543 """ 544 # ensures atoms are always in the order defined at atoms_in_order list 545 join_dict_classes = dict() 546 atoms_in_order = ["N", "S", "P", "O"] + atoms_in_order[4:] + ["HC"] 547 548 sort_method = lambda atoms_keys: [atoms_in_order.index(atoms_keys)] 549 for class_str, class_dict in combination_dict.items(): 550 sorted_dict_keys = sorted(class_dict, key=sort_method) 551 class_dict = {atom: class_dict[atom] for atom in sorted_dict_keys} 552 class_str = json.dumps(class_dict) 553 # using json for the new database, class 554 # class_str = ' '.join([atom + str(class_dict[atom]) for atom in sorted_dict_keys]) 555 join_dict_classes[class_str] = class_dict 556 557 return join_dict_classes 558 559 @staticmethod 560 def get_fixed_initial_number_of_hydrogen(min_h, odd_even): 561 """Get the fixed initial number of hydrogen atoms 562 563 Parameters 564 ---------- 565 min_h : int 566 The minimum number of hydrogen atoms. 567 odd_even : str 568 A string indicating whether to retrieve even or odd hydrogen atoms. 569 """ 570 remaining_h = min_h % 2 571 572 if odd_even == "even": 573 if remaining_h == 0: 574 return remaining_h 575 576 else: 577 return remaining_h + 1 578 579 else: 580 if remaining_h == 0: 581 return remaining_h + 1 582 583 else: 584 return remaining_h 585 586 def calc_mz(self, datadict, class_mass=0): 587 """Calculate the mass-to-charge ratio (m/z) of a molecular formula. 588 589 Parameters 590 ---------- 591 datadict : dict 592 A dictionary of classes. 593 class_mass : int 594 The mass of the class. 595 596 Returns 597 ------- 598 float 599 The mass-to-charge ratio (m/z) of a molecular formula. 600 """ 601 mass = class_mass 602 603 for atom in datadict.keys(): 604 if atom != "HC": 605 mass = mass + Atoms.atomic_masses[atom] * datadict.get(atom) 606 607 return mass 608 609 def calc_dbe_class(self, datadict): 610 """Calculate the double bond equivalent (DBE) of a molecular formula. 611 612 Parameters 613 ---------- 614 datadict : dict 615 A dictionary of classes. 616 617 Returns 618 ------- 619 float 620 The double bond equivalent (DBE) of a molecular formula. 621 """ 622 init_dbe = 0 623 for atom in datadict.keys(): 624 if atom == "HC": 625 continue 626 627 n_atom = int(datadict.get(atom)) 628 629 clean_atom = "".join([i for i in atom if not i.isdigit()]) 630 631 valencia = MSParameters.molecular_search.used_atom_valences.get(clean_atom) 632 633 if type(valencia) is tuple: 634 valencia = valencia[0] 635 if valencia > 0: 636 # print atom, valencia, n_atom, init_dbe 637 init_dbe = init_dbe + (n_atom * (valencia - 2)) 638 else: 639 continue 640 641 return 0.5 * init_dbe 642 643 def populate_combinations(self, classe_tuple, settings): 644 """Populate the combinations 645 646 Parameters 647 ---------- 648 classe_tuple : tuple 649 A tuple containing the class name, the class dictionary, and the class ID. 650 settings : object 651 An object containing user-defined settings. 652 653 Returns 654 ------- 655 list 656 A list of molecular formula data dictionaries. 657 """ 658 ion_charge = 0 659 660 class_dict = classe_tuple[1] 661 odd_or_even = self.get_h_odd_or_even(class_dict) 662 663 return self.get_mol_formulas(odd_or_even, classe_tuple, settings) 664 665 def get_or_add(self, SomeClass, kw): 666 """Get or add a class 667 668 Parameters 669 ---------- 670 SomeClass : object 671 A class object. 672 kw : dict 673 A dictionary of classes. 674 675 Returns 676 ------- 677 object 678 A class object. 679 """ 680 obj = self.sql_db.session.query(SomeClass).filter_by(**kw).first() 681 if not obj: 682 obj = SomeClass(**kw) 683 return obj 684 685 def get_mol_formulas(self, odd_even_tag, classe_tuple, settings): 686 """Get the molecular formulas 687 688 Parameters 689 ---------- 690 odd_even_tag : str 691 A string indicating whether to retrieve even or odd hydrogen atoms. 692 classe_tuple : tuple 693 694 settings : object 695 An object containing user-defined settings. 696 697 Returns 698 ------- 699 list 700 A list of molecular formula data dictionaries. 701 702 """ 703 class_str = classe_tuple[0] 704 class_dict = classe_tuple[1] 705 classe_id = classe_tuple[2] 706 707 results = list() 708 709 if "HC" in class_dict: 710 del class_dict["HC"] 711 712 class_dbe = self.calc_dbe_class(class_dict) 713 class_mass = self.calc_mz(class_dict) 714 715 carbonHydrogen_mass = ( 716 self.odd_ch_mass if odd_even_tag == "odd" else self.even_ch_mass 717 ) 718 carbonHydrogen_dbe = ( 719 self.odd_ch_dbe if odd_even_tag == "odd" else self.even_ch_dbe 720 ) 721 carbonHydrogen_id = self.odd_ch_id if odd_even_tag == "odd" else self.even_ch_id 722 723 for index, carbonHydrogen_obj in enumerate(carbonHydrogen_id): 724 mass = carbonHydrogen_mass[index] + class_mass 725 dbe = carbonHydrogen_dbe[index] + class_dbe 726 727 if settings.min_mz <= mass <= settings.max_mz: 728 if settings.min_dbe <= dbe <= settings.max_dbe: 729 molecularFormula = { 730 "heteroAtoms_id": classe_id, 731 "carbonHydrogen_id": carbonHydrogen_id[index], 732 "mass": mass, 733 "DBE": dbe, 734 } 735 736 results.append(molecularFormula) 737 738 return results 739 740 def get_h_odd_or_even(self, class_dict): 741 """Get the hydrogen odd or even 742 743 Parameters 744 ---------- 745 class_dict : dict 746 A dictionary of classes. 747 748 Returns 749 ------- 750 str 751 A string indicating whether to retrieve even or odd hydrogen atoms. 752 """ 753 754 HAS_NITROGEN = "N" in class_dict.keys() 755 756 number_of_halogen = self.get_total_halogen_atoms(class_dict) 757 number_of_hetero = self.get_total_heteroatoms(class_dict) 758 759 if number_of_halogen > 0: 760 HAS_HALOGEN = True 761 762 else: 763 HAS_HALOGEN = False 764 765 if HAS_HALOGEN: 766 remaining_halogen = number_of_halogen % 2 767 768 else: 769 remaining_halogen = 0 770 771 if number_of_hetero > 0: 772 HAS_OTHER_HETERO = True 773 774 total_hetero_valence = self.get_total_hetero_valence(class_dict) 775 776 else: 777 HAS_OTHER_HETERO = False 778 779 total_hetero_valence = 0 780 781 if HAS_OTHER_HETERO: 782 remaining_hetero_valence = total_hetero_valence % 2 783 784 else: 785 remaining_hetero_valence = 0 786 787 if HAS_NITROGEN and not HAS_OTHER_HETERO: 788 number_of_n = class_dict.get("N") 789 remaining_n = number_of_n % 2 790 791 elif HAS_NITROGEN and HAS_OTHER_HETERO: 792 number_of_n = class_dict.get("N") 793 remaining_n = (number_of_n + remaining_hetero_valence) % 2 794 795 elif HAS_OTHER_HETERO and not HAS_NITROGEN: 796 remaining_n = remaining_hetero_valence 797 798 else: 799 remaining_n = -1 800 801 if remaining_n > 0.0: 802 if HAS_NITROGEN or HAS_OTHER_HETERO: 803 if HAS_HALOGEN: 804 if remaining_halogen == 0: 805 return "odd" 806 else: 807 return "even" 808 809 else: 810 return "odd" 811 812 elif remaining_n == 0.0: 813 if HAS_NITROGEN or HAS_OTHER_HETERO: 814 if HAS_HALOGEN: 815 if remaining_halogen == 0: 816 return "even" 817 else: 818 return "odd" 819 820 else: 821 return "even" 822 823 else: 824 if HAS_HALOGEN: 825 if remaining_halogen == 0: 826 return "even" 827 else: 828 return "odd" 829 830 else: 831 return "even" 832 833 @staticmethod 834 def get_total_heteroatoms(class_dict): 835 """Get the total number of heteroatoms other than N, F, Cl, Br 836 837 Parameters 838 ---------- 839 class_dict : dict 840 A dictionary of classes. 841 842 Returns 843 ------- 844 int 845 The total number of heteroatoms. 846 """ 847 848 total_number = 0 849 850 for atom in class_dict.keys(): 851 if atom not in ["HC", "C", "H", "O", "N", "F", "Cl", "Br"]: 852 total_number = total_number + class_dict.get(atom) 853 854 return total_number 855 856 @staticmethod 857 def get_total_hetero_valence(class_dict): 858 """Get the total valence of heteroatoms other than N, F, Cl, Br 859 860 Parameters 861 ---------- 862 class_dict : dict 863 A dictionary of classes. 864 865 Returns 866 ------- 867 int 868 The total heteroatom valence. 869 """ 870 total_valence = 0 871 872 for atom in class_dict.keys(): 873 if atom not in ["HC", "C", "H", "O", "N", "F", "Cl", "Br"]: 874 clean_atom = "".join([i for i in atom if not i.isdigit()]) 875 876 atom_valence = MSParameters.molecular_search.used_atom_valences.get( 877 clean_atom 878 ) 879 880 if type(atom_valence) is tuple: 881 atom_valence = atom_valence[0] 882 883 n_atom = int(class_dict.get(atom)) 884 885 n_atom_valence = atom_valence * n_atom 886 887 total_valence = total_valence + n_atom_valence 888 889 return total_valence 890 891 @staticmethod 892 def get_total_halogen_atoms(class_dict): 893 """Get the total number of halogen atoms 894 895 Parameters 896 ---------- 897 class_dict : dict 898 A dictionary of classes. 899 900 Returns 901 ------- 902 int 903 The total number of halogen atoms. 904 """ 905 atoms = ["F", "Cl", "Br"] 906 907 total_number = 0 908 909 for atom in atoms: 910 if atom in class_dict.keys(): 911 total_number = total_number + class_dict.get(atom) 912 913 return total_number
31@contextlib.contextmanager 32def profiled(): 33 """A context manager for profiling.""" 34 pr = cProfile.Profile() 35 pr.enable() 36 yield 37 pr.disable() 38 s = io.StringIO() 39 ps = pstats.Stats(pr, stream=s).sort_stats("cumulative") 40 ps.print_stats() 41 # uncomment this to see who's calling what 42 # ps.print_callers() 43 print(s.getvalue())
A context manager for profiling.
46def insert_database_worker(args): 47 """Inserts data into the database.""" 48 results, url = args 49 50 if not url: 51 url = "sqlite:///db/molformulas.sqlite" 52 53 if url[0:6] == "sqlite": 54 engine = create_engine(url, echo=False) 55 else: 56 engine = create_engine(url, echo=False, isolation_level="AUTOCOMMIT") 57 58 session_factory = sessionmaker(bind=engine) 59 session = session_factory() 60 insert_query = MolecularFormulaLink.__table__.insert().values(results) 61 session.execute(insert_query) 62 session.commit() 63 session.close() 64 engine.dispose()
Inserts data into the database.
67class MolecularCombinations: 68 """A class for generating molecular formula combinations. 69 70 Parameters 71 ---------- 72 molecular_search_settings : object 73 An object containing user-defined settings. 74 75 Attributes 76 ---------- 77 sql_db : MolForm_SQL 78 The SQLite database object. 79 len_existing_classes : int 80 The number of existing classes in the SQLite database. 81 odd_ch_id : list 82 A list of odd carbon and hydrogen atom IDs. 83 odd_ch_dict : list 84 A list of odd carbon and hydrogen atom dictionaries. 85 odd_ch_mass : list 86 A list of odd carbon and hydrogen atom masses. 87 odd_ch_dbe : list 88 A list of odd carbon and hydrogen atom double bond equivalents. 89 even_ch_id : list 90 A list of even carbon and hydrogen atom IDs. 91 even_ch_dict : list 92 A list of even carbon and hydrogen atom dictionaries. 93 even_ch_mass : list 94 A list of even carbon and hydrogen atom masses. 95 even_ch_dbe : list 96 A list of even carbon and hydrogen atom double bond equivalents. 97 98 Methods 99 ------- 100 * cProfile_worker(args) 101 A cProfile worker for the get_mol_formulas function. 102 * check_database_get_class_list(molecular_search_settings) 103 Checks if the database has all the classes, if not create the missing classes. 104 * get_carbonsHydrogens(settings, odd_even) 105 Retrieves carbon and hydrogen atoms from the molecular lookup table based on user-defined settings. 106 * add_carbonsHydrogens(settings, existing_classes_objs) 107 Adds carbon and hydrogen atoms to the molecular lookup table based on user-defined settings. 108 * runworker(molecular_search_settings) 109 Runs the molecular formula lookup table worker. 110 * get_classes_in_order(molecular_search_settings) 111 Gets the classes in order. 112 * sort_classes(atoms_in_order, combination_dict) 113 Sorts the classes in order. 114 * get_fixed_initial_number_of_hydrogen(min_h, odd_even) 115 Gets the fixed initial number of hydrogen atoms. 116 * calc_mz(datadict, class_mass=0) 117 Calculates the mass-to-charge ratio (m/z) of a molecular formula. 118 * calc_dbe_class(datadict) 119 Calculates the double bond equivalent (DBE) of a molecular formula. 120 * populate_combinations(classe_tuple, settings) 121 Populates the combinations. 122 * get_or_add(SomeClass, kw) 123 Gets or adds a class. 124 * get_mol_formulas(odd_even_tag, classe_tuple, settings) 125 Gets the molecular formulas. 126 * get_h_odd_or_even(class_dict) 127 Gets the hydrogen odd or even. 128 * get_total_halogen_atoms(class_dict) 129 Gets the total number of halogen atoms. 130 * get_total_hetero_valence(class_dict) 131 Gets the total valence of heteroatoms other than N, F, Cl, and Br 132 """ 133 134 def __init__(self, sql_db=None): 135 if not sql_db: 136 self.sql_db = MolForm_SQL() 137 else: 138 self.sql_db = sql_db 139 140 def cProfile_worker(self, args): 141 """cProfile worker for the get_mol_formulas function""" 142 cProfile.runctx( 143 "self.get_mol_formulas(*args)", 144 globals(), 145 locals(), 146 "mf_database_cprofile.prof", 147 ) 148 149 def check_database_get_class_list(self, molecular_search_settings): 150 """check if the database has all the classes, if not create the missing classes 151 152 Parameters 153 ---------- 154 molecular_search_settings : object 155 An object containing user-defined settings. 156 157 Returns 158 ------- 159 list 160 list of tuples with the class name and the class dictionary 161 """ 162 all_class_to_create = [] 163 164 classes_dict = self.get_classes_in_order(molecular_search_settings) 165 166 class_str_set = set(classes_dict.keys()) 167 168 existing_classes_objs = self.sql_db.session.query(HeteroAtoms).distinct().all() 169 170 existing_classes_str = set([classe.name for classe in existing_classes_objs]) 171 172 self.len_existing_classes = len(existing_classes_str) 173 174 class_to_create = class_str_set - existing_classes_str 175 176 class_count = len(existing_classes_objs) 177 178 data_classes = list() 179 for index, class_str in enumerate(class_to_create): 180 class_dict = classes_dict.get(class_str) 181 halogen_count = self.get_total_halogen_atoms(class_dict) 182 data_classes.append( 183 { 184 "name": class_str, 185 "id": class_count + index + 1, 186 "halogensCount": halogen_count, 187 } 188 ) 189 190 # data_classes = [{"name":class_str, "id":class_count+ index + 1} for index, class_str in enumerate(class_to_create)] 191 192 if data_classes: 193 list_insert_chunks = chunks(data_classes, self.sql_db.chunks_count) 194 for insert_chunk in list_insert_chunks: 195 insert_query = HeteroAtoms.__table__.insert().values(insert_chunk) 196 self.sql_db.session.execute(insert_query) 197 198 for index, class_str in enumerate(class_to_create): 199 class_tuple = ( 200 class_str, 201 classes_dict.get(class_str), 202 class_count + index + 1, 203 ) 204 205 all_class_to_create.append(class_tuple) 206 207 return ( 208 [(c_s, c_d) for c_s, c_d in classes_dict.items()], 209 all_class_to_create, 210 existing_classes_objs, 211 ) 212 213 def get_carbonsHydrogens(self, settings, odd_even): 214 """Retrieve carbon and hydrogen atoms from the molecular lookup table based on user-defined settings. 215 216 Parameters 217 ---------- 218 settings : object 219 An object containing user-defined settings. 220 odd_even : str 221 A string indicating whether to retrieve even or odd hydrogen atoms. 222 223 Returns 224 ------- 225 list 226 A list of CarbonHydrogen objects that satisfy the specified conditions. 227 """ 228 operator = "==" if odd_even == "even" else "!=" 229 usedAtoms = settings.usedAtoms 230 user_min_c, user_max_c = usedAtoms.get("C") 231 user_min_h, user_max_h = usedAtoms.get("H") 232 233 return eval( 234 "self.sql_db.session.query(CarbonHydrogen).filter(" 235 "CarbonHydrogen.C >= user_min_c," 236 "CarbonHydrogen.H >= user_min_h," 237 "CarbonHydrogen.C <= user_max_c," 238 "CarbonHydrogen.H <= user_max_h," 239 "CarbonHydrogen.H % 2" + operator + "0).all()" 240 ) 241 242 def add_carbonsHydrogens(self, settings, existing_classes_objs): 243 """Add carbon and hydrogen atoms to the molecular lookup table based on user-defined settings. 244 245 Parameters 246 ---------- 247 settings : object 248 An object containing user-defined settings. 249 existing_classes_objs : list 250 A list of HeteroAtoms objects. 251 """ 252 usedAtoms = settings.usedAtoms 253 254 user_min_c, user_max_c = usedAtoms.get("C") 255 user_min_h, user_max_h = usedAtoms.get("H") 256 257 query_obj = self.sql_db.session.query( 258 func.max(CarbonHydrogen.C).label("max_c"), 259 func.min(CarbonHydrogen.C).label("min_c"), 260 func.max(CarbonHydrogen.H).label("max_h"), 261 func.min(CarbonHydrogen.H).label("min_h"), 262 ) 263 264 database = query_obj.first() 265 if ( 266 database.max_c == user_max_c 267 and database.min_c == user_min_c 268 and database.max_h == user_max_h 269 and database.min_h == user_min_h 270 ): 271 # all data is already available at the database 272 pass 273 274 else: 275 current_count = self.sql_db.session.query(CarbonHydrogen.C).count() 276 277 databaseCarbonHydrogen = self.sql_db.session.query(CarbonHydrogen).all() 278 279 userCarbon = set(range(user_min_c, user_max_c + 1)) 280 userHydrogen = set(range(user_min_h, user_max_h + 1)) 281 282 carbon_hydrogen_objs_database = {} 283 for obj in databaseCarbonHydrogen: 284 str_data = "C:{},H:{}".format(obj.C, obj.H) 285 carbon_hydrogen_objs_database[str_data] = str_data 286 287 carbon_hydrogen_objs_to_create = {"even": {}, "odd": {}} 288 289 list_ch_obj_to_add = list() 290 i = 0 291 for comb in itertools.product(userCarbon, userHydrogen): 292 C = comb[0] 293 H = comb[1] 294 data = { 295 "C": C, 296 "H": H, 297 } 298 299 data_insert = { 300 "C": C, 301 "H": H, 302 } 303 304 str_data = "C:{},H:{}".format(C, H) 305 306 if not str_data in carbon_hydrogen_objs_database.keys(): 307 label = "even" if comb[1] % 2 == 0 else "odd" 308 data["mass"] = (C * Atoms.atomic_masses.get("C")) + ( 309 H * Atoms.atomic_masses.get("H") 310 ) 311 data["dbe"] = C - (H / 2) + 1 312 data["id"] = i + current_count + 1 313 data_insert["id"] = i + current_count + 1 314 i = i + 1 315 carbon_hydrogen_objs_to_create[label][str_data] = data 316 317 list_ch_obj_to_add.append(data_insert) 318 319 if list_ch_obj_to_add: 320 # insert carbon hydrogen objs 321 list_insert_chunks = chunks( 322 list_ch_obj_to_add, self.sql_db.chunks_count 323 ) 324 for insert_chunk in list_insert_chunks: 325 insert_query = CarbonHydrogen.__table__.insert().values( 326 insert_chunk 327 ) 328 self.sql_db.session.execute(insert_query) 329 self.sql_db.session.commit() 330 331 list_molecular_form = list() 332 for classe_obj in existing_classes_objs: 333 classe_dict = classe_obj.to_dict() 334 classe_mass = self.calc_mz(classe_dict) 335 classe_dbe = self.calc_dbe_class(classe_dict) 336 337 odd_even_label = self.get_h_odd_or_even(classe_dict) 338 339 ch_datalist = carbon_hydrogen_objs_to_create.get( 340 odd_even_label 341 ).values() 342 343 for ch_dict in ch_datalist: 344 mass = ch_dict.get("mass") + classe_mass 345 dbe = ch_dict.get("dbe") + classe_dbe 346 347 if settings.min_mz <= mass <= settings.max_mz: 348 if settings.min_dbe <= dbe <= settings.max_dbe: 349 list_molecular_form.append( 350 { 351 "heteroAtoms_id": classe_obj.id, 352 "carbonHydrogen_id": ch_dict.get("id"), 353 "mass": mass, 354 "DBE": dbe, 355 } 356 ) 357 358 list_insert_chunks = chunks( 359 list_molecular_form, self.sql_db.chunks_count 360 ) 361 for insert_chunk in list_insert_chunks: 362 insert_query = MolecularFormulaLink.__table__.insert().values( 363 insert_chunk 364 ) 365 self.sql_db.session.execute(insert_query) 366 self.sql_db.session.commit() 367 368 @timeit(print_time=True) 369 def runworker(self, molecular_search_settings, **kwargs): 370 """Run the molecular formula lookup table worker. 371 372 Parameters 373 ---------- 374 molecular_search_settings : object 375 An object containing user-defined settings. 376 kwargs : dict 377 A dictionary of keyword arguments. 378 Most notably, the print_time argument which is passed to the timeit decorator. 379 380 Returns 381 ------- 382 list 383 A list of tuples with the class name and the class dictionary. 384 385 386 """ 387 verbose = molecular_search_settings.verbose_processing 388 389 classes_list, class_to_create, existing_classes_objs = ( 390 self.check_database_get_class_list(molecular_search_settings) 391 ) 392 393 settings = MolecularLookupDictSettings() 394 settings.usedAtoms = deepcopy(molecular_search_settings.usedAtoms) 395 settings.url_database = molecular_search_settings.url_database 396 settings.db_jobs = molecular_search_settings.db_jobs 397 398 self.add_carbonsHydrogens(settings, existing_classes_objs) 399 400 if class_to_create: 401 settings = MolecularLookupDictSettings() 402 settings.usedAtoms = deepcopy(molecular_search_settings.usedAtoms) 403 settings.url_database = molecular_search_settings.url_database 404 settings.db_jobs = molecular_search_settings.db_jobs 405 406 self.sql_db.session.commit() 407 odd_ch_obj = self.get_carbonsHydrogens(settings, "odd") 408 self.odd_ch_id = [obj.id for obj in odd_ch_obj] 409 self.odd_ch_dict = [{"C": obj.C, "H": obj.H} for obj in odd_ch_obj] 410 self.odd_ch_mass = [obj.mass for obj in odd_ch_obj] 411 self.odd_ch_dbe = [obj.dbe for obj in odd_ch_obj] 412 413 even_ch_obj = self.get_carbonsHydrogens(settings, "even") 414 self.even_ch_id = [obj.id for obj in even_ch_obj] 415 self.even_ch_dict = [{"C": obj.C, "H": obj.H} for obj in even_ch_obj] 416 self.even_ch_mass = [obj.mass for obj in even_ch_obj] 417 self.even_ch_dbe = [obj.dbe for obj in even_ch_obj] 418 419 all_results = list() 420 for class_tuple in tqdm(class_to_create, disable = not verbose): 421 results = self.populate_combinations(class_tuple, settings) 422 all_results.extend(results) 423 if settings.db_jobs == 1: 424 # if len(all_results) >= self.sql_db.chunks_count: 425 list_insert_chunks = list(chunks(results, self.sql_db.chunks_count)) 426 for chunk in list_insert_chunks: 427 insert_query = MolecularFormulaLink.__table__.insert().values( 428 chunk 429 ) 430 self.sql_db.session.execute(insert_query) 431 # all_results = list() 432 self.sql_db.session.commit() 433 # each chunk takes ~600Mb of memory, so if using 8 processes the total free memory needs to be 5GB 434 if settings.db_jobs > 1: 435 list_insert_chunks = list(chunks(all_results, self.sql_db.chunks_count)) 436 print( 437 "Started database insert using {} iterations for a total of {} rows".format( 438 len(list_insert_chunks), len(all_results) 439 ) 440 ) 441 worker_args = [ 442 (chunk, settings.url_database) for chunk in list_insert_chunks 443 ] 444 p = multiprocessing.Pool(settings.db_jobs) 445 for class_list in tqdm( 446 p.imap_unordered(insert_database_worker, worker_args), disable= not verbose 447 ): 448 pass 449 p.close() 450 p.join() 451 452 return classes_list 453 454 def get_classes_in_order(self, molecular_search_settings): 455 """Get the classes in order 456 457 Parameters 458 ---------- 459 molecular_search_settings : object 460 An object containing user-defined settings. 461 462 Returns 463 ------- 464 dict 465 A dictionary of classes in order. 466 structure is ('HC', {'HC': 1}) 467 """ 468 469 usedAtoms = deepcopy(molecular_search_settings.usedAtoms) 470 471 usedAtoms.pop("C") 472 usedAtoms.pop("H") 473 474 min_n, max_n = usedAtoms.get("N") if usedAtoms.get("N") else (0, 0) 475 min_o, max_o = usedAtoms.get("O") if usedAtoms.get("O") else (0, 0) 476 min_s, max_s = usedAtoms.get("S") if usedAtoms.get("S") else (0, 0) 477 min_p, max_p = usedAtoms.get("P") if usedAtoms.get("P") else (0, 0) 478 479 possible_n = [n for n in range(min_n, max_n + 1)] 480 possible_o = [o for o in range(min_o, max_o + 1)] 481 possible_s = [s for s in range(min_s, max_s + 1)] 482 possible_p = [p for p in range(min_p, max_p + 1)] 483 484 atoms_in_order = ["N", "O", "S", "P"] 485 486 classe_in_order = {} 487 488 all_atoms_tuples = itertools.product( 489 possible_n, possible_o, possible_s, possible_p 490 ) 491 492 for atom in atoms_in_order: 493 usedAtoms.pop(atom, None) 494 495 for selected_atom, min_max_tuple in usedAtoms.items(): 496 min_x = min_max_tuple[0] 497 max_x = min_max_tuple[1] 498 499 possible_x = [x for x in range(min_x, max_x + 1)] 500 501 all_atoms_tuples = itertools.product(all_atoms_tuples, possible_x) 502 all_atoms_tuples = [ 503 all_atoms_combined[0] + (all_atoms_combined[1],) 504 for all_atoms_combined in all_atoms_tuples 505 ] 506 atoms_in_order.append(selected_atom) 507 508 for all_atoms_tuple in all_atoms_tuples: 509 classe_str = "" 510 classe_dict = {} 511 512 for each_atoms_index, atom_number in enumerate(all_atoms_tuple): 513 if atom_number != 0: 514 classe_dict[atoms_in_order[each_atoms_index]] = atom_number 515 516 if not classe_dict: 517 classe_in_order["HC"] = {"HC": ""} 518 continue 519 520 classe_str = json.dumps(classe_dict) 521 522 if len(classe_str) > 0: 523 classe_in_order[classe_str] = classe_dict 524 525 classe_in_order_dict = self.sort_classes(atoms_in_order, classe_in_order) 526 527 return classe_in_order_dict 528 529 @staticmethod 530 def sort_classes(atoms_in_order, combination_dict) -> Dict[str, Dict[str, int]]: 531 """Sort the classes in order 532 533 Parameters 534 ---------- 535 atoms_in_order : list 536 A list of atoms in order. 537 combination_dict : dict 538 A dictionary of classes. 539 540 Returns 541 ------- 542 dict 543 A dictionary of classes in order. 544 """ 545 # ensures atoms are always in the order defined at atoms_in_order list 546 join_dict_classes = dict() 547 atoms_in_order = ["N", "S", "P", "O"] + atoms_in_order[4:] + ["HC"] 548 549 sort_method = lambda atoms_keys: [atoms_in_order.index(atoms_keys)] 550 for class_str, class_dict in combination_dict.items(): 551 sorted_dict_keys = sorted(class_dict, key=sort_method) 552 class_dict = {atom: class_dict[atom] for atom in sorted_dict_keys} 553 class_str = json.dumps(class_dict) 554 # using json for the new database, class 555 # class_str = ' '.join([atom + str(class_dict[atom]) for atom in sorted_dict_keys]) 556 join_dict_classes[class_str] = class_dict 557 558 return join_dict_classes 559 560 @staticmethod 561 def get_fixed_initial_number_of_hydrogen(min_h, odd_even): 562 """Get the fixed initial number of hydrogen atoms 563 564 Parameters 565 ---------- 566 min_h : int 567 The minimum number of hydrogen atoms. 568 odd_even : str 569 A string indicating whether to retrieve even or odd hydrogen atoms. 570 """ 571 remaining_h = min_h % 2 572 573 if odd_even == "even": 574 if remaining_h == 0: 575 return remaining_h 576 577 else: 578 return remaining_h + 1 579 580 else: 581 if remaining_h == 0: 582 return remaining_h + 1 583 584 else: 585 return remaining_h 586 587 def calc_mz(self, datadict, class_mass=0): 588 """Calculate the mass-to-charge ratio (m/z) of a molecular formula. 589 590 Parameters 591 ---------- 592 datadict : dict 593 A dictionary of classes. 594 class_mass : int 595 The mass of the class. 596 597 Returns 598 ------- 599 float 600 The mass-to-charge ratio (m/z) of a molecular formula. 601 """ 602 mass = class_mass 603 604 for atom in datadict.keys(): 605 if atom != "HC": 606 mass = mass + Atoms.atomic_masses[atom] * datadict.get(atom) 607 608 return mass 609 610 def calc_dbe_class(self, datadict): 611 """Calculate the double bond equivalent (DBE) of a molecular formula. 612 613 Parameters 614 ---------- 615 datadict : dict 616 A dictionary of classes. 617 618 Returns 619 ------- 620 float 621 The double bond equivalent (DBE) of a molecular formula. 622 """ 623 init_dbe = 0 624 for atom in datadict.keys(): 625 if atom == "HC": 626 continue 627 628 n_atom = int(datadict.get(atom)) 629 630 clean_atom = "".join([i for i in atom if not i.isdigit()]) 631 632 valencia = MSParameters.molecular_search.used_atom_valences.get(clean_atom) 633 634 if type(valencia) is tuple: 635 valencia = valencia[0] 636 if valencia > 0: 637 # print atom, valencia, n_atom, init_dbe 638 init_dbe = init_dbe + (n_atom * (valencia - 2)) 639 else: 640 continue 641 642 return 0.5 * init_dbe 643 644 def populate_combinations(self, classe_tuple, settings): 645 """Populate the combinations 646 647 Parameters 648 ---------- 649 classe_tuple : tuple 650 A tuple containing the class name, the class dictionary, and the class ID. 651 settings : object 652 An object containing user-defined settings. 653 654 Returns 655 ------- 656 list 657 A list of molecular formula data dictionaries. 658 """ 659 ion_charge = 0 660 661 class_dict = classe_tuple[1] 662 odd_or_even = self.get_h_odd_or_even(class_dict) 663 664 return self.get_mol_formulas(odd_or_even, classe_tuple, settings) 665 666 def get_or_add(self, SomeClass, kw): 667 """Get or add a class 668 669 Parameters 670 ---------- 671 SomeClass : object 672 A class object. 673 kw : dict 674 A dictionary of classes. 675 676 Returns 677 ------- 678 object 679 A class object. 680 """ 681 obj = self.sql_db.session.query(SomeClass).filter_by(**kw).first() 682 if not obj: 683 obj = SomeClass(**kw) 684 return obj 685 686 def get_mol_formulas(self, odd_even_tag, classe_tuple, settings): 687 """Get the molecular formulas 688 689 Parameters 690 ---------- 691 odd_even_tag : str 692 A string indicating whether to retrieve even or odd hydrogen atoms. 693 classe_tuple : tuple 694 695 settings : object 696 An object containing user-defined settings. 697 698 Returns 699 ------- 700 list 701 A list of molecular formula data dictionaries. 702 703 """ 704 class_str = classe_tuple[0] 705 class_dict = classe_tuple[1] 706 classe_id = classe_tuple[2] 707 708 results = list() 709 710 if "HC" in class_dict: 711 del class_dict["HC"] 712 713 class_dbe = self.calc_dbe_class(class_dict) 714 class_mass = self.calc_mz(class_dict) 715 716 carbonHydrogen_mass = ( 717 self.odd_ch_mass if odd_even_tag == "odd" else self.even_ch_mass 718 ) 719 carbonHydrogen_dbe = ( 720 self.odd_ch_dbe if odd_even_tag == "odd" else self.even_ch_dbe 721 ) 722 carbonHydrogen_id = self.odd_ch_id if odd_even_tag == "odd" else self.even_ch_id 723 724 for index, carbonHydrogen_obj in enumerate(carbonHydrogen_id): 725 mass = carbonHydrogen_mass[index] + class_mass 726 dbe = carbonHydrogen_dbe[index] + class_dbe 727 728 if settings.min_mz <= mass <= settings.max_mz: 729 if settings.min_dbe <= dbe <= settings.max_dbe: 730 molecularFormula = { 731 "heteroAtoms_id": classe_id, 732 "carbonHydrogen_id": carbonHydrogen_id[index], 733 "mass": mass, 734 "DBE": dbe, 735 } 736 737 results.append(molecularFormula) 738 739 return results 740 741 def get_h_odd_or_even(self, class_dict): 742 """Get the hydrogen odd or even 743 744 Parameters 745 ---------- 746 class_dict : dict 747 A dictionary of classes. 748 749 Returns 750 ------- 751 str 752 A string indicating whether to retrieve even or odd hydrogen atoms. 753 """ 754 755 HAS_NITROGEN = "N" in class_dict.keys() 756 757 number_of_halogen = self.get_total_halogen_atoms(class_dict) 758 number_of_hetero = self.get_total_heteroatoms(class_dict) 759 760 if number_of_halogen > 0: 761 HAS_HALOGEN = True 762 763 else: 764 HAS_HALOGEN = False 765 766 if HAS_HALOGEN: 767 remaining_halogen = number_of_halogen % 2 768 769 else: 770 remaining_halogen = 0 771 772 if number_of_hetero > 0: 773 HAS_OTHER_HETERO = True 774 775 total_hetero_valence = self.get_total_hetero_valence(class_dict) 776 777 else: 778 HAS_OTHER_HETERO = False 779 780 total_hetero_valence = 0 781 782 if HAS_OTHER_HETERO: 783 remaining_hetero_valence = total_hetero_valence % 2 784 785 else: 786 remaining_hetero_valence = 0 787 788 if HAS_NITROGEN and not HAS_OTHER_HETERO: 789 number_of_n = class_dict.get("N") 790 remaining_n = number_of_n % 2 791 792 elif HAS_NITROGEN and HAS_OTHER_HETERO: 793 number_of_n = class_dict.get("N") 794 remaining_n = (number_of_n + remaining_hetero_valence) % 2 795 796 elif HAS_OTHER_HETERO and not HAS_NITROGEN: 797 remaining_n = remaining_hetero_valence 798 799 else: 800 remaining_n = -1 801 802 if remaining_n > 0.0: 803 if HAS_NITROGEN or HAS_OTHER_HETERO: 804 if HAS_HALOGEN: 805 if remaining_halogen == 0: 806 return "odd" 807 else: 808 return "even" 809 810 else: 811 return "odd" 812 813 elif remaining_n == 0.0: 814 if HAS_NITROGEN or HAS_OTHER_HETERO: 815 if HAS_HALOGEN: 816 if remaining_halogen == 0: 817 return "even" 818 else: 819 return "odd" 820 821 else: 822 return "even" 823 824 else: 825 if HAS_HALOGEN: 826 if remaining_halogen == 0: 827 return "even" 828 else: 829 return "odd" 830 831 else: 832 return "even" 833 834 @staticmethod 835 def get_total_heteroatoms(class_dict): 836 """Get the total number of heteroatoms other than N, F, Cl, Br 837 838 Parameters 839 ---------- 840 class_dict : dict 841 A dictionary of classes. 842 843 Returns 844 ------- 845 int 846 The total number of heteroatoms. 847 """ 848 849 total_number = 0 850 851 for atom in class_dict.keys(): 852 if atom not in ["HC", "C", "H", "O", "N", "F", "Cl", "Br"]: 853 total_number = total_number + class_dict.get(atom) 854 855 return total_number 856 857 @staticmethod 858 def get_total_hetero_valence(class_dict): 859 """Get the total valence of heteroatoms other than N, F, Cl, Br 860 861 Parameters 862 ---------- 863 class_dict : dict 864 A dictionary of classes. 865 866 Returns 867 ------- 868 int 869 The total heteroatom valence. 870 """ 871 total_valence = 0 872 873 for atom in class_dict.keys(): 874 if atom not in ["HC", "C", "H", "O", "N", "F", "Cl", "Br"]: 875 clean_atom = "".join([i for i in atom if not i.isdigit()]) 876 877 atom_valence = MSParameters.molecular_search.used_atom_valences.get( 878 clean_atom 879 ) 880 881 if type(atom_valence) is tuple: 882 atom_valence = atom_valence[0] 883 884 n_atom = int(class_dict.get(atom)) 885 886 n_atom_valence = atom_valence * n_atom 887 888 total_valence = total_valence + n_atom_valence 889 890 return total_valence 891 892 @staticmethod 893 def get_total_halogen_atoms(class_dict): 894 """Get the total number of halogen atoms 895 896 Parameters 897 ---------- 898 class_dict : dict 899 A dictionary of classes. 900 901 Returns 902 ------- 903 int 904 The total number of halogen atoms. 905 """ 906 atoms = ["F", "Cl", "Br"] 907 908 total_number = 0 909 910 for atom in atoms: 911 if atom in class_dict.keys(): 912 total_number = total_number + class_dict.get(atom) 913 914 return total_number
A class for generating molecular formula combinations.
Parameters
- molecular_search_settings (object): An object containing user-defined settings.
Attributes
- sql_db (MolForm_SQL): The SQLite database object.
- len_existing_classes (int): The number of existing classes in the SQLite database.
- odd_ch_id (list): A list of odd carbon and hydrogen atom IDs.
- odd_ch_dict (list): A list of odd carbon and hydrogen atom dictionaries.
- odd_ch_mass (list): A list of odd carbon and hydrogen atom masses.
- odd_ch_dbe (list): A list of odd carbon and hydrogen atom double bond equivalents.
- even_ch_id (list): A list of even carbon and hydrogen atom IDs.
- even_ch_dict (list): A list of even carbon and hydrogen atom dictionaries.
- even_ch_mass (list): A list of even carbon and hydrogen atom masses.
- even_ch_dbe (list): A list of even carbon and hydrogen atom double bond equivalents.
Methods
- cProfile_worker(args) A cProfile worker for the get_mol_formulas function.
- check_database_get_class_list(molecular_search_settings) Checks if the database has all the classes, if not create the missing classes.
- get_carbonsHydrogens(settings, odd_even) Retrieves carbon and hydrogen atoms from the molecular lookup table based on user-defined settings.
- add_carbonsHydrogens(settings, existing_classes_objs) Adds carbon and hydrogen atoms to the molecular lookup table based on user-defined settings.
- runworker(molecular_search_settings) Runs the molecular formula lookup table worker.
- get_classes_in_order(molecular_search_settings) Gets the classes in order.
- sort_classes(atoms_in_order, combination_dict) Sorts the classes in order.
- get_fixed_initial_number_of_hydrogen(min_h, odd_even) Gets the fixed initial number of hydrogen atoms.
- calc_mz(datadict, class_mass=0) Calculates the mass-to-charge ratio (m/z) of a molecular formula.
- calc_dbe_class(datadict) Calculates the double bond equivalent (DBE) of a molecular formula.
- populate_combinations(classe_tuple, settings) Populates the combinations.
- get_or_add(SomeClass, kw) Gets or adds a class.
- get_mol_formulas(odd_even_tag, classe_tuple, settings) Gets the molecular formulas.
- get_h_odd_or_even(class_dict) Gets the hydrogen odd or even.
- get_total_halogen_atoms(class_dict) Gets the total number of halogen atoms.
- get_total_hetero_valence(class_dict) Gets the total valence of heteroatoms other than N, F, Cl, and Br
140 def cProfile_worker(self, args): 141 """cProfile worker for the get_mol_formulas function""" 142 cProfile.runctx( 143 "self.get_mol_formulas(*args)", 144 globals(), 145 locals(), 146 "mf_database_cprofile.prof", 147 )
cProfile worker for the get_mol_formulas function
149 def check_database_get_class_list(self, molecular_search_settings): 150 """check if the database has all the classes, if not create the missing classes 151 152 Parameters 153 ---------- 154 molecular_search_settings : object 155 An object containing user-defined settings. 156 157 Returns 158 ------- 159 list 160 list of tuples with the class name and the class dictionary 161 """ 162 all_class_to_create = [] 163 164 classes_dict = self.get_classes_in_order(molecular_search_settings) 165 166 class_str_set = set(classes_dict.keys()) 167 168 existing_classes_objs = self.sql_db.session.query(HeteroAtoms).distinct().all() 169 170 existing_classes_str = set([classe.name for classe in existing_classes_objs]) 171 172 self.len_existing_classes = len(existing_classes_str) 173 174 class_to_create = class_str_set - existing_classes_str 175 176 class_count = len(existing_classes_objs) 177 178 data_classes = list() 179 for index, class_str in enumerate(class_to_create): 180 class_dict = classes_dict.get(class_str) 181 halogen_count = self.get_total_halogen_atoms(class_dict) 182 data_classes.append( 183 { 184 "name": class_str, 185 "id": class_count + index + 1, 186 "halogensCount": halogen_count, 187 } 188 ) 189 190 # data_classes = [{"name":class_str, "id":class_count+ index + 1} for index, class_str in enumerate(class_to_create)] 191 192 if data_classes: 193 list_insert_chunks = chunks(data_classes, self.sql_db.chunks_count) 194 for insert_chunk in list_insert_chunks: 195 insert_query = HeteroAtoms.__table__.insert().values(insert_chunk) 196 self.sql_db.session.execute(insert_query) 197 198 for index, class_str in enumerate(class_to_create): 199 class_tuple = ( 200 class_str, 201 classes_dict.get(class_str), 202 class_count + index + 1, 203 ) 204 205 all_class_to_create.append(class_tuple) 206 207 return ( 208 [(c_s, c_d) for c_s, c_d in classes_dict.items()], 209 all_class_to_create, 210 existing_classes_objs, 211 )
check if the database has all the classes, if not create the missing classes
Parameters
- molecular_search_settings (object): An object containing user-defined settings.
Returns
- list: list of tuples with the class name and the class dictionary
213 def get_carbonsHydrogens(self, settings, odd_even): 214 """Retrieve carbon and hydrogen atoms from the molecular lookup table based on user-defined settings. 215 216 Parameters 217 ---------- 218 settings : object 219 An object containing user-defined settings. 220 odd_even : str 221 A string indicating whether to retrieve even or odd hydrogen atoms. 222 223 Returns 224 ------- 225 list 226 A list of CarbonHydrogen objects that satisfy the specified conditions. 227 """ 228 operator = "==" if odd_even == "even" else "!=" 229 usedAtoms = settings.usedAtoms 230 user_min_c, user_max_c = usedAtoms.get("C") 231 user_min_h, user_max_h = usedAtoms.get("H") 232 233 return eval( 234 "self.sql_db.session.query(CarbonHydrogen).filter(" 235 "CarbonHydrogen.C >= user_min_c," 236 "CarbonHydrogen.H >= user_min_h," 237 "CarbonHydrogen.C <= user_max_c," 238 "CarbonHydrogen.H <= user_max_h," 239 "CarbonHydrogen.H % 2" + operator + "0).all()" 240 )
Retrieve carbon and hydrogen atoms from the molecular lookup table based on user-defined settings.
Parameters
- settings (object): An object containing user-defined settings.
- odd_even (str): A string indicating whether to retrieve even or odd hydrogen atoms.
Returns
- list: A list of CarbonHydrogen objects that satisfy the specified conditions.
242 def add_carbonsHydrogens(self, settings, existing_classes_objs): 243 """Add carbon and hydrogen atoms to the molecular lookup table based on user-defined settings. 244 245 Parameters 246 ---------- 247 settings : object 248 An object containing user-defined settings. 249 existing_classes_objs : list 250 A list of HeteroAtoms objects. 251 """ 252 usedAtoms = settings.usedAtoms 253 254 user_min_c, user_max_c = usedAtoms.get("C") 255 user_min_h, user_max_h = usedAtoms.get("H") 256 257 query_obj = self.sql_db.session.query( 258 func.max(CarbonHydrogen.C).label("max_c"), 259 func.min(CarbonHydrogen.C).label("min_c"), 260 func.max(CarbonHydrogen.H).label("max_h"), 261 func.min(CarbonHydrogen.H).label("min_h"), 262 ) 263 264 database = query_obj.first() 265 if ( 266 database.max_c == user_max_c 267 and database.min_c == user_min_c 268 and database.max_h == user_max_h 269 and database.min_h == user_min_h 270 ): 271 # all data is already available at the database 272 pass 273 274 else: 275 current_count = self.sql_db.session.query(CarbonHydrogen.C).count() 276 277 databaseCarbonHydrogen = self.sql_db.session.query(CarbonHydrogen).all() 278 279 userCarbon = set(range(user_min_c, user_max_c + 1)) 280 userHydrogen = set(range(user_min_h, user_max_h + 1)) 281 282 carbon_hydrogen_objs_database = {} 283 for obj in databaseCarbonHydrogen: 284 str_data = "C:{},H:{}".format(obj.C, obj.H) 285 carbon_hydrogen_objs_database[str_data] = str_data 286 287 carbon_hydrogen_objs_to_create = {"even": {}, "odd": {}} 288 289 list_ch_obj_to_add = list() 290 i = 0 291 for comb in itertools.product(userCarbon, userHydrogen): 292 C = comb[0] 293 H = comb[1] 294 data = { 295 "C": C, 296 "H": H, 297 } 298 299 data_insert = { 300 "C": C, 301 "H": H, 302 } 303 304 str_data = "C:{},H:{}".format(C, H) 305 306 if not str_data in carbon_hydrogen_objs_database.keys(): 307 label = "even" if comb[1] % 2 == 0 else "odd" 308 data["mass"] = (C * Atoms.atomic_masses.get("C")) + ( 309 H * Atoms.atomic_masses.get("H") 310 ) 311 data["dbe"] = C - (H / 2) + 1 312 data["id"] = i + current_count + 1 313 data_insert["id"] = i + current_count + 1 314 i = i + 1 315 carbon_hydrogen_objs_to_create[label][str_data] = data 316 317 list_ch_obj_to_add.append(data_insert) 318 319 if list_ch_obj_to_add: 320 # insert carbon hydrogen objs 321 list_insert_chunks = chunks( 322 list_ch_obj_to_add, self.sql_db.chunks_count 323 ) 324 for insert_chunk in list_insert_chunks: 325 insert_query = CarbonHydrogen.__table__.insert().values( 326 insert_chunk 327 ) 328 self.sql_db.session.execute(insert_query) 329 self.sql_db.session.commit() 330 331 list_molecular_form = list() 332 for classe_obj in existing_classes_objs: 333 classe_dict = classe_obj.to_dict() 334 classe_mass = self.calc_mz(classe_dict) 335 classe_dbe = self.calc_dbe_class(classe_dict) 336 337 odd_even_label = self.get_h_odd_or_even(classe_dict) 338 339 ch_datalist = carbon_hydrogen_objs_to_create.get( 340 odd_even_label 341 ).values() 342 343 for ch_dict in ch_datalist: 344 mass = ch_dict.get("mass") + classe_mass 345 dbe = ch_dict.get("dbe") + classe_dbe 346 347 if settings.min_mz <= mass <= settings.max_mz: 348 if settings.min_dbe <= dbe <= settings.max_dbe: 349 list_molecular_form.append( 350 { 351 "heteroAtoms_id": classe_obj.id, 352 "carbonHydrogen_id": ch_dict.get("id"), 353 "mass": mass, 354 "DBE": dbe, 355 } 356 ) 357 358 list_insert_chunks = chunks( 359 list_molecular_form, self.sql_db.chunks_count 360 ) 361 for insert_chunk in list_insert_chunks: 362 insert_query = MolecularFormulaLink.__table__.insert().values( 363 insert_chunk 364 ) 365 self.sql_db.session.execute(insert_query) 366 self.sql_db.session.commit()
Add carbon and hydrogen atoms to the molecular lookup table based on user-defined settings.
Parameters
- settings (object): An object containing user-defined settings.
- existing_classes_objs (list): A list of HeteroAtoms objects.
27 def timed(*args, **kw): 28 # Extract print_time from kwargs if provided 29 local_print_time = kw.pop('print_time', print_time) 30 ts = time.time() 31 result = method(*args, **kw) 32 te = time.time() 33 if "log_time" in kw: 34 name = kw.get("log_name", method.__name__.upper()) 35 kw["log_time"][name] = int((te - ts) * 1000) 36 elif local_print_time: 37 print("%r %2.2f ms" % (method.__name__, (te - ts) * 1000)) 38 return result
Run the molecular formula lookup table worker.
Parameters
- molecular_search_settings (object): An object containing user-defined settings.
- kwargs (dict): A dictionary of keyword arguments. Most notably, the print_time argument which is passed to the timeit decorator.
Returns
- list: A list of tuples with the class name and the class dictionary.
454 def get_classes_in_order(self, molecular_search_settings): 455 """Get the classes in order 456 457 Parameters 458 ---------- 459 molecular_search_settings : object 460 An object containing user-defined settings. 461 462 Returns 463 ------- 464 dict 465 A dictionary of classes in order. 466 structure is ('HC', {'HC': 1}) 467 """ 468 469 usedAtoms = deepcopy(molecular_search_settings.usedAtoms) 470 471 usedAtoms.pop("C") 472 usedAtoms.pop("H") 473 474 min_n, max_n = usedAtoms.get("N") if usedAtoms.get("N") else (0, 0) 475 min_o, max_o = usedAtoms.get("O") if usedAtoms.get("O") else (0, 0) 476 min_s, max_s = usedAtoms.get("S") if usedAtoms.get("S") else (0, 0) 477 min_p, max_p = usedAtoms.get("P") if usedAtoms.get("P") else (0, 0) 478 479 possible_n = [n for n in range(min_n, max_n + 1)] 480 possible_o = [o for o in range(min_o, max_o + 1)] 481 possible_s = [s for s in range(min_s, max_s + 1)] 482 possible_p = [p for p in range(min_p, max_p + 1)] 483 484 atoms_in_order = ["N", "O", "S", "P"] 485 486 classe_in_order = {} 487 488 all_atoms_tuples = itertools.product( 489 possible_n, possible_o, possible_s, possible_p 490 ) 491 492 for atom in atoms_in_order: 493 usedAtoms.pop(atom, None) 494 495 for selected_atom, min_max_tuple in usedAtoms.items(): 496 min_x = min_max_tuple[0] 497 max_x = min_max_tuple[1] 498 499 possible_x = [x for x in range(min_x, max_x + 1)] 500 501 all_atoms_tuples = itertools.product(all_atoms_tuples, possible_x) 502 all_atoms_tuples = [ 503 all_atoms_combined[0] + (all_atoms_combined[1],) 504 for all_atoms_combined in all_atoms_tuples 505 ] 506 atoms_in_order.append(selected_atom) 507 508 for all_atoms_tuple in all_atoms_tuples: 509 classe_str = "" 510 classe_dict = {} 511 512 for each_atoms_index, atom_number in enumerate(all_atoms_tuple): 513 if atom_number != 0: 514 classe_dict[atoms_in_order[each_atoms_index]] = atom_number 515 516 if not classe_dict: 517 classe_in_order["HC"] = {"HC": ""} 518 continue 519 520 classe_str = json.dumps(classe_dict) 521 522 if len(classe_str) > 0: 523 classe_in_order[classe_str] = classe_dict 524 525 classe_in_order_dict = self.sort_classes(atoms_in_order, classe_in_order) 526 527 return classe_in_order_dict
Get the classes in order
Parameters
- molecular_search_settings (object): An object containing user-defined settings.
Returns
- dict: A dictionary of classes in order. structure is ('HC', {'HC': 1})
529 @staticmethod 530 def sort_classes(atoms_in_order, combination_dict) -> Dict[str, Dict[str, int]]: 531 """Sort the classes in order 532 533 Parameters 534 ---------- 535 atoms_in_order : list 536 A list of atoms in order. 537 combination_dict : dict 538 A dictionary of classes. 539 540 Returns 541 ------- 542 dict 543 A dictionary of classes in order. 544 """ 545 # ensures atoms are always in the order defined at atoms_in_order list 546 join_dict_classes = dict() 547 atoms_in_order = ["N", "S", "P", "O"] + atoms_in_order[4:] + ["HC"] 548 549 sort_method = lambda atoms_keys: [atoms_in_order.index(atoms_keys)] 550 for class_str, class_dict in combination_dict.items(): 551 sorted_dict_keys = sorted(class_dict, key=sort_method) 552 class_dict = {atom: class_dict[atom] for atom in sorted_dict_keys} 553 class_str = json.dumps(class_dict) 554 # using json for the new database, class 555 # class_str = ' '.join([atom + str(class_dict[atom]) for atom in sorted_dict_keys]) 556 join_dict_classes[class_str] = class_dict 557 558 return join_dict_classes
Sort the classes in order
Parameters
- atoms_in_order (list): A list of atoms in order.
- combination_dict (dict): A dictionary of classes.
Returns
- dict: A dictionary of classes in order.
560 @staticmethod 561 def get_fixed_initial_number_of_hydrogen(min_h, odd_even): 562 """Get the fixed initial number of hydrogen atoms 563 564 Parameters 565 ---------- 566 min_h : int 567 The minimum number of hydrogen atoms. 568 odd_even : str 569 A string indicating whether to retrieve even or odd hydrogen atoms. 570 """ 571 remaining_h = min_h % 2 572 573 if odd_even == "even": 574 if remaining_h == 0: 575 return remaining_h 576 577 else: 578 return remaining_h + 1 579 580 else: 581 if remaining_h == 0: 582 return remaining_h + 1 583 584 else: 585 return remaining_h
Get the fixed initial number of hydrogen atoms
Parameters
- min_h (int): The minimum number of hydrogen atoms.
- odd_even (str): A string indicating whether to retrieve even or odd hydrogen atoms.
587 def calc_mz(self, datadict, class_mass=0): 588 """Calculate the mass-to-charge ratio (m/z) of a molecular formula. 589 590 Parameters 591 ---------- 592 datadict : dict 593 A dictionary of classes. 594 class_mass : int 595 The mass of the class. 596 597 Returns 598 ------- 599 float 600 The mass-to-charge ratio (m/z) of a molecular formula. 601 """ 602 mass = class_mass 603 604 for atom in datadict.keys(): 605 if atom != "HC": 606 mass = mass + Atoms.atomic_masses[atom] * datadict.get(atom) 607 608 return mass
Calculate the mass-to-charge ratio (m/z) of a molecular formula.
Parameters
- datadict (dict): A dictionary of classes.
- class_mass (int): The mass of the class.
Returns
- float: The mass-to-charge ratio (m/z) of a molecular formula.
610 def calc_dbe_class(self, datadict): 611 """Calculate the double bond equivalent (DBE) of a molecular formula. 612 613 Parameters 614 ---------- 615 datadict : dict 616 A dictionary of classes. 617 618 Returns 619 ------- 620 float 621 The double bond equivalent (DBE) of a molecular formula. 622 """ 623 init_dbe = 0 624 for atom in datadict.keys(): 625 if atom == "HC": 626 continue 627 628 n_atom = int(datadict.get(atom)) 629 630 clean_atom = "".join([i for i in atom if not i.isdigit()]) 631 632 valencia = MSParameters.molecular_search.used_atom_valences.get(clean_atom) 633 634 if type(valencia) is tuple: 635 valencia = valencia[0] 636 if valencia > 0: 637 # print atom, valencia, n_atom, init_dbe 638 init_dbe = init_dbe + (n_atom * (valencia - 2)) 639 else: 640 continue 641 642 return 0.5 * init_dbe
Calculate the double bond equivalent (DBE) of a molecular formula.
Parameters
- datadict (dict): A dictionary of classes.
Returns
- float: The double bond equivalent (DBE) of a molecular formula.
644 def populate_combinations(self, classe_tuple, settings): 645 """Populate the combinations 646 647 Parameters 648 ---------- 649 classe_tuple : tuple 650 A tuple containing the class name, the class dictionary, and the class ID. 651 settings : object 652 An object containing user-defined settings. 653 654 Returns 655 ------- 656 list 657 A list of molecular formula data dictionaries. 658 """ 659 ion_charge = 0 660 661 class_dict = classe_tuple[1] 662 odd_or_even = self.get_h_odd_or_even(class_dict) 663 664 return self.get_mol_formulas(odd_or_even, classe_tuple, settings)
Populate the combinations
Parameters
- classe_tuple (tuple): A tuple containing the class name, the class dictionary, and the class ID.
- settings (object): An object containing user-defined settings.
Returns
- list: A list of molecular formula data dictionaries.
666 def get_or_add(self, SomeClass, kw): 667 """Get or add a class 668 669 Parameters 670 ---------- 671 SomeClass : object 672 A class object. 673 kw : dict 674 A dictionary of classes. 675 676 Returns 677 ------- 678 object 679 A class object. 680 """ 681 obj = self.sql_db.session.query(SomeClass).filter_by(**kw).first() 682 if not obj: 683 obj = SomeClass(**kw) 684 return obj
Get or add a class
Parameters
- SomeClass (object): A class object.
- kw (dict): A dictionary of classes.
Returns
- object: A class object.
686 def get_mol_formulas(self, odd_even_tag, classe_tuple, settings): 687 """Get the molecular formulas 688 689 Parameters 690 ---------- 691 odd_even_tag : str 692 A string indicating whether to retrieve even or odd hydrogen atoms. 693 classe_tuple : tuple 694 695 settings : object 696 An object containing user-defined settings. 697 698 Returns 699 ------- 700 list 701 A list of molecular formula data dictionaries. 702 703 """ 704 class_str = classe_tuple[0] 705 class_dict = classe_tuple[1] 706 classe_id = classe_tuple[2] 707 708 results = list() 709 710 if "HC" in class_dict: 711 del class_dict["HC"] 712 713 class_dbe = self.calc_dbe_class(class_dict) 714 class_mass = self.calc_mz(class_dict) 715 716 carbonHydrogen_mass = ( 717 self.odd_ch_mass if odd_even_tag == "odd" else self.even_ch_mass 718 ) 719 carbonHydrogen_dbe = ( 720 self.odd_ch_dbe if odd_even_tag == "odd" else self.even_ch_dbe 721 ) 722 carbonHydrogen_id = self.odd_ch_id if odd_even_tag == "odd" else self.even_ch_id 723 724 for index, carbonHydrogen_obj in enumerate(carbonHydrogen_id): 725 mass = carbonHydrogen_mass[index] + class_mass 726 dbe = carbonHydrogen_dbe[index] + class_dbe 727 728 if settings.min_mz <= mass <= settings.max_mz: 729 if settings.min_dbe <= dbe <= settings.max_dbe: 730 molecularFormula = { 731 "heteroAtoms_id": classe_id, 732 "carbonHydrogen_id": carbonHydrogen_id[index], 733 "mass": mass, 734 "DBE": dbe, 735 } 736 737 results.append(molecularFormula) 738 739 return results
Get the molecular formulas
Parameters
- odd_even_tag (str): A string indicating whether to retrieve even or odd hydrogen atoms.
classe_tuple (tuple):
settings (object): An object containing user-defined settings.
Returns
- list: A list of molecular formula data dictionaries.
741 def get_h_odd_or_even(self, class_dict): 742 """Get the hydrogen odd or even 743 744 Parameters 745 ---------- 746 class_dict : dict 747 A dictionary of classes. 748 749 Returns 750 ------- 751 str 752 A string indicating whether to retrieve even or odd hydrogen atoms. 753 """ 754 755 HAS_NITROGEN = "N" in class_dict.keys() 756 757 number_of_halogen = self.get_total_halogen_atoms(class_dict) 758 number_of_hetero = self.get_total_heteroatoms(class_dict) 759 760 if number_of_halogen > 0: 761 HAS_HALOGEN = True 762 763 else: 764 HAS_HALOGEN = False 765 766 if HAS_HALOGEN: 767 remaining_halogen = number_of_halogen % 2 768 769 else: 770 remaining_halogen = 0 771 772 if number_of_hetero > 0: 773 HAS_OTHER_HETERO = True 774 775 total_hetero_valence = self.get_total_hetero_valence(class_dict) 776 777 else: 778 HAS_OTHER_HETERO = False 779 780 total_hetero_valence = 0 781 782 if HAS_OTHER_HETERO: 783 remaining_hetero_valence = total_hetero_valence % 2 784 785 else: 786 remaining_hetero_valence = 0 787 788 if HAS_NITROGEN and not HAS_OTHER_HETERO: 789 number_of_n = class_dict.get("N") 790 remaining_n = number_of_n % 2 791 792 elif HAS_NITROGEN and HAS_OTHER_HETERO: 793 number_of_n = class_dict.get("N") 794 remaining_n = (number_of_n + remaining_hetero_valence) % 2 795 796 elif HAS_OTHER_HETERO and not HAS_NITROGEN: 797 remaining_n = remaining_hetero_valence 798 799 else: 800 remaining_n = -1 801 802 if remaining_n > 0.0: 803 if HAS_NITROGEN or HAS_OTHER_HETERO: 804 if HAS_HALOGEN: 805 if remaining_halogen == 0: 806 return "odd" 807 else: 808 return "even" 809 810 else: 811 return "odd" 812 813 elif remaining_n == 0.0: 814 if HAS_NITROGEN or HAS_OTHER_HETERO: 815 if HAS_HALOGEN: 816 if remaining_halogen == 0: 817 return "even" 818 else: 819 return "odd" 820 821 else: 822 return "even" 823 824 else: 825 if HAS_HALOGEN: 826 if remaining_halogen == 0: 827 return "even" 828 else: 829 return "odd" 830 831 else: 832 return "even"
Get the hydrogen odd or even
Parameters
- class_dict (dict): A dictionary of classes.
Returns
- str: A string indicating whether to retrieve even or odd hydrogen atoms.
834 @staticmethod 835 def get_total_heteroatoms(class_dict): 836 """Get the total number of heteroatoms other than N, F, Cl, Br 837 838 Parameters 839 ---------- 840 class_dict : dict 841 A dictionary of classes. 842 843 Returns 844 ------- 845 int 846 The total number of heteroatoms. 847 """ 848 849 total_number = 0 850 851 for atom in class_dict.keys(): 852 if atom not in ["HC", "C", "H", "O", "N", "F", "Cl", "Br"]: 853 total_number = total_number + class_dict.get(atom) 854 855 return total_number
Get the total number of heteroatoms other than N, F, Cl, Br
Parameters
- class_dict (dict): A dictionary of classes.
Returns
- int: The total number of heteroatoms.
857 @staticmethod 858 def get_total_hetero_valence(class_dict): 859 """Get the total valence of heteroatoms other than N, F, Cl, Br 860 861 Parameters 862 ---------- 863 class_dict : dict 864 A dictionary of classes. 865 866 Returns 867 ------- 868 int 869 The total heteroatom valence. 870 """ 871 total_valence = 0 872 873 for atom in class_dict.keys(): 874 if atom not in ["HC", "C", "H", "O", "N", "F", "Cl", "Br"]: 875 clean_atom = "".join([i for i in atom if not i.isdigit()]) 876 877 atom_valence = MSParameters.molecular_search.used_atom_valences.get( 878 clean_atom 879 ) 880 881 if type(atom_valence) is tuple: 882 atom_valence = atom_valence[0] 883 884 n_atom = int(class_dict.get(atom)) 885 886 n_atom_valence = atom_valence * n_atom 887 888 total_valence = total_valence + n_atom_valence 889 890 return total_valence
Get the total valence of heteroatoms other than N, F, Cl, Br
Parameters
- class_dict (dict): A dictionary of classes.
Returns
- int: The total heteroatom valence.
892 @staticmethod 893 def get_total_halogen_atoms(class_dict): 894 """Get the total number of halogen atoms 895 896 Parameters 897 ---------- 898 class_dict : dict 899 A dictionary of classes. 900 901 Returns 902 ------- 903 int 904 The total number of halogen atoms. 905 """ 906 atoms = ["F", "Cl", "Br"] 907 908 total_number = 0 909 910 for atom in atoms: 911 if atom in class_dict.keys(): 912 total_number = total_number + class_dict.get(atom) 913 914 return total_number
Get the total number of halogen atoms
Parameters
- class_dict (dict): A dictionary of classes.
Returns
- int: The total number of halogen atoms.