corems.mass_spectra.input.rawFileReader
1__author__ = "Yuri E. Corilo" 2__date__ = "Jun 09, 2021" 3 4 5from warnings import warn 6import warnings 7from collections import defaultdict 8 9from matplotlib import axes 10from corems.encapsulation.factory.processingSetting import LiquidChromatographSetting 11 12import numpy as np 13import sys 14import site 15from pathlib import Path 16import datetime 17import importlib.util 18import os 19 20import clr 21import pandas as pd 22from s3path import S3Path 23 24 25from typing import Any, Dict, List, Optional, Tuple 26from corems.encapsulation.constant import Labels 27from corems.mass_spectra.factory.lc_class import MassSpectraBase, LCMSBase 28from corems.mass_spectra.factory.chromat_data import EIC_Data, TIC_Data 29from corems.mass_spectrum.factory.MassSpectrumClasses import ( 30 MassSpecProfile, 31 MassSpecCentroid, 32) 33from corems.encapsulation.factory.parameters import LCMSParameters, default_parameters 34from corems.mass_spectra.input.parserbase import SpectraParserInterface 35 36# Add the path of the Thermo .NET libraries to the system path 37spec = importlib.util.find_spec("corems") 38sys.path.append(str(Path(os.path.dirname(spec.origin)).parent) + "/ext_lib/dotnet/") 39 40clr.AddReference("ThermoFisher.CommonCore.RawFileReader") 41clr.AddReference("ThermoFisher.CommonCore.Data") 42clr.AddReference("ThermoFisher.CommonCore.MassPrecisionEstimator") 43 44from ThermoFisher.CommonCore.RawFileReader import RawFileReaderAdapter 45from ThermoFisher.CommonCore.Data import ToleranceUnits, Extensions 46from ThermoFisher.CommonCore.Data.Business import ( 47 ChromatogramTraceSettings, 48 TraceType, 49 MassOptions, 50) 51from ThermoFisher.CommonCore.Data.Business import ChromatogramSignal, Range 52from ThermoFisher.CommonCore.Data.Business import Device 53from ThermoFisher.CommonCore.Data.Interfaces import IChromatogramSettings 54from ThermoFisher.CommonCore.Data.Business import MassOptions, FileHeaderReaderFactory 55from ThermoFisher.CommonCore.Data.FilterEnums import MSOrderType 56from System.Collections.Generic import List 57 58 59class ThermoBaseClass: 60 """Class for parsing Thermo Raw files and extracting information from them. 61 62 Parameters: 63 ----------- 64 file_location : str or pathlib.Path or s3path.S3Path 65 Thermo Raw file path or S3 path. 66 67 Attributes: 68 ----------- 69 file_path : str or pathlib.Path or s3path.S3Path 70 The file path of the Thermo Raw file. 71 parameters : LCMSParameters 72 The LCMS parameters for the Thermo Raw file. 73 chromatogram_settings : LiquidChromatographSetting 74 The chromatogram settings for the Thermo Raw file. 75 scans : list or tuple 76 The selected scans for the Thermo Raw file. 77 start_scan : int 78 The starting scan number for the Thermo Raw file. 79 end_scan : int 80 The ending scan number for the Thermo Raw file. 81 82 Methods: 83 -------- 84 * set_msordertype(scanFilter, mstype: str = 'ms1') -> scanFilter 85 Convert the user-passed MS Type string to a Thermo MSOrderType object. 86 * get_instrument_info() -> dict 87 Get the instrument information from the Thermo Raw file. 88 * get_creation_time() -> datetime.datetime 89 Extract the creation date stamp from the .RAW file and return it as a formatted datetime object. 90 * remove_temp_file() 91 Remove the temporary file if the path is from S3Path. 92 * get_polarity_mode(scan_number: int) -> int 93 Get the polarity mode for the given scan number. 94 * get_filter_for_scan_num(scan_number: int) -> List[str] 95 Get the filter for the given scan number. 96 * check_full_scan(scan_number: int) -> bool 97 Check if the given scan number is a full scan. 98 * get_all_filters() -> Tuple[Dict[int, str], List[str]] 99 Get all scan filters for the Thermo Raw file. 100 * get_scan_header(scan: int) -> Dict[str, Any] 101 Get the full dictionary of scan header metadata for the given scan number. 102 * get_rt_time_from_trace(trace) -> Tuple[List[float], List[float], List[int]] 103 Get the retention time, intensity, and scan number from the given trace. 104 * get_eics(target_mzs: List[float], tic_data: Dict[str, Any], ms_type: str = 'MS !d', 105 peak_detection: bool = True, smooth: bool = True, plot: bool = False, 106 ax: Optional[matplotlib.axes.Axes] = None, legend: bool = False) -> Tuple[Dict[float, EIC_Data], matplotlib.axes.Axes] 107 Get the extracted ion chromatograms (EICs) for the target m/z values. 108 109 """ 110 111 def __init__(self, file_location): 112 """file_location: srt pathlib.Path or s3path.S3Path 113 Thermo Raw file path 114 """ 115 # Thread.__init__(self) 116 if isinstance(file_location, str): 117 file_path = Path(file_location) 118 119 elif isinstance(file_location, S3Path): 120 temp_dir = Path("tmp/") 121 temp_dir.mkdir(exist_ok=True) 122 123 file_path = temp_dir / file_location.name 124 with open(file_path, "wb") as fh: 125 fh.write(file_location.read_bytes()) 126 127 else: 128 file_path = file_location 129 130 self.iRawDataPlus = RawFileReaderAdapter.FileFactory(str(file_path)) 131 132 if not self.iRawDataPlus.IsOpen: 133 raise FileNotFoundError( 134 "Unable to access the RAW file using the RawFileReader class!" 135 ) 136 137 # Check for any errors in the RAW file 138 if self.iRawDataPlus.IsError: 139 raise IOError( 140 "Error opening ({}) - {}".format(self.iRawDataPlus.FileError, file_path) 141 ) 142 143 self.res = self.iRawDataPlus.SelectInstrument(Device.MS, 1) 144 145 self.file_path = file_location 146 self.iFileHeader = FileHeaderReaderFactory.ReadFile(str(file_path)) 147 148 # removing tmp file 149 150 self._init_settings() 151 152 def _init_settings(self): 153 """ 154 Initialize the LCMSParameters object. 155 """ 156 self._parameters = LCMSParameters() 157 158 @property 159 def parameters(self) -> LCMSParameters: 160 """ 161 Get or set the LCMSParameters object. 162 """ 163 return self._parameters 164 165 @parameters.setter 166 def parameters(self, instance_LCMSParameters: LCMSParameters): 167 self._parameters = instance_LCMSParameters 168 169 @property 170 def chromatogram_settings(self) -> LiquidChromatographSetting: 171 """ 172 Get or set the LiquidChromatographSetting object. 173 """ 174 return self.parameters.lc_ms 175 176 @chromatogram_settings.setter 177 def chromatogram_settings( 178 self, instance_LiquidChromatographSetting: LiquidChromatographSetting 179 ): 180 self.parameters.lc_ms = instance_LiquidChromatographSetting 181 182 @property 183 def scans(self) -> list | tuple: 184 """scans : list or tuple 185 If list uses Thermo AverageScansInScanRange for selected scans, ortherwise uses Thermo AverageScans for a scan range 186 """ 187 return self.chromatogram_settings.scans 188 189 @property 190 def start_scan(self) -> int: 191 """ 192 Get the starting scan number for the Thermo Raw file. 193 """ 194 if self.scans[0] == -1: 195 return self.iRawDataPlus.RunHeaderEx.FirstSpectrum 196 else: 197 return self.scans[0] 198 199 @property 200 def end_scan(self) -> int: 201 """ 202 Get the ending scan number for the Thermo Raw file. 203 """ 204 if self.scans[-1] == -1: 205 return self.iRawDataPlus.RunHeaderEx.LastSpectrum 206 else: 207 return self.scans[-1] 208 209 def set_msordertype(self, scanFilter, mstype: str = "ms1"): 210 """ 211 Function to convert user passed string MS Type to Thermo MSOrderType object 212 Limited to MS1 through MS10. 213 214 Parameters: 215 ----------- 216 scanFilter : Thermo.ScanFilter 217 The scan filter object. 218 mstype : str, optional 219 The MS Type string, by default 'ms1' 220 221 """ 222 mstype = mstype.upper() 223 # Check that a valid mstype is passed 224 if (int(mstype.split("MS")[1]) > 10) or (int(mstype.split("MS")[1]) < 1): 225 warn("MS Type not valid, must be between MS1 and MS10") 226 227 msordertypedict = { 228 "MS1": MSOrderType.Ms, 229 "MS2": MSOrderType.Ms2, 230 "MS3": MSOrderType.Ms3, 231 "MS4": MSOrderType.Ms4, 232 "MS5": MSOrderType.Ms5, 233 "MS6": MSOrderType.Ms6, 234 "MS7": MSOrderType.Ms7, 235 "MS8": MSOrderType.Ms8, 236 "MS9": MSOrderType.Ms9, 237 "MS10": MSOrderType.Ms10, 238 } 239 scanFilter.MSOrder = msordertypedict[mstype] 240 return scanFilter 241 242 def get_instrument_info(self) -> dict: 243 """ 244 Get the instrument information from the Thermo Raw file. 245 246 Returns: 247 -------- 248 dict 249 A dictionary with the keys 'model', and 'serial_number'. 250 """ 251 instrumentData = self.iRawDataPlus.GetInstrumentData() 252 return { 253 "model": instrumentData.Model, 254 "serial_number": instrumentData.SerialNumber 255 } 256 257 def get_creation_time(self) -> datetime.datetime: 258 """ 259 Extract the creation date stamp from the .RAW file 260 Return formatted creation date stamp. 261 262 """ 263 credate = self.iRawDataPlus.CreationDate.get_Ticks() 264 credate = datetime.datetime(1, 1, 1) + datetime.timedelta( 265 microseconds=credate / 10 266 ) 267 return credate 268 269 def remove_temp_file(self) -> None: 270 """if the path is from S3Path data cannot be serialized to io.ByteStream and 271 a temporary copy is stored at the temp dir 272 use this function only at the end of your execution scrip 273 some LCMS class methods depend on this file 274 """ 275 276 self.file_path.unlink() 277 278 def close_file(self) -> None: 279 """ 280 Close the Thermo Raw file. 281 """ 282 self.iRawDataPlus.Dispose() 283 284 def get_polarity_mode(self, scan_number: int) -> int: 285 """ 286 Get the polarity mode for the given scan number. 287 288 Parameters: 289 ----------- 290 scan_number : int 291 The scan number. 292 293 Raises: 294 ------- 295 Exception 296 If the polarity mode is unknown. 297 298 """ 299 polarity_symbol = self.get_filter_for_scan_num(scan_number)[1] 300 301 if polarity_symbol == "+": 302 return 1 303 # return 'POSITIVE_ION_MODE' 304 305 elif polarity_symbol == "-": 306 return -1 307 308 else: 309 raise Exception("Polarity Mode Unknown, please set it manually") 310 311 def get_filter_for_scan_num(self, scan_number: int) -> List[str]: 312 """ 313 Returns the closest matching run time that corresponds to scan_number for the current 314 controller. This function is only supported for MS device controllers. 315 e.g. ['FTMS', '-', 'p', 'NSI', 'Full', 'ms', '[200.00-1000.00]'] 316 317 Parameters: 318 ----------- 319 scan_number : int 320 The scan number. 321 322 """ 323 scan_label = self.iRawDataPlus.GetScanEventStringForScanNumber(scan_number) 324 325 return str(scan_label).split() 326 327 def get_ms_level_for_scan_num(self, scan_number: int) -> str: 328 """ 329 Get the MS order for the given scan number. 330 331 Parameters: 332 ----------- 333 scan_number : int 334 The scan number 335 336 Returns: 337 -------- 338 int 339 The MS order type (1 for MS, 2 for MS2, etc.) 340 """ 341 scan_filter = self.iRawDataPlus.GetFilterForScanNumber(scan_number) 342 343 msordertype = { 344 MSOrderType.Ms: 1, 345 MSOrderType.Ms2: 2, 346 MSOrderType.Ms3: 3, 347 MSOrderType.Ms4: 4, 348 MSOrderType.Ms5: 5, 349 MSOrderType.Ms6: 6, 350 MSOrderType.Ms7: 7, 351 MSOrderType.Ms8: 8, 352 MSOrderType.Ms9: 9, 353 MSOrderType.Ms10: 10, 354 } 355 356 if scan_filter.MSOrder in msordertype: 357 return msordertype[scan_filter.MSOrder] 358 else: 359 raise Exception("MS Order Type not found") 360 361 def check_full_scan(self, scan_number: int) -> bool: 362 # scan_filter.ScanMode 0 = FULL 363 scan_filter = self.iRawDataPlus.GetFilterForScanNumber(scan_number) 364 365 return scan_filter.ScanMode == MSOrderType.Ms 366 367 def get_all_filters(self) -> Tuple[Dict[int, str], List[str]]: 368 """ 369 Get all scan filters. 370 This function is only supported for MS device controllers. 371 e.g. ['FTMS', '-', 'p', 'NSI', 'Full', 'ms', '[200.00-1000.00]'] 372 373 """ 374 375 scanrange = range(self.start_scan, self.end_scan + 1) 376 scanfiltersdic = {} 377 scanfilterslist = [] 378 for scan_number in scanrange: 379 scan_label = self.iRawDataPlus.GetScanEventStringForScanNumber(scan_number) 380 scanfiltersdic[scan_number] = scan_label 381 scanfilterslist.append(scan_label) 382 scanfilterset = list(set(scanfilterslist)) 383 return scanfiltersdic, scanfilterset 384 385 def get_scan_header(self, scan: int) -> Dict[str, Any]: 386 """ 387 Get full dictionary of scan header meta data, i.e. AGC status, ion injection time, etc. 388 389 Parameters: 390 ----------- 391 scan : int 392 The scan number. 393 394 """ 395 header = self.iRawDataPlus.GetTrailerExtraInformation(scan) 396 397 header_dic = {} 398 for i in range(header.Length): 399 header_dic.update({header.Labels[i]: header.Values[i]}) 400 return header_dic 401 402 @staticmethod 403 def get_rt_time_from_trace(trace) -> Tuple[List[float], List[float], List[int]]: 404 """trace: ThermoFisher.CommonCore.Data.Business.ChromatogramSignal""" 405 return list(trace.Times), list(trace.Intensities), list(trace.Scans) 406 407 def get_eics( 408 self, 409 target_mzs: List[float], 410 tic_data: Dict[str, Any], 411 ms_type="MS !d", 412 peak_detection=False, 413 smooth=False, 414 plot=False, 415 ax: Optional[axes.Axes] = None, 416 legend=False, 417 ) -> Tuple[Dict[float, EIC_Data], axes.Axes]: 418 """ms_type: str ('MS', MS2') 419 start_scan: int default -1 will select the lowest available 420 end_scan: int default -1 will select the highest available 421 422 returns: 423 424 chroma: dict{target_mz: EIC_Data( 425 Scans: [int] 426 original thermo scan numbers 427 Time: [floats] 428 list of retention times 429 TIC: [floats] 430 total ion chromatogram 431 Apexes: [int] 432 original thermo apex scan number after peak picking 433 ) 434 435 """ 436 # If peak_detection or smooth is True, raise exception 437 if peak_detection or smooth: 438 raise Exception("Peak detection and smoothing are no longer implemented in this function") 439 440 options = MassOptions() 441 options.ToleranceUnits = ToleranceUnits.ppm 442 options.Tolerance = self.chromatogram_settings.eic_tolerance_ppm 443 444 all_chroma_settings = [] 445 446 for target_mz in target_mzs: 447 settings = ChromatogramTraceSettings(TraceType.MassRange) 448 settings.Filter = ms_type 449 settings.MassRanges = [Range(target_mz, target_mz)] 450 451 chroma_settings = IChromatogramSettings(settings) 452 453 all_chroma_settings.append(chroma_settings) 454 455 # chroma_settings2 = IChromatogramSettings(settings) 456 # print(chroma_settings.FragmentMass) 457 # print(chroma_settings.FragmentMass) 458 # print(chroma_settings) 459 # print(chroma_settings) 460 461 data = self.iRawDataPlus.GetChromatogramData( 462 all_chroma_settings, self.start_scan, self.end_scan, options 463 ) 464 465 traces = ChromatogramSignal.FromChromatogramData(data) 466 467 chroma = {} 468 469 if plot: 470 from matplotlib.transforms import Bbox 471 import matplotlib.pyplot as plt 472 473 if not ax: 474 # ax = plt.gca() 475 # ax.clear() 476 fig, ax = plt.subplots() 477 478 else: 479 fig = plt.gcf() 480 481 # plt.show() 482 483 for i, trace in enumerate(traces): 484 if trace.Length > 0: 485 rt, eic, scans = self.get_rt_time_from_trace(trace) 486 if smooth: 487 eic = self.smooth_tic(eic) 488 489 chroma[target_mzs[i]] = EIC_Data(scans=scans, time=rt, eic=eic) 490 if plot: 491 ax.plot(rt, eic, label="{:.5f}".format(target_mzs[i])) 492 493 if peak_detection: 494 # max_eic = self.get_max_eic(chroma) 495 max_signal = max(tic_data.tic) 496 497 for eic_data in chroma.values(): 498 eic = eic_data.eic 499 time = eic_data.time 500 501 if len(eic) != len(tic_data.tic): 502 warn( 503 "The software assumes same lenth of TIC and EIC, this does not seems to be the case and the results mass spectrum selected by the scan number might not be correct" 504 ) 505 506 if eic.max() > 0: 507 centroid_eics = self.eic_centroid_detector(time, eic, max_signal) 508 eic_data.apexes = [i for i in centroid_eics] 509 510 if plot: 511 for peak_indexes in eic_data.apexes: 512 apex_index = peak_indexes[1] 513 ax.plot( 514 time[apex_index], 515 eic[apex_index], 516 marker="x", 517 linewidth=0, 518 ) 519 520 if plot: 521 ax.set_xlabel("Time (min)") 522 ax.set_ylabel("a.u.") 523 ax.set_title(ms_type + " EIC") 524 ax.tick_params(axis="both", which="major", labelsize=12) 525 ax.axes.spines["top"].set_visible(False) 526 ax.axes.spines["right"].set_visible(False) 527 528 if legend: 529 legend = ax.legend(loc="upper left", bbox_to_anchor=(1.02, 0, 0.07, 1)) 530 fig.subplots_adjust(right=0.76) 531 # ax.set_prop_cycle(color=plt.cm.gist_rainbow(np.linspace(0, 1, len(traces)))) 532 533 d = {"down": 30, "up": -30} 534 535 def func(evt): 536 if legend.contains(evt): 537 bbox = legend.get_bbox_to_anchor() 538 bbox = Bbox.from_bounds( 539 bbox.x0, bbox.y0 + d[evt.button], bbox.width, bbox.height 540 ) 541 tr = legend.axes.transAxes.inverted() 542 legend.set_bbox_to_anchor(bbox.transformed(tr)) 543 fig.canvas.draw_idle() 544 545 fig.canvas.mpl_connect("scroll_event", func) 546 return chroma, ax 547 else: 548 return chroma, None 549 rt = [] 550 tic = [] 551 scans = [] 552 for i in range(traces[0].Length): 553 # print(trace[0].HasBasePeakData,trace[0].EndTime ) 554 555 # print(" {} - {}, {}".format( i, trace[0].Times[i], trace[0].Intensities[i] )) 556 rt.append(traces[0].Times[i]) 557 tic.append(traces[0].Intensities[i]) 558 scans.append(traces[0].Scans[i]) 559 560 return traces 561 # plot_chroma(rt, tic) 562 # plt.show() 563 564 def get_tic( 565 self, 566 ms_type="MS !d", 567 peak_detection=False, # This wont work right now 568 smooth=False, # This wont work right now 569 plot=False, 570 ax=None, 571 trace_type="TIC", 572 ) -> Tuple[TIC_Data, axes.Axes]: 573 """ms_type: str ('MS !d', 'MS2', None) 574 if you use None you get all scans. 575 peak_detection: bool 576 smooth: bool 577 plot: bool 578 ax: matplotlib axis object 579 trace_type: str ('TIC','BPC') 580 581 returns: 582 chroma: dict 583 { 584 Scan: [int] 585 original thermo scan numberMS 586 Time: [floats] 587 list of retention times 588 TIC: [floats] 589 total ion chromatogram 590 Apexes: [int] 591 original thermo apex scan number after peak picking 592 } 593 """ 594 if trace_type == "TIC": 595 settings = ChromatogramTraceSettings(TraceType.TIC) 596 elif trace_type == "BPC": 597 settings = ChromatogramTraceSettings(TraceType.BasePeak) 598 else: 599 raise ValueError(f"{trace_type} undefined") 600 if ms_type == "all": 601 settings.Filter = None 602 else: 603 settings.Filter = ms_type 604 605 chroma_settings = IChromatogramSettings(settings) 606 607 data = self.iRawDataPlus.GetChromatogramData( 608 [chroma_settings], self.start_scan, self.end_scan 609 ) 610 611 trace = ChromatogramSignal.FromChromatogramData(data) 612 613 data = TIC_Data(time=[], scans=[], tic=[], bpc=[], apexes=[]) 614 615 if trace[0].Length > 0: 616 for i in range(trace[0].Length): 617 # print(trace[0].HasBasePeakData,trace[0].EndTime ) 618 619 # print(" {} - {}, {}".format( i, trace[0].Times[i], trace[0].Intensities[i] )) 620 data.time.append(trace[0].Times[i]) 621 data.tic.append(trace[0].Intensities[i]) 622 data.scans.append(trace[0].Scans[i]) 623 624 # print(trace[0].Scans[i]) 625 if smooth: 626 data.tic = self.smooth_tic(data.tic) 627 628 else: 629 data.tic = np.array(data.tic) 630 631 if peak_detection: 632 centroid_peak_indexes = [ 633 i for i in self.centroid_detector(data.time, data.tic) 634 ] 635 636 data.apexes = centroid_peak_indexes 637 638 if plot: 639 if not ax: 640 import matplotlib.pyplot as plt 641 642 ax = plt.gca() 643 # fig, ax = plt.subplots(figsize=(6, 3)) 644 645 ax.plot(data.time, data.tic, label=trace_type) 646 ax.set_xlabel("Time (min)") 647 ax.set_ylabel("a.u.") 648 if peak_detection: 649 for peak_indexes in data.apexes: 650 apex_index = peak_indexes[1] 651 ax.plot( 652 data.time[apex_index], 653 data.tic[apex_index], 654 marker="x", 655 linewidth=0, 656 ) 657 658 # plt.show() 659 if trace_type == "BPC": 660 data.bpc = data.tic 661 data.tic = [] 662 return data, ax 663 if trace_type == "BPC": 664 data.bpc = data.tic 665 data.tic = [] 666 return data, None 667 668 else: 669 return None, None 670 671 def get_average_mass_spectrum( 672 self, 673 spectrum_mode: str = "profile", 674 auto_process: bool = True, 675 ppm_tolerance: float = 5.0, 676 ms_type: str = "MS1", 677 ) -> MassSpecProfile | MassSpecCentroid: 678 """ 679 Averages mass spectra over a scan range using Thermo's AverageScansInScanRange method 680 or a scan list using Thermo's AverageScans method 681 spectrum_mode: str 682 centroid or profile mass spectrum 683 auto_process: bool 684 If true performs peak picking, and noise threshold calculation after creation of mass spectrum object 685 ms_type: str 686 String of form 'ms1' or 'ms2' or 'MS3' etc. Valid up to MS10. 687 Internal function converts to Thermo MSOrderType class. 688 689 """ 690 691 def get_profile_mass_spec(averageScan, d_params: dict, auto_process: bool): 692 mz_list = list(averageScan.SegmentedScan.Positions) 693 abund_list = list(averageScan.SegmentedScan.Intensities) 694 695 data_dict = { 696 Labels.mz: mz_list, 697 Labels.abundance: abund_list, 698 } 699 700 return MassSpecProfile(data_dict, d_params, auto_process=auto_process) 701 702 def get_centroid_mass_spec(averageScan, d_params: dict): 703 noise = list(averageScan.centroidScan.Noises) 704 705 baselines = list(averageScan.centroidScan.Baselines) 706 707 rp = list(averageScan.centroidScan.Resolutions) 708 709 magnitude = list(averageScan.centroidScan.Intensities) 710 711 mz = list(averageScan.centroidScan.Masses) 712 713 array_noise_std = (np.array(noise) - np.array(baselines)) / 3 714 l_signal_to_noise = np.array(magnitude) / array_noise_std 715 716 d_params["baseline_noise"] = np.average(array_noise_std) 717 718 d_params["baseline_noise_std"] = np.std(array_noise_std) 719 720 data_dict = { 721 Labels.mz: mz, 722 Labels.abundance: magnitude, 723 Labels.rp: rp, 724 Labels.s2n: list(l_signal_to_noise), 725 } 726 727 mass_spec = MassSpecCentroid(data_dict, d_params, auto_process=False) 728 729 return mass_spec 730 731 d_params = self.set_metadata( 732 firstScanNumber=self.start_scan, lastScanNumber=self.end_scan 733 ) 734 735 # Create the mass options object that will be used when averaging the scans 736 options = MassOptions() 737 options.ToleranceUnits = ToleranceUnits.ppm 738 options.Tolerance = ppm_tolerance 739 740 # Get the scan filter for the first scan. This scan filter will be used to located 741 # scans within the given scan range of the same type 742 scanFilter = self.iRawDataPlus.GetFilterForScanNumber(self.start_scan) 743 744 # force it to only look for the MSType 745 scanFilter = self.set_msordertype(scanFilter, ms_type) 746 747 if isinstance(self.scans, tuple): 748 averageScan = Extensions.AverageScansInScanRange( 749 self.iRawDataPlus, self.start_scan, self.end_scan, scanFilter, options 750 ) 751 752 if averageScan: 753 if spectrum_mode == "profile": 754 mass_spec = get_profile_mass_spec( 755 averageScan, d_params, auto_process 756 ) 757 758 return mass_spec 759 760 elif spectrum_mode == "centroid": 761 if averageScan.HasCentroidStream: 762 mass_spec = get_centroid_mass_spec(averageScan, d_params) 763 764 return mass_spec 765 766 else: 767 raise ValueError( 768 "No Centroind data available for the selected scans" 769 ) 770 else: 771 raise ValueError("spectrum_mode must be 'profile' or centroid") 772 else: 773 raise ValueError("No data found for the selected scans") 774 775 elif isinstance(self.scans, list): 776 d_params = self.set_metadata(scans_list=self.scans) 777 778 scans = List[int]() 779 for scan in self.scans: 780 scans.Add(scan) 781 782 averageScan = Extensions.AverageScans(self.iRawDataPlus, scans, options) 783 784 if averageScan: 785 if spectrum_mode == "profile": 786 mass_spec = get_profile_mass_spec( 787 averageScan, d_params, auto_process 788 ) 789 790 return mass_spec 791 792 elif spectrum_mode == "centroid": 793 if averageScan.HasCentroidStream: 794 mass_spec = get_centroid_mass_spec(averageScan, d_params) 795 796 return mass_spec 797 798 else: 799 raise ValueError( 800 "No Centroind data available for the selected scans" 801 ) 802 803 else: 804 raise ValueError("spectrum_mode must be 'profile' or centroid") 805 806 else: 807 raise ValueError("No data found for the selected scans") 808 809 else: 810 raise ValueError("scans must be a list intergers or a tuple if integers") 811 812 def set_metadata( 813 self, 814 firstScanNumber=0, 815 lastScanNumber=0, 816 scans_list=False, 817 label=Labels.thermo_profile, 818 ): 819 """ 820 Collect metadata to be ingested in the mass spectrum object 821 822 scans_list: list[int] or false 823 lastScanNumber: int 824 firstScanNumber: int 825 """ 826 827 d_params = default_parameters(self.file_path) 828 829 # assumes scans is full scan or reduced profile scan 830 831 d_params["label"] = label 832 833 if scans_list: 834 d_params["scan_number"] = scans_list 835 836 d_params["polarity"] = self.get_polarity_mode(scans_list[0]) 837 838 else: 839 d_params["scan_number"] = "{}-{}".format(firstScanNumber, lastScanNumber) 840 841 d_params["polarity"] = self.get_polarity_mode(firstScanNumber) 842 843 d_params["analyzer"] = self.iRawDataPlus.GetInstrumentData().Model 844 845 d_params["acquisition_time"] = self.get_creation_time() 846 847 d_params["instrument_label"] = self.iRawDataPlus.GetInstrumentData().Name 848 849 return d_params 850 851 def get_instrument_methods(self, parse_strings: bool = True): 852 """ 853 This function will extract the instrument methods embedded in the raw file 854 855 First it will check if there are any instrument methods, if not returning None 856 Then it will get the total number of instrument methods. 857 For each method, it will extract the plaintext string of the method and attempt to parse it into a dictionary 858 If this fails, it will return just the string object. 859 860 This has been tested on data from an Orbitrap ID-X with embedded MS and LC methods, but other instrument types may fail. 861 862 Parameters: 863 ----------- 864 parse_strings: bool 865 If True, will attempt to parse the instrument methods into a dictionary. If False, will return the raw string. 866 867 Returns: 868 -------- 869 List[Dict[str, Any]] or List 870 A list of dictionaries containing the instrument methods, or a list of strings if parsing fails. 871 """ 872 873 if not self.iRawDataPlus.HasInstrumentMethod: 874 raise ValueError( 875 "Raw Data file does not have any instrument methods attached" 876 ) 877 return None 878 else: 879 880 def parse_instrument_method(data): 881 lines = data.split("\r\n") 882 method = {} 883 current_section = None 884 sub_section = None 885 886 for line in lines: 887 if not line.strip(): # Skip empty lines 888 continue 889 if ( 890 line.startswith("----") 891 or line.endswith("Settings") 892 or line.endswith("Summary") 893 or line.startswith("Experiment") 894 or line.startswith("Scan Event") 895 ): 896 current_section = line.replace("-", "").strip() 897 method[current_section] = {} 898 sub_section = None 899 elif line.startswith("\t"): 900 if "\t\t" in line: 901 indent_level = line.count("\t") 902 key_value = line.strip() 903 904 if indent_level == 2: 905 if sub_section: 906 key, value = ( 907 key_value.split("=", 1) 908 if "=" in key_value 909 else (key_value, None) 910 ) 911 method[current_section][sub_section][ 912 key.strip() 913 ] = value.strip() if value else None 914 elif indent_level == 3: 915 scan_type, key_value = ( 916 key_value.split(" ", 1) 917 if " " in key_value 918 else (key_value, None) 919 ) 920 method.setdefault(current_section, {}).setdefault( 921 sub_section, {} 922 ).setdefault(scan_type, {}) 923 924 if key_value: 925 key, value = ( 926 key_value.split("=", 1) 927 if "=" in key_value 928 else (key_value, None) 929 ) 930 method[current_section][sub_section][scan_type][ 931 key.strip() 932 ] = value.strip() if value else None 933 else: 934 key_value = line.strip() 935 if "=" in key_value: 936 key, value = key_value.split("=", 1) 937 method.setdefault(current_section, {})[key.strip()] = ( 938 value.strip() 939 ) 940 else: 941 sub_section = key_value 942 else: 943 if ":" in line: 944 key, value = line.split(":", 1) 945 method[current_section][key.strip()] = value.strip() 946 else: 947 method[current_section][line] = {} 948 949 return method 950 951 count_instrument_methods = self.iRawDataPlus.InstrumentMethodsCount 952 # TODO make this code better... 953 instrument_methods = [] 954 for i in range(count_instrument_methods): 955 instrument_method_string = self.iRawDataPlus.GetInstrumentMethod(i) 956 if parse_strings: 957 try: 958 instrument_method_dict = parse_instrument_method( 959 instrument_method_string 960 ) 961 except: # if it fails for any reason 962 instrument_method_dict = instrument_method_string 963 else: 964 instrument_method_dict = instrument_method_string 965 instrument_methods.append(instrument_method_dict) 966 return instrument_methods 967 968 def get_tune_method(self): 969 """ 970 This code will extract the tune method from the raw file 971 It has been tested on data from a Thermo Orbitrap ID-X, Astral and Q-Exactive, but may fail on other instrument types. 972 It attempts to parse out section headers and sub-sections, but may not work for all instrument types. 973 It will also not return Labels (keys) where the value is blank 974 975 Returns: 976 -------- 977 Dict[str, Any] 978 A dictionary containing the tune method information 979 980 Raises: 981 ------- 982 ValueError 983 If no tune methods are found in the raw file 984 985 """ 986 tunemethodcount = self.iRawDataPlus.GetTuneDataCount() 987 if tunemethodcount == 0: 988 raise ValueError("No tune methods found in the raw data file") 989 return None 990 elif tunemethodcount > 1: 991 warnings.warn( 992 "Multiple tune methods found in the raw data file, returning the 1st" 993 ) 994 995 header = self.iRawDataPlus.GetTuneData(0) 996 997 header_dic = {} 998 current_section = None 999 1000 for i in range(header.Length): 1001 label = header.Labels[i] 1002 value = header.Values[i] 1003 1004 # Check for section headers 1005 if "===" in label or ( 1006 (value == "" or value is None) and not label.endswith(":") 1007 ): 1008 # This is a section header 1009 section_name = ( 1010 label.replace("=", "").replace(":", "").strip() 1011 ) # Clean the label if it contains '=' 1012 header_dic[section_name] = {} 1013 current_section = section_name 1014 else: 1015 if current_section: 1016 header_dic[current_section][label] = value 1017 else: 1018 header_dic[label] = value 1019 return header_dic 1020 1021 def get_status_log(self, retention_time: float = 0): 1022 """ 1023 This code will extract the status logs from the raw file 1024 It has been tested on data from a Thermo Orbitrap ID-X, Astral and Q-Exactive, but may fail on other instrument types. 1025 It attempts to parse out section headers and sub-sections, but may not work for all instrument types. 1026 It will also not return Labels (keys) where the value is blank 1027 1028 Parameters: 1029 ----------- 1030 retention_time: float 1031 The retention time in minutes to extract the status log data from. 1032 Will use the closest retention time found. Default 0. 1033 1034 Returns: 1035 -------- 1036 Dict[str, Any] 1037 A dictionary containing the status log information 1038 1039 Raises: 1040 ------- 1041 ValueError 1042 If no status logs are found in the raw file 1043 1044 """ 1045 tunemethodcount = self.iRawDataPlus.GetStatusLogEntriesCount() 1046 if tunemethodcount == 0: 1047 raise ValueError("No status logs found in the raw data file") 1048 return None 1049 1050 header = self.iRawDataPlus.GetStatusLogForRetentionTime(retention_time) 1051 1052 header_dic = {} 1053 current_section = None 1054 1055 for i in range(header.Length): 1056 label = header.Labels[i] 1057 value = header.Values[i] 1058 1059 # Check for section headers 1060 if "===" in label or ( 1061 (value == "" or value is None) and not label.endswith(":") 1062 ): 1063 # This is a section header 1064 section_name = ( 1065 label.replace("=", "").replace(":", "").strip() 1066 ) # Clean the label if it contains '=' 1067 header_dic[section_name] = {} 1068 current_section = section_name 1069 else: 1070 if current_section: 1071 header_dic[current_section][label] = value 1072 else: 1073 header_dic[label] = value 1074 return header_dic 1075 1076 def get_error_logs(self): 1077 """ 1078 This code will extract the error logs from the raw file 1079 1080 Returns: 1081 -------- 1082 Dict[float, str] 1083 A dictionary containing the error log information with the retention time as the key 1084 1085 Raises: 1086 ------- 1087 ValueError 1088 If no error logs are found in the raw file 1089 """ 1090 1091 error_log_count = self.iRawDataPlus.RunHeaderEx.ErrorLogCount 1092 if error_log_count == 0: 1093 raise ValueError("No error logs found in the raw data file") 1094 return None 1095 1096 error_logs = {} 1097 1098 for i in range(error_log_count): 1099 error_log_item = self.iRawDataPlus.GetErrorLogItem(i) 1100 rt = error_log_item.RetentionTime 1101 message = error_log_item.Message 1102 # Use the index `i` as the unique ID key 1103 error_logs[i] = {"rt": rt, "message": message} 1104 return error_logs 1105 1106 def get_sample_information(self): 1107 """ 1108 This code will extract the sample information from the raw file 1109 1110 Returns: 1111 -------- 1112 Dict[str, Any] 1113 A dictionary containing the sample information 1114 Note that UserText field may not be handled properly and may need further processing 1115 """ 1116 sminfo = self.iRawDataPlus.SampleInformation 1117 smdict = {} 1118 smdict["Comment"] = sminfo.Comment 1119 smdict["SampleId"] = sminfo.SampleId 1120 smdict["SampleName"] = sminfo.SampleName 1121 smdict["Vial"] = sminfo.Vial 1122 smdict["InjectionVolume"] = sminfo.InjectionVolume 1123 smdict["Barcode"] = sminfo.Barcode 1124 smdict["BarcodeStatus"] = str(sminfo.BarcodeStatus) 1125 smdict["CalibrationLevel"] = sminfo.CalibrationLevel 1126 smdict["DilutionFactor"] = sminfo.DilutionFactor 1127 smdict["InstrumentMethodFile"] = sminfo.InstrumentMethodFile 1128 smdict["RawFileName"] = sminfo.RawFileName 1129 smdict["CalibrationFile"] = sminfo.CalibrationFile 1130 smdict["IstdAmount"] = sminfo.IstdAmount 1131 smdict["RowNumber"] = sminfo.RowNumber 1132 smdict["Path"] = sminfo.Path 1133 smdict["ProcessingMethodFile"] = sminfo.ProcessingMethodFile 1134 smdict["SampleType"] = str(sminfo.SampleType) 1135 smdict["SampleWeight"] = sminfo.SampleWeight 1136 smdict["UserText"] = { 1137 "UserText": [x for x in sminfo.UserText] 1138 } # [0] #This may not work - needs debugging with 1139 return smdict 1140 1141 def get_instrument_data(self): 1142 """ 1143 This code will extract the instrument data from the raw file 1144 1145 Returns: 1146 -------- 1147 Dict[str, Any] 1148 A dictionary containing the instrument data 1149 """ 1150 instrument_data = self.iRawDataPlus.GetInstrumentData() 1151 id_dict = {} 1152 id_dict["Name"] = instrument_data.Name 1153 id_dict["Model"] = instrument_data.Model 1154 id_dict["SerialNumber"] = instrument_data.SerialNumber 1155 id_dict["SoftwareVersion"] = instrument_data.SoftwareVersion 1156 id_dict["HardwareVersion"] = instrument_data.HardwareVersion 1157 id_dict["ChannelLabels"] = { 1158 "ChannelLabels": [x for x in instrument_data.ChannelLabels] 1159 } 1160 id_dict["Flags"] = instrument_data.Flags 1161 id_dict["AxisLabelY"] = instrument_data.AxisLabelY 1162 id_dict["AxisLabelX"] = instrument_data.AxisLabelX 1163 return id_dict 1164 1165 def get_centroid_msms_data(self, scan): 1166 """ 1167 .. deprecated:: 2.0 1168 This function will be removed in CoreMS 2.0. Please use `get_average_mass_spectrum()` instead for similar functionality. 1169 """ 1170 1171 warnings.warn( 1172 "The `get_centroid_msms_data()` is deprecated as of CoreMS 2.0 and will be removed in a future version. " 1173 "Please use `get_average_mass_spectrum()` instead.", 1174 DeprecationWarning, 1175 ) 1176 1177 d_params = self.set_metadata(scans_list=[scan], label=Labels.thermo_centroid) 1178 1179 centroidStream = self.iRawDataPlus.GetCentroidStream(scan, False) 1180 1181 noise = list(centroidStream.Noises) 1182 1183 baselines = list(centroidStream.Baselines) 1184 1185 rp = list(centroidStream.Resolutions) 1186 1187 magnitude = list(centroidStream.Intensities) 1188 1189 mz = list(centroidStream.Masses) 1190 1191 # charge = scans_labels[5] 1192 array_noise_std = (np.array(noise) - np.array(baselines)) / 3 1193 l_signal_to_noise = np.array(magnitude) / array_noise_std 1194 1195 d_params["baseline_noise"] = np.average(array_noise_std) 1196 1197 d_params["baseline_noise_std"] = np.std(array_noise_std) 1198 1199 data_dict = { 1200 Labels.mz: mz, 1201 Labels.abundance: magnitude, 1202 Labels.rp: rp, 1203 Labels.s2n: list(l_signal_to_noise), 1204 } 1205 1206 mass_spec = MassSpecCentroid(data_dict, d_params, auto_process=False) 1207 mass_spec.settings.noise_threshold_method = "relative_abundance" 1208 mass_spec.settings.noise_threshold_min_relative_abundance = 1 1209 mass_spec.process_mass_spec() 1210 return mass_spec 1211 1212 def get_average_mass_spectrum_by_scanlist( 1213 self, 1214 scans_list: List[int], 1215 auto_process: bool = True, 1216 ppm_tolerance: float = 5.0, 1217 ) -> MassSpecProfile: 1218 """ 1219 Averages selected scans mass spectra using Thermo's AverageScans method 1220 scans_list: list[int] 1221 auto_process: bool 1222 If true performs peak picking, and noise threshold calculation after creation of mass spectrum object 1223 Returns: 1224 MassSpecProfile 1225 1226 .. deprecated:: 2.0 1227 This function will be removed in CoreMS 2.0. Please use `get_average_mass_spectrum()` instead for similar functionality. 1228 """ 1229 1230 warnings.warn( 1231 "The `get_average_mass_spectrum_by_scanlist()` is deprecated as of CoreMS 2.0 and will be removed in a future version. " 1232 "Please use `get_average_mass_spectrum()` instead.", 1233 DeprecationWarning, 1234 ) 1235 1236 d_params = self.set_metadata(scans_list=scans_list) 1237 1238 # assumes scans is full scan or reduced profile scan 1239 1240 scans = List[int]() 1241 for scan in scans_list: 1242 scans.Add(scan) 1243 1244 # Create the mass options object that will be used when averaging the scans 1245 options = MassOptions() 1246 options.ToleranceUnits = ToleranceUnits.ppm 1247 options.Tolerance = ppm_tolerance 1248 1249 # Get the scan filter for the first scan. This scan filter will be used to located 1250 # scans within the given scan range of the same type 1251 1252 averageScan = Extensions.AverageScans(self.iRawDataPlus, scans, options) 1253 1254 len_data = averageScan.SegmentedScan.Positions.Length 1255 1256 mz_list = list(averageScan.SegmentedScan.Positions) 1257 abund_list = list(averageScan.SegmentedScan.Intensities) 1258 1259 data_dict = { 1260 Labels.mz: mz_list, 1261 Labels.abundance: abund_list, 1262 } 1263 1264 mass_spec = MassSpecProfile(data_dict, d_params, auto_process=auto_process) 1265 1266 return mass_spec 1267 1268 1269class ImportMassSpectraThermoMSFileReader(ThermoBaseClass, SpectraParserInterface): 1270 """A class for parsing Thermo RAW mass spectrometry data files and instatiating MassSpectraBase or LCMSBase objects 1271 1272 Parameters 1273 ---------- 1274 file_location : str or Path 1275 The path to the RAW file to be parsed. 1276 analyzer : str, optional 1277 The type of mass analyzer used in the instrument. Default is "Unknown". 1278 instrument_label : str, optional 1279 The name of the instrument used to acquire the data. Default is "Unknown". 1280 sample_name : str, optional 1281 The name of the sample being analyzed. If not provided, the stem of the file_location path will be used. 1282 1283 Attributes 1284 ---------- 1285 file_location : Path 1286 The path to the RAW file being parsed. 1287 analyzer : str 1288 The type of mass analyzer used in the instrument. 1289 instrument_label : str 1290 The name of the instrument used to acquire the data. 1291 sample_name : str 1292 The name of the sample being analyzed. 1293 1294 Methods 1295 ------- 1296 * run(spectra=True). 1297 Parses the RAW file and returns a dictionary of mass spectra dataframes and a scan metadata dataframe. 1298 * get_mass_spectrum_from_scan(scan_number, polarity, auto_process=True) 1299 Parses the RAW file and returns a MassSpecBase object from a single scan. 1300 * get_mass_spectra_obj(). 1301 Parses the RAW file and instantiates a MassSpectraBase object. 1302 * get_lcms_obj(). 1303 Parses the RAW file and instantiates an LCMSBase object. 1304 * get_icr_transient_times(). 1305 Return a list for transient time targets for all scans, or selected scans range 1306 1307 Inherits from ThermoBaseClass and SpectraParserInterface 1308 """ 1309 1310 def __init__( 1311 self, 1312 file_location, 1313 analyzer="Unknown", 1314 instrument_label="Unknown", 1315 sample_name=None, 1316 ): 1317 super().__init__(file_location) 1318 if isinstance(file_location, str): 1319 # if obj is a string it defaults to create a Path obj, pass the S3Path if needed 1320 file_location = Path(file_location) 1321 if not file_location.exists(): 1322 raise FileExistsError("File does not exist: " + str(file_location)) 1323 1324 self.file_location = file_location 1325 self.analyzer = analyzer 1326 self.instrument_label = instrument_label 1327 1328 if sample_name: 1329 self.sample_name = sample_name 1330 else: 1331 self.sample_name = file_location.stem 1332 1333 def load(self): 1334 pass 1335 1336 def get_scan_df(self): 1337 # This automatically brings in all the data 1338 self.chromatogram_settings.scans = (-1, -1) 1339 1340 # Get scan df info; starting with TIC data 1341 tic_data, _ = self.get_tic(ms_type="all", peak_detection=False, smooth=False) 1342 tic_data = { 1343 "scan": tic_data.scans, 1344 "scan_time": tic_data.time, 1345 "tic": tic_data.tic, 1346 } 1347 scan_df = pd.DataFrame.from_dict(tic_data) 1348 scan_df["ms_level"] = None 1349 1350 # get scan text 1351 scan_filter_df = pd.DataFrame.from_dict( 1352 self.get_all_filters()[0], orient="index" 1353 ) 1354 scan_filter_df.reset_index(inplace=True) 1355 scan_filter_df.rename(columns={"index": "scan", 0: "scan_text"}, inplace=True) 1356 1357 scan_df = scan_df.merge(scan_filter_df, on="scan", how="left") 1358 scan_df["scan_window_lower"] = scan_df.scan_text.str.extract( 1359 r"\[(\d+\.\d+)-\d+\.\d+\]" 1360 ) 1361 scan_df["scan_window_upper"] = scan_df.scan_text.str.extract( 1362 r"\[\d+\.\d+-(\d+\.\d+)\]" 1363 ) 1364 scan_df["polarity"] = np.where( 1365 scan_df.scan_text.str.contains(" - "), "negative", "positive" 1366 ) 1367 scan_df["precursor_mz"] = scan_df.scan_text.str.extract(r"(\d+\.\d+)@") 1368 scan_df["precursor_mz"] = scan_df["precursor_mz"].astype(float) 1369 1370 # Assign each scan as centroid or profile and add ms_level 1371 scan_df["ms_format"] = None 1372 for i in scan_df.scan.to_list(): 1373 scan_df.loc[scan_df.scan == i, "ms_level"] = self.get_ms_level_for_scan_num(i) 1374 if self.iRawDataPlus.IsCentroidScanFromScanNumber(i): 1375 scan_df.loc[scan_df.scan == i, "ms_format"] = "centroid" 1376 else: 1377 scan_df.loc[scan_df.scan == i, "ms_format"] = "profile" 1378 1379 return scan_df 1380 1381 def get_ms_raw(self, spectra, scan_df): 1382 if spectra == "all": 1383 scan_df_forspec = scan_df 1384 elif spectra == "ms1": 1385 scan_df_forspec = scan_df[scan_df.ms_level == 1] 1386 elif spectra == "ms2": 1387 scan_df_forspec = scan_df[scan_df.ms_level == 2] 1388 else: 1389 raise ValueError("spectra must be 'none', 'all', 'ms1', or 'ms2'") 1390 1391 # Result container 1392 res = {} 1393 1394 # Row count container 1395 counter = {} 1396 1397 # Column name container 1398 cols = {} 1399 1400 # set at float32 1401 dtype = np.float32 1402 1403 # First pass: get nrows 1404 N = defaultdict(lambda: 0) 1405 for i in scan_df_forspec.scan.to_list(): 1406 level = scan_df_forspec.loc[scan_df_forspec.scan == i, "ms_level"].values[0] 1407 scanStatistics = self.iRawDataPlus.GetScanStatsForScanNumber(i) 1408 profileStream = self.iRawDataPlus.GetSegmentedScanFromScanNumber( 1409 i, scanStatistics 1410 ) 1411 abun = list(profileStream.Intensities) 1412 abun = np.array(abun)[np.where(np.array(abun) > 0)[0]] 1413 1414 N[level] += len(abun) 1415 1416 # Second pass: parse 1417 for i in scan_df_forspec.scan.to_list(): 1418 scanStatistics = self.iRawDataPlus.GetScanStatsForScanNumber(i) 1419 profileStream = self.iRawDataPlus.GetSegmentedScanFromScanNumber( 1420 i, scanStatistics 1421 ) 1422 abun = list(profileStream.Intensities) 1423 mz = list(profileStream.Positions) 1424 1425 # Get index of abun that are > 0 1426 inx = np.where(np.array(abun) > 0)[0] 1427 mz = np.array(mz)[inx] 1428 mz = np.float32(mz) 1429 abun = np.array(abun)[inx] 1430 abun = np.float32(abun) 1431 1432 level = scan_df_forspec.loc[scan_df_forspec.scan == i, "ms_level"].values[0] 1433 1434 # Number of rows 1435 n = len(mz) 1436 1437 # No measurements 1438 if n == 0: 1439 continue 1440 1441 # Dimension check 1442 if len(mz) != len(abun): 1443 warnings.warn("m/z and intensity array dimension mismatch") 1444 continue 1445 1446 # Scan/frame info 1447 id_dict = i 1448 1449 # Columns 1450 cols[level] = ["scan", "mz", "intensity"] 1451 m = len(cols[level]) 1452 1453 # Subarray init 1454 arr = np.empty((n, m), dtype=dtype) 1455 inx = 0 1456 1457 # Populate scan/frame info 1458 arr[:, inx] = i 1459 inx += 1 1460 1461 # Populate m/z 1462 arr[:, inx] = mz 1463 inx += 1 1464 1465 # Populate intensity 1466 arr[:, inx] = abun 1467 inx += 1 1468 1469 # Initialize output container 1470 if level not in res: 1471 res[level] = np.empty((N[level], m), dtype=dtype) 1472 counter[level] = 0 1473 1474 # Insert subarray 1475 res[level][counter[level] : counter[level] + n, :] = arr 1476 counter[level] += n 1477 1478 # Construct ms1 and ms2 mz dataframes 1479 for level in res.keys(): 1480 res[level] = pd.DataFrame(res[level]) 1481 res[level].columns = cols[level] 1482 # rename keys in res to add 'ms' prefix 1483 res = {f"ms{key}": value for key, value in res.items()} 1484 1485 return res 1486 1487 def run(self, spectra="all", scan_df=None): 1488 """ 1489 Extracts mass spectra data from a raw file. 1490 1491 Parameters 1492 ---------- 1493 spectra : str, optional 1494 Which mass spectra data to include in the output. Default is all. Other options: none, ms1, ms2. 1495 scan_df : pandas.DataFrame, optional 1496 Scan dataframe. If not provided, the scan dataframe is created from the mzML file. 1497 1498 Returns 1499 ------- 1500 tuple 1501 A tuple containing two elements: 1502 - A dictionary containing mass spectra data, separated by MS level. 1503 - A pandas DataFrame containing scan information, including scan number, scan time, TIC, MS level, 1504 scan text, scan window lower and upper bounds, polarity, and precursor m/z (if applicable). 1505 """ 1506 # Prepare scan_df 1507 if scan_df is None: 1508 scan_df = self.get_scan_df() 1509 1510 # Prepare mass spectra data 1511 if spectra != "none": 1512 res = self.get_ms_raw(spectra=spectra, scan_df=scan_df) 1513 else: 1514 res = None 1515 1516 return res, scan_df 1517 1518 def get_mass_spectrum_from_scan( 1519 self, scan_number, spectrum_mode, auto_process=True 1520 ): 1521 """Instatiate a MassSpecBase object from a single scan number from the binary file, currently only supports profile mode. 1522 1523 Parameters 1524 ---------- 1525 scan_number : int 1526 The scan number to extract the mass spectrum from. 1527 polarity : int 1528 The polarity of the scan. 1 for positive mode, -1 for negative mode. 1529 spectrum_mode : str 1530 The type of mass spectrum to extract. Must be 'profile' or 'centroid'. 1531 auto_process : bool, optional 1532 If True, perform peak picking and noise threshold calculation after creating the mass spectrum object. Default is True. 1533 1534 Returns 1535 ------- 1536 MassSpecProfile | MassSpecCentroid 1537 The MassSpecProfile or MassSpecCentroid object containing the parsed mass spectrum. 1538 """ 1539 1540 if spectrum_mode == "profile": 1541 scanStatistics = self.iRawDataPlus.GetScanStatsForScanNumber(scan_number) 1542 profileStream = self.iRawDataPlus.GetSegmentedScanFromScanNumber( 1543 scan_number, scanStatistics 1544 ) 1545 abun = list(profileStream.Intensities) 1546 mz = list(profileStream.Positions) 1547 data_dict = { 1548 Labels.mz: mz, 1549 Labels.abundance: abun, 1550 } 1551 d_params = self.set_metadata( 1552 firstScanNumber=scan_number, 1553 lastScanNumber=scan_number, 1554 scans_list=False, 1555 label=Labels.thermo_profile, 1556 ) 1557 mass_spectrum_obj = MassSpecProfile( 1558 data_dict, d_params, auto_process=auto_process 1559 ) 1560 1561 elif spectrum_mode == "centroid": 1562 centroid_scan = self.iRawDataPlus.GetCentroidStream(scan_number, False) 1563 if centroid_scan.Masses is not None: 1564 mz = list(centroid_scan.Masses) 1565 abun = list(centroid_scan.Intensities) 1566 rp = list(centroid_scan.Resolutions) 1567 magnitude = list(centroid_scan.Intensities) 1568 noise = list(centroid_scan.Noises) 1569 baselines = list(centroid_scan.Baselines) 1570 array_noise_std = (np.array(noise) - np.array(baselines)) / 3 1571 l_signal_to_noise = np.array(magnitude) / array_noise_std 1572 data_dict = { 1573 Labels.mz: mz, 1574 Labels.abundance: abun, 1575 Labels.rp: rp, 1576 Labels.s2n: list(l_signal_to_noise), 1577 } 1578 else: # For CID MS2, the centroid data are stored in the profile data location, they do not have any associated rp or baseline data, but they should be treated as centroid data 1579 scanStatistics = self.iRawDataPlus.GetScanStatsForScanNumber( 1580 scan_number 1581 ) 1582 profileStream = self.iRawDataPlus.GetSegmentedScanFromScanNumber( 1583 scan_number, scanStatistics 1584 ) 1585 abun = list(profileStream.Intensities) 1586 mz = list(profileStream.Positions) 1587 data_dict = { 1588 Labels.mz: mz, 1589 Labels.abundance: abun, 1590 Labels.rp: [np.nan] * len(mz), 1591 Labels.s2n: [np.nan] * len(mz), 1592 } 1593 d_params = self.set_metadata( 1594 firstScanNumber=scan_number, 1595 lastScanNumber=scan_number, 1596 scans_list=False, 1597 label=Labels.thermo_centroid, 1598 ) 1599 mass_spectrum_obj = MassSpecCentroid( 1600 data_dict, d_params, auto_process=auto_process 1601 ) 1602 1603 return mass_spectrum_obj 1604 1605 def get_mass_spectra_obj(self): 1606 """Instatiate a MassSpectraBase object from the binary data file file. 1607 1608 Returns 1609 ------- 1610 MassSpectraBase 1611 The MassSpectra object containing the parsed mass spectra. The object is instatiated with the mzML file, analyzer, instrument, sample name, and scan dataframe. 1612 """ 1613 _, scan_df = self.run(spectra="none") 1614 mass_spectra_obj = MassSpectraBase( 1615 self.file_location, 1616 self.analyzer, 1617 self.instrument_label, 1618 self.sample_name, 1619 self, 1620 ) 1621 scan_df = scan_df.set_index("scan", drop=False) 1622 mass_spectra_obj.scan_df = scan_df 1623 1624 return mass_spectra_obj 1625 1626 def get_lcms_obj(self, spectra="all"): 1627 """Instatiates a LCMSBase object from the mzML file. 1628 1629 Parameters 1630 ---------- 1631 spectra : str, optional 1632 Which mass spectra data to include in the output. Default is "all". Other options: "none", "ms1", "ms2". 1633 1634 Returns 1635 ------- 1636 LCMSBase 1637 LCMS object containing mass spectra data. The object is instatiated with the file location, analyzer, instrument, sample name, scan info, mz dataframe (as specifified), polarity, as well as the attributes holding the scans, retention times, and tics. 1638 """ 1639 _, scan_df = self.run(spectra="none") # first run it to just get scan info 1640 res, scan_df = self.run( 1641 scan_df=scan_df, spectra=spectra 1642 ) # second run to parse data 1643 lcms_obj = LCMSBase( 1644 self.file_location, 1645 self.analyzer, 1646 self.instrument_label, 1647 self.sample_name, 1648 self, 1649 ) 1650 if spectra != "none": 1651 for key in res: 1652 key_int = int(key.replace("ms", "")) 1653 res[key] = res[key][res[key].intensity > 0] 1654 res[key] = ( 1655 res[key].sort_values(by=["scan", "mz"]).reset_index(drop=True) 1656 ) 1657 lcms_obj._ms_unprocessed[key_int] = res[key] 1658 lcms_obj.scan_df = scan_df.set_index("scan", drop=False) 1659 # Check if polarity is mixed 1660 if len(set(scan_df.polarity)) > 1: 1661 raise ValueError("Mixed polarities detected in scan data") 1662 lcms_obj.polarity = scan_df.polarity[0] 1663 lcms_obj._scans_number_list = list(scan_df.scan) 1664 lcms_obj._retention_time_list = list(scan_df.scan_time) 1665 lcms_obj._tic_list = list(scan_df.tic) 1666 1667 return lcms_obj 1668 1669 def get_icr_transient_times(self): 1670 """Return a list for transient time targets for all scans, or selected scans range 1671 1672 Notes 1673 -------- 1674 Resolving Power and Transient time targets based on 7T FT-ICR MS system 1675 """ 1676 1677 res_trans_time = { 1678 "50": 0.384, 1679 "100000": 0.768, 1680 "200000": 1.536, 1681 "400000": 3.072, 1682 "750000": 6.144, 1683 "1000000": 12.288, 1684 } 1685 1686 firstScanNumber = self.start_scan 1687 1688 lastScanNumber = self.end_scan 1689 1690 transient_time_list = [] 1691 1692 for scan in range(firstScanNumber, lastScanNumber): 1693 scan_header = self.get_scan_header(scan) 1694 1695 rp_target = scan_header["FT Resolution:"] 1696 1697 transient_time = res_trans_time.get(rp_target) 1698 1699 transient_time_list.append(transient_time) 1700 1701 # print(transient_time, rp_target) 1702 1703 return transient_time_list
60class ThermoBaseClass: 61 """Class for parsing Thermo Raw files and extracting information from them. 62 63 Parameters: 64 ----------- 65 file_location : str or pathlib.Path or s3path.S3Path 66 Thermo Raw file path or S3 path. 67 68 Attributes: 69 ----------- 70 file_path : str or pathlib.Path or s3path.S3Path 71 The file path of the Thermo Raw file. 72 parameters : LCMSParameters 73 The LCMS parameters for the Thermo Raw file. 74 chromatogram_settings : LiquidChromatographSetting 75 The chromatogram settings for the Thermo Raw file. 76 scans : list or tuple 77 The selected scans for the Thermo Raw file. 78 start_scan : int 79 The starting scan number for the Thermo Raw file. 80 end_scan : int 81 The ending scan number for the Thermo Raw file. 82 83 Methods: 84 -------- 85 * set_msordertype(scanFilter, mstype: str = 'ms1') -> scanFilter 86 Convert the user-passed MS Type string to a Thermo MSOrderType object. 87 * get_instrument_info() -> dict 88 Get the instrument information from the Thermo Raw file. 89 * get_creation_time() -> datetime.datetime 90 Extract the creation date stamp from the .RAW file and return it as a formatted datetime object. 91 * remove_temp_file() 92 Remove the temporary file if the path is from S3Path. 93 * get_polarity_mode(scan_number: int) -> int 94 Get the polarity mode for the given scan number. 95 * get_filter_for_scan_num(scan_number: int) -> List[str] 96 Get the filter for the given scan number. 97 * check_full_scan(scan_number: int) -> bool 98 Check if the given scan number is a full scan. 99 * get_all_filters() -> Tuple[Dict[int, str], List[str]] 100 Get all scan filters for the Thermo Raw file. 101 * get_scan_header(scan: int) -> Dict[str, Any] 102 Get the full dictionary of scan header metadata for the given scan number. 103 * get_rt_time_from_trace(trace) -> Tuple[List[float], List[float], List[int]] 104 Get the retention time, intensity, and scan number from the given trace. 105 * get_eics(target_mzs: List[float], tic_data: Dict[str, Any], ms_type: str = 'MS !d', 106 peak_detection: bool = True, smooth: bool = True, plot: bool = False, 107 ax: Optional[matplotlib.axes.Axes] = None, legend: bool = False) -> Tuple[Dict[float, EIC_Data], matplotlib.axes.Axes] 108 Get the extracted ion chromatograms (EICs) for the target m/z values. 109 110 """ 111 112 def __init__(self, file_location): 113 """file_location: srt pathlib.Path or s3path.S3Path 114 Thermo Raw file path 115 """ 116 # Thread.__init__(self) 117 if isinstance(file_location, str): 118 file_path = Path(file_location) 119 120 elif isinstance(file_location, S3Path): 121 temp_dir = Path("tmp/") 122 temp_dir.mkdir(exist_ok=True) 123 124 file_path = temp_dir / file_location.name 125 with open(file_path, "wb") as fh: 126 fh.write(file_location.read_bytes()) 127 128 else: 129 file_path = file_location 130 131 self.iRawDataPlus = RawFileReaderAdapter.FileFactory(str(file_path)) 132 133 if not self.iRawDataPlus.IsOpen: 134 raise FileNotFoundError( 135 "Unable to access the RAW file using the RawFileReader class!" 136 ) 137 138 # Check for any errors in the RAW file 139 if self.iRawDataPlus.IsError: 140 raise IOError( 141 "Error opening ({}) - {}".format(self.iRawDataPlus.FileError, file_path) 142 ) 143 144 self.res = self.iRawDataPlus.SelectInstrument(Device.MS, 1) 145 146 self.file_path = file_location 147 self.iFileHeader = FileHeaderReaderFactory.ReadFile(str(file_path)) 148 149 # removing tmp file 150 151 self._init_settings() 152 153 def _init_settings(self): 154 """ 155 Initialize the LCMSParameters object. 156 """ 157 self._parameters = LCMSParameters() 158 159 @property 160 def parameters(self) -> LCMSParameters: 161 """ 162 Get or set the LCMSParameters object. 163 """ 164 return self._parameters 165 166 @parameters.setter 167 def parameters(self, instance_LCMSParameters: LCMSParameters): 168 self._parameters = instance_LCMSParameters 169 170 @property 171 def chromatogram_settings(self) -> LiquidChromatographSetting: 172 """ 173 Get or set the LiquidChromatographSetting object. 174 """ 175 return self.parameters.lc_ms 176 177 @chromatogram_settings.setter 178 def chromatogram_settings( 179 self, instance_LiquidChromatographSetting: LiquidChromatographSetting 180 ): 181 self.parameters.lc_ms = instance_LiquidChromatographSetting 182 183 @property 184 def scans(self) -> list | tuple: 185 """scans : list or tuple 186 If list uses Thermo AverageScansInScanRange for selected scans, ortherwise uses Thermo AverageScans for a scan range 187 """ 188 return self.chromatogram_settings.scans 189 190 @property 191 def start_scan(self) -> int: 192 """ 193 Get the starting scan number for the Thermo Raw file. 194 """ 195 if self.scans[0] == -1: 196 return self.iRawDataPlus.RunHeaderEx.FirstSpectrum 197 else: 198 return self.scans[0] 199 200 @property 201 def end_scan(self) -> int: 202 """ 203 Get the ending scan number for the Thermo Raw file. 204 """ 205 if self.scans[-1] == -1: 206 return self.iRawDataPlus.RunHeaderEx.LastSpectrum 207 else: 208 return self.scans[-1] 209 210 def set_msordertype(self, scanFilter, mstype: str = "ms1"): 211 """ 212 Function to convert user passed string MS Type to Thermo MSOrderType object 213 Limited to MS1 through MS10. 214 215 Parameters: 216 ----------- 217 scanFilter : Thermo.ScanFilter 218 The scan filter object. 219 mstype : str, optional 220 The MS Type string, by default 'ms1' 221 222 """ 223 mstype = mstype.upper() 224 # Check that a valid mstype is passed 225 if (int(mstype.split("MS")[1]) > 10) or (int(mstype.split("MS")[1]) < 1): 226 warn("MS Type not valid, must be between MS1 and MS10") 227 228 msordertypedict = { 229 "MS1": MSOrderType.Ms, 230 "MS2": MSOrderType.Ms2, 231 "MS3": MSOrderType.Ms3, 232 "MS4": MSOrderType.Ms4, 233 "MS5": MSOrderType.Ms5, 234 "MS6": MSOrderType.Ms6, 235 "MS7": MSOrderType.Ms7, 236 "MS8": MSOrderType.Ms8, 237 "MS9": MSOrderType.Ms9, 238 "MS10": MSOrderType.Ms10, 239 } 240 scanFilter.MSOrder = msordertypedict[mstype] 241 return scanFilter 242 243 def get_instrument_info(self) -> dict: 244 """ 245 Get the instrument information from the Thermo Raw file. 246 247 Returns: 248 -------- 249 dict 250 A dictionary with the keys 'model', and 'serial_number'. 251 """ 252 instrumentData = self.iRawDataPlus.GetInstrumentData() 253 return { 254 "model": instrumentData.Model, 255 "serial_number": instrumentData.SerialNumber 256 } 257 258 def get_creation_time(self) -> datetime.datetime: 259 """ 260 Extract the creation date stamp from the .RAW file 261 Return formatted creation date stamp. 262 263 """ 264 credate = self.iRawDataPlus.CreationDate.get_Ticks() 265 credate = datetime.datetime(1, 1, 1) + datetime.timedelta( 266 microseconds=credate / 10 267 ) 268 return credate 269 270 def remove_temp_file(self) -> None: 271 """if the path is from S3Path data cannot be serialized to io.ByteStream and 272 a temporary copy is stored at the temp dir 273 use this function only at the end of your execution scrip 274 some LCMS class methods depend on this file 275 """ 276 277 self.file_path.unlink() 278 279 def close_file(self) -> None: 280 """ 281 Close the Thermo Raw file. 282 """ 283 self.iRawDataPlus.Dispose() 284 285 def get_polarity_mode(self, scan_number: int) -> int: 286 """ 287 Get the polarity mode for the given scan number. 288 289 Parameters: 290 ----------- 291 scan_number : int 292 The scan number. 293 294 Raises: 295 ------- 296 Exception 297 If the polarity mode is unknown. 298 299 """ 300 polarity_symbol = self.get_filter_for_scan_num(scan_number)[1] 301 302 if polarity_symbol == "+": 303 return 1 304 # return 'POSITIVE_ION_MODE' 305 306 elif polarity_symbol == "-": 307 return -1 308 309 else: 310 raise Exception("Polarity Mode Unknown, please set it manually") 311 312 def get_filter_for_scan_num(self, scan_number: int) -> List[str]: 313 """ 314 Returns the closest matching run time that corresponds to scan_number for the current 315 controller. This function is only supported for MS device controllers. 316 e.g. ['FTMS', '-', 'p', 'NSI', 'Full', 'ms', '[200.00-1000.00]'] 317 318 Parameters: 319 ----------- 320 scan_number : int 321 The scan number. 322 323 """ 324 scan_label = self.iRawDataPlus.GetScanEventStringForScanNumber(scan_number) 325 326 return str(scan_label).split() 327 328 def get_ms_level_for_scan_num(self, scan_number: int) -> str: 329 """ 330 Get the MS order for the given scan number. 331 332 Parameters: 333 ----------- 334 scan_number : int 335 The scan number 336 337 Returns: 338 -------- 339 int 340 The MS order type (1 for MS, 2 for MS2, etc.) 341 """ 342 scan_filter = self.iRawDataPlus.GetFilterForScanNumber(scan_number) 343 344 msordertype = { 345 MSOrderType.Ms: 1, 346 MSOrderType.Ms2: 2, 347 MSOrderType.Ms3: 3, 348 MSOrderType.Ms4: 4, 349 MSOrderType.Ms5: 5, 350 MSOrderType.Ms6: 6, 351 MSOrderType.Ms7: 7, 352 MSOrderType.Ms8: 8, 353 MSOrderType.Ms9: 9, 354 MSOrderType.Ms10: 10, 355 } 356 357 if scan_filter.MSOrder in msordertype: 358 return msordertype[scan_filter.MSOrder] 359 else: 360 raise Exception("MS Order Type not found") 361 362 def check_full_scan(self, scan_number: int) -> bool: 363 # scan_filter.ScanMode 0 = FULL 364 scan_filter = self.iRawDataPlus.GetFilterForScanNumber(scan_number) 365 366 return scan_filter.ScanMode == MSOrderType.Ms 367 368 def get_all_filters(self) -> Tuple[Dict[int, str], List[str]]: 369 """ 370 Get all scan filters. 371 This function is only supported for MS device controllers. 372 e.g. ['FTMS', '-', 'p', 'NSI', 'Full', 'ms', '[200.00-1000.00]'] 373 374 """ 375 376 scanrange = range(self.start_scan, self.end_scan + 1) 377 scanfiltersdic = {} 378 scanfilterslist = [] 379 for scan_number in scanrange: 380 scan_label = self.iRawDataPlus.GetScanEventStringForScanNumber(scan_number) 381 scanfiltersdic[scan_number] = scan_label 382 scanfilterslist.append(scan_label) 383 scanfilterset = list(set(scanfilterslist)) 384 return scanfiltersdic, scanfilterset 385 386 def get_scan_header(self, scan: int) -> Dict[str, Any]: 387 """ 388 Get full dictionary of scan header meta data, i.e. AGC status, ion injection time, etc. 389 390 Parameters: 391 ----------- 392 scan : int 393 The scan number. 394 395 """ 396 header = self.iRawDataPlus.GetTrailerExtraInformation(scan) 397 398 header_dic = {} 399 for i in range(header.Length): 400 header_dic.update({header.Labels[i]: header.Values[i]}) 401 return header_dic 402 403 @staticmethod 404 def get_rt_time_from_trace(trace) -> Tuple[List[float], List[float], List[int]]: 405 """trace: ThermoFisher.CommonCore.Data.Business.ChromatogramSignal""" 406 return list(trace.Times), list(trace.Intensities), list(trace.Scans) 407 408 def get_eics( 409 self, 410 target_mzs: List[float], 411 tic_data: Dict[str, Any], 412 ms_type="MS !d", 413 peak_detection=False, 414 smooth=False, 415 plot=False, 416 ax: Optional[axes.Axes] = None, 417 legend=False, 418 ) -> Tuple[Dict[float, EIC_Data], axes.Axes]: 419 """ms_type: str ('MS', MS2') 420 start_scan: int default -1 will select the lowest available 421 end_scan: int default -1 will select the highest available 422 423 returns: 424 425 chroma: dict{target_mz: EIC_Data( 426 Scans: [int] 427 original thermo scan numbers 428 Time: [floats] 429 list of retention times 430 TIC: [floats] 431 total ion chromatogram 432 Apexes: [int] 433 original thermo apex scan number after peak picking 434 ) 435 436 """ 437 # If peak_detection or smooth is True, raise exception 438 if peak_detection or smooth: 439 raise Exception("Peak detection and smoothing are no longer implemented in this function") 440 441 options = MassOptions() 442 options.ToleranceUnits = ToleranceUnits.ppm 443 options.Tolerance = self.chromatogram_settings.eic_tolerance_ppm 444 445 all_chroma_settings = [] 446 447 for target_mz in target_mzs: 448 settings = ChromatogramTraceSettings(TraceType.MassRange) 449 settings.Filter = ms_type 450 settings.MassRanges = [Range(target_mz, target_mz)] 451 452 chroma_settings = IChromatogramSettings(settings) 453 454 all_chroma_settings.append(chroma_settings) 455 456 # chroma_settings2 = IChromatogramSettings(settings) 457 # print(chroma_settings.FragmentMass) 458 # print(chroma_settings.FragmentMass) 459 # print(chroma_settings) 460 # print(chroma_settings) 461 462 data = self.iRawDataPlus.GetChromatogramData( 463 all_chroma_settings, self.start_scan, self.end_scan, options 464 ) 465 466 traces = ChromatogramSignal.FromChromatogramData(data) 467 468 chroma = {} 469 470 if plot: 471 from matplotlib.transforms import Bbox 472 import matplotlib.pyplot as plt 473 474 if not ax: 475 # ax = plt.gca() 476 # ax.clear() 477 fig, ax = plt.subplots() 478 479 else: 480 fig = plt.gcf() 481 482 # plt.show() 483 484 for i, trace in enumerate(traces): 485 if trace.Length > 0: 486 rt, eic, scans = self.get_rt_time_from_trace(trace) 487 if smooth: 488 eic = self.smooth_tic(eic) 489 490 chroma[target_mzs[i]] = EIC_Data(scans=scans, time=rt, eic=eic) 491 if plot: 492 ax.plot(rt, eic, label="{:.5f}".format(target_mzs[i])) 493 494 if peak_detection: 495 # max_eic = self.get_max_eic(chroma) 496 max_signal = max(tic_data.tic) 497 498 for eic_data in chroma.values(): 499 eic = eic_data.eic 500 time = eic_data.time 501 502 if len(eic) != len(tic_data.tic): 503 warn( 504 "The software assumes same lenth of TIC and EIC, this does not seems to be the case and the results mass spectrum selected by the scan number might not be correct" 505 ) 506 507 if eic.max() > 0: 508 centroid_eics = self.eic_centroid_detector(time, eic, max_signal) 509 eic_data.apexes = [i for i in centroid_eics] 510 511 if plot: 512 for peak_indexes in eic_data.apexes: 513 apex_index = peak_indexes[1] 514 ax.plot( 515 time[apex_index], 516 eic[apex_index], 517 marker="x", 518 linewidth=0, 519 ) 520 521 if plot: 522 ax.set_xlabel("Time (min)") 523 ax.set_ylabel("a.u.") 524 ax.set_title(ms_type + " EIC") 525 ax.tick_params(axis="both", which="major", labelsize=12) 526 ax.axes.spines["top"].set_visible(False) 527 ax.axes.spines["right"].set_visible(False) 528 529 if legend: 530 legend = ax.legend(loc="upper left", bbox_to_anchor=(1.02, 0, 0.07, 1)) 531 fig.subplots_adjust(right=0.76) 532 # ax.set_prop_cycle(color=plt.cm.gist_rainbow(np.linspace(0, 1, len(traces)))) 533 534 d = {"down": 30, "up": -30} 535 536 def func(evt): 537 if legend.contains(evt): 538 bbox = legend.get_bbox_to_anchor() 539 bbox = Bbox.from_bounds( 540 bbox.x0, bbox.y0 + d[evt.button], bbox.width, bbox.height 541 ) 542 tr = legend.axes.transAxes.inverted() 543 legend.set_bbox_to_anchor(bbox.transformed(tr)) 544 fig.canvas.draw_idle() 545 546 fig.canvas.mpl_connect("scroll_event", func) 547 return chroma, ax 548 else: 549 return chroma, None 550 rt = [] 551 tic = [] 552 scans = [] 553 for i in range(traces[0].Length): 554 # print(trace[0].HasBasePeakData,trace[0].EndTime ) 555 556 # print(" {} - {}, {}".format( i, trace[0].Times[i], trace[0].Intensities[i] )) 557 rt.append(traces[0].Times[i]) 558 tic.append(traces[0].Intensities[i]) 559 scans.append(traces[0].Scans[i]) 560 561 return traces 562 # plot_chroma(rt, tic) 563 # plt.show() 564 565 def get_tic( 566 self, 567 ms_type="MS !d", 568 peak_detection=False, # This wont work right now 569 smooth=False, # This wont work right now 570 plot=False, 571 ax=None, 572 trace_type="TIC", 573 ) -> Tuple[TIC_Data, axes.Axes]: 574 """ms_type: str ('MS !d', 'MS2', None) 575 if you use None you get all scans. 576 peak_detection: bool 577 smooth: bool 578 plot: bool 579 ax: matplotlib axis object 580 trace_type: str ('TIC','BPC') 581 582 returns: 583 chroma: dict 584 { 585 Scan: [int] 586 original thermo scan numberMS 587 Time: [floats] 588 list of retention times 589 TIC: [floats] 590 total ion chromatogram 591 Apexes: [int] 592 original thermo apex scan number after peak picking 593 } 594 """ 595 if trace_type == "TIC": 596 settings = ChromatogramTraceSettings(TraceType.TIC) 597 elif trace_type == "BPC": 598 settings = ChromatogramTraceSettings(TraceType.BasePeak) 599 else: 600 raise ValueError(f"{trace_type} undefined") 601 if ms_type == "all": 602 settings.Filter = None 603 else: 604 settings.Filter = ms_type 605 606 chroma_settings = IChromatogramSettings(settings) 607 608 data = self.iRawDataPlus.GetChromatogramData( 609 [chroma_settings], self.start_scan, self.end_scan 610 ) 611 612 trace = ChromatogramSignal.FromChromatogramData(data) 613 614 data = TIC_Data(time=[], scans=[], tic=[], bpc=[], apexes=[]) 615 616 if trace[0].Length > 0: 617 for i in range(trace[0].Length): 618 # print(trace[0].HasBasePeakData,trace[0].EndTime ) 619 620 # print(" {} - {}, {}".format( i, trace[0].Times[i], trace[0].Intensities[i] )) 621 data.time.append(trace[0].Times[i]) 622 data.tic.append(trace[0].Intensities[i]) 623 data.scans.append(trace[0].Scans[i]) 624 625 # print(trace[0].Scans[i]) 626 if smooth: 627 data.tic = self.smooth_tic(data.tic) 628 629 else: 630 data.tic = np.array(data.tic) 631 632 if peak_detection: 633 centroid_peak_indexes = [ 634 i for i in self.centroid_detector(data.time, data.tic) 635 ] 636 637 data.apexes = centroid_peak_indexes 638 639 if plot: 640 if not ax: 641 import matplotlib.pyplot as plt 642 643 ax = plt.gca() 644 # fig, ax = plt.subplots(figsize=(6, 3)) 645 646 ax.plot(data.time, data.tic, label=trace_type) 647 ax.set_xlabel("Time (min)") 648 ax.set_ylabel("a.u.") 649 if peak_detection: 650 for peak_indexes in data.apexes: 651 apex_index = peak_indexes[1] 652 ax.plot( 653 data.time[apex_index], 654 data.tic[apex_index], 655 marker="x", 656 linewidth=0, 657 ) 658 659 # plt.show() 660 if trace_type == "BPC": 661 data.bpc = data.tic 662 data.tic = [] 663 return data, ax 664 if trace_type == "BPC": 665 data.bpc = data.tic 666 data.tic = [] 667 return data, None 668 669 else: 670 return None, None 671 672 def get_average_mass_spectrum( 673 self, 674 spectrum_mode: str = "profile", 675 auto_process: bool = True, 676 ppm_tolerance: float = 5.0, 677 ms_type: str = "MS1", 678 ) -> MassSpecProfile | MassSpecCentroid: 679 """ 680 Averages mass spectra over a scan range using Thermo's AverageScansInScanRange method 681 or a scan list using Thermo's AverageScans method 682 spectrum_mode: str 683 centroid or profile mass spectrum 684 auto_process: bool 685 If true performs peak picking, and noise threshold calculation after creation of mass spectrum object 686 ms_type: str 687 String of form 'ms1' or 'ms2' or 'MS3' etc. Valid up to MS10. 688 Internal function converts to Thermo MSOrderType class. 689 690 """ 691 692 def get_profile_mass_spec(averageScan, d_params: dict, auto_process: bool): 693 mz_list = list(averageScan.SegmentedScan.Positions) 694 abund_list = list(averageScan.SegmentedScan.Intensities) 695 696 data_dict = { 697 Labels.mz: mz_list, 698 Labels.abundance: abund_list, 699 } 700 701 return MassSpecProfile(data_dict, d_params, auto_process=auto_process) 702 703 def get_centroid_mass_spec(averageScan, d_params: dict): 704 noise = list(averageScan.centroidScan.Noises) 705 706 baselines = list(averageScan.centroidScan.Baselines) 707 708 rp = list(averageScan.centroidScan.Resolutions) 709 710 magnitude = list(averageScan.centroidScan.Intensities) 711 712 mz = list(averageScan.centroidScan.Masses) 713 714 array_noise_std = (np.array(noise) - np.array(baselines)) / 3 715 l_signal_to_noise = np.array(magnitude) / array_noise_std 716 717 d_params["baseline_noise"] = np.average(array_noise_std) 718 719 d_params["baseline_noise_std"] = np.std(array_noise_std) 720 721 data_dict = { 722 Labels.mz: mz, 723 Labels.abundance: magnitude, 724 Labels.rp: rp, 725 Labels.s2n: list(l_signal_to_noise), 726 } 727 728 mass_spec = MassSpecCentroid(data_dict, d_params, auto_process=False) 729 730 return mass_spec 731 732 d_params = self.set_metadata( 733 firstScanNumber=self.start_scan, lastScanNumber=self.end_scan 734 ) 735 736 # Create the mass options object that will be used when averaging the scans 737 options = MassOptions() 738 options.ToleranceUnits = ToleranceUnits.ppm 739 options.Tolerance = ppm_tolerance 740 741 # Get the scan filter for the first scan. This scan filter will be used to located 742 # scans within the given scan range of the same type 743 scanFilter = self.iRawDataPlus.GetFilterForScanNumber(self.start_scan) 744 745 # force it to only look for the MSType 746 scanFilter = self.set_msordertype(scanFilter, ms_type) 747 748 if isinstance(self.scans, tuple): 749 averageScan = Extensions.AverageScansInScanRange( 750 self.iRawDataPlus, self.start_scan, self.end_scan, scanFilter, options 751 ) 752 753 if averageScan: 754 if spectrum_mode == "profile": 755 mass_spec = get_profile_mass_spec( 756 averageScan, d_params, auto_process 757 ) 758 759 return mass_spec 760 761 elif spectrum_mode == "centroid": 762 if averageScan.HasCentroidStream: 763 mass_spec = get_centroid_mass_spec(averageScan, d_params) 764 765 return mass_spec 766 767 else: 768 raise ValueError( 769 "No Centroind data available for the selected scans" 770 ) 771 else: 772 raise ValueError("spectrum_mode must be 'profile' or centroid") 773 else: 774 raise ValueError("No data found for the selected scans") 775 776 elif isinstance(self.scans, list): 777 d_params = self.set_metadata(scans_list=self.scans) 778 779 scans = List[int]() 780 for scan in self.scans: 781 scans.Add(scan) 782 783 averageScan = Extensions.AverageScans(self.iRawDataPlus, scans, options) 784 785 if averageScan: 786 if spectrum_mode == "profile": 787 mass_spec = get_profile_mass_spec( 788 averageScan, d_params, auto_process 789 ) 790 791 return mass_spec 792 793 elif spectrum_mode == "centroid": 794 if averageScan.HasCentroidStream: 795 mass_spec = get_centroid_mass_spec(averageScan, d_params) 796 797 return mass_spec 798 799 else: 800 raise ValueError( 801 "No Centroind data available for the selected scans" 802 ) 803 804 else: 805 raise ValueError("spectrum_mode must be 'profile' or centroid") 806 807 else: 808 raise ValueError("No data found for the selected scans") 809 810 else: 811 raise ValueError("scans must be a list intergers or a tuple if integers") 812 813 def set_metadata( 814 self, 815 firstScanNumber=0, 816 lastScanNumber=0, 817 scans_list=False, 818 label=Labels.thermo_profile, 819 ): 820 """ 821 Collect metadata to be ingested in the mass spectrum object 822 823 scans_list: list[int] or false 824 lastScanNumber: int 825 firstScanNumber: int 826 """ 827 828 d_params = default_parameters(self.file_path) 829 830 # assumes scans is full scan or reduced profile scan 831 832 d_params["label"] = label 833 834 if scans_list: 835 d_params["scan_number"] = scans_list 836 837 d_params["polarity"] = self.get_polarity_mode(scans_list[0]) 838 839 else: 840 d_params["scan_number"] = "{}-{}".format(firstScanNumber, lastScanNumber) 841 842 d_params["polarity"] = self.get_polarity_mode(firstScanNumber) 843 844 d_params["analyzer"] = self.iRawDataPlus.GetInstrumentData().Model 845 846 d_params["acquisition_time"] = self.get_creation_time() 847 848 d_params["instrument_label"] = self.iRawDataPlus.GetInstrumentData().Name 849 850 return d_params 851 852 def get_instrument_methods(self, parse_strings: bool = True): 853 """ 854 This function will extract the instrument methods embedded in the raw file 855 856 First it will check if there are any instrument methods, if not returning None 857 Then it will get the total number of instrument methods. 858 For each method, it will extract the plaintext string of the method and attempt to parse it into a dictionary 859 If this fails, it will return just the string object. 860 861 This has been tested on data from an Orbitrap ID-X with embedded MS and LC methods, but other instrument types may fail. 862 863 Parameters: 864 ----------- 865 parse_strings: bool 866 If True, will attempt to parse the instrument methods into a dictionary. If False, will return the raw string. 867 868 Returns: 869 -------- 870 List[Dict[str, Any]] or List 871 A list of dictionaries containing the instrument methods, or a list of strings if parsing fails. 872 """ 873 874 if not self.iRawDataPlus.HasInstrumentMethod: 875 raise ValueError( 876 "Raw Data file does not have any instrument methods attached" 877 ) 878 return None 879 else: 880 881 def parse_instrument_method(data): 882 lines = data.split("\r\n") 883 method = {} 884 current_section = None 885 sub_section = None 886 887 for line in lines: 888 if not line.strip(): # Skip empty lines 889 continue 890 if ( 891 line.startswith("----") 892 or line.endswith("Settings") 893 or line.endswith("Summary") 894 or line.startswith("Experiment") 895 or line.startswith("Scan Event") 896 ): 897 current_section = line.replace("-", "").strip() 898 method[current_section] = {} 899 sub_section = None 900 elif line.startswith("\t"): 901 if "\t\t" in line: 902 indent_level = line.count("\t") 903 key_value = line.strip() 904 905 if indent_level == 2: 906 if sub_section: 907 key, value = ( 908 key_value.split("=", 1) 909 if "=" in key_value 910 else (key_value, None) 911 ) 912 method[current_section][sub_section][ 913 key.strip() 914 ] = value.strip() if value else None 915 elif indent_level == 3: 916 scan_type, key_value = ( 917 key_value.split(" ", 1) 918 if " " in key_value 919 else (key_value, None) 920 ) 921 method.setdefault(current_section, {}).setdefault( 922 sub_section, {} 923 ).setdefault(scan_type, {}) 924 925 if key_value: 926 key, value = ( 927 key_value.split("=", 1) 928 if "=" in key_value 929 else (key_value, None) 930 ) 931 method[current_section][sub_section][scan_type][ 932 key.strip() 933 ] = value.strip() if value else None 934 else: 935 key_value = line.strip() 936 if "=" in key_value: 937 key, value = key_value.split("=", 1) 938 method.setdefault(current_section, {})[key.strip()] = ( 939 value.strip() 940 ) 941 else: 942 sub_section = key_value 943 else: 944 if ":" in line: 945 key, value = line.split(":", 1) 946 method[current_section][key.strip()] = value.strip() 947 else: 948 method[current_section][line] = {} 949 950 return method 951 952 count_instrument_methods = self.iRawDataPlus.InstrumentMethodsCount 953 # TODO make this code better... 954 instrument_methods = [] 955 for i in range(count_instrument_methods): 956 instrument_method_string = self.iRawDataPlus.GetInstrumentMethod(i) 957 if parse_strings: 958 try: 959 instrument_method_dict = parse_instrument_method( 960 instrument_method_string 961 ) 962 except: # if it fails for any reason 963 instrument_method_dict = instrument_method_string 964 else: 965 instrument_method_dict = instrument_method_string 966 instrument_methods.append(instrument_method_dict) 967 return instrument_methods 968 969 def get_tune_method(self): 970 """ 971 This code will extract the tune method from the raw file 972 It has been tested on data from a Thermo Orbitrap ID-X, Astral and Q-Exactive, but may fail on other instrument types. 973 It attempts to parse out section headers and sub-sections, but may not work for all instrument types. 974 It will also not return Labels (keys) where the value is blank 975 976 Returns: 977 -------- 978 Dict[str, Any] 979 A dictionary containing the tune method information 980 981 Raises: 982 ------- 983 ValueError 984 If no tune methods are found in the raw file 985 986 """ 987 tunemethodcount = self.iRawDataPlus.GetTuneDataCount() 988 if tunemethodcount == 0: 989 raise ValueError("No tune methods found in the raw data file") 990 return None 991 elif tunemethodcount > 1: 992 warnings.warn( 993 "Multiple tune methods found in the raw data file, returning the 1st" 994 ) 995 996 header = self.iRawDataPlus.GetTuneData(0) 997 998 header_dic = {} 999 current_section = None 1000 1001 for i in range(header.Length): 1002 label = header.Labels[i] 1003 value = header.Values[i] 1004 1005 # Check for section headers 1006 if "===" in label or ( 1007 (value == "" or value is None) and not label.endswith(":") 1008 ): 1009 # This is a section header 1010 section_name = ( 1011 label.replace("=", "").replace(":", "").strip() 1012 ) # Clean the label if it contains '=' 1013 header_dic[section_name] = {} 1014 current_section = section_name 1015 else: 1016 if current_section: 1017 header_dic[current_section][label] = value 1018 else: 1019 header_dic[label] = value 1020 return header_dic 1021 1022 def get_status_log(self, retention_time: float = 0): 1023 """ 1024 This code will extract the status logs from the raw file 1025 It has been tested on data from a Thermo Orbitrap ID-X, Astral and Q-Exactive, but may fail on other instrument types. 1026 It attempts to parse out section headers and sub-sections, but may not work for all instrument types. 1027 It will also not return Labels (keys) where the value is blank 1028 1029 Parameters: 1030 ----------- 1031 retention_time: float 1032 The retention time in minutes to extract the status log data from. 1033 Will use the closest retention time found. Default 0. 1034 1035 Returns: 1036 -------- 1037 Dict[str, Any] 1038 A dictionary containing the status log information 1039 1040 Raises: 1041 ------- 1042 ValueError 1043 If no status logs are found in the raw file 1044 1045 """ 1046 tunemethodcount = self.iRawDataPlus.GetStatusLogEntriesCount() 1047 if tunemethodcount == 0: 1048 raise ValueError("No status logs found in the raw data file") 1049 return None 1050 1051 header = self.iRawDataPlus.GetStatusLogForRetentionTime(retention_time) 1052 1053 header_dic = {} 1054 current_section = None 1055 1056 for i in range(header.Length): 1057 label = header.Labels[i] 1058 value = header.Values[i] 1059 1060 # Check for section headers 1061 if "===" in label or ( 1062 (value == "" or value is None) and not label.endswith(":") 1063 ): 1064 # This is a section header 1065 section_name = ( 1066 label.replace("=", "").replace(":", "").strip() 1067 ) # Clean the label if it contains '=' 1068 header_dic[section_name] = {} 1069 current_section = section_name 1070 else: 1071 if current_section: 1072 header_dic[current_section][label] = value 1073 else: 1074 header_dic[label] = value 1075 return header_dic 1076 1077 def get_error_logs(self): 1078 """ 1079 This code will extract the error logs from the raw file 1080 1081 Returns: 1082 -------- 1083 Dict[float, str] 1084 A dictionary containing the error log information with the retention time as the key 1085 1086 Raises: 1087 ------- 1088 ValueError 1089 If no error logs are found in the raw file 1090 """ 1091 1092 error_log_count = self.iRawDataPlus.RunHeaderEx.ErrorLogCount 1093 if error_log_count == 0: 1094 raise ValueError("No error logs found in the raw data file") 1095 return None 1096 1097 error_logs = {} 1098 1099 for i in range(error_log_count): 1100 error_log_item = self.iRawDataPlus.GetErrorLogItem(i) 1101 rt = error_log_item.RetentionTime 1102 message = error_log_item.Message 1103 # Use the index `i` as the unique ID key 1104 error_logs[i] = {"rt": rt, "message": message} 1105 return error_logs 1106 1107 def get_sample_information(self): 1108 """ 1109 This code will extract the sample information from the raw file 1110 1111 Returns: 1112 -------- 1113 Dict[str, Any] 1114 A dictionary containing the sample information 1115 Note that UserText field may not be handled properly and may need further processing 1116 """ 1117 sminfo = self.iRawDataPlus.SampleInformation 1118 smdict = {} 1119 smdict["Comment"] = sminfo.Comment 1120 smdict["SampleId"] = sminfo.SampleId 1121 smdict["SampleName"] = sminfo.SampleName 1122 smdict["Vial"] = sminfo.Vial 1123 smdict["InjectionVolume"] = sminfo.InjectionVolume 1124 smdict["Barcode"] = sminfo.Barcode 1125 smdict["BarcodeStatus"] = str(sminfo.BarcodeStatus) 1126 smdict["CalibrationLevel"] = sminfo.CalibrationLevel 1127 smdict["DilutionFactor"] = sminfo.DilutionFactor 1128 smdict["InstrumentMethodFile"] = sminfo.InstrumentMethodFile 1129 smdict["RawFileName"] = sminfo.RawFileName 1130 smdict["CalibrationFile"] = sminfo.CalibrationFile 1131 smdict["IstdAmount"] = sminfo.IstdAmount 1132 smdict["RowNumber"] = sminfo.RowNumber 1133 smdict["Path"] = sminfo.Path 1134 smdict["ProcessingMethodFile"] = sminfo.ProcessingMethodFile 1135 smdict["SampleType"] = str(sminfo.SampleType) 1136 smdict["SampleWeight"] = sminfo.SampleWeight 1137 smdict["UserText"] = { 1138 "UserText": [x for x in sminfo.UserText] 1139 } # [0] #This may not work - needs debugging with 1140 return smdict 1141 1142 def get_instrument_data(self): 1143 """ 1144 This code will extract the instrument data from the raw file 1145 1146 Returns: 1147 -------- 1148 Dict[str, Any] 1149 A dictionary containing the instrument data 1150 """ 1151 instrument_data = self.iRawDataPlus.GetInstrumentData() 1152 id_dict = {} 1153 id_dict["Name"] = instrument_data.Name 1154 id_dict["Model"] = instrument_data.Model 1155 id_dict["SerialNumber"] = instrument_data.SerialNumber 1156 id_dict["SoftwareVersion"] = instrument_data.SoftwareVersion 1157 id_dict["HardwareVersion"] = instrument_data.HardwareVersion 1158 id_dict["ChannelLabels"] = { 1159 "ChannelLabels": [x for x in instrument_data.ChannelLabels] 1160 } 1161 id_dict["Flags"] = instrument_data.Flags 1162 id_dict["AxisLabelY"] = instrument_data.AxisLabelY 1163 id_dict["AxisLabelX"] = instrument_data.AxisLabelX 1164 return id_dict 1165 1166 def get_centroid_msms_data(self, scan): 1167 """ 1168 .. deprecated:: 2.0 1169 This function will be removed in CoreMS 2.0. Please use `get_average_mass_spectrum()` instead for similar functionality. 1170 """ 1171 1172 warnings.warn( 1173 "The `get_centroid_msms_data()` is deprecated as of CoreMS 2.0 and will be removed in a future version. " 1174 "Please use `get_average_mass_spectrum()` instead.", 1175 DeprecationWarning, 1176 ) 1177 1178 d_params = self.set_metadata(scans_list=[scan], label=Labels.thermo_centroid) 1179 1180 centroidStream = self.iRawDataPlus.GetCentroidStream(scan, False) 1181 1182 noise = list(centroidStream.Noises) 1183 1184 baselines = list(centroidStream.Baselines) 1185 1186 rp = list(centroidStream.Resolutions) 1187 1188 magnitude = list(centroidStream.Intensities) 1189 1190 mz = list(centroidStream.Masses) 1191 1192 # charge = scans_labels[5] 1193 array_noise_std = (np.array(noise) - np.array(baselines)) / 3 1194 l_signal_to_noise = np.array(magnitude) / array_noise_std 1195 1196 d_params["baseline_noise"] = np.average(array_noise_std) 1197 1198 d_params["baseline_noise_std"] = np.std(array_noise_std) 1199 1200 data_dict = { 1201 Labels.mz: mz, 1202 Labels.abundance: magnitude, 1203 Labels.rp: rp, 1204 Labels.s2n: list(l_signal_to_noise), 1205 } 1206 1207 mass_spec = MassSpecCentroid(data_dict, d_params, auto_process=False) 1208 mass_spec.settings.noise_threshold_method = "relative_abundance" 1209 mass_spec.settings.noise_threshold_min_relative_abundance = 1 1210 mass_spec.process_mass_spec() 1211 return mass_spec 1212 1213 def get_average_mass_spectrum_by_scanlist( 1214 self, 1215 scans_list: List[int], 1216 auto_process: bool = True, 1217 ppm_tolerance: float = 5.0, 1218 ) -> MassSpecProfile: 1219 """ 1220 Averages selected scans mass spectra using Thermo's AverageScans method 1221 scans_list: list[int] 1222 auto_process: bool 1223 If true performs peak picking, and noise threshold calculation after creation of mass spectrum object 1224 Returns: 1225 MassSpecProfile 1226 1227 .. deprecated:: 2.0 1228 This function will be removed in CoreMS 2.0. Please use `get_average_mass_spectrum()` instead for similar functionality. 1229 """ 1230 1231 warnings.warn( 1232 "The `get_average_mass_spectrum_by_scanlist()` is deprecated as of CoreMS 2.0 and will be removed in a future version. " 1233 "Please use `get_average_mass_spectrum()` instead.", 1234 DeprecationWarning, 1235 ) 1236 1237 d_params = self.set_metadata(scans_list=scans_list) 1238 1239 # assumes scans is full scan or reduced profile scan 1240 1241 scans = List[int]() 1242 for scan in scans_list: 1243 scans.Add(scan) 1244 1245 # Create the mass options object that will be used when averaging the scans 1246 options = MassOptions() 1247 options.ToleranceUnits = ToleranceUnits.ppm 1248 options.Tolerance = ppm_tolerance 1249 1250 # Get the scan filter for the first scan. This scan filter will be used to located 1251 # scans within the given scan range of the same type 1252 1253 averageScan = Extensions.AverageScans(self.iRawDataPlus, scans, options) 1254 1255 len_data = averageScan.SegmentedScan.Positions.Length 1256 1257 mz_list = list(averageScan.SegmentedScan.Positions) 1258 abund_list = list(averageScan.SegmentedScan.Intensities) 1259 1260 data_dict = { 1261 Labels.mz: mz_list, 1262 Labels.abundance: abund_list, 1263 } 1264 1265 mass_spec = MassSpecProfile(data_dict, d_params, auto_process=auto_process) 1266 1267 return mass_spec
Class for parsing Thermo Raw files and extracting information from them.
Parameters:
file_location : str or pathlib.Path or s3path.S3Path Thermo Raw file path or S3 path.
Attributes:
file_path : str or pathlib.Path or s3path.S3Path The file path of the Thermo Raw file. parameters : LCMSParameters The LCMS parameters for the Thermo Raw file. chromatogram_settings : LiquidChromatographSetting The chromatogram settings for the Thermo Raw file. scans : list or tuple The selected scans for the Thermo Raw file. start_scan : int The starting scan number for the Thermo Raw file. end_scan : int The ending scan number for the Thermo Raw file.
Methods:
- set_msordertype(scanFilter, mstype: str = 'ms1') -> scanFilter Convert the user-passed MS Type string to a Thermo MSOrderType object.
- get_instrument_info() -> dict Get the instrument information from the Thermo Raw file.
- get_creation_time() -> datetime.datetime Extract the creation date stamp from the .RAW file and return it as a formatted datetime object.
- remove_temp_file() Remove the temporary file if the path is from S3Path.
- get_polarity_mode(scan_number: int) -> int Get the polarity mode for the given scan number.
- get_filter_for_scan_num(scan_number: int) -> List[str] Get the filter for the given scan number.
- check_full_scan(scan_number: int) -> bool Check if the given scan number is a full scan.
- get_all_filters() -> Tuple[Dict[int, str], List[str]] Get all scan filters for the Thermo Raw file.
- get_scan_header(scan: int) -> Dict[str, Any] Get the full dictionary of scan header metadata for the given scan number.
- get_rt_time_from_trace(trace) -> Tuple[List[float], List[float], List[int]] Get the retention time, intensity, and scan number from the given trace.
- get_eics(target_mzs: List[float], tic_data: Dict[str, Any], ms_type: str = 'MS !d', peak_detection: bool = True, smooth: bool = True, plot: bool = False, ax: Optional[matplotlib.axes.Axes] = None, legend: bool = False) -> Tuple[Dict[float, EIC_Data], matplotlib.axes.Axes] Get the extracted ion chromatograms (EICs) for the target m/z values.
112 def __init__(self, file_location): 113 """file_location: srt pathlib.Path or s3path.S3Path 114 Thermo Raw file path 115 """ 116 # Thread.__init__(self) 117 if isinstance(file_location, str): 118 file_path = Path(file_location) 119 120 elif isinstance(file_location, S3Path): 121 temp_dir = Path("tmp/") 122 temp_dir.mkdir(exist_ok=True) 123 124 file_path = temp_dir / file_location.name 125 with open(file_path, "wb") as fh: 126 fh.write(file_location.read_bytes()) 127 128 else: 129 file_path = file_location 130 131 self.iRawDataPlus = RawFileReaderAdapter.FileFactory(str(file_path)) 132 133 if not self.iRawDataPlus.IsOpen: 134 raise FileNotFoundError( 135 "Unable to access the RAW file using the RawFileReader class!" 136 ) 137 138 # Check for any errors in the RAW file 139 if self.iRawDataPlus.IsError: 140 raise IOError( 141 "Error opening ({}) - {}".format(self.iRawDataPlus.FileError, file_path) 142 ) 143 144 self.res = self.iRawDataPlus.SelectInstrument(Device.MS, 1) 145 146 self.file_path = file_location 147 self.iFileHeader = FileHeaderReaderFactory.ReadFile(str(file_path)) 148 149 # removing tmp file 150 151 self._init_settings()
file_location: srt pathlib.Path or s3path.S3Path Thermo Raw file path
Get or set the LiquidChromatographSetting object.
scans : list or tuple If list uses Thermo AverageScansInScanRange for selected scans, ortherwise uses Thermo AverageScans for a scan range
210 def set_msordertype(self, scanFilter, mstype: str = "ms1"): 211 """ 212 Function to convert user passed string MS Type to Thermo MSOrderType object 213 Limited to MS1 through MS10. 214 215 Parameters: 216 ----------- 217 scanFilter : Thermo.ScanFilter 218 The scan filter object. 219 mstype : str, optional 220 The MS Type string, by default 'ms1' 221 222 """ 223 mstype = mstype.upper() 224 # Check that a valid mstype is passed 225 if (int(mstype.split("MS")[1]) > 10) or (int(mstype.split("MS")[1]) < 1): 226 warn("MS Type not valid, must be between MS1 and MS10") 227 228 msordertypedict = { 229 "MS1": MSOrderType.Ms, 230 "MS2": MSOrderType.Ms2, 231 "MS3": MSOrderType.Ms3, 232 "MS4": MSOrderType.Ms4, 233 "MS5": MSOrderType.Ms5, 234 "MS6": MSOrderType.Ms6, 235 "MS7": MSOrderType.Ms7, 236 "MS8": MSOrderType.Ms8, 237 "MS9": MSOrderType.Ms9, 238 "MS10": MSOrderType.Ms10, 239 } 240 scanFilter.MSOrder = msordertypedict[mstype] 241 return scanFilter
Function to convert user passed string MS Type to Thermo MSOrderType object Limited to MS1 through MS10.
Parameters:
scanFilter : Thermo.ScanFilter The scan filter object. mstype : str, optional The MS Type string, by default 'ms1'
243 def get_instrument_info(self) -> dict: 244 """ 245 Get the instrument information from the Thermo Raw file. 246 247 Returns: 248 -------- 249 dict 250 A dictionary with the keys 'model', and 'serial_number'. 251 """ 252 instrumentData = self.iRawDataPlus.GetInstrumentData() 253 return { 254 "model": instrumentData.Model, 255 "serial_number": instrumentData.SerialNumber 256 }
Get the instrument information from the Thermo Raw file.
Returns:
dict A dictionary with the keys 'model', and 'serial_number'.
258 def get_creation_time(self) -> datetime.datetime: 259 """ 260 Extract the creation date stamp from the .RAW file 261 Return formatted creation date stamp. 262 263 """ 264 credate = self.iRawDataPlus.CreationDate.get_Ticks() 265 credate = datetime.datetime(1, 1, 1) + datetime.timedelta( 266 microseconds=credate / 10 267 ) 268 return credate
Extract the creation date stamp from the .RAW file Return formatted creation date stamp.
270 def remove_temp_file(self) -> None: 271 """if the path is from S3Path data cannot be serialized to io.ByteStream and 272 a temporary copy is stored at the temp dir 273 use this function only at the end of your execution scrip 274 some LCMS class methods depend on this file 275 """ 276 277 self.file_path.unlink()
if the path is from S3Path data cannot be serialized to io.ByteStream and a temporary copy is stored at the temp dir use this function only at the end of your execution scrip some LCMS class methods depend on this file
279 def close_file(self) -> None: 280 """ 281 Close the Thermo Raw file. 282 """ 283 self.iRawDataPlus.Dispose()
Close the Thermo Raw file.
285 def get_polarity_mode(self, scan_number: int) -> int: 286 """ 287 Get the polarity mode for the given scan number. 288 289 Parameters: 290 ----------- 291 scan_number : int 292 The scan number. 293 294 Raises: 295 ------- 296 Exception 297 If the polarity mode is unknown. 298 299 """ 300 polarity_symbol = self.get_filter_for_scan_num(scan_number)[1] 301 302 if polarity_symbol == "+": 303 return 1 304 # return 'POSITIVE_ION_MODE' 305 306 elif polarity_symbol == "-": 307 return -1 308 309 else: 310 raise Exception("Polarity Mode Unknown, please set it manually")
Get the polarity mode for the given scan number.
Parameters:
scan_number : int The scan number.
Raises:
Exception If the polarity mode is unknown.
312 def get_filter_for_scan_num(self, scan_number: int) -> List[str]: 313 """ 314 Returns the closest matching run time that corresponds to scan_number for the current 315 controller. This function is only supported for MS device controllers. 316 e.g. ['FTMS', '-', 'p', 'NSI', 'Full', 'ms', '[200.00-1000.00]'] 317 318 Parameters: 319 ----------- 320 scan_number : int 321 The scan number. 322 323 """ 324 scan_label = self.iRawDataPlus.GetScanEventStringForScanNumber(scan_number) 325 326 return str(scan_label).split()
Returns the closest matching run time that corresponds to scan_number for the current controller. This function is only supported for MS device controllers. e.g. ['FTMS', '-', 'p', 'NSI', 'Full', 'ms', '[200.00-1000.00]']
Parameters:
scan_number : int The scan number.
328 def get_ms_level_for_scan_num(self, scan_number: int) -> str: 329 """ 330 Get the MS order for the given scan number. 331 332 Parameters: 333 ----------- 334 scan_number : int 335 The scan number 336 337 Returns: 338 -------- 339 int 340 The MS order type (1 for MS, 2 for MS2, etc.) 341 """ 342 scan_filter = self.iRawDataPlus.GetFilterForScanNumber(scan_number) 343 344 msordertype = { 345 MSOrderType.Ms: 1, 346 MSOrderType.Ms2: 2, 347 MSOrderType.Ms3: 3, 348 MSOrderType.Ms4: 4, 349 MSOrderType.Ms5: 5, 350 MSOrderType.Ms6: 6, 351 MSOrderType.Ms7: 7, 352 MSOrderType.Ms8: 8, 353 MSOrderType.Ms9: 9, 354 MSOrderType.Ms10: 10, 355 } 356 357 if scan_filter.MSOrder in msordertype: 358 return msordertype[scan_filter.MSOrder] 359 else: 360 raise Exception("MS Order Type not found")
Get the MS order for the given scan number.
Parameters:
scan_number : int The scan number
Returns:
int The MS order type (1 for MS, 2 for MS2, etc.)
368 def get_all_filters(self) -> Tuple[Dict[int, str], List[str]]: 369 """ 370 Get all scan filters. 371 This function is only supported for MS device controllers. 372 e.g. ['FTMS', '-', 'p', 'NSI', 'Full', 'ms', '[200.00-1000.00]'] 373 374 """ 375 376 scanrange = range(self.start_scan, self.end_scan + 1) 377 scanfiltersdic = {} 378 scanfilterslist = [] 379 for scan_number in scanrange: 380 scan_label = self.iRawDataPlus.GetScanEventStringForScanNumber(scan_number) 381 scanfiltersdic[scan_number] = scan_label 382 scanfilterslist.append(scan_label) 383 scanfilterset = list(set(scanfilterslist)) 384 return scanfiltersdic, scanfilterset
Get all scan filters. This function is only supported for MS device controllers. e.g. ['FTMS', '-', 'p', 'NSI', 'Full', 'ms', '[200.00-1000.00]']
386 def get_scan_header(self, scan: int) -> Dict[str, Any]: 387 """ 388 Get full dictionary of scan header meta data, i.e. AGC status, ion injection time, etc. 389 390 Parameters: 391 ----------- 392 scan : int 393 The scan number. 394 395 """ 396 header = self.iRawDataPlus.GetTrailerExtraInformation(scan) 397 398 header_dic = {} 399 for i in range(header.Length): 400 header_dic.update({header.Labels[i]: header.Values[i]}) 401 return header_dic
Get full dictionary of scan header meta data, i.e. AGC status, ion injection time, etc.
Parameters:
scan : int The scan number.
403 @staticmethod 404 def get_rt_time_from_trace(trace) -> Tuple[List[float], List[float], List[int]]: 405 """trace: ThermoFisher.CommonCore.Data.Business.ChromatogramSignal""" 406 return list(trace.Times), list(trace.Intensities), list(trace.Scans)
trace: ThermoFisher.CommonCore.Data.Business.ChromatogramSignal
408 def get_eics( 409 self, 410 target_mzs: List[float], 411 tic_data: Dict[str, Any], 412 ms_type="MS !d", 413 peak_detection=False, 414 smooth=False, 415 plot=False, 416 ax: Optional[axes.Axes] = None, 417 legend=False, 418 ) -> Tuple[Dict[float, EIC_Data], axes.Axes]: 419 """ms_type: str ('MS', MS2') 420 start_scan: int default -1 will select the lowest available 421 end_scan: int default -1 will select the highest available 422 423 returns: 424 425 chroma: dict{target_mz: EIC_Data( 426 Scans: [int] 427 original thermo scan numbers 428 Time: [floats] 429 list of retention times 430 TIC: [floats] 431 total ion chromatogram 432 Apexes: [int] 433 original thermo apex scan number after peak picking 434 ) 435 436 """ 437 # If peak_detection or smooth is True, raise exception 438 if peak_detection or smooth: 439 raise Exception("Peak detection and smoothing are no longer implemented in this function") 440 441 options = MassOptions() 442 options.ToleranceUnits = ToleranceUnits.ppm 443 options.Tolerance = self.chromatogram_settings.eic_tolerance_ppm 444 445 all_chroma_settings = [] 446 447 for target_mz in target_mzs: 448 settings = ChromatogramTraceSettings(TraceType.MassRange) 449 settings.Filter = ms_type 450 settings.MassRanges = [Range(target_mz, target_mz)] 451 452 chroma_settings = IChromatogramSettings(settings) 453 454 all_chroma_settings.append(chroma_settings) 455 456 # chroma_settings2 = IChromatogramSettings(settings) 457 # print(chroma_settings.FragmentMass) 458 # print(chroma_settings.FragmentMass) 459 # print(chroma_settings) 460 # print(chroma_settings) 461 462 data = self.iRawDataPlus.GetChromatogramData( 463 all_chroma_settings, self.start_scan, self.end_scan, options 464 ) 465 466 traces = ChromatogramSignal.FromChromatogramData(data) 467 468 chroma = {} 469 470 if plot: 471 from matplotlib.transforms import Bbox 472 import matplotlib.pyplot as plt 473 474 if not ax: 475 # ax = plt.gca() 476 # ax.clear() 477 fig, ax = plt.subplots() 478 479 else: 480 fig = plt.gcf() 481 482 # plt.show() 483 484 for i, trace in enumerate(traces): 485 if trace.Length > 0: 486 rt, eic, scans = self.get_rt_time_from_trace(trace) 487 if smooth: 488 eic = self.smooth_tic(eic) 489 490 chroma[target_mzs[i]] = EIC_Data(scans=scans, time=rt, eic=eic) 491 if plot: 492 ax.plot(rt, eic, label="{:.5f}".format(target_mzs[i])) 493 494 if peak_detection: 495 # max_eic = self.get_max_eic(chroma) 496 max_signal = max(tic_data.tic) 497 498 for eic_data in chroma.values(): 499 eic = eic_data.eic 500 time = eic_data.time 501 502 if len(eic) != len(tic_data.tic): 503 warn( 504 "The software assumes same lenth of TIC and EIC, this does not seems to be the case and the results mass spectrum selected by the scan number might not be correct" 505 ) 506 507 if eic.max() > 0: 508 centroid_eics = self.eic_centroid_detector(time, eic, max_signal) 509 eic_data.apexes = [i for i in centroid_eics] 510 511 if plot: 512 for peak_indexes in eic_data.apexes: 513 apex_index = peak_indexes[1] 514 ax.plot( 515 time[apex_index], 516 eic[apex_index], 517 marker="x", 518 linewidth=0, 519 ) 520 521 if plot: 522 ax.set_xlabel("Time (min)") 523 ax.set_ylabel("a.u.") 524 ax.set_title(ms_type + " EIC") 525 ax.tick_params(axis="both", which="major", labelsize=12) 526 ax.axes.spines["top"].set_visible(False) 527 ax.axes.spines["right"].set_visible(False) 528 529 if legend: 530 legend = ax.legend(loc="upper left", bbox_to_anchor=(1.02, 0, 0.07, 1)) 531 fig.subplots_adjust(right=0.76) 532 # ax.set_prop_cycle(color=plt.cm.gist_rainbow(np.linspace(0, 1, len(traces)))) 533 534 d = {"down": 30, "up": -30} 535 536 def func(evt): 537 if legend.contains(evt): 538 bbox = legend.get_bbox_to_anchor() 539 bbox = Bbox.from_bounds( 540 bbox.x0, bbox.y0 + d[evt.button], bbox.width, bbox.height 541 ) 542 tr = legend.axes.transAxes.inverted() 543 legend.set_bbox_to_anchor(bbox.transformed(tr)) 544 fig.canvas.draw_idle() 545 546 fig.canvas.mpl_connect("scroll_event", func) 547 return chroma, ax 548 else: 549 return chroma, None 550 rt = [] 551 tic = [] 552 scans = [] 553 for i in range(traces[0].Length): 554 # print(trace[0].HasBasePeakData,trace[0].EndTime ) 555 556 # print(" {} - {}, {}".format( i, trace[0].Times[i], trace[0].Intensities[i] )) 557 rt.append(traces[0].Times[i]) 558 tic.append(traces[0].Intensities[i]) 559 scans.append(traces[0].Scans[i]) 560 561 return traces 562 # plot_chroma(rt, tic) 563 # plt.show()
ms_type: str ('MS', MS2') start_scan: int default -1 will select the lowest available end_scan: int default -1 will select the highest available
returns:
chroma: dict{target_mz: EIC_Data(
Scans: [int]
original thermo scan numbers
Time: [floats]
list of retention times
TIC: [floats]
total ion chromatogram
Apexes: [int]
original thermo apex scan number after peak picking
)
565 def get_tic( 566 self, 567 ms_type="MS !d", 568 peak_detection=False, # This wont work right now 569 smooth=False, # This wont work right now 570 plot=False, 571 ax=None, 572 trace_type="TIC", 573 ) -> Tuple[TIC_Data, axes.Axes]: 574 """ms_type: str ('MS !d', 'MS2', None) 575 if you use None you get all scans. 576 peak_detection: bool 577 smooth: bool 578 plot: bool 579 ax: matplotlib axis object 580 trace_type: str ('TIC','BPC') 581 582 returns: 583 chroma: dict 584 { 585 Scan: [int] 586 original thermo scan numberMS 587 Time: [floats] 588 list of retention times 589 TIC: [floats] 590 total ion chromatogram 591 Apexes: [int] 592 original thermo apex scan number after peak picking 593 } 594 """ 595 if trace_type == "TIC": 596 settings = ChromatogramTraceSettings(TraceType.TIC) 597 elif trace_type == "BPC": 598 settings = ChromatogramTraceSettings(TraceType.BasePeak) 599 else: 600 raise ValueError(f"{trace_type} undefined") 601 if ms_type == "all": 602 settings.Filter = None 603 else: 604 settings.Filter = ms_type 605 606 chroma_settings = IChromatogramSettings(settings) 607 608 data = self.iRawDataPlus.GetChromatogramData( 609 [chroma_settings], self.start_scan, self.end_scan 610 ) 611 612 trace = ChromatogramSignal.FromChromatogramData(data) 613 614 data = TIC_Data(time=[], scans=[], tic=[], bpc=[], apexes=[]) 615 616 if trace[0].Length > 0: 617 for i in range(trace[0].Length): 618 # print(trace[0].HasBasePeakData,trace[0].EndTime ) 619 620 # print(" {} - {}, {}".format( i, trace[0].Times[i], trace[0].Intensities[i] )) 621 data.time.append(trace[0].Times[i]) 622 data.tic.append(trace[0].Intensities[i]) 623 data.scans.append(trace[0].Scans[i]) 624 625 # print(trace[0].Scans[i]) 626 if smooth: 627 data.tic = self.smooth_tic(data.tic) 628 629 else: 630 data.tic = np.array(data.tic) 631 632 if peak_detection: 633 centroid_peak_indexes = [ 634 i for i in self.centroid_detector(data.time, data.tic) 635 ] 636 637 data.apexes = centroid_peak_indexes 638 639 if plot: 640 if not ax: 641 import matplotlib.pyplot as plt 642 643 ax = plt.gca() 644 # fig, ax = plt.subplots(figsize=(6, 3)) 645 646 ax.plot(data.time, data.tic, label=trace_type) 647 ax.set_xlabel("Time (min)") 648 ax.set_ylabel("a.u.") 649 if peak_detection: 650 for peak_indexes in data.apexes: 651 apex_index = peak_indexes[1] 652 ax.plot( 653 data.time[apex_index], 654 data.tic[apex_index], 655 marker="x", 656 linewidth=0, 657 ) 658 659 # plt.show() 660 if trace_type == "BPC": 661 data.bpc = data.tic 662 data.tic = [] 663 return data, ax 664 if trace_type == "BPC": 665 data.bpc = data.tic 666 data.tic = [] 667 return data, None 668 669 else: 670 return None, None
ms_type: str ('MS !d', 'MS2', None) if you use None you get all scans. peak_detection: bool smooth: bool plot: bool ax: matplotlib axis object trace_type: str ('TIC','BPC')
returns: chroma: dict { Scan: [int] original thermo scan numberMS Time: [floats] list of retention times TIC: [floats] total ion chromatogram Apexes: [int] original thermo apex scan number after peak picking }
672 def get_average_mass_spectrum( 673 self, 674 spectrum_mode: str = "profile", 675 auto_process: bool = True, 676 ppm_tolerance: float = 5.0, 677 ms_type: str = "MS1", 678 ) -> MassSpecProfile | MassSpecCentroid: 679 """ 680 Averages mass spectra over a scan range using Thermo's AverageScansInScanRange method 681 or a scan list using Thermo's AverageScans method 682 spectrum_mode: str 683 centroid or profile mass spectrum 684 auto_process: bool 685 If true performs peak picking, and noise threshold calculation after creation of mass spectrum object 686 ms_type: str 687 String of form 'ms1' or 'ms2' or 'MS3' etc. Valid up to MS10. 688 Internal function converts to Thermo MSOrderType class. 689 690 """ 691 692 def get_profile_mass_spec(averageScan, d_params: dict, auto_process: bool): 693 mz_list = list(averageScan.SegmentedScan.Positions) 694 abund_list = list(averageScan.SegmentedScan.Intensities) 695 696 data_dict = { 697 Labels.mz: mz_list, 698 Labels.abundance: abund_list, 699 } 700 701 return MassSpecProfile(data_dict, d_params, auto_process=auto_process) 702 703 def get_centroid_mass_spec(averageScan, d_params: dict): 704 noise = list(averageScan.centroidScan.Noises) 705 706 baselines = list(averageScan.centroidScan.Baselines) 707 708 rp = list(averageScan.centroidScan.Resolutions) 709 710 magnitude = list(averageScan.centroidScan.Intensities) 711 712 mz = list(averageScan.centroidScan.Masses) 713 714 array_noise_std = (np.array(noise) - np.array(baselines)) / 3 715 l_signal_to_noise = np.array(magnitude) / array_noise_std 716 717 d_params["baseline_noise"] = np.average(array_noise_std) 718 719 d_params["baseline_noise_std"] = np.std(array_noise_std) 720 721 data_dict = { 722 Labels.mz: mz, 723 Labels.abundance: magnitude, 724 Labels.rp: rp, 725 Labels.s2n: list(l_signal_to_noise), 726 } 727 728 mass_spec = MassSpecCentroid(data_dict, d_params, auto_process=False) 729 730 return mass_spec 731 732 d_params = self.set_metadata( 733 firstScanNumber=self.start_scan, lastScanNumber=self.end_scan 734 ) 735 736 # Create the mass options object that will be used when averaging the scans 737 options = MassOptions() 738 options.ToleranceUnits = ToleranceUnits.ppm 739 options.Tolerance = ppm_tolerance 740 741 # Get the scan filter for the first scan. This scan filter will be used to located 742 # scans within the given scan range of the same type 743 scanFilter = self.iRawDataPlus.GetFilterForScanNumber(self.start_scan) 744 745 # force it to only look for the MSType 746 scanFilter = self.set_msordertype(scanFilter, ms_type) 747 748 if isinstance(self.scans, tuple): 749 averageScan = Extensions.AverageScansInScanRange( 750 self.iRawDataPlus, self.start_scan, self.end_scan, scanFilter, options 751 ) 752 753 if averageScan: 754 if spectrum_mode == "profile": 755 mass_spec = get_profile_mass_spec( 756 averageScan, d_params, auto_process 757 ) 758 759 return mass_spec 760 761 elif spectrum_mode == "centroid": 762 if averageScan.HasCentroidStream: 763 mass_spec = get_centroid_mass_spec(averageScan, d_params) 764 765 return mass_spec 766 767 else: 768 raise ValueError( 769 "No Centroind data available for the selected scans" 770 ) 771 else: 772 raise ValueError("spectrum_mode must be 'profile' or centroid") 773 else: 774 raise ValueError("No data found for the selected scans") 775 776 elif isinstance(self.scans, list): 777 d_params = self.set_metadata(scans_list=self.scans) 778 779 scans = List[int]() 780 for scan in self.scans: 781 scans.Add(scan) 782 783 averageScan = Extensions.AverageScans(self.iRawDataPlus, scans, options) 784 785 if averageScan: 786 if spectrum_mode == "profile": 787 mass_spec = get_profile_mass_spec( 788 averageScan, d_params, auto_process 789 ) 790 791 return mass_spec 792 793 elif spectrum_mode == "centroid": 794 if averageScan.HasCentroidStream: 795 mass_spec = get_centroid_mass_spec(averageScan, d_params) 796 797 return mass_spec 798 799 else: 800 raise ValueError( 801 "No Centroind data available for the selected scans" 802 ) 803 804 else: 805 raise ValueError("spectrum_mode must be 'profile' or centroid") 806 807 else: 808 raise ValueError("No data found for the selected scans") 809 810 else: 811 raise ValueError("scans must be a list intergers or a tuple if integers")
Averages mass spectra over a scan range using Thermo's AverageScansInScanRange method or a scan list using Thermo's AverageScans method spectrum_mode: str centroid or profile mass spectrum auto_process: bool If true performs peak picking, and noise threshold calculation after creation of mass spectrum object ms_type: str String of form 'ms1' or 'ms2' or 'MS3' etc. Valid up to MS10. Internal function converts to Thermo MSOrderType class.
813 def set_metadata( 814 self, 815 firstScanNumber=0, 816 lastScanNumber=0, 817 scans_list=False, 818 label=Labels.thermo_profile, 819 ): 820 """ 821 Collect metadata to be ingested in the mass spectrum object 822 823 scans_list: list[int] or false 824 lastScanNumber: int 825 firstScanNumber: int 826 """ 827 828 d_params = default_parameters(self.file_path) 829 830 # assumes scans is full scan or reduced profile scan 831 832 d_params["label"] = label 833 834 if scans_list: 835 d_params["scan_number"] = scans_list 836 837 d_params["polarity"] = self.get_polarity_mode(scans_list[0]) 838 839 else: 840 d_params["scan_number"] = "{}-{}".format(firstScanNumber, lastScanNumber) 841 842 d_params["polarity"] = self.get_polarity_mode(firstScanNumber) 843 844 d_params["analyzer"] = self.iRawDataPlus.GetInstrumentData().Model 845 846 d_params["acquisition_time"] = self.get_creation_time() 847 848 d_params["instrument_label"] = self.iRawDataPlus.GetInstrumentData().Name 849 850 return d_params
Collect metadata to be ingested in the mass spectrum object
scans_list: list[int] or false lastScanNumber: int firstScanNumber: int
852 def get_instrument_methods(self, parse_strings: bool = True): 853 """ 854 This function will extract the instrument methods embedded in the raw file 855 856 First it will check if there are any instrument methods, if not returning None 857 Then it will get the total number of instrument methods. 858 For each method, it will extract the plaintext string of the method and attempt to parse it into a dictionary 859 If this fails, it will return just the string object. 860 861 This has been tested on data from an Orbitrap ID-X with embedded MS and LC methods, but other instrument types may fail. 862 863 Parameters: 864 ----------- 865 parse_strings: bool 866 If True, will attempt to parse the instrument methods into a dictionary. If False, will return the raw string. 867 868 Returns: 869 -------- 870 List[Dict[str, Any]] or List 871 A list of dictionaries containing the instrument methods, or a list of strings if parsing fails. 872 """ 873 874 if not self.iRawDataPlus.HasInstrumentMethod: 875 raise ValueError( 876 "Raw Data file does not have any instrument methods attached" 877 ) 878 return None 879 else: 880 881 def parse_instrument_method(data): 882 lines = data.split("\r\n") 883 method = {} 884 current_section = None 885 sub_section = None 886 887 for line in lines: 888 if not line.strip(): # Skip empty lines 889 continue 890 if ( 891 line.startswith("----") 892 or line.endswith("Settings") 893 or line.endswith("Summary") 894 or line.startswith("Experiment") 895 or line.startswith("Scan Event") 896 ): 897 current_section = line.replace("-", "").strip() 898 method[current_section] = {} 899 sub_section = None 900 elif line.startswith("\t"): 901 if "\t\t" in line: 902 indent_level = line.count("\t") 903 key_value = line.strip() 904 905 if indent_level == 2: 906 if sub_section: 907 key, value = ( 908 key_value.split("=", 1) 909 if "=" in key_value 910 else (key_value, None) 911 ) 912 method[current_section][sub_section][ 913 key.strip() 914 ] = value.strip() if value else None 915 elif indent_level == 3: 916 scan_type, key_value = ( 917 key_value.split(" ", 1) 918 if " " in key_value 919 else (key_value, None) 920 ) 921 method.setdefault(current_section, {}).setdefault( 922 sub_section, {} 923 ).setdefault(scan_type, {}) 924 925 if key_value: 926 key, value = ( 927 key_value.split("=", 1) 928 if "=" in key_value 929 else (key_value, None) 930 ) 931 method[current_section][sub_section][scan_type][ 932 key.strip() 933 ] = value.strip() if value else None 934 else: 935 key_value = line.strip() 936 if "=" in key_value: 937 key, value = key_value.split("=", 1) 938 method.setdefault(current_section, {})[key.strip()] = ( 939 value.strip() 940 ) 941 else: 942 sub_section = key_value 943 else: 944 if ":" in line: 945 key, value = line.split(":", 1) 946 method[current_section][key.strip()] = value.strip() 947 else: 948 method[current_section][line] = {} 949 950 return method 951 952 count_instrument_methods = self.iRawDataPlus.InstrumentMethodsCount 953 # TODO make this code better... 954 instrument_methods = [] 955 for i in range(count_instrument_methods): 956 instrument_method_string = self.iRawDataPlus.GetInstrumentMethod(i) 957 if parse_strings: 958 try: 959 instrument_method_dict = parse_instrument_method( 960 instrument_method_string 961 ) 962 except: # if it fails for any reason 963 instrument_method_dict = instrument_method_string 964 else: 965 instrument_method_dict = instrument_method_string 966 instrument_methods.append(instrument_method_dict) 967 return instrument_methods
This function will extract the instrument methods embedded in the raw file
First it will check if there are any instrument methods, if not returning None Then it will get the total number of instrument methods. For each method, it will extract the plaintext string of the method and attempt to parse it into a dictionary If this fails, it will return just the string object.
This has been tested on data from an Orbitrap ID-X with embedded MS and LC methods, but other instrument types may fail.
Parameters:
parse_strings: bool If True, will attempt to parse the instrument methods into a dictionary. If False, will return the raw string.
Returns:
List[Dict[str, Any]] or List A list of dictionaries containing the instrument methods, or a list of strings if parsing fails.
969 def get_tune_method(self): 970 """ 971 This code will extract the tune method from the raw file 972 It has been tested on data from a Thermo Orbitrap ID-X, Astral and Q-Exactive, but may fail on other instrument types. 973 It attempts to parse out section headers and sub-sections, but may not work for all instrument types. 974 It will also not return Labels (keys) where the value is blank 975 976 Returns: 977 -------- 978 Dict[str, Any] 979 A dictionary containing the tune method information 980 981 Raises: 982 ------- 983 ValueError 984 If no tune methods are found in the raw file 985 986 """ 987 tunemethodcount = self.iRawDataPlus.GetTuneDataCount() 988 if tunemethodcount == 0: 989 raise ValueError("No tune methods found in the raw data file") 990 return None 991 elif tunemethodcount > 1: 992 warnings.warn( 993 "Multiple tune methods found in the raw data file, returning the 1st" 994 ) 995 996 header = self.iRawDataPlus.GetTuneData(0) 997 998 header_dic = {} 999 current_section = None 1000 1001 for i in range(header.Length): 1002 label = header.Labels[i] 1003 value = header.Values[i] 1004 1005 # Check for section headers 1006 if "===" in label or ( 1007 (value == "" or value is None) and not label.endswith(":") 1008 ): 1009 # This is a section header 1010 section_name = ( 1011 label.replace("=", "").replace(":", "").strip() 1012 ) # Clean the label if it contains '=' 1013 header_dic[section_name] = {} 1014 current_section = section_name 1015 else: 1016 if current_section: 1017 header_dic[current_section][label] = value 1018 else: 1019 header_dic[label] = value 1020 return header_dic
This code will extract the tune method from the raw file It has been tested on data from a Thermo Orbitrap ID-X, Astral and Q-Exactive, but may fail on other instrument types. It attempts to parse out section headers and sub-sections, but may not work for all instrument types. It will also not return Labels (keys) where the value is blank
Returns:
Dict[str, Any] A dictionary containing the tune method information
Raises:
ValueError If no tune methods are found in the raw file
1022 def get_status_log(self, retention_time: float = 0): 1023 """ 1024 This code will extract the status logs from the raw file 1025 It has been tested on data from a Thermo Orbitrap ID-X, Astral and Q-Exactive, but may fail on other instrument types. 1026 It attempts to parse out section headers and sub-sections, but may not work for all instrument types. 1027 It will also not return Labels (keys) where the value is blank 1028 1029 Parameters: 1030 ----------- 1031 retention_time: float 1032 The retention time in minutes to extract the status log data from. 1033 Will use the closest retention time found. Default 0. 1034 1035 Returns: 1036 -------- 1037 Dict[str, Any] 1038 A dictionary containing the status log information 1039 1040 Raises: 1041 ------- 1042 ValueError 1043 If no status logs are found in the raw file 1044 1045 """ 1046 tunemethodcount = self.iRawDataPlus.GetStatusLogEntriesCount() 1047 if tunemethodcount == 0: 1048 raise ValueError("No status logs found in the raw data file") 1049 return None 1050 1051 header = self.iRawDataPlus.GetStatusLogForRetentionTime(retention_time) 1052 1053 header_dic = {} 1054 current_section = None 1055 1056 for i in range(header.Length): 1057 label = header.Labels[i] 1058 value = header.Values[i] 1059 1060 # Check for section headers 1061 if "===" in label or ( 1062 (value == "" or value is None) and not label.endswith(":") 1063 ): 1064 # This is a section header 1065 section_name = ( 1066 label.replace("=", "").replace(":", "").strip() 1067 ) # Clean the label if it contains '=' 1068 header_dic[section_name] = {} 1069 current_section = section_name 1070 else: 1071 if current_section: 1072 header_dic[current_section][label] = value 1073 else: 1074 header_dic[label] = value 1075 return header_dic
This code will extract the status logs from the raw file It has been tested on data from a Thermo Orbitrap ID-X, Astral and Q-Exactive, but may fail on other instrument types. It attempts to parse out section headers and sub-sections, but may not work for all instrument types. It will also not return Labels (keys) where the value is blank
Parameters:
retention_time: float The retention time in minutes to extract the status log data from. Will use the closest retention time found. Default 0.
Returns:
Dict[str, Any] A dictionary containing the status log information
Raises:
ValueError If no status logs are found in the raw file
1077 def get_error_logs(self): 1078 """ 1079 This code will extract the error logs from the raw file 1080 1081 Returns: 1082 -------- 1083 Dict[float, str] 1084 A dictionary containing the error log information with the retention time as the key 1085 1086 Raises: 1087 ------- 1088 ValueError 1089 If no error logs are found in the raw file 1090 """ 1091 1092 error_log_count = self.iRawDataPlus.RunHeaderEx.ErrorLogCount 1093 if error_log_count == 0: 1094 raise ValueError("No error logs found in the raw data file") 1095 return None 1096 1097 error_logs = {} 1098 1099 for i in range(error_log_count): 1100 error_log_item = self.iRawDataPlus.GetErrorLogItem(i) 1101 rt = error_log_item.RetentionTime 1102 message = error_log_item.Message 1103 # Use the index `i` as the unique ID key 1104 error_logs[i] = {"rt": rt, "message": message} 1105 return error_logs
This code will extract the error logs from the raw file
Returns:
Dict[float, str] A dictionary containing the error log information with the retention time as the key
Raises:
ValueError If no error logs are found in the raw file
1107 def get_sample_information(self): 1108 """ 1109 This code will extract the sample information from the raw file 1110 1111 Returns: 1112 -------- 1113 Dict[str, Any] 1114 A dictionary containing the sample information 1115 Note that UserText field may not be handled properly and may need further processing 1116 """ 1117 sminfo = self.iRawDataPlus.SampleInformation 1118 smdict = {} 1119 smdict["Comment"] = sminfo.Comment 1120 smdict["SampleId"] = sminfo.SampleId 1121 smdict["SampleName"] = sminfo.SampleName 1122 smdict["Vial"] = sminfo.Vial 1123 smdict["InjectionVolume"] = sminfo.InjectionVolume 1124 smdict["Barcode"] = sminfo.Barcode 1125 smdict["BarcodeStatus"] = str(sminfo.BarcodeStatus) 1126 smdict["CalibrationLevel"] = sminfo.CalibrationLevel 1127 smdict["DilutionFactor"] = sminfo.DilutionFactor 1128 smdict["InstrumentMethodFile"] = sminfo.InstrumentMethodFile 1129 smdict["RawFileName"] = sminfo.RawFileName 1130 smdict["CalibrationFile"] = sminfo.CalibrationFile 1131 smdict["IstdAmount"] = sminfo.IstdAmount 1132 smdict["RowNumber"] = sminfo.RowNumber 1133 smdict["Path"] = sminfo.Path 1134 smdict["ProcessingMethodFile"] = sminfo.ProcessingMethodFile 1135 smdict["SampleType"] = str(sminfo.SampleType) 1136 smdict["SampleWeight"] = sminfo.SampleWeight 1137 smdict["UserText"] = { 1138 "UserText": [x for x in sminfo.UserText] 1139 } # [0] #This may not work - needs debugging with 1140 return smdict
This code will extract the sample information from the raw file
Returns:
Dict[str, Any] A dictionary containing the sample information Note that UserText field may not be handled properly and may need further processing
1142 def get_instrument_data(self): 1143 """ 1144 This code will extract the instrument data from the raw file 1145 1146 Returns: 1147 -------- 1148 Dict[str, Any] 1149 A dictionary containing the instrument data 1150 """ 1151 instrument_data = self.iRawDataPlus.GetInstrumentData() 1152 id_dict = {} 1153 id_dict["Name"] = instrument_data.Name 1154 id_dict["Model"] = instrument_data.Model 1155 id_dict["SerialNumber"] = instrument_data.SerialNumber 1156 id_dict["SoftwareVersion"] = instrument_data.SoftwareVersion 1157 id_dict["HardwareVersion"] = instrument_data.HardwareVersion 1158 id_dict["ChannelLabels"] = { 1159 "ChannelLabels": [x for x in instrument_data.ChannelLabels] 1160 } 1161 id_dict["Flags"] = instrument_data.Flags 1162 id_dict["AxisLabelY"] = instrument_data.AxisLabelY 1163 id_dict["AxisLabelX"] = instrument_data.AxisLabelX 1164 return id_dict
This code will extract the instrument data from the raw file
Returns:
Dict[str, Any] A dictionary containing the instrument data
1166 def get_centroid_msms_data(self, scan): 1167 """ 1168 .. deprecated:: 2.0 1169 This function will be removed in CoreMS 2.0. Please use `get_average_mass_spectrum()` instead for similar functionality. 1170 """ 1171 1172 warnings.warn( 1173 "The `get_centroid_msms_data()` is deprecated as of CoreMS 2.0 and will be removed in a future version. " 1174 "Please use `get_average_mass_spectrum()` instead.", 1175 DeprecationWarning, 1176 ) 1177 1178 d_params = self.set_metadata(scans_list=[scan], label=Labels.thermo_centroid) 1179 1180 centroidStream = self.iRawDataPlus.GetCentroidStream(scan, False) 1181 1182 noise = list(centroidStream.Noises) 1183 1184 baselines = list(centroidStream.Baselines) 1185 1186 rp = list(centroidStream.Resolutions) 1187 1188 magnitude = list(centroidStream.Intensities) 1189 1190 mz = list(centroidStream.Masses) 1191 1192 # charge = scans_labels[5] 1193 array_noise_std = (np.array(noise) - np.array(baselines)) / 3 1194 l_signal_to_noise = np.array(magnitude) / array_noise_std 1195 1196 d_params["baseline_noise"] = np.average(array_noise_std) 1197 1198 d_params["baseline_noise_std"] = np.std(array_noise_std) 1199 1200 data_dict = { 1201 Labels.mz: mz, 1202 Labels.abundance: magnitude, 1203 Labels.rp: rp, 1204 Labels.s2n: list(l_signal_to_noise), 1205 } 1206 1207 mass_spec = MassSpecCentroid(data_dict, d_params, auto_process=False) 1208 mass_spec.settings.noise_threshold_method = "relative_abundance" 1209 mass_spec.settings.noise_threshold_min_relative_abundance = 1 1210 mass_spec.process_mass_spec() 1211 return mass_spec
Deprecated since version 2.0:
This function will be removed in CoreMS 2.0. Please use get_average_mass_spectrum()
instead for similar functionality.
1213 def get_average_mass_spectrum_by_scanlist( 1214 self, 1215 scans_list: List[int], 1216 auto_process: bool = True, 1217 ppm_tolerance: float = 5.0, 1218 ) -> MassSpecProfile: 1219 """ 1220 Averages selected scans mass spectra using Thermo's AverageScans method 1221 scans_list: list[int] 1222 auto_process: bool 1223 If true performs peak picking, and noise threshold calculation after creation of mass spectrum object 1224 Returns: 1225 MassSpecProfile 1226 1227 .. deprecated:: 2.0 1228 This function will be removed in CoreMS 2.0. Please use `get_average_mass_spectrum()` instead for similar functionality. 1229 """ 1230 1231 warnings.warn( 1232 "The `get_average_mass_spectrum_by_scanlist()` is deprecated as of CoreMS 2.0 and will be removed in a future version. " 1233 "Please use `get_average_mass_spectrum()` instead.", 1234 DeprecationWarning, 1235 ) 1236 1237 d_params = self.set_metadata(scans_list=scans_list) 1238 1239 # assumes scans is full scan or reduced profile scan 1240 1241 scans = List[int]() 1242 for scan in scans_list: 1243 scans.Add(scan) 1244 1245 # Create the mass options object that will be used when averaging the scans 1246 options = MassOptions() 1247 options.ToleranceUnits = ToleranceUnits.ppm 1248 options.Tolerance = ppm_tolerance 1249 1250 # Get the scan filter for the first scan. This scan filter will be used to located 1251 # scans within the given scan range of the same type 1252 1253 averageScan = Extensions.AverageScans(self.iRawDataPlus, scans, options) 1254 1255 len_data = averageScan.SegmentedScan.Positions.Length 1256 1257 mz_list = list(averageScan.SegmentedScan.Positions) 1258 abund_list = list(averageScan.SegmentedScan.Intensities) 1259 1260 data_dict = { 1261 Labels.mz: mz_list, 1262 Labels.abundance: abund_list, 1263 } 1264 1265 mass_spec = MassSpecProfile(data_dict, d_params, auto_process=auto_process) 1266 1267 return mass_spec
Averages selected scans mass spectra using Thermo's AverageScans method scans_list: list[int] auto_process: bool If true performs peak picking, and noise threshold calculation after creation of mass spectrum object Returns: MassSpecProfile
Deprecated since version 2.0.
This function will be removed in CoreMS 2.0. Please use get_average_mass_spectrum()
instead for similar functionality.
1270class ImportMassSpectraThermoMSFileReader(ThermoBaseClass, SpectraParserInterface): 1271 """A class for parsing Thermo RAW mass spectrometry data files and instatiating MassSpectraBase or LCMSBase objects 1272 1273 Parameters 1274 ---------- 1275 file_location : str or Path 1276 The path to the RAW file to be parsed. 1277 analyzer : str, optional 1278 The type of mass analyzer used in the instrument. Default is "Unknown". 1279 instrument_label : str, optional 1280 The name of the instrument used to acquire the data. Default is "Unknown". 1281 sample_name : str, optional 1282 The name of the sample being analyzed. If not provided, the stem of the file_location path will be used. 1283 1284 Attributes 1285 ---------- 1286 file_location : Path 1287 The path to the RAW file being parsed. 1288 analyzer : str 1289 The type of mass analyzer used in the instrument. 1290 instrument_label : str 1291 The name of the instrument used to acquire the data. 1292 sample_name : str 1293 The name of the sample being analyzed. 1294 1295 Methods 1296 ------- 1297 * run(spectra=True). 1298 Parses the RAW file and returns a dictionary of mass spectra dataframes and a scan metadata dataframe. 1299 * get_mass_spectrum_from_scan(scan_number, polarity, auto_process=True) 1300 Parses the RAW file and returns a MassSpecBase object from a single scan. 1301 * get_mass_spectra_obj(). 1302 Parses the RAW file and instantiates a MassSpectraBase object. 1303 * get_lcms_obj(). 1304 Parses the RAW file and instantiates an LCMSBase object. 1305 * get_icr_transient_times(). 1306 Return a list for transient time targets for all scans, or selected scans range 1307 1308 Inherits from ThermoBaseClass and SpectraParserInterface 1309 """ 1310 1311 def __init__( 1312 self, 1313 file_location, 1314 analyzer="Unknown", 1315 instrument_label="Unknown", 1316 sample_name=None, 1317 ): 1318 super().__init__(file_location) 1319 if isinstance(file_location, str): 1320 # if obj is a string it defaults to create a Path obj, pass the S3Path if needed 1321 file_location = Path(file_location) 1322 if not file_location.exists(): 1323 raise FileExistsError("File does not exist: " + str(file_location)) 1324 1325 self.file_location = file_location 1326 self.analyzer = analyzer 1327 self.instrument_label = instrument_label 1328 1329 if sample_name: 1330 self.sample_name = sample_name 1331 else: 1332 self.sample_name = file_location.stem 1333 1334 def load(self): 1335 pass 1336 1337 def get_scan_df(self): 1338 # This automatically brings in all the data 1339 self.chromatogram_settings.scans = (-1, -1) 1340 1341 # Get scan df info; starting with TIC data 1342 tic_data, _ = self.get_tic(ms_type="all", peak_detection=False, smooth=False) 1343 tic_data = { 1344 "scan": tic_data.scans, 1345 "scan_time": tic_data.time, 1346 "tic": tic_data.tic, 1347 } 1348 scan_df = pd.DataFrame.from_dict(tic_data) 1349 scan_df["ms_level"] = None 1350 1351 # get scan text 1352 scan_filter_df = pd.DataFrame.from_dict( 1353 self.get_all_filters()[0], orient="index" 1354 ) 1355 scan_filter_df.reset_index(inplace=True) 1356 scan_filter_df.rename(columns={"index": "scan", 0: "scan_text"}, inplace=True) 1357 1358 scan_df = scan_df.merge(scan_filter_df, on="scan", how="left") 1359 scan_df["scan_window_lower"] = scan_df.scan_text.str.extract( 1360 r"\[(\d+\.\d+)-\d+\.\d+\]" 1361 ) 1362 scan_df["scan_window_upper"] = scan_df.scan_text.str.extract( 1363 r"\[\d+\.\d+-(\d+\.\d+)\]" 1364 ) 1365 scan_df["polarity"] = np.where( 1366 scan_df.scan_text.str.contains(" - "), "negative", "positive" 1367 ) 1368 scan_df["precursor_mz"] = scan_df.scan_text.str.extract(r"(\d+\.\d+)@") 1369 scan_df["precursor_mz"] = scan_df["precursor_mz"].astype(float) 1370 1371 # Assign each scan as centroid or profile and add ms_level 1372 scan_df["ms_format"] = None 1373 for i in scan_df.scan.to_list(): 1374 scan_df.loc[scan_df.scan == i, "ms_level"] = self.get_ms_level_for_scan_num(i) 1375 if self.iRawDataPlus.IsCentroidScanFromScanNumber(i): 1376 scan_df.loc[scan_df.scan == i, "ms_format"] = "centroid" 1377 else: 1378 scan_df.loc[scan_df.scan == i, "ms_format"] = "profile" 1379 1380 return scan_df 1381 1382 def get_ms_raw(self, spectra, scan_df): 1383 if spectra == "all": 1384 scan_df_forspec = scan_df 1385 elif spectra == "ms1": 1386 scan_df_forspec = scan_df[scan_df.ms_level == 1] 1387 elif spectra == "ms2": 1388 scan_df_forspec = scan_df[scan_df.ms_level == 2] 1389 else: 1390 raise ValueError("spectra must be 'none', 'all', 'ms1', or 'ms2'") 1391 1392 # Result container 1393 res = {} 1394 1395 # Row count container 1396 counter = {} 1397 1398 # Column name container 1399 cols = {} 1400 1401 # set at float32 1402 dtype = np.float32 1403 1404 # First pass: get nrows 1405 N = defaultdict(lambda: 0) 1406 for i in scan_df_forspec.scan.to_list(): 1407 level = scan_df_forspec.loc[scan_df_forspec.scan == i, "ms_level"].values[0] 1408 scanStatistics = self.iRawDataPlus.GetScanStatsForScanNumber(i) 1409 profileStream = self.iRawDataPlus.GetSegmentedScanFromScanNumber( 1410 i, scanStatistics 1411 ) 1412 abun = list(profileStream.Intensities) 1413 abun = np.array(abun)[np.where(np.array(abun) > 0)[0]] 1414 1415 N[level] += len(abun) 1416 1417 # Second pass: parse 1418 for i in scan_df_forspec.scan.to_list(): 1419 scanStatistics = self.iRawDataPlus.GetScanStatsForScanNumber(i) 1420 profileStream = self.iRawDataPlus.GetSegmentedScanFromScanNumber( 1421 i, scanStatistics 1422 ) 1423 abun = list(profileStream.Intensities) 1424 mz = list(profileStream.Positions) 1425 1426 # Get index of abun that are > 0 1427 inx = np.where(np.array(abun) > 0)[0] 1428 mz = np.array(mz)[inx] 1429 mz = np.float32(mz) 1430 abun = np.array(abun)[inx] 1431 abun = np.float32(abun) 1432 1433 level = scan_df_forspec.loc[scan_df_forspec.scan == i, "ms_level"].values[0] 1434 1435 # Number of rows 1436 n = len(mz) 1437 1438 # No measurements 1439 if n == 0: 1440 continue 1441 1442 # Dimension check 1443 if len(mz) != len(abun): 1444 warnings.warn("m/z and intensity array dimension mismatch") 1445 continue 1446 1447 # Scan/frame info 1448 id_dict = i 1449 1450 # Columns 1451 cols[level] = ["scan", "mz", "intensity"] 1452 m = len(cols[level]) 1453 1454 # Subarray init 1455 arr = np.empty((n, m), dtype=dtype) 1456 inx = 0 1457 1458 # Populate scan/frame info 1459 arr[:, inx] = i 1460 inx += 1 1461 1462 # Populate m/z 1463 arr[:, inx] = mz 1464 inx += 1 1465 1466 # Populate intensity 1467 arr[:, inx] = abun 1468 inx += 1 1469 1470 # Initialize output container 1471 if level not in res: 1472 res[level] = np.empty((N[level], m), dtype=dtype) 1473 counter[level] = 0 1474 1475 # Insert subarray 1476 res[level][counter[level] : counter[level] + n, :] = arr 1477 counter[level] += n 1478 1479 # Construct ms1 and ms2 mz dataframes 1480 for level in res.keys(): 1481 res[level] = pd.DataFrame(res[level]) 1482 res[level].columns = cols[level] 1483 # rename keys in res to add 'ms' prefix 1484 res = {f"ms{key}": value for key, value in res.items()} 1485 1486 return res 1487 1488 def run(self, spectra="all", scan_df=None): 1489 """ 1490 Extracts mass spectra data from a raw file. 1491 1492 Parameters 1493 ---------- 1494 spectra : str, optional 1495 Which mass spectra data to include in the output. Default is all. Other options: none, ms1, ms2. 1496 scan_df : pandas.DataFrame, optional 1497 Scan dataframe. If not provided, the scan dataframe is created from the mzML file. 1498 1499 Returns 1500 ------- 1501 tuple 1502 A tuple containing two elements: 1503 - A dictionary containing mass spectra data, separated by MS level. 1504 - A pandas DataFrame containing scan information, including scan number, scan time, TIC, MS level, 1505 scan text, scan window lower and upper bounds, polarity, and precursor m/z (if applicable). 1506 """ 1507 # Prepare scan_df 1508 if scan_df is None: 1509 scan_df = self.get_scan_df() 1510 1511 # Prepare mass spectra data 1512 if spectra != "none": 1513 res = self.get_ms_raw(spectra=spectra, scan_df=scan_df) 1514 else: 1515 res = None 1516 1517 return res, scan_df 1518 1519 def get_mass_spectrum_from_scan( 1520 self, scan_number, spectrum_mode, auto_process=True 1521 ): 1522 """Instatiate a MassSpecBase object from a single scan number from the binary file, currently only supports profile mode. 1523 1524 Parameters 1525 ---------- 1526 scan_number : int 1527 The scan number to extract the mass spectrum from. 1528 polarity : int 1529 The polarity of the scan. 1 for positive mode, -1 for negative mode. 1530 spectrum_mode : str 1531 The type of mass spectrum to extract. Must be 'profile' or 'centroid'. 1532 auto_process : bool, optional 1533 If True, perform peak picking and noise threshold calculation after creating the mass spectrum object. Default is True. 1534 1535 Returns 1536 ------- 1537 MassSpecProfile | MassSpecCentroid 1538 The MassSpecProfile or MassSpecCentroid object containing the parsed mass spectrum. 1539 """ 1540 1541 if spectrum_mode == "profile": 1542 scanStatistics = self.iRawDataPlus.GetScanStatsForScanNumber(scan_number) 1543 profileStream = self.iRawDataPlus.GetSegmentedScanFromScanNumber( 1544 scan_number, scanStatistics 1545 ) 1546 abun = list(profileStream.Intensities) 1547 mz = list(profileStream.Positions) 1548 data_dict = { 1549 Labels.mz: mz, 1550 Labels.abundance: abun, 1551 } 1552 d_params = self.set_metadata( 1553 firstScanNumber=scan_number, 1554 lastScanNumber=scan_number, 1555 scans_list=False, 1556 label=Labels.thermo_profile, 1557 ) 1558 mass_spectrum_obj = MassSpecProfile( 1559 data_dict, d_params, auto_process=auto_process 1560 ) 1561 1562 elif spectrum_mode == "centroid": 1563 centroid_scan = self.iRawDataPlus.GetCentroidStream(scan_number, False) 1564 if centroid_scan.Masses is not None: 1565 mz = list(centroid_scan.Masses) 1566 abun = list(centroid_scan.Intensities) 1567 rp = list(centroid_scan.Resolutions) 1568 magnitude = list(centroid_scan.Intensities) 1569 noise = list(centroid_scan.Noises) 1570 baselines = list(centroid_scan.Baselines) 1571 array_noise_std = (np.array(noise) - np.array(baselines)) / 3 1572 l_signal_to_noise = np.array(magnitude) / array_noise_std 1573 data_dict = { 1574 Labels.mz: mz, 1575 Labels.abundance: abun, 1576 Labels.rp: rp, 1577 Labels.s2n: list(l_signal_to_noise), 1578 } 1579 else: # For CID MS2, the centroid data are stored in the profile data location, they do not have any associated rp or baseline data, but they should be treated as centroid data 1580 scanStatistics = self.iRawDataPlus.GetScanStatsForScanNumber( 1581 scan_number 1582 ) 1583 profileStream = self.iRawDataPlus.GetSegmentedScanFromScanNumber( 1584 scan_number, scanStatistics 1585 ) 1586 abun = list(profileStream.Intensities) 1587 mz = list(profileStream.Positions) 1588 data_dict = { 1589 Labels.mz: mz, 1590 Labels.abundance: abun, 1591 Labels.rp: [np.nan] * len(mz), 1592 Labels.s2n: [np.nan] * len(mz), 1593 } 1594 d_params = self.set_metadata( 1595 firstScanNumber=scan_number, 1596 lastScanNumber=scan_number, 1597 scans_list=False, 1598 label=Labels.thermo_centroid, 1599 ) 1600 mass_spectrum_obj = MassSpecCentroid( 1601 data_dict, d_params, auto_process=auto_process 1602 ) 1603 1604 return mass_spectrum_obj 1605 1606 def get_mass_spectra_obj(self): 1607 """Instatiate a MassSpectraBase object from the binary data file file. 1608 1609 Returns 1610 ------- 1611 MassSpectraBase 1612 The MassSpectra object containing the parsed mass spectra. The object is instatiated with the mzML file, analyzer, instrument, sample name, and scan dataframe. 1613 """ 1614 _, scan_df = self.run(spectra="none") 1615 mass_spectra_obj = MassSpectraBase( 1616 self.file_location, 1617 self.analyzer, 1618 self.instrument_label, 1619 self.sample_name, 1620 self, 1621 ) 1622 scan_df = scan_df.set_index("scan", drop=False) 1623 mass_spectra_obj.scan_df = scan_df 1624 1625 return mass_spectra_obj 1626 1627 def get_lcms_obj(self, spectra="all"): 1628 """Instatiates a LCMSBase object from the mzML file. 1629 1630 Parameters 1631 ---------- 1632 spectra : str, optional 1633 Which mass spectra data to include in the output. Default is "all". Other options: "none", "ms1", "ms2". 1634 1635 Returns 1636 ------- 1637 LCMSBase 1638 LCMS object containing mass spectra data. The object is instatiated with the file location, analyzer, instrument, sample name, scan info, mz dataframe (as specifified), polarity, as well as the attributes holding the scans, retention times, and tics. 1639 """ 1640 _, scan_df = self.run(spectra="none") # first run it to just get scan info 1641 res, scan_df = self.run( 1642 scan_df=scan_df, spectra=spectra 1643 ) # second run to parse data 1644 lcms_obj = LCMSBase( 1645 self.file_location, 1646 self.analyzer, 1647 self.instrument_label, 1648 self.sample_name, 1649 self, 1650 ) 1651 if spectra != "none": 1652 for key in res: 1653 key_int = int(key.replace("ms", "")) 1654 res[key] = res[key][res[key].intensity > 0] 1655 res[key] = ( 1656 res[key].sort_values(by=["scan", "mz"]).reset_index(drop=True) 1657 ) 1658 lcms_obj._ms_unprocessed[key_int] = res[key] 1659 lcms_obj.scan_df = scan_df.set_index("scan", drop=False) 1660 # Check if polarity is mixed 1661 if len(set(scan_df.polarity)) > 1: 1662 raise ValueError("Mixed polarities detected in scan data") 1663 lcms_obj.polarity = scan_df.polarity[0] 1664 lcms_obj._scans_number_list = list(scan_df.scan) 1665 lcms_obj._retention_time_list = list(scan_df.scan_time) 1666 lcms_obj._tic_list = list(scan_df.tic) 1667 1668 return lcms_obj 1669 1670 def get_icr_transient_times(self): 1671 """Return a list for transient time targets for all scans, or selected scans range 1672 1673 Notes 1674 -------- 1675 Resolving Power and Transient time targets based on 7T FT-ICR MS system 1676 """ 1677 1678 res_trans_time = { 1679 "50": 0.384, 1680 "100000": 0.768, 1681 "200000": 1.536, 1682 "400000": 3.072, 1683 "750000": 6.144, 1684 "1000000": 12.288, 1685 } 1686 1687 firstScanNumber = self.start_scan 1688 1689 lastScanNumber = self.end_scan 1690 1691 transient_time_list = [] 1692 1693 for scan in range(firstScanNumber, lastScanNumber): 1694 scan_header = self.get_scan_header(scan) 1695 1696 rp_target = scan_header["FT Resolution:"] 1697 1698 transient_time = res_trans_time.get(rp_target) 1699 1700 transient_time_list.append(transient_time) 1701 1702 # print(transient_time, rp_target) 1703 1704 return transient_time_list
A class for parsing Thermo RAW mass spectrometry data files and instatiating MassSpectraBase or LCMSBase objects
Parameters
- file_location (str or Path): The path to the RAW file to be parsed.
- analyzer (str, optional): The type of mass analyzer used in the instrument. Default is "Unknown".
- instrument_label (str, optional): The name of the instrument used to acquire the data. Default is "Unknown".
- sample_name (str, optional): The name of the sample being analyzed. If not provided, the stem of the file_location path will be used.
Attributes
- file_location (Path): The path to the RAW file being parsed.
- analyzer (str): The type of mass analyzer used in the instrument.
- instrument_label (str): The name of the instrument used to acquire the data.
- sample_name (str): The name of the sample being analyzed.
Methods
- run(spectra=True). Parses the RAW file and returns a dictionary of mass spectra dataframes and a scan metadata dataframe.
- get_mass_spectrum_from_scan(scan_number, polarity, auto_process=True) Parses the RAW file and returns a MassSpecBase object from a single scan.
- get_mass_spectra_obj(). Parses the RAW file and instantiates a MassSpectraBase object.
- get_lcms_obj(). Parses the RAW file and instantiates an LCMSBase object.
- get_icr_transient_times(). Return a list for transient time targets for all scans, or selected scans range
Inherits from ThermoBaseClass and SpectraParserInterface
1311 def __init__( 1312 self, 1313 file_location, 1314 analyzer="Unknown", 1315 instrument_label="Unknown", 1316 sample_name=None, 1317 ): 1318 super().__init__(file_location) 1319 if isinstance(file_location, str): 1320 # if obj is a string it defaults to create a Path obj, pass the S3Path if needed 1321 file_location = Path(file_location) 1322 if not file_location.exists(): 1323 raise FileExistsError("File does not exist: " + str(file_location)) 1324 1325 self.file_location = file_location 1326 self.analyzer = analyzer 1327 self.instrument_label = instrument_label 1328 1329 if sample_name: 1330 self.sample_name = sample_name 1331 else: 1332 self.sample_name = file_location.stem
file_location: srt pathlib.Path or s3path.S3Path Thermo Raw file path
1337 def get_scan_df(self): 1338 # This automatically brings in all the data 1339 self.chromatogram_settings.scans = (-1, -1) 1340 1341 # Get scan df info; starting with TIC data 1342 tic_data, _ = self.get_tic(ms_type="all", peak_detection=False, smooth=False) 1343 tic_data = { 1344 "scan": tic_data.scans, 1345 "scan_time": tic_data.time, 1346 "tic": tic_data.tic, 1347 } 1348 scan_df = pd.DataFrame.from_dict(tic_data) 1349 scan_df["ms_level"] = None 1350 1351 # get scan text 1352 scan_filter_df = pd.DataFrame.from_dict( 1353 self.get_all_filters()[0], orient="index" 1354 ) 1355 scan_filter_df.reset_index(inplace=True) 1356 scan_filter_df.rename(columns={"index": "scan", 0: "scan_text"}, inplace=True) 1357 1358 scan_df = scan_df.merge(scan_filter_df, on="scan", how="left") 1359 scan_df["scan_window_lower"] = scan_df.scan_text.str.extract( 1360 r"\[(\d+\.\d+)-\d+\.\d+\]" 1361 ) 1362 scan_df["scan_window_upper"] = scan_df.scan_text.str.extract( 1363 r"\[\d+\.\d+-(\d+\.\d+)\]" 1364 ) 1365 scan_df["polarity"] = np.where( 1366 scan_df.scan_text.str.contains(" - "), "negative", "positive" 1367 ) 1368 scan_df["precursor_mz"] = scan_df.scan_text.str.extract(r"(\d+\.\d+)@") 1369 scan_df["precursor_mz"] = scan_df["precursor_mz"].astype(float) 1370 1371 # Assign each scan as centroid or profile and add ms_level 1372 scan_df["ms_format"] = None 1373 for i in scan_df.scan.to_list(): 1374 scan_df.loc[scan_df.scan == i, "ms_level"] = self.get_ms_level_for_scan_num(i) 1375 if self.iRawDataPlus.IsCentroidScanFromScanNumber(i): 1376 scan_df.loc[scan_df.scan == i, "ms_format"] = "centroid" 1377 else: 1378 scan_df.loc[scan_df.scan == i, "ms_format"] = "profile" 1379 1380 return scan_df
Return scan data as a pandas DataFrame.
1382 def get_ms_raw(self, spectra, scan_df): 1383 if spectra == "all": 1384 scan_df_forspec = scan_df 1385 elif spectra == "ms1": 1386 scan_df_forspec = scan_df[scan_df.ms_level == 1] 1387 elif spectra == "ms2": 1388 scan_df_forspec = scan_df[scan_df.ms_level == 2] 1389 else: 1390 raise ValueError("spectra must be 'none', 'all', 'ms1', or 'ms2'") 1391 1392 # Result container 1393 res = {} 1394 1395 # Row count container 1396 counter = {} 1397 1398 # Column name container 1399 cols = {} 1400 1401 # set at float32 1402 dtype = np.float32 1403 1404 # First pass: get nrows 1405 N = defaultdict(lambda: 0) 1406 for i in scan_df_forspec.scan.to_list(): 1407 level = scan_df_forspec.loc[scan_df_forspec.scan == i, "ms_level"].values[0] 1408 scanStatistics = self.iRawDataPlus.GetScanStatsForScanNumber(i) 1409 profileStream = self.iRawDataPlus.GetSegmentedScanFromScanNumber( 1410 i, scanStatistics 1411 ) 1412 abun = list(profileStream.Intensities) 1413 abun = np.array(abun)[np.where(np.array(abun) > 0)[0]] 1414 1415 N[level] += len(abun) 1416 1417 # Second pass: parse 1418 for i in scan_df_forspec.scan.to_list(): 1419 scanStatistics = self.iRawDataPlus.GetScanStatsForScanNumber(i) 1420 profileStream = self.iRawDataPlus.GetSegmentedScanFromScanNumber( 1421 i, scanStatistics 1422 ) 1423 abun = list(profileStream.Intensities) 1424 mz = list(profileStream.Positions) 1425 1426 # Get index of abun that are > 0 1427 inx = np.where(np.array(abun) > 0)[0] 1428 mz = np.array(mz)[inx] 1429 mz = np.float32(mz) 1430 abun = np.array(abun)[inx] 1431 abun = np.float32(abun) 1432 1433 level = scan_df_forspec.loc[scan_df_forspec.scan == i, "ms_level"].values[0] 1434 1435 # Number of rows 1436 n = len(mz) 1437 1438 # No measurements 1439 if n == 0: 1440 continue 1441 1442 # Dimension check 1443 if len(mz) != len(abun): 1444 warnings.warn("m/z and intensity array dimension mismatch") 1445 continue 1446 1447 # Scan/frame info 1448 id_dict = i 1449 1450 # Columns 1451 cols[level] = ["scan", "mz", "intensity"] 1452 m = len(cols[level]) 1453 1454 # Subarray init 1455 arr = np.empty((n, m), dtype=dtype) 1456 inx = 0 1457 1458 # Populate scan/frame info 1459 arr[:, inx] = i 1460 inx += 1 1461 1462 # Populate m/z 1463 arr[:, inx] = mz 1464 inx += 1 1465 1466 # Populate intensity 1467 arr[:, inx] = abun 1468 inx += 1 1469 1470 # Initialize output container 1471 if level not in res: 1472 res[level] = np.empty((N[level], m), dtype=dtype) 1473 counter[level] = 0 1474 1475 # Insert subarray 1476 res[level][counter[level] : counter[level] + n, :] = arr 1477 counter[level] += n 1478 1479 # Construct ms1 and ms2 mz dataframes 1480 for level in res.keys(): 1481 res[level] = pd.DataFrame(res[level]) 1482 res[level].columns = cols[level] 1483 # rename keys in res to add 'ms' prefix 1484 res = {f"ms{key}": value for key, value in res.items()} 1485 1486 return res
Return a dictionary of mass spectra data as a pandas DataFrame.
1488 def run(self, spectra="all", scan_df=None): 1489 """ 1490 Extracts mass spectra data from a raw file. 1491 1492 Parameters 1493 ---------- 1494 spectra : str, optional 1495 Which mass spectra data to include in the output. Default is all. Other options: none, ms1, ms2. 1496 scan_df : pandas.DataFrame, optional 1497 Scan dataframe. If not provided, the scan dataframe is created from the mzML file. 1498 1499 Returns 1500 ------- 1501 tuple 1502 A tuple containing two elements: 1503 - A dictionary containing mass spectra data, separated by MS level. 1504 - A pandas DataFrame containing scan information, including scan number, scan time, TIC, MS level, 1505 scan text, scan window lower and upper bounds, polarity, and precursor m/z (if applicable). 1506 """ 1507 # Prepare scan_df 1508 if scan_df is None: 1509 scan_df = self.get_scan_df() 1510 1511 # Prepare mass spectra data 1512 if spectra != "none": 1513 res = self.get_ms_raw(spectra=spectra, scan_df=scan_df) 1514 else: 1515 res = None 1516 1517 return res, scan_df
Extracts mass spectra data from a raw file.
Parameters
- spectra (str, optional): Which mass spectra data to include in the output. Default is all. Other options: none, ms1, ms2.
- scan_df (pandas.DataFrame, optional): Scan dataframe. If not provided, the scan dataframe is created from the mzML file.
Returns
- tuple: A tuple containing two elements:
- A dictionary containing mass spectra data, separated by MS level.
- A pandas DataFrame containing scan information, including scan number, scan time, TIC, MS level, scan text, scan window lower and upper bounds, polarity, and precursor m/z (if applicable).
1519 def get_mass_spectrum_from_scan( 1520 self, scan_number, spectrum_mode, auto_process=True 1521 ): 1522 """Instatiate a MassSpecBase object from a single scan number from the binary file, currently only supports profile mode. 1523 1524 Parameters 1525 ---------- 1526 scan_number : int 1527 The scan number to extract the mass spectrum from. 1528 polarity : int 1529 The polarity of the scan. 1 for positive mode, -1 for negative mode. 1530 spectrum_mode : str 1531 The type of mass spectrum to extract. Must be 'profile' or 'centroid'. 1532 auto_process : bool, optional 1533 If True, perform peak picking and noise threshold calculation after creating the mass spectrum object. Default is True. 1534 1535 Returns 1536 ------- 1537 MassSpecProfile | MassSpecCentroid 1538 The MassSpecProfile or MassSpecCentroid object containing the parsed mass spectrum. 1539 """ 1540 1541 if spectrum_mode == "profile": 1542 scanStatistics = self.iRawDataPlus.GetScanStatsForScanNumber(scan_number) 1543 profileStream = self.iRawDataPlus.GetSegmentedScanFromScanNumber( 1544 scan_number, scanStatistics 1545 ) 1546 abun = list(profileStream.Intensities) 1547 mz = list(profileStream.Positions) 1548 data_dict = { 1549 Labels.mz: mz, 1550 Labels.abundance: abun, 1551 } 1552 d_params = self.set_metadata( 1553 firstScanNumber=scan_number, 1554 lastScanNumber=scan_number, 1555 scans_list=False, 1556 label=Labels.thermo_profile, 1557 ) 1558 mass_spectrum_obj = MassSpecProfile( 1559 data_dict, d_params, auto_process=auto_process 1560 ) 1561 1562 elif spectrum_mode == "centroid": 1563 centroid_scan = self.iRawDataPlus.GetCentroidStream(scan_number, False) 1564 if centroid_scan.Masses is not None: 1565 mz = list(centroid_scan.Masses) 1566 abun = list(centroid_scan.Intensities) 1567 rp = list(centroid_scan.Resolutions) 1568 magnitude = list(centroid_scan.Intensities) 1569 noise = list(centroid_scan.Noises) 1570 baselines = list(centroid_scan.Baselines) 1571 array_noise_std = (np.array(noise) - np.array(baselines)) / 3 1572 l_signal_to_noise = np.array(magnitude) / array_noise_std 1573 data_dict = { 1574 Labels.mz: mz, 1575 Labels.abundance: abun, 1576 Labels.rp: rp, 1577 Labels.s2n: list(l_signal_to_noise), 1578 } 1579 else: # For CID MS2, the centroid data are stored in the profile data location, they do not have any associated rp or baseline data, but they should be treated as centroid data 1580 scanStatistics = self.iRawDataPlus.GetScanStatsForScanNumber( 1581 scan_number 1582 ) 1583 profileStream = self.iRawDataPlus.GetSegmentedScanFromScanNumber( 1584 scan_number, scanStatistics 1585 ) 1586 abun = list(profileStream.Intensities) 1587 mz = list(profileStream.Positions) 1588 data_dict = { 1589 Labels.mz: mz, 1590 Labels.abundance: abun, 1591 Labels.rp: [np.nan] * len(mz), 1592 Labels.s2n: [np.nan] * len(mz), 1593 } 1594 d_params = self.set_metadata( 1595 firstScanNumber=scan_number, 1596 lastScanNumber=scan_number, 1597 scans_list=False, 1598 label=Labels.thermo_centroid, 1599 ) 1600 mass_spectrum_obj = MassSpecCentroid( 1601 data_dict, d_params, auto_process=auto_process 1602 ) 1603 1604 return mass_spectrum_obj
Instatiate a MassSpecBase object from a single scan number from the binary file, currently only supports profile mode.
Parameters
- scan_number (int): The scan number to extract the mass spectrum from.
- polarity (int): The polarity of the scan. 1 for positive mode, -1 for negative mode.
- spectrum_mode (str): The type of mass spectrum to extract. Must be 'profile' or 'centroid'.
- auto_process (bool, optional): If True, perform peak picking and noise threshold calculation after creating the mass spectrum object. Default is True.
Returns
- MassSpecProfile | MassSpecCentroid: The MassSpecProfile or MassSpecCentroid object containing the parsed mass spectrum.
1606 def get_mass_spectra_obj(self): 1607 """Instatiate a MassSpectraBase object from the binary data file file. 1608 1609 Returns 1610 ------- 1611 MassSpectraBase 1612 The MassSpectra object containing the parsed mass spectra. The object is instatiated with the mzML file, analyzer, instrument, sample name, and scan dataframe. 1613 """ 1614 _, scan_df = self.run(spectra="none") 1615 mass_spectra_obj = MassSpectraBase( 1616 self.file_location, 1617 self.analyzer, 1618 self.instrument_label, 1619 self.sample_name, 1620 self, 1621 ) 1622 scan_df = scan_df.set_index("scan", drop=False) 1623 mass_spectra_obj.scan_df = scan_df 1624 1625 return mass_spectra_obj
Instatiate a MassSpectraBase object from the binary data file file.
Returns
- MassSpectraBase: The MassSpectra object containing the parsed mass spectra. The object is instatiated with the mzML file, analyzer, instrument, sample name, and scan dataframe.
1627 def get_lcms_obj(self, spectra="all"): 1628 """Instatiates a LCMSBase object from the mzML file. 1629 1630 Parameters 1631 ---------- 1632 spectra : str, optional 1633 Which mass spectra data to include in the output. Default is "all". Other options: "none", "ms1", "ms2". 1634 1635 Returns 1636 ------- 1637 LCMSBase 1638 LCMS object containing mass spectra data. The object is instatiated with the file location, analyzer, instrument, sample name, scan info, mz dataframe (as specifified), polarity, as well as the attributes holding the scans, retention times, and tics. 1639 """ 1640 _, scan_df = self.run(spectra="none") # first run it to just get scan info 1641 res, scan_df = self.run( 1642 scan_df=scan_df, spectra=spectra 1643 ) # second run to parse data 1644 lcms_obj = LCMSBase( 1645 self.file_location, 1646 self.analyzer, 1647 self.instrument_label, 1648 self.sample_name, 1649 self, 1650 ) 1651 if spectra != "none": 1652 for key in res: 1653 key_int = int(key.replace("ms", "")) 1654 res[key] = res[key][res[key].intensity > 0] 1655 res[key] = ( 1656 res[key].sort_values(by=["scan", "mz"]).reset_index(drop=True) 1657 ) 1658 lcms_obj._ms_unprocessed[key_int] = res[key] 1659 lcms_obj.scan_df = scan_df.set_index("scan", drop=False) 1660 # Check if polarity is mixed 1661 if len(set(scan_df.polarity)) > 1: 1662 raise ValueError("Mixed polarities detected in scan data") 1663 lcms_obj.polarity = scan_df.polarity[0] 1664 lcms_obj._scans_number_list = list(scan_df.scan) 1665 lcms_obj._retention_time_list = list(scan_df.scan_time) 1666 lcms_obj._tic_list = list(scan_df.tic) 1667 1668 return lcms_obj
Instatiates a LCMSBase object from the mzML file.
Parameters
- spectra (str, optional): Which mass spectra data to include in the output. Default is "all". Other options: "none", "ms1", "ms2".
Returns
- LCMSBase: LCMS object containing mass spectra data. The object is instatiated with the file location, analyzer, instrument, sample name, scan info, mz dataframe (as specifified), polarity, as well as the attributes holding the scans, retention times, and tics.
1670 def get_icr_transient_times(self): 1671 """Return a list for transient time targets for all scans, or selected scans range 1672 1673 Notes 1674 -------- 1675 Resolving Power and Transient time targets based on 7T FT-ICR MS system 1676 """ 1677 1678 res_trans_time = { 1679 "50": 0.384, 1680 "100000": 0.768, 1681 "200000": 1.536, 1682 "400000": 3.072, 1683 "750000": 6.144, 1684 "1000000": 12.288, 1685 } 1686 1687 firstScanNumber = self.start_scan 1688 1689 lastScanNumber = self.end_scan 1690 1691 transient_time_list = [] 1692 1693 for scan in range(firstScanNumber, lastScanNumber): 1694 scan_header = self.get_scan_header(scan) 1695 1696 rp_target = scan_header["FT Resolution:"] 1697 1698 transient_time = res_trans_time.get(rp_target) 1699 1700 transient_time_list.append(transient_time) 1701 1702 # print(transient_time, rp_target) 1703 1704 return transient_time_list
Return a list for transient time targets for all scans, or selected scans range
Notes
Resolving Power and Transient time targets based on 7T FT-ICR MS system
Inherited Members
- ThermoBaseClass
- iRawDataPlus
- res
- file_path
- iFileHeader
- parameters
- chromatogram_settings
- scans
- start_scan
- end_scan
- set_msordertype
- get_instrument_info
- get_creation_time
- remove_temp_file
- close_file
- get_polarity_mode
- get_filter_for_scan_num
- get_ms_level_for_scan_num
- check_full_scan
- get_all_filters
- get_scan_header
- get_rt_time_from_trace
- get_eics
- get_tic
- get_average_mass_spectrum
- set_metadata
- get_instrument_methods
- get_tune_method
- get_status_log
- get_error_logs
- get_sample_information
- get_instrument_data
- get_centroid_msms_data
- get_average_mass_spectrum_by_scanlist