corems.mass_spectra.input.rawFileReader
1__author__ = "Yuri E. Corilo" 2__date__ = "Jun 09, 2021" 3 4 5from warnings import warn 6import warnings 7from collections import defaultdict 8 9from matplotlib import axes 10from corems.encapsulation.factory.processingSetting import LiquidChromatographSetting 11 12import numpy as np 13import sys 14import site 15from pathlib import Path 16import datetime 17import importlib.util 18import os 19 20import clr 21import pandas as pd 22from s3path import S3Path 23 24 25from typing import Any, Dict, List, Optional, Tuple 26from corems.encapsulation.constant import Labels 27from corems.mass_spectra.factory.lc_class import MassSpectraBase, LCMSBase 28from corems.mass_spectra.factory.chromat_data import EIC_Data, TIC_Data 29from corems.mass_spectrum.factory.MassSpectrumClasses import ( 30 MassSpecProfile, 31 MassSpecCentroid, 32) 33from corems.encapsulation.factory.parameters import LCMSParameters, default_parameters 34from corems.mass_spectra.input.parserbase import SpectraParserInterface 35 36# Add the path of the Thermo .NET libraries to the system path 37spec = importlib.util.find_spec("corems") 38sys.path.append(str(Path(os.path.dirname(spec.origin)).parent) + "/ext_lib/dotnet/") 39 40clr.AddReference("ThermoFisher.CommonCore.RawFileReader") 41clr.AddReference("ThermoFisher.CommonCore.Data") 42clr.AddReference("ThermoFisher.CommonCore.MassPrecisionEstimator") 43 44from ThermoFisher.CommonCore.RawFileReader import RawFileReaderAdapter 45from ThermoFisher.CommonCore.Data import ToleranceUnits, Extensions 46from ThermoFisher.CommonCore.Data.Business import ( 47 ChromatogramTraceSettings, 48 TraceType, 49 MassOptions, 50) 51from ThermoFisher.CommonCore.Data.Business import ChromatogramSignal, Range 52from ThermoFisher.CommonCore.Data.Business import Device 53from ThermoFisher.CommonCore.Data.Interfaces import IChromatogramSettings 54from ThermoFisher.CommonCore.Data.Business import MassOptions, FileHeaderReaderFactory 55from ThermoFisher.CommonCore.Data.FilterEnums import MSOrderType 56from System.Collections.Generic import List 57 58 59class ThermoBaseClass: 60 """Class for parsing Thermo Raw files and extracting information from them. 61 62 Parameters: 63 ----------- 64 file_location : str or pathlib.Path or s3path.S3Path 65 Thermo Raw file path or S3 path. 66 67 Attributes: 68 ----------- 69 file_path : str or pathlib.Path or s3path.S3Path 70 The file path of the Thermo Raw file. 71 parameters : LCMSParameters 72 The LCMS parameters for the Thermo Raw file. 73 chromatogram_settings : LiquidChromatographSetting 74 The chromatogram settings for the Thermo Raw file. 75 scans : list or tuple 76 The selected scans for the Thermo Raw file. 77 start_scan : int 78 The starting scan number for the Thermo Raw file. 79 end_scan : int 80 The ending scan number for the Thermo Raw file. 81 82 Methods: 83 -------- 84 * set_msordertype(scanFilter, mstype: str = 'ms1') -> scanFilter 85 Convert the user-passed MS Type string to a Thermo MSOrderType object. 86 * get_creation_time() -> datetime.datetime 87 Extract the creation date stamp from the .RAW file and return it as a formatted datetime object. 88 * remove_temp_file() 89 Remove the temporary file if the path is from S3Path. 90 * get_polarity_mode(scan_number: int) -> int 91 Get the polarity mode for the given scan number. 92 * get_filter_for_scan_num(scan_number: int) -> List[str] 93 Get the filter for the given scan number. 94 * check_full_scan(scan_number: int) -> bool 95 Check if the given scan number is a full scan. 96 * get_all_filters() -> Tuple[Dict[int, str], List[str]] 97 Get all scan filters for the Thermo Raw file. 98 * get_scan_header(scan: int) -> Dict[str, Any] 99 Get the full dictionary of scan header metadata for the given scan number. 100 * get_rt_time_from_trace(trace) -> Tuple[List[float], List[float], List[int]] 101 Get the retention time, intensity, and scan number from the given trace. 102 * get_eics(target_mzs: List[float], tic_data: Dict[str, Any], ms_type: str = 'MS !d', 103 peak_detection: bool = True, smooth: bool = True, plot: bool = False, 104 ax: Optional[matplotlib.axes.Axes] = None, legend: bool = False) -> Tuple[Dict[float, EIC_Data], matplotlib.axes.Axes] 105 Get the extracted ion chromatograms (EICs) for the target m/z values. 106 107 """ 108 109 def __init__(self, file_location): 110 """file_location: srt pathlib.Path or s3path.S3Path 111 Thermo Raw file path 112 """ 113 # Thread.__init__(self) 114 if isinstance(file_location, str): 115 file_path = Path(file_location) 116 117 elif isinstance(file_location, S3Path): 118 temp_dir = Path("tmp/") 119 temp_dir.mkdir(exist_ok=True) 120 121 file_path = temp_dir / file_location.name 122 with open(file_path, "wb") as fh: 123 fh.write(file_location.read_bytes()) 124 125 else: 126 file_path = file_location 127 128 self.iRawDataPlus = RawFileReaderAdapter.FileFactory(str(file_path)) 129 130 if not self.iRawDataPlus.IsOpen: 131 raise FileNotFoundError( 132 "Unable to access the RAW file using the RawFileReader class!" 133 ) 134 135 # Check for any errors in the RAW file 136 if self.iRawDataPlus.IsError: 137 raise IOError( 138 "Error opening ({}) - {}".format(self.iRawDataPlus.FileError, file_path) 139 ) 140 141 self.res = self.iRawDataPlus.SelectInstrument(Device.MS, 1) 142 143 self.file_path = file_location 144 self.iFileHeader = FileHeaderReaderFactory.ReadFile(str(file_path)) 145 146 # removing tmp file 147 148 self._init_settings() 149 150 def _init_settings(self): 151 """ 152 Initialize the LCMSParameters object. 153 """ 154 self._parameters = LCMSParameters() 155 156 @property 157 def parameters(self) -> LCMSParameters: 158 """ 159 Get or set the LCMSParameters object. 160 """ 161 return self._parameters 162 163 @parameters.setter 164 def parameters(self, instance_LCMSParameters: LCMSParameters): 165 self._parameters = instance_LCMSParameters 166 167 @property 168 def chromatogram_settings(self) -> LiquidChromatographSetting: 169 """ 170 Get or set the LiquidChromatographSetting object. 171 """ 172 return self.parameters.lc_ms 173 174 @chromatogram_settings.setter 175 def chromatogram_settings( 176 self, instance_LiquidChromatographSetting: LiquidChromatographSetting 177 ): 178 self.parameters.lc_ms = instance_LiquidChromatographSetting 179 180 @property 181 def scans(self) -> list | tuple: 182 """scans : list or tuple 183 If list uses Thermo AverageScansInScanRange for selected scans, ortherwise uses Thermo AverageScans for a scan range 184 """ 185 return self.chromatogram_settings.scans 186 187 @property 188 def start_scan(self) -> int: 189 """ 190 Get the starting scan number for the Thermo Raw file. 191 """ 192 if self.scans[0] == -1: 193 return self.iRawDataPlus.RunHeaderEx.FirstSpectrum 194 else: 195 return self.scans[0] 196 197 @property 198 def end_scan(self) -> int: 199 """ 200 Get the ending scan number for the Thermo Raw file. 201 """ 202 if self.scans[-1] == -1: 203 return self.iRawDataPlus.RunHeaderEx.LastSpectrum 204 else: 205 return self.scans[-1] 206 207 def set_msordertype(self, scanFilter, mstype: str = "ms1"): 208 """ 209 Function to convert user passed string MS Type to Thermo MSOrderType object 210 Limited to MS1 through MS10. 211 212 Parameters: 213 ----------- 214 scanFilter : Thermo.ScanFilter 215 The scan filter object. 216 mstype : str, optional 217 The MS Type string, by default 'ms1' 218 219 """ 220 mstype = mstype.upper() 221 # Check that a valid mstype is passed 222 if (int(mstype.split("MS")[1]) > 10) or (int(mstype.split("MS")[1]) < 1): 223 warn("MS Type not valid, must be between MS1 and MS10") 224 225 msordertypedict = { 226 "MS1": MSOrderType.Ms, 227 "MS2": MSOrderType.Ms2, 228 "MS3": MSOrderType.Ms3, 229 "MS4": MSOrderType.Ms4, 230 "MS5": MSOrderType.Ms5, 231 "MS6": MSOrderType.Ms6, 232 "MS7": MSOrderType.Ms7, 233 "MS8": MSOrderType.Ms8, 234 "MS9": MSOrderType.Ms9, 235 "MS10": MSOrderType.Ms10, 236 } 237 scanFilter.MSOrder = msordertypedict[mstype] 238 return scanFilter 239 240 def get_creation_time(self) -> datetime.datetime: 241 """ 242 Extract the creation date stamp from the .RAW file 243 Return formatted creation date stamp. 244 245 """ 246 credate = self.iRawDataPlus.CreationDate.get_Ticks() 247 credate = datetime.datetime(1, 1, 1) + datetime.timedelta( 248 microseconds=credate / 10 249 ) 250 return credate 251 252 def remove_temp_file(self) -> None: 253 """if the path is from S3Path data cannot be serialized to io.ByteStream and 254 a temporary copy is stored at the temp dir 255 use this function only at the end of your execution scrip 256 some LCMS class methods depend on this file 257 """ 258 259 self.file_path.unlink() 260 261 def close_file(self) -> None: 262 """ 263 Close the Thermo Raw file. 264 """ 265 self.iRawDataPlus.Dispose() 266 267 def get_polarity_mode(self, scan_number: int) -> int: 268 """ 269 Get the polarity mode for the given scan number. 270 271 Parameters: 272 ----------- 273 scan_number : int 274 The scan number. 275 276 Raises: 277 ------- 278 Exception 279 If the polarity mode is unknown. 280 281 """ 282 polarity_symbol = self.get_filter_for_scan_num(scan_number)[1] 283 284 if polarity_symbol == "+": 285 return 1 286 # return 'POSITIVE_ION_MODE' 287 288 elif polarity_symbol == "-": 289 return -1 290 291 else: 292 raise Exception("Polarity Mode Unknown, please set it manually") 293 294 def get_filter_for_scan_num(self, scan_number: int) -> List[str]: 295 """ 296 Returns the closest matching run time that corresponds to scan_number for the current 297 controller. This function is only supported for MS device controllers. 298 e.g. ['FTMS', '-', 'p', 'NSI', 'Full', 'ms', '[200.00-1000.00]'] 299 300 Parameters: 301 ----------- 302 scan_number : int 303 The scan number. 304 305 """ 306 scan_label = self.iRawDataPlus.GetScanEventStringForScanNumber(scan_number) 307 308 return str(scan_label).split() 309 310 def get_ms_level_for_scan_num(self, scan_number: int) -> str: 311 """ 312 Get the MS order for the given scan number. 313 314 Parameters: 315 ----------- 316 scan_number : int 317 The scan number 318 319 Returns: 320 -------- 321 int 322 The MS order type (1 for MS, 2 for MS2, etc.) 323 """ 324 scan_filter = self.iRawDataPlus.GetFilterForScanNumber(scan_number) 325 326 msordertype = { 327 MSOrderType.Ms: 1, 328 MSOrderType.Ms2: 2, 329 MSOrderType.Ms3: 3, 330 MSOrderType.Ms4: 4, 331 MSOrderType.Ms5: 5, 332 MSOrderType.Ms6: 6, 333 MSOrderType.Ms7: 7, 334 MSOrderType.Ms8: 8, 335 MSOrderType.Ms9: 9, 336 MSOrderType.Ms10: 10, 337 } 338 339 if scan_filter.MSOrder in msordertype: 340 return msordertype[scan_filter.MSOrder] 341 else: 342 raise Exception("MS Order Type not found") 343 344 def check_full_scan(self, scan_number: int) -> bool: 345 # scan_filter.ScanMode 0 = FULL 346 scan_filter = self.iRawDataPlus.GetFilterForScanNumber(scan_number) 347 348 return scan_filter.ScanMode == MSOrderType.Ms 349 350 def get_all_filters(self) -> Tuple[Dict[int, str], List[str]]: 351 """ 352 Get all scan filters. 353 This function is only supported for MS device controllers. 354 e.g. ['FTMS', '-', 'p', 'NSI', 'Full', 'ms', '[200.00-1000.00]'] 355 356 """ 357 358 scanrange = range(self.start_scan, self.end_scan + 1) 359 scanfiltersdic = {} 360 scanfilterslist = [] 361 for scan_number in scanrange: 362 scan_label = self.iRawDataPlus.GetScanEventStringForScanNumber(scan_number) 363 scanfiltersdic[scan_number] = scan_label 364 scanfilterslist.append(scan_label) 365 scanfilterset = list(set(scanfilterslist)) 366 return scanfiltersdic, scanfilterset 367 368 def get_scan_header(self, scan: int) -> Dict[str, Any]: 369 """ 370 Get full dictionary of scan header meta data, i.e. AGC status, ion injection time, etc. 371 372 Parameters: 373 ----------- 374 scan : int 375 The scan number. 376 377 """ 378 header = self.iRawDataPlus.GetTrailerExtraInformation(scan) 379 380 header_dic = {} 381 for i in range(header.Length): 382 header_dic.update({header.Labels[i]: header.Values[i]}) 383 return header_dic 384 385 @staticmethod 386 def get_rt_time_from_trace(trace) -> Tuple[List[float], List[float], List[int]]: 387 """trace: ThermoFisher.CommonCore.Data.Business.ChromatogramSignal""" 388 return list(trace.Times), list(trace.Intensities), list(trace.Scans) 389 390 def get_eics( 391 self, 392 target_mzs: List[float], 393 tic_data: Dict[str, Any], 394 ms_type="MS !d", 395 peak_detection=False, 396 smooth=False, 397 plot=False, 398 ax: Optional[axes.Axes] = None, 399 legend=False, 400 ) -> Tuple[Dict[float, EIC_Data], axes.Axes]: 401 """ms_type: str ('MS', MS2') 402 start_scan: int default -1 will select the lowest available 403 end_scan: int default -1 will select the highest available 404 405 returns: 406 407 chroma: dict{target_mz: EIC_Data( 408 Scans: [int] 409 original thermo scan numbers 410 Time: [floats] 411 list of retention times 412 TIC: [floats] 413 total ion chromatogram 414 Apexes: [int] 415 original thermo apex scan number after peak picking 416 ) 417 418 """ 419 # If peak_detection or smooth is True, raise exception 420 if peak_detection or smooth: 421 raise Exception("Peak detection and smoothing are no longer implemented in this function") 422 423 options = MassOptions() 424 options.ToleranceUnits = ToleranceUnits.ppm 425 options.Tolerance = self.chromatogram_settings.eic_tolerance_ppm 426 427 all_chroma_settings = [] 428 429 for target_mz in target_mzs: 430 settings = ChromatogramTraceSettings(TraceType.MassRange) 431 settings.Filter = ms_type 432 settings.MassRanges = [Range(target_mz, target_mz)] 433 434 chroma_settings = IChromatogramSettings(settings) 435 436 all_chroma_settings.append(chroma_settings) 437 438 # chroma_settings2 = IChromatogramSettings(settings) 439 # print(chroma_settings.FragmentMass) 440 # print(chroma_settings.FragmentMass) 441 # print(chroma_settings) 442 # print(chroma_settings) 443 444 data = self.iRawDataPlus.GetChromatogramData( 445 all_chroma_settings, self.start_scan, self.end_scan, options 446 ) 447 448 traces = ChromatogramSignal.FromChromatogramData(data) 449 450 chroma = {} 451 452 if plot: 453 from matplotlib.transforms import Bbox 454 import matplotlib.pyplot as plt 455 456 if not ax: 457 # ax = plt.gca() 458 # ax.clear() 459 fig, ax = plt.subplots() 460 461 else: 462 fig = plt.gcf() 463 464 # plt.show() 465 466 for i, trace in enumerate(traces): 467 if trace.Length > 0: 468 rt, eic, scans = self.get_rt_time_from_trace(trace) 469 if smooth: 470 eic = self.smooth_tic(eic) 471 472 chroma[target_mzs[i]] = EIC_Data(scans=scans, time=rt, eic=eic) 473 if plot: 474 ax.plot(rt, eic, label="{:.5f}".format(target_mzs[i])) 475 476 if peak_detection: 477 # max_eic = self.get_max_eic(chroma) 478 max_signal = max(tic_data.tic) 479 480 for eic_data in chroma.values(): 481 eic = eic_data.eic 482 time = eic_data.time 483 484 if len(eic) != len(tic_data.tic): 485 warn( 486 "The software assumes same lenth of TIC and EIC, this does not seems to be the case and the results mass spectrum selected by the scan number might not be correct" 487 ) 488 489 if eic.max() > 0: 490 centroid_eics = self.eic_centroid_detector(time, eic, max_signal) 491 eic_data.apexes = [i for i in centroid_eics] 492 493 if plot: 494 for peak_indexes in eic_data.apexes: 495 apex_index = peak_indexes[1] 496 ax.plot( 497 time[apex_index], 498 eic[apex_index], 499 marker="x", 500 linewidth=0, 501 ) 502 503 if plot: 504 ax.set_xlabel("Time (min)") 505 ax.set_ylabel("a.u.") 506 ax.set_title(ms_type + " EIC") 507 ax.tick_params(axis="both", which="major", labelsize=12) 508 ax.axes.spines["top"].set_visible(False) 509 ax.axes.spines["right"].set_visible(False) 510 511 if legend: 512 legend = ax.legend(loc="upper left", bbox_to_anchor=(1.02, 0, 0.07, 1)) 513 fig.subplots_adjust(right=0.76) 514 # ax.set_prop_cycle(color=plt.cm.gist_rainbow(np.linspace(0, 1, len(traces)))) 515 516 d = {"down": 30, "up": -30} 517 518 def func(evt): 519 if legend.contains(evt): 520 bbox = legend.get_bbox_to_anchor() 521 bbox = Bbox.from_bounds( 522 bbox.x0, bbox.y0 + d[evt.button], bbox.width, bbox.height 523 ) 524 tr = legend.axes.transAxes.inverted() 525 legend.set_bbox_to_anchor(bbox.transformed(tr)) 526 fig.canvas.draw_idle() 527 528 fig.canvas.mpl_connect("scroll_event", func) 529 return chroma, ax 530 else: 531 return chroma, None 532 rt = [] 533 tic = [] 534 scans = [] 535 for i in range(traces[0].Length): 536 # print(trace[0].HasBasePeakData,trace[0].EndTime ) 537 538 # print(" {} - {}, {}".format( i, trace[0].Times[i], trace[0].Intensities[i] )) 539 rt.append(traces[0].Times[i]) 540 tic.append(traces[0].Intensities[i]) 541 scans.append(traces[0].Scans[i]) 542 543 return traces 544 # plot_chroma(rt, tic) 545 # plt.show() 546 547 def get_tic( 548 self, 549 ms_type="MS !d", 550 peak_detection=False, # This wont work right now 551 smooth=False, # This wont work right now 552 plot=False, 553 ax=None, 554 trace_type="TIC", 555 ) -> Tuple[TIC_Data, axes.Axes]: 556 """ms_type: str ('MS !d', 'MS2', None) 557 if you use None you get all scans. 558 peak_detection: bool 559 smooth: bool 560 plot: bool 561 ax: matplotlib axis object 562 trace_type: str ('TIC','BPC') 563 564 returns: 565 chroma: dict 566 { 567 Scan: [int] 568 original thermo scan numberMS 569 Time: [floats] 570 list of retention times 571 TIC: [floats] 572 total ion chromatogram 573 Apexes: [int] 574 original thermo apex scan number after peak picking 575 } 576 """ 577 if trace_type == "TIC": 578 settings = ChromatogramTraceSettings(TraceType.TIC) 579 elif trace_type == "BPC": 580 settings = ChromatogramTraceSettings(TraceType.BasePeak) 581 else: 582 raise ValueError(f"{trace_type} undefined") 583 if ms_type == "all": 584 settings.Filter = None 585 else: 586 settings.Filter = ms_type 587 588 chroma_settings = IChromatogramSettings(settings) 589 590 data = self.iRawDataPlus.GetChromatogramData( 591 [chroma_settings], self.start_scan, self.end_scan 592 ) 593 594 trace = ChromatogramSignal.FromChromatogramData(data) 595 596 data = TIC_Data(time=[], scans=[], tic=[], bpc=[], apexes=[]) 597 598 if trace[0].Length > 0: 599 for i in range(trace[0].Length): 600 # print(trace[0].HasBasePeakData,trace[0].EndTime ) 601 602 # print(" {} - {}, {}".format( i, trace[0].Times[i], trace[0].Intensities[i] )) 603 data.time.append(trace[0].Times[i]) 604 data.tic.append(trace[0].Intensities[i]) 605 data.scans.append(trace[0].Scans[i]) 606 607 # print(trace[0].Scans[i]) 608 if smooth: 609 data.tic = self.smooth_tic(data.tic) 610 611 else: 612 data.tic = np.array(data.tic) 613 614 if peak_detection: 615 centroid_peak_indexes = [ 616 i for i in self.centroid_detector(data.time, data.tic) 617 ] 618 619 data.apexes = centroid_peak_indexes 620 621 if plot: 622 if not ax: 623 import matplotlib.pyplot as plt 624 625 ax = plt.gca() 626 # fig, ax = plt.subplots(figsize=(6, 3)) 627 628 ax.plot(data.time, data.tic, label=trace_type) 629 ax.set_xlabel("Time (min)") 630 ax.set_ylabel("a.u.") 631 if peak_detection: 632 for peak_indexes in data.apexes: 633 apex_index = peak_indexes[1] 634 ax.plot( 635 data.time[apex_index], 636 data.tic[apex_index], 637 marker="x", 638 linewidth=0, 639 ) 640 641 # plt.show() 642 if trace_type == "BPC": 643 data.bpc = data.tic 644 data.tic = [] 645 return data, ax 646 if trace_type == "BPC": 647 data.bpc = data.tic 648 data.tic = [] 649 return data, None 650 651 else: 652 return None, None 653 654 def get_average_mass_spectrum( 655 self, 656 spectrum_mode: str = "profile", 657 auto_process: bool = True, 658 ppm_tolerance: float = 5.0, 659 ms_type: str = "MS1", 660 ) -> MassSpecProfile | MassSpecCentroid: 661 """ 662 Averages mass spectra over a scan range using Thermo's AverageScansInScanRange method 663 or a scan list using Thermo's AverageScans method 664 spectrum_mode: str 665 centroid or profile mass spectrum 666 auto_process: bool 667 If true performs peak picking, and noise threshold calculation after creation of mass spectrum object 668 ms_type: str 669 String of form 'ms1' or 'ms2' or 'MS3' etc. Valid up to MS10. 670 Internal function converts to Thermo MSOrderType class. 671 672 """ 673 674 def get_profile_mass_spec(averageScan, d_params: dict, auto_process: bool): 675 mz_list = list(averageScan.SegmentedScan.Positions) 676 abund_list = list(averageScan.SegmentedScan.Intensities) 677 678 data_dict = { 679 Labels.mz: mz_list, 680 Labels.abundance: abund_list, 681 } 682 683 return MassSpecProfile(data_dict, d_params, auto_process=auto_process) 684 685 def get_centroid_mass_spec(averageScan, d_params: dict): 686 noise = list(averageScan.centroidScan.Noises) 687 688 baselines = list(averageScan.centroidScan.Baselines) 689 690 rp = list(averageScan.centroidScan.Resolutions) 691 692 magnitude = list(averageScan.centroidScan.Intensities) 693 694 mz = list(averageScan.centroidScan.Masses) 695 696 array_noise_std = (np.array(noise) - np.array(baselines)) / 3 697 l_signal_to_noise = np.array(magnitude) / array_noise_std 698 699 d_params["baseline_noise"] = np.average(array_noise_std) 700 701 d_params["baseline_noise_std"] = np.std(array_noise_std) 702 703 data_dict = { 704 Labels.mz: mz, 705 Labels.abundance: magnitude, 706 Labels.rp: rp, 707 Labels.s2n: list(l_signal_to_noise), 708 } 709 710 mass_spec = MassSpecCentroid(data_dict, d_params, auto_process=False) 711 712 return mass_spec 713 714 d_params = self.set_metadata( 715 firstScanNumber=self.start_scan, lastScanNumber=self.end_scan 716 ) 717 718 # Create the mass options object that will be used when averaging the scans 719 options = MassOptions() 720 options.ToleranceUnits = ToleranceUnits.ppm 721 options.Tolerance = ppm_tolerance 722 723 # Get the scan filter for the first scan. This scan filter will be used to located 724 # scans within the given scan range of the same type 725 scanFilter = self.iRawDataPlus.GetFilterForScanNumber(self.start_scan) 726 727 # force it to only look for the MSType 728 scanFilter = self.set_msordertype(scanFilter, ms_type) 729 730 if isinstance(self.scans, tuple): 731 averageScan = Extensions.AverageScansInScanRange( 732 self.iRawDataPlus, self.start_scan, self.end_scan, scanFilter, options 733 ) 734 735 if averageScan: 736 if spectrum_mode == "profile": 737 mass_spec = get_profile_mass_spec( 738 averageScan, d_params, auto_process 739 ) 740 741 return mass_spec 742 743 elif spectrum_mode == "centroid": 744 if averageScan.HasCentroidStream: 745 mass_spec = get_centroid_mass_spec(averageScan, d_params) 746 747 return mass_spec 748 749 else: 750 raise ValueError( 751 "No Centroind data available for the selected scans" 752 ) 753 else: 754 raise ValueError("spectrum_mode must be 'profile' or centroid") 755 else: 756 raise ValueError("No data found for the selected scans") 757 758 elif isinstance(self.scans, list): 759 d_params = self.set_metadata(scans_list=self.scans) 760 761 scans = List[int]() 762 for scan in self.scans: 763 scans.Add(scan) 764 765 averageScan = Extensions.AverageScans(self.iRawDataPlus, scans, options) 766 767 if averageScan: 768 if spectrum_mode == "profile": 769 mass_spec = get_profile_mass_spec( 770 averageScan, d_params, auto_process 771 ) 772 773 return mass_spec 774 775 elif spectrum_mode == "centroid": 776 if averageScan.HasCentroidStream: 777 mass_spec = get_centroid_mass_spec(averageScan, d_params) 778 779 return mass_spec 780 781 else: 782 raise ValueError( 783 "No Centroind data available for the selected scans" 784 ) 785 786 else: 787 raise ValueError("spectrum_mode must be 'profile' or centroid") 788 789 else: 790 raise ValueError("No data found for the selected scans") 791 792 else: 793 raise ValueError("scans must be a list intergers or a tuple if integers") 794 795 def set_metadata( 796 self, 797 firstScanNumber=0, 798 lastScanNumber=0, 799 scans_list=False, 800 label=Labels.thermo_profile, 801 ): 802 """ 803 Collect metadata to be ingested in the mass spectrum object 804 805 scans_list: list[int] or false 806 lastScanNumber: int 807 firstScanNumber: int 808 """ 809 810 d_params = default_parameters(self.file_path) 811 812 # assumes scans is full scan or reduced profile scan 813 814 d_params["label"] = label 815 816 if scans_list: 817 d_params["scan_number"] = scans_list 818 819 d_params["polarity"] = self.get_polarity_mode(scans_list[0]) 820 821 else: 822 d_params["scan_number"] = "{}-{}".format(firstScanNumber, lastScanNumber) 823 824 d_params["polarity"] = self.get_polarity_mode(firstScanNumber) 825 826 d_params["analyzer"] = self.iRawDataPlus.GetInstrumentData().Model 827 828 d_params["acquisition_time"] = self.get_creation_time() 829 830 d_params["instrument_label"] = self.iRawDataPlus.GetInstrumentData().Name 831 832 return d_params 833 834 def get_instrument_methods(self, parse_strings: bool = True): 835 """ 836 This function will extract the instrument methods embedded in the raw file 837 838 First it will check if there are any instrument methods, if not returning None 839 Then it will get the total number of instrument methods. 840 For each method, it will extract the plaintext string of the method and attempt to parse it into a dictionary 841 If this fails, it will return just the string object. 842 843 This has been tested on data from an Orbitrap ID-X with embedded MS and LC methods, but other instrument types may fail. 844 845 Parameters: 846 ----------- 847 parse_strings: bool 848 If True, will attempt to parse the instrument methods into a dictionary. If False, will return the raw string. 849 850 Returns: 851 -------- 852 List[Dict[str, Any]] or List 853 A list of dictionaries containing the instrument methods, or a list of strings if parsing fails. 854 """ 855 856 if not self.iRawDataPlus.HasInstrumentMethod: 857 raise ValueError( 858 "Raw Data file does not have any instrument methods attached" 859 ) 860 return None 861 else: 862 863 def parse_instrument_method(data): 864 lines = data.split("\r\n") 865 method = {} 866 current_section = None 867 sub_section = None 868 869 for line in lines: 870 if not line.strip(): # Skip empty lines 871 continue 872 if ( 873 line.startswith("----") 874 or line.endswith("Settings") 875 or line.endswith("Summary") 876 or line.startswith("Experiment") 877 or line.startswith("Scan Event") 878 ): 879 current_section = line.replace("-", "").strip() 880 method[current_section] = {} 881 sub_section = None 882 elif line.startswith("\t"): 883 if "\t\t" in line: 884 indent_level = line.count("\t") 885 key_value = line.strip() 886 887 if indent_level == 2: 888 if sub_section: 889 key, value = ( 890 key_value.split("=", 1) 891 if "=" in key_value 892 else (key_value, None) 893 ) 894 method[current_section][sub_section][ 895 key.strip() 896 ] = value.strip() if value else None 897 elif indent_level == 3: 898 scan_type, key_value = ( 899 key_value.split(" ", 1) 900 if " " in key_value 901 else (key_value, None) 902 ) 903 method.setdefault(current_section, {}).setdefault( 904 sub_section, {} 905 ).setdefault(scan_type, {}) 906 907 if key_value: 908 key, value = ( 909 key_value.split("=", 1) 910 if "=" in key_value 911 else (key_value, None) 912 ) 913 method[current_section][sub_section][scan_type][ 914 key.strip() 915 ] = value.strip() if value else None 916 else: 917 key_value = line.strip() 918 if "=" in key_value: 919 key, value = key_value.split("=", 1) 920 method.setdefault(current_section, {})[key.strip()] = ( 921 value.strip() 922 ) 923 else: 924 sub_section = key_value 925 else: 926 if ":" in line: 927 key, value = line.split(":", 1) 928 method[current_section][key.strip()] = value.strip() 929 else: 930 method[current_section][line] = {} 931 932 return method 933 934 count_instrument_methods = self.iRawDataPlus.InstrumentMethodsCount 935 # TODO make this code better... 936 instrument_methods = [] 937 for i in range(count_instrument_methods): 938 instrument_method_string = self.iRawDataPlus.GetInstrumentMethod(i) 939 if parse_strings: 940 try: 941 instrument_method_dict = parse_instrument_method( 942 instrument_method_string 943 ) 944 except: # if it fails for any reason 945 instrument_method_dict = instrument_method_string 946 else: 947 instrument_method_dict = instrument_method_string 948 instrument_methods.append(instrument_method_dict) 949 return instrument_methods 950 951 def get_tune_method(self): 952 """ 953 This code will extract the tune method from the raw file 954 It has been tested on data from a Thermo Orbitrap ID-X, Astral and Q-Exactive, but may fail on other instrument types. 955 It attempts to parse out section headers and sub-sections, but may not work for all instrument types. 956 It will also not return Labels (keys) where the value is blank 957 958 Returns: 959 -------- 960 Dict[str, Any] 961 A dictionary containing the tune method information 962 963 Raises: 964 ------- 965 ValueError 966 If no tune methods are found in the raw file 967 968 """ 969 tunemethodcount = self.iRawDataPlus.GetTuneDataCount() 970 if tunemethodcount == 0: 971 raise ValueError("No tune methods found in the raw data file") 972 return None 973 elif tunemethodcount > 1: 974 warnings.warn( 975 "Multiple tune methods found in the raw data file, returning the 1st" 976 ) 977 978 header = self.iRawDataPlus.GetTuneData(0) 979 980 header_dic = {} 981 current_section = None 982 983 for i in range(header.Length): 984 label = header.Labels[i] 985 value = header.Values[i] 986 987 # Check for section headers 988 if "===" in label or ( 989 (value == "" or value is None) and not label.endswith(":") 990 ): 991 # This is a section header 992 section_name = ( 993 label.replace("=", "").replace(":", "").strip() 994 ) # Clean the label if it contains '=' 995 header_dic[section_name] = {} 996 current_section = section_name 997 else: 998 if current_section: 999 header_dic[current_section][label] = value 1000 else: 1001 header_dic[label] = value 1002 return header_dic 1003 1004 def get_status_log(self, retention_time: float = 0): 1005 """ 1006 This code will extract the status logs from the raw file 1007 It has been tested on data from a Thermo Orbitrap ID-X, Astral and Q-Exactive, but may fail on other instrument types. 1008 It attempts to parse out section headers and sub-sections, but may not work for all instrument types. 1009 It will also not return Labels (keys) where the value is blank 1010 1011 Parameters: 1012 ----------- 1013 retention_time: float 1014 The retention time in minutes to extract the status log data from. 1015 Will use the closest retention time found. Default 0. 1016 1017 Returns: 1018 -------- 1019 Dict[str, Any] 1020 A dictionary containing the status log information 1021 1022 Raises: 1023 ------- 1024 ValueError 1025 If no status logs are found in the raw file 1026 1027 """ 1028 tunemethodcount = self.iRawDataPlus.GetStatusLogEntriesCount() 1029 if tunemethodcount == 0: 1030 raise ValueError("No status logs found in the raw data file") 1031 return None 1032 1033 header = self.iRawDataPlus.GetStatusLogForRetentionTime(retention_time) 1034 1035 header_dic = {} 1036 current_section = None 1037 1038 for i in range(header.Length): 1039 label = header.Labels[i] 1040 value = header.Values[i] 1041 1042 # Check for section headers 1043 if "===" in label or ( 1044 (value == "" or value is None) and not label.endswith(":") 1045 ): 1046 # This is a section header 1047 section_name = ( 1048 label.replace("=", "").replace(":", "").strip() 1049 ) # Clean the label if it contains '=' 1050 header_dic[section_name] = {} 1051 current_section = section_name 1052 else: 1053 if current_section: 1054 header_dic[current_section][label] = value 1055 else: 1056 header_dic[label] = value 1057 return header_dic 1058 1059 def get_error_logs(self): 1060 """ 1061 This code will extract the error logs from the raw file 1062 1063 Returns: 1064 -------- 1065 Dict[float, str] 1066 A dictionary containing the error log information with the retention time as the key 1067 1068 Raises: 1069 ------- 1070 ValueError 1071 If no error logs are found in the raw file 1072 """ 1073 1074 error_log_count = self.iRawDataPlus.RunHeaderEx.ErrorLogCount 1075 if error_log_count == 0: 1076 raise ValueError("No error logs found in the raw data file") 1077 return None 1078 1079 error_logs = {} 1080 1081 for i in range(error_log_count): 1082 error_log_item = self.iRawDataPlus.GetErrorLogItem(i) 1083 rt = error_log_item.RetentionTime 1084 message = error_log_item.Message 1085 # Use the index `i` as the unique ID key 1086 error_logs[i] = {"rt": rt, "message": message} 1087 return error_logs 1088 1089 def get_sample_information(self): 1090 """ 1091 This code will extract the sample information from the raw file 1092 1093 Returns: 1094 -------- 1095 Dict[str, Any] 1096 A dictionary containing the sample information 1097 Note that UserText field may not be handled properly and may need further processing 1098 """ 1099 sminfo = self.iRawDataPlus.SampleInformation 1100 smdict = {} 1101 smdict["Comment"] = sminfo.Comment 1102 smdict["SampleId"] = sminfo.SampleId 1103 smdict["SampleName"] = sminfo.SampleName 1104 smdict["Vial"] = sminfo.Vial 1105 smdict["InjectionVolume"] = sminfo.InjectionVolume 1106 smdict["Barcode"] = sminfo.Barcode 1107 smdict["BarcodeStatus"] = str(sminfo.BarcodeStatus) 1108 smdict["CalibrationLevel"] = sminfo.CalibrationLevel 1109 smdict["DilutionFactor"] = sminfo.DilutionFactor 1110 smdict["InstrumentMethodFile"] = sminfo.InstrumentMethodFile 1111 smdict["RawFileName"] = sminfo.RawFileName 1112 smdict["CalibrationFile"] = sminfo.CalibrationFile 1113 smdict["IstdAmount"] = sminfo.IstdAmount 1114 smdict["RowNumber"] = sminfo.RowNumber 1115 smdict["Path"] = sminfo.Path 1116 smdict["ProcessingMethodFile"] = sminfo.ProcessingMethodFile 1117 smdict["SampleType"] = str(sminfo.SampleType) 1118 smdict["SampleWeight"] = sminfo.SampleWeight 1119 smdict["UserText"] = { 1120 "UserText": [x for x in sminfo.UserText] 1121 } # [0] #This may not work - needs debugging with 1122 return smdict 1123 1124 def get_instrument_data(self): 1125 """ 1126 This code will extract the instrument data from the raw file 1127 1128 Returns: 1129 -------- 1130 Dict[str, Any] 1131 A dictionary containing the instrument data 1132 """ 1133 instrument_data = self.iRawDataPlus.GetInstrumentData() 1134 id_dict = {} 1135 id_dict["Name"] = instrument_data.Name 1136 id_dict["Model"] = instrument_data.Model 1137 id_dict["SerialNumber"] = instrument_data.SerialNumber 1138 id_dict["SoftwareVersion"] = instrument_data.SoftwareVersion 1139 id_dict["HardwareVersion"] = instrument_data.HardwareVersion 1140 id_dict["ChannelLabels"] = { 1141 "ChannelLabels": [x for x in instrument_data.ChannelLabels] 1142 } 1143 id_dict["Flags"] = instrument_data.Flags 1144 id_dict["AxisLabelY"] = instrument_data.AxisLabelY 1145 id_dict["AxisLabelX"] = instrument_data.AxisLabelX 1146 return id_dict 1147 1148 def get_centroid_msms_data(self, scan): 1149 """ 1150 .. deprecated:: 2.0 1151 This function will be removed in CoreMS 2.0. Please use `get_average_mass_spectrum()` instead for similar functionality. 1152 """ 1153 1154 warnings.warn( 1155 "The `get_centroid_msms_data()` is deprecated as of CoreMS 2.0 and will be removed in a future version. " 1156 "Please use `get_average_mass_spectrum()` instead.", 1157 DeprecationWarning, 1158 ) 1159 1160 d_params = self.set_metadata(scans_list=[scan], label=Labels.thermo_centroid) 1161 1162 centroidStream = self.iRawDataPlus.GetCentroidStream(scan, False) 1163 1164 noise = list(centroidStream.Noises) 1165 1166 baselines = list(centroidStream.Baselines) 1167 1168 rp = list(centroidStream.Resolutions) 1169 1170 magnitude = list(centroidStream.Intensities) 1171 1172 mz = list(centroidStream.Masses) 1173 1174 # charge = scans_labels[5] 1175 array_noise_std = (np.array(noise) - np.array(baselines)) / 3 1176 l_signal_to_noise = np.array(magnitude) / array_noise_std 1177 1178 d_params["baseline_noise"] = np.average(array_noise_std) 1179 1180 d_params["baseline_noise_std"] = np.std(array_noise_std) 1181 1182 data_dict = { 1183 Labels.mz: mz, 1184 Labels.abundance: magnitude, 1185 Labels.rp: rp, 1186 Labels.s2n: list(l_signal_to_noise), 1187 } 1188 1189 mass_spec = MassSpecCentroid(data_dict, d_params, auto_process=False) 1190 mass_spec.settings.noise_threshold_method = "relative_abundance" 1191 mass_spec.settings.noise_threshold_min_relative_abundance = 1 1192 mass_spec.process_mass_spec() 1193 return mass_spec 1194 1195 def get_average_mass_spectrum_by_scanlist( 1196 self, 1197 scans_list: List[int], 1198 auto_process: bool = True, 1199 ppm_tolerance: float = 5.0, 1200 ) -> MassSpecProfile: 1201 """ 1202 Averages selected scans mass spectra using Thermo's AverageScans method 1203 scans_list: list[int] 1204 auto_process: bool 1205 If true performs peak picking, and noise threshold calculation after creation of mass spectrum object 1206 Returns: 1207 MassSpecProfile 1208 1209 .. deprecated:: 2.0 1210 This function will be removed in CoreMS 2.0. Please use `get_average_mass_spectrum()` instead for similar functionality. 1211 """ 1212 1213 warnings.warn( 1214 "The `get_average_mass_spectrum_by_scanlist()` is deprecated as of CoreMS 2.0 and will be removed in a future version. " 1215 "Please use `get_average_mass_spectrum()` instead.", 1216 DeprecationWarning, 1217 ) 1218 1219 d_params = self.set_metadata(scans_list=scans_list) 1220 1221 # assumes scans is full scan or reduced profile scan 1222 1223 scans = List[int]() 1224 for scan in scans_list: 1225 scans.Add(scan) 1226 1227 # Create the mass options object that will be used when averaging the scans 1228 options = MassOptions() 1229 options.ToleranceUnits = ToleranceUnits.ppm 1230 options.Tolerance = ppm_tolerance 1231 1232 # Get the scan filter for the first scan. This scan filter will be used to located 1233 # scans within the given scan range of the same type 1234 1235 averageScan = Extensions.AverageScans(self.iRawDataPlus, scans, options) 1236 1237 len_data = averageScan.SegmentedScan.Positions.Length 1238 1239 mz_list = list(averageScan.SegmentedScan.Positions) 1240 abund_list = list(averageScan.SegmentedScan.Intensities) 1241 1242 data_dict = { 1243 Labels.mz: mz_list, 1244 Labels.abundance: abund_list, 1245 } 1246 1247 mass_spec = MassSpecProfile(data_dict, d_params, auto_process=auto_process) 1248 1249 return mass_spec 1250 1251 1252class ImportMassSpectraThermoMSFileReader(ThermoBaseClass, SpectraParserInterface): 1253 """A class for parsing Thermo RAW mass spectrometry data files and instatiating MassSpectraBase or LCMSBase objects 1254 1255 Parameters 1256 ---------- 1257 file_location : str or Path 1258 The path to the RAW file to be parsed. 1259 analyzer : str, optional 1260 The type of mass analyzer used in the instrument. Default is "Unknown". 1261 instrument_label : str, optional 1262 The name of the instrument used to acquire the data. Default is "Unknown". 1263 sample_name : str, optional 1264 The name of the sample being analyzed. If not provided, the stem of the file_location path will be used. 1265 1266 Attributes 1267 ---------- 1268 file_location : Path 1269 The path to the RAW file being parsed. 1270 analyzer : str 1271 The type of mass analyzer used in the instrument. 1272 instrument_label : str 1273 The name of the instrument used to acquire the data. 1274 sample_name : str 1275 The name of the sample being analyzed. 1276 1277 Methods 1278 ------- 1279 * run(spectra=True). 1280 Parses the RAW file and returns a dictionary of mass spectra dataframes and a scan metadata dataframe. 1281 * get_mass_spectrum_from_scan(scan_number, polarity, auto_process=True) 1282 Parses the RAW file and returns a MassSpecBase object from a single scan. 1283 * get_mass_spectra_obj(). 1284 Parses the RAW file and instantiates a MassSpectraBase object. 1285 * get_lcms_obj(). 1286 Parses the RAW file and instantiates an LCMSBase object. 1287 * get_icr_transient_times(). 1288 Return a list for transient time targets for all scans, or selected scans range 1289 1290 Inherits from ThermoBaseClass and SpectraParserInterface 1291 """ 1292 1293 def __init__( 1294 self, 1295 file_location, 1296 analyzer="Unknown", 1297 instrument_label="Unknown", 1298 sample_name=None, 1299 ): 1300 super().__init__(file_location) 1301 if isinstance(file_location, str): 1302 # if obj is a string it defaults to create a Path obj, pass the S3Path if needed 1303 file_location = Path(file_location) 1304 if not file_location.exists(): 1305 raise FileExistsError("File does not exist: " + str(file_location)) 1306 1307 self.file_location = file_location 1308 self.analyzer = analyzer 1309 self.instrument_label = instrument_label 1310 1311 if sample_name: 1312 self.sample_name = sample_name 1313 else: 1314 self.sample_name = file_location.stem 1315 1316 def load(self): 1317 pass 1318 1319 def get_scan_df(self): 1320 # This automatically brings in all the data 1321 self.chromatogram_settings.scans = (-1, -1) 1322 1323 # Get scan df info; starting with TIC data 1324 tic_data, _ = self.get_tic(ms_type="all", peak_detection=False, smooth=False) 1325 tic_data = { 1326 "scan": tic_data.scans, 1327 "scan_time": tic_data.time, 1328 "tic": tic_data.tic, 1329 } 1330 scan_df = pd.DataFrame.from_dict(tic_data) 1331 scan_df["ms_level"] = None 1332 1333 # get scan text 1334 scan_filter_df = pd.DataFrame.from_dict( 1335 self.get_all_filters()[0], orient="index" 1336 ) 1337 scan_filter_df.reset_index(inplace=True) 1338 scan_filter_df.rename(columns={"index": "scan", 0: "scan_text"}, inplace=True) 1339 1340 scan_df = scan_df.merge(scan_filter_df, on="scan", how="left") 1341 scan_df["scan_window_lower"] = scan_df.scan_text.str.extract( 1342 r"\[(\d+\.\d+)-\d+\.\d+\]" 1343 ) 1344 scan_df["scan_window_upper"] = scan_df.scan_text.str.extract( 1345 r"\[\d+\.\d+-(\d+\.\d+)\]" 1346 ) 1347 scan_df["polarity"] = np.where( 1348 scan_df.scan_text.str.contains(" - "), "negative", "positive" 1349 ) 1350 scan_df["precursor_mz"] = scan_df.scan_text.str.extract(r"(\d+\.\d+)@") 1351 scan_df["precursor_mz"] = scan_df["precursor_mz"].astype(float) 1352 1353 # Assign each scan as centroid or profile and add ms_level 1354 scan_df["ms_format"] = None 1355 for i in scan_df.scan.to_list(): 1356 scan_df.loc[scan_df.scan == i, "ms_level"] = self.get_ms_level_for_scan_num(i) 1357 if self.iRawDataPlus.IsCentroidScanFromScanNumber(i): 1358 scan_df.loc[scan_df.scan == i, "ms_format"] = "centroid" 1359 else: 1360 scan_df.loc[scan_df.scan == i, "ms_format"] = "profile" 1361 1362 return scan_df 1363 1364 def get_ms_raw(self, spectra, scan_df): 1365 if spectra == "all": 1366 scan_df_forspec = scan_df 1367 elif spectra == "ms1": 1368 scan_df_forspec = scan_df[scan_df.ms_level == 1] 1369 elif spectra == "ms2": 1370 scan_df_forspec = scan_df[scan_df.ms_level == 2] 1371 else: 1372 raise ValueError("spectra must be 'none', 'all', 'ms1', or 'ms2'") 1373 1374 # Result container 1375 res = {} 1376 1377 # Row count container 1378 counter = {} 1379 1380 # Column name container 1381 cols = {} 1382 1383 # set at float32 1384 dtype = np.float32 1385 1386 # First pass: get nrows 1387 N = defaultdict(lambda: 0) 1388 for i in scan_df_forspec.scan.to_list(): 1389 level = scan_df_forspec.loc[scan_df_forspec.scan == i, "ms_level"].values[0] 1390 scanStatistics = self.iRawDataPlus.GetScanStatsForScanNumber(i) 1391 profileStream = self.iRawDataPlus.GetSegmentedScanFromScanNumber( 1392 i, scanStatistics 1393 ) 1394 abun = list(profileStream.Intensities) 1395 abun = np.array(abun)[np.where(np.array(abun) > 0)[0]] 1396 1397 N[level] += len(abun) 1398 1399 # Second pass: parse 1400 for i in scan_df_forspec.scan.to_list(): 1401 scanStatistics = self.iRawDataPlus.GetScanStatsForScanNumber(i) 1402 profileStream = self.iRawDataPlus.GetSegmentedScanFromScanNumber( 1403 i, scanStatistics 1404 ) 1405 abun = list(profileStream.Intensities) 1406 mz = list(profileStream.Positions) 1407 1408 # Get index of abun that are > 0 1409 inx = np.where(np.array(abun) > 0)[0] 1410 mz = np.array(mz)[inx] 1411 mz = np.float32(mz) 1412 abun = np.array(abun)[inx] 1413 abun = np.float32(abun) 1414 1415 level = scan_df_forspec.loc[scan_df_forspec.scan == i, "ms_level"].values[0] 1416 1417 # Number of rows 1418 n = len(mz) 1419 1420 # No measurements 1421 if n == 0: 1422 continue 1423 1424 # Dimension check 1425 if len(mz) != len(abun): 1426 warnings.warn("m/z and intensity array dimension mismatch") 1427 continue 1428 1429 # Scan/frame info 1430 id_dict = i 1431 1432 # Columns 1433 cols[level] = ["scan", "mz", "intensity"] 1434 m = len(cols[level]) 1435 1436 # Subarray init 1437 arr = np.empty((n, m), dtype=dtype) 1438 inx = 0 1439 1440 # Populate scan/frame info 1441 arr[:, inx] = i 1442 inx += 1 1443 1444 # Populate m/z 1445 arr[:, inx] = mz 1446 inx += 1 1447 1448 # Populate intensity 1449 arr[:, inx] = abun 1450 inx += 1 1451 1452 # Initialize output container 1453 if level not in res: 1454 res[level] = np.empty((N[level], m), dtype=dtype) 1455 counter[level] = 0 1456 1457 # Insert subarray 1458 res[level][counter[level] : counter[level] + n, :] = arr 1459 counter[level] += n 1460 1461 # Construct ms1 and ms2 mz dataframes 1462 for level in res.keys(): 1463 res[level] = pd.DataFrame(res[level]) 1464 res[level].columns = cols[level] 1465 # rename keys in res to add 'ms' prefix 1466 res = {f"ms{key}": value for key, value in res.items()} 1467 1468 return res 1469 1470 def run(self, spectra="all", scan_df=None): 1471 """ 1472 Extracts mass spectra data from a raw file. 1473 1474 Parameters 1475 ---------- 1476 spectra : str, optional 1477 Which mass spectra data to include in the output. Default is all. Other options: none, ms1, ms2. 1478 scan_df : pandas.DataFrame, optional 1479 Scan dataframe. If not provided, the scan dataframe is created from the mzML file. 1480 1481 Returns 1482 ------- 1483 tuple 1484 A tuple containing two elements: 1485 - A dictionary containing mass spectra data, separated by MS level. 1486 - A pandas DataFrame containing scan information, including scan number, scan time, TIC, MS level, 1487 scan text, scan window lower and upper bounds, polarity, and precursor m/z (if applicable). 1488 """ 1489 # Prepare scan_df 1490 if scan_df is None: 1491 scan_df = self.get_scan_df() 1492 1493 # Prepare mass spectra data 1494 if spectra != "none": 1495 res = self.get_ms_raw(spectra=spectra, scan_df=scan_df) 1496 else: 1497 res = None 1498 1499 return res, scan_df 1500 1501 def get_mass_spectrum_from_scan( 1502 self, scan_number, spectrum_mode, auto_process=True 1503 ): 1504 """Instatiate a MassSpecBase object from a single scan number from the binary file, currently only supports profile mode. 1505 1506 Parameters 1507 ---------- 1508 scan_number : int 1509 The scan number to extract the mass spectrum from. 1510 polarity : int 1511 The polarity of the scan. 1 for positive mode, -1 for negative mode. 1512 spectrum_mode : str 1513 The type of mass spectrum to extract. Must be 'profile' or 'centroid'. 1514 auto_process : bool, optional 1515 If True, perform peak picking and noise threshold calculation after creating the mass spectrum object. Default is True. 1516 1517 Returns 1518 ------- 1519 MassSpecProfile | MassSpecCentroid 1520 The MassSpecProfile or MassSpecCentroid object containing the parsed mass spectrum. 1521 """ 1522 1523 if spectrum_mode == "profile": 1524 scanStatistics = self.iRawDataPlus.GetScanStatsForScanNumber(scan_number) 1525 profileStream = self.iRawDataPlus.GetSegmentedScanFromScanNumber( 1526 scan_number, scanStatistics 1527 ) 1528 abun = list(profileStream.Intensities) 1529 mz = list(profileStream.Positions) 1530 data_dict = { 1531 Labels.mz: mz, 1532 Labels.abundance: abun, 1533 } 1534 d_params = self.set_metadata( 1535 firstScanNumber=scan_number, 1536 lastScanNumber=scan_number, 1537 scans_list=False, 1538 label=Labels.thermo_profile, 1539 ) 1540 mass_spectrum_obj = MassSpecProfile( 1541 data_dict, d_params, auto_process=auto_process 1542 ) 1543 1544 elif spectrum_mode == "centroid": 1545 centroid_scan = self.iRawDataPlus.GetCentroidStream(scan_number, False) 1546 if centroid_scan.Masses is not None: 1547 mz = list(centroid_scan.Masses) 1548 abun = list(centroid_scan.Intensities) 1549 rp = list(centroid_scan.Resolutions) 1550 magnitude = list(centroid_scan.Intensities) 1551 noise = list(centroid_scan.Noises) 1552 baselines = list(centroid_scan.Baselines) 1553 array_noise_std = (np.array(noise) - np.array(baselines)) / 3 1554 l_signal_to_noise = np.array(magnitude) / array_noise_std 1555 data_dict = { 1556 Labels.mz: mz, 1557 Labels.abundance: abun, 1558 Labels.rp: rp, 1559 Labels.s2n: list(l_signal_to_noise), 1560 } 1561 else: # For CID MS2, the centroid data are stored in the profile data location, they do not have any associated rp or baseline data, but they should be treated as centroid data 1562 scanStatistics = self.iRawDataPlus.GetScanStatsForScanNumber( 1563 scan_number 1564 ) 1565 profileStream = self.iRawDataPlus.GetSegmentedScanFromScanNumber( 1566 scan_number, scanStatistics 1567 ) 1568 abun = list(profileStream.Intensities) 1569 mz = list(profileStream.Positions) 1570 data_dict = { 1571 Labels.mz: mz, 1572 Labels.abundance: abun, 1573 Labels.rp: [np.nan] * len(mz), 1574 Labels.s2n: [np.nan] * len(mz), 1575 } 1576 d_params = self.set_metadata( 1577 firstScanNumber=scan_number, 1578 lastScanNumber=scan_number, 1579 scans_list=False, 1580 label=Labels.thermo_centroid, 1581 ) 1582 mass_spectrum_obj = MassSpecCentroid( 1583 data_dict, d_params, auto_process=auto_process 1584 ) 1585 1586 return mass_spectrum_obj 1587 1588 def get_mass_spectra_obj(self): 1589 """Instatiate a MassSpectraBase object from the binary data file file. 1590 1591 Returns 1592 ------- 1593 MassSpectraBase 1594 The MassSpectra object containing the parsed mass spectra. The object is instatiated with the mzML file, analyzer, instrument, sample name, and scan dataframe. 1595 """ 1596 _, scan_df = self.run(spectra="none") 1597 mass_spectra_obj = MassSpectraBase( 1598 self.file_location, 1599 self.analyzer, 1600 self.instrument_label, 1601 self.sample_name, 1602 self, 1603 ) 1604 scan_df = scan_df.set_index("scan", drop=False) 1605 mass_spectra_obj.scan_df = scan_df 1606 1607 return mass_spectra_obj 1608 1609 def get_lcms_obj(self, spectra="all"): 1610 """Instatiates a LCMSBase object from the mzML file. 1611 1612 Parameters 1613 ---------- 1614 spectra : str, optional 1615 Which mass spectra data to include in the output. Default is "all". Other options: "none", "ms1", "ms2". 1616 1617 Returns 1618 ------- 1619 LCMSBase 1620 LCMS object containing mass spectra data. The object is instatiated with the file location, analyzer, instrument, sample name, scan info, mz dataframe (as specifified), polarity, as well as the attributes holding the scans, retention times, and tics. 1621 """ 1622 _, scan_df = self.run(spectra="none") # first run it to just get scan info 1623 res, scan_df = self.run( 1624 scan_df=scan_df, spectra=spectra 1625 ) # second run to parse data 1626 lcms_obj = LCMSBase( 1627 self.file_location, 1628 self.analyzer, 1629 self.instrument_label, 1630 self.sample_name, 1631 self, 1632 ) 1633 if spectra != "none": 1634 for key in res: 1635 key_int = int(key.replace("ms", "")) 1636 res[key] = res[key][res[key].intensity > 0] 1637 res[key] = ( 1638 res[key].sort_values(by=["scan", "mz"]).reset_index(drop=True) 1639 ) 1640 lcms_obj._ms_unprocessed[key_int] = res[key] 1641 lcms_obj.scan_df = scan_df.set_index("scan", drop=False) 1642 # Check if polarity is mixed 1643 if len(set(scan_df.polarity)) > 1: 1644 raise ValueError("Mixed polarities detected in scan data") 1645 lcms_obj.polarity = scan_df.polarity[0] 1646 lcms_obj._scans_number_list = list(scan_df.scan) 1647 lcms_obj._retention_time_list = list(scan_df.scan_time) 1648 lcms_obj._tic_list = list(scan_df.tic) 1649 1650 return lcms_obj 1651 1652 def get_icr_transient_times(self): 1653 """Return a list for transient time targets for all scans, or selected scans range 1654 1655 Notes 1656 -------- 1657 Resolving Power and Transient time targets based on 7T FT-ICR MS system 1658 """ 1659 1660 res_trans_time = { 1661 "50": 0.384, 1662 "100000": 0.768, 1663 "200000": 1.536, 1664 "400000": 3.072, 1665 "750000": 6.144, 1666 "1000000": 12.288, 1667 } 1668 1669 firstScanNumber = self.start_scan 1670 1671 lastScanNumber = self.end_scan 1672 1673 transient_time_list = [] 1674 1675 for scan in range(firstScanNumber, lastScanNumber): 1676 scan_header = self.get_scan_header(scan) 1677 1678 rp_target = scan_header["FT Resolution:"] 1679 1680 transient_time = res_trans_time.get(rp_target) 1681 1682 transient_time_list.append(transient_time) 1683 1684 # print(transient_time, rp_target) 1685 1686 return transient_time_list
60class ThermoBaseClass: 61 """Class for parsing Thermo Raw files and extracting information from them. 62 63 Parameters: 64 ----------- 65 file_location : str or pathlib.Path or s3path.S3Path 66 Thermo Raw file path or S3 path. 67 68 Attributes: 69 ----------- 70 file_path : str or pathlib.Path or s3path.S3Path 71 The file path of the Thermo Raw file. 72 parameters : LCMSParameters 73 The LCMS parameters for the Thermo Raw file. 74 chromatogram_settings : LiquidChromatographSetting 75 The chromatogram settings for the Thermo Raw file. 76 scans : list or tuple 77 The selected scans for the Thermo Raw file. 78 start_scan : int 79 The starting scan number for the Thermo Raw file. 80 end_scan : int 81 The ending scan number for the Thermo Raw file. 82 83 Methods: 84 -------- 85 * set_msordertype(scanFilter, mstype: str = 'ms1') -> scanFilter 86 Convert the user-passed MS Type string to a Thermo MSOrderType object. 87 * get_creation_time() -> datetime.datetime 88 Extract the creation date stamp from the .RAW file and return it as a formatted datetime object. 89 * remove_temp_file() 90 Remove the temporary file if the path is from S3Path. 91 * get_polarity_mode(scan_number: int) -> int 92 Get the polarity mode for the given scan number. 93 * get_filter_for_scan_num(scan_number: int) -> List[str] 94 Get the filter for the given scan number. 95 * check_full_scan(scan_number: int) -> bool 96 Check if the given scan number is a full scan. 97 * get_all_filters() -> Tuple[Dict[int, str], List[str]] 98 Get all scan filters for the Thermo Raw file. 99 * get_scan_header(scan: int) -> Dict[str, Any] 100 Get the full dictionary of scan header metadata for the given scan number. 101 * get_rt_time_from_trace(trace) -> Tuple[List[float], List[float], List[int]] 102 Get the retention time, intensity, and scan number from the given trace. 103 * get_eics(target_mzs: List[float], tic_data: Dict[str, Any], ms_type: str = 'MS !d', 104 peak_detection: bool = True, smooth: bool = True, plot: bool = False, 105 ax: Optional[matplotlib.axes.Axes] = None, legend: bool = False) -> Tuple[Dict[float, EIC_Data], matplotlib.axes.Axes] 106 Get the extracted ion chromatograms (EICs) for the target m/z values. 107 108 """ 109 110 def __init__(self, file_location): 111 """file_location: srt pathlib.Path or s3path.S3Path 112 Thermo Raw file path 113 """ 114 # Thread.__init__(self) 115 if isinstance(file_location, str): 116 file_path = Path(file_location) 117 118 elif isinstance(file_location, S3Path): 119 temp_dir = Path("tmp/") 120 temp_dir.mkdir(exist_ok=True) 121 122 file_path = temp_dir / file_location.name 123 with open(file_path, "wb") as fh: 124 fh.write(file_location.read_bytes()) 125 126 else: 127 file_path = file_location 128 129 self.iRawDataPlus = RawFileReaderAdapter.FileFactory(str(file_path)) 130 131 if not self.iRawDataPlus.IsOpen: 132 raise FileNotFoundError( 133 "Unable to access the RAW file using the RawFileReader class!" 134 ) 135 136 # Check for any errors in the RAW file 137 if self.iRawDataPlus.IsError: 138 raise IOError( 139 "Error opening ({}) - {}".format(self.iRawDataPlus.FileError, file_path) 140 ) 141 142 self.res = self.iRawDataPlus.SelectInstrument(Device.MS, 1) 143 144 self.file_path = file_location 145 self.iFileHeader = FileHeaderReaderFactory.ReadFile(str(file_path)) 146 147 # removing tmp file 148 149 self._init_settings() 150 151 def _init_settings(self): 152 """ 153 Initialize the LCMSParameters object. 154 """ 155 self._parameters = LCMSParameters() 156 157 @property 158 def parameters(self) -> LCMSParameters: 159 """ 160 Get or set the LCMSParameters object. 161 """ 162 return self._parameters 163 164 @parameters.setter 165 def parameters(self, instance_LCMSParameters: LCMSParameters): 166 self._parameters = instance_LCMSParameters 167 168 @property 169 def chromatogram_settings(self) -> LiquidChromatographSetting: 170 """ 171 Get or set the LiquidChromatographSetting object. 172 """ 173 return self.parameters.lc_ms 174 175 @chromatogram_settings.setter 176 def chromatogram_settings( 177 self, instance_LiquidChromatographSetting: LiquidChromatographSetting 178 ): 179 self.parameters.lc_ms = instance_LiquidChromatographSetting 180 181 @property 182 def scans(self) -> list | tuple: 183 """scans : list or tuple 184 If list uses Thermo AverageScansInScanRange for selected scans, ortherwise uses Thermo AverageScans for a scan range 185 """ 186 return self.chromatogram_settings.scans 187 188 @property 189 def start_scan(self) -> int: 190 """ 191 Get the starting scan number for the Thermo Raw file. 192 """ 193 if self.scans[0] == -1: 194 return self.iRawDataPlus.RunHeaderEx.FirstSpectrum 195 else: 196 return self.scans[0] 197 198 @property 199 def end_scan(self) -> int: 200 """ 201 Get the ending scan number for the Thermo Raw file. 202 """ 203 if self.scans[-1] == -1: 204 return self.iRawDataPlus.RunHeaderEx.LastSpectrum 205 else: 206 return self.scans[-1] 207 208 def set_msordertype(self, scanFilter, mstype: str = "ms1"): 209 """ 210 Function to convert user passed string MS Type to Thermo MSOrderType object 211 Limited to MS1 through MS10. 212 213 Parameters: 214 ----------- 215 scanFilter : Thermo.ScanFilter 216 The scan filter object. 217 mstype : str, optional 218 The MS Type string, by default 'ms1' 219 220 """ 221 mstype = mstype.upper() 222 # Check that a valid mstype is passed 223 if (int(mstype.split("MS")[1]) > 10) or (int(mstype.split("MS")[1]) < 1): 224 warn("MS Type not valid, must be between MS1 and MS10") 225 226 msordertypedict = { 227 "MS1": MSOrderType.Ms, 228 "MS2": MSOrderType.Ms2, 229 "MS3": MSOrderType.Ms3, 230 "MS4": MSOrderType.Ms4, 231 "MS5": MSOrderType.Ms5, 232 "MS6": MSOrderType.Ms6, 233 "MS7": MSOrderType.Ms7, 234 "MS8": MSOrderType.Ms8, 235 "MS9": MSOrderType.Ms9, 236 "MS10": MSOrderType.Ms10, 237 } 238 scanFilter.MSOrder = msordertypedict[mstype] 239 return scanFilter 240 241 def get_creation_time(self) -> datetime.datetime: 242 """ 243 Extract the creation date stamp from the .RAW file 244 Return formatted creation date stamp. 245 246 """ 247 credate = self.iRawDataPlus.CreationDate.get_Ticks() 248 credate = datetime.datetime(1, 1, 1) + datetime.timedelta( 249 microseconds=credate / 10 250 ) 251 return credate 252 253 def remove_temp_file(self) -> None: 254 """if the path is from S3Path data cannot be serialized to io.ByteStream and 255 a temporary copy is stored at the temp dir 256 use this function only at the end of your execution scrip 257 some LCMS class methods depend on this file 258 """ 259 260 self.file_path.unlink() 261 262 def close_file(self) -> None: 263 """ 264 Close the Thermo Raw file. 265 """ 266 self.iRawDataPlus.Dispose() 267 268 def get_polarity_mode(self, scan_number: int) -> int: 269 """ 270 Get the polarity mode for the given scan number. 271 272 Parameters: 273 ----------- 274 scan_number : int 275 The scan number. 276 277 Raises: 278 ------- 279 Exception 280 If the polarity mode is unknown. 281 282 """ 283 polarity_symbol = self.get_filter_for_scan_num(scan_number)[1] 284 285 if polarity_symbol == "+": 286 return 1 287 # return 'POSITIVE_ION_MODE' 288 289 elif polarity_symbol == "-": 290 return -1 291 292 else: 293 raise Exception("Polarity Mode Unknown, please set it manually") 294 295 def get_filter_for_scan_num(self, scan_number: int) -> List[str]: 296 """ 297 Returns the closest matching run time that corresponds to scan_number for the current 298 controller. This function is only supported for MS device controllers. 299 e.g. ['FTMS', '-', 'p', 'NSI', 'Full', 'ms', '[200.00-1000.00]'] 300 301 Parameters: 302 ----------- 303 scan_number : int 304 The scan number. 305 306 """ 307 scan_label = self.iRawDataPlus.GetScanEventStringForScanNumber(scan_number) 308 309 return str(scan_label).split() 310 311 def get_ms_level_for_scan_num(self, scan_number: int) -> str: 312 """ 313 Get the MS order for the given scan number. 314 315 Parameters: 316 ----------- 317 scan_number : int 318 The scan number 319 320 Returns: 321 -------- 322 int 323 The MS order type (1 for MS, 2 for MS2, etc.) 324 """ 325 scan_filter = self.iRawDataPlus.GetFilterForScanNumber(scan_number) 326 327 msordertype = { 328 MSOrderType.Ms: 1, 329 MSOrderType.Ms2: 2, 330 MSOrderType.Ms3: 3, 331 MSOrderType.Ms4: 4, 332 MSOrderType.Ms5: 5, 333 MSOrderType.Ms6: 6, 334 MSOrderType.Ms7: 7, 335 MSOrderType.Ms8: 8, 336 MSOrderType.Ms9: 9, 337 MSOrderType.Ms10: 10, 338 } 339 340 if scan_filter.MSOrder in msordertype: 341 return msordertype[scan_filter.MSOrder] 342 else: 343 raise Exception("MS Order Type not found") 344 345 def check_full_scan(self, scan_number: int) -> bool: 346 # scan_filter.ScanMode 0 = FULL 347 scan_filter = self.iRawDataPlus.GetFilterForScanNumber(scan_number) 348 349 return scan_filter.ScanMode == MSOrderType.Ms 350 351 def get_all_filters(self) -> Tuple[Dict[int, str], List[str]]: 352 """ 353 Get all scan filters. 354 This function is only supported for MS device controllers. 355 e.g. ['FTMS', '-', 'p', 'NSI', 'Full', 'ms', '[200.00-1000.00]'] 356 357 """ 358 359 scanrange = range(self.start_scan, self.end_scan + 1) 360 scanfiltersdic = {} 361 scanfilterslist = [] 362 for scan_number in scanrange: 363 scan_label = self.iRawDataPlus.GetScanEventStringForScanNumber(scan_number) 364 scanfiltersdic[scan_number] = scan_label 365 scanfilterslist.append(scan_label) 366 scanfilterset = list(set(scanfilterslist)) 367 return scanfiltersdic, scanfilterset 368 369 def get_scan_header(self, scan: int) -> Dict[str, Any]: 370 """ 371 Get full dictionary of scan header meta data, i.e. AGC status, ion injection time, etc. 372 373 Parameters: 374 ----------- 375 scan : int 376 The scan number. 377 378 """ 379 header = self.iRawDataPlus.GetTrailerExtraInformation(scan) 380 381 header_dic = {} 382 for i in range(header.Length): 383 header_dic.update({header.Labels[i]: header.Values[i]}) 384 return header_dic 385 386 @staticmethod 387 def get_rt_time_from_trace(trace) -> Tuple[List[float], List[float], List[int]]: 388 """trace: ThermoFisher.CommonCore.Data.Business.ChromatogramSignal""" 389 return list(trace.Times), list(trace.Intensities), list(trace.Scans) 390 391 def get_eics( 392 self, 393 target_mzs: List[float], 394 tic_data: Dict[str, Any], 395 ms_type="MS !d", 396 peak_detection=False, 397 smooth=False, 398 plot=False, 399 ax: Optional[axes.Axes] = None, 400 legend=False, 401 ) -> Tuple[Dict[float, EIC_Data], axes.Axes]: 402 """ms_type: str ('MS', MS2') 403 start_scan: int default -1 will select the lowest available 404 end_scan: int default -1 will select the highest available 405 406 returns: 407 408 chroma: dict{target_mz: EIC_Data( 409 Scans: [int] 410 original thermo scan numbers 411 Time: [floats] 412 list of retention times 413 TIC: [floats] 414 total ion chromatogram 415 Apexes: [int] 416 original thermo apex scan number after peak picking 417 ) 418 419 """ 420 # If peak_detection or smooth is True, raise exception 421 if peak_detection or smooth: 422 raise Exception("Peak detection and smoothing are no longer implemented in this function") 423 424 options = MassOptions() 425 options.ToleranceUnits = ToleranceUnits.ppm 426 options.Tolerance = self.chromatogram_settings.eic_tolerance_ppm 427 428 all_chroma_settings = [] 429 430 for target_mz in target_mzs: 431 settings = ChromatogramTraceSettings(TraceType.MassRange) 432 settings.Filter = ms_type 433 settings.MassRanges = [Range(target_mz, target_mz)] 434 435 chroma_settings = IChromatogramSettings(settings) 436 437 all_chroma_settings.append(chroma_settings) 438 439 # chroma_settings2 = IChromatogramSettings(settings) 440 # print(chroma_settings.FragmentMass) 441 # print(chroma_settings.FragmentMass) 442 # print(chroma_settings) 443 # print(chroma_settings) 444 445 data = self.iRawDataPlus.GetChromatogramData( 446 all_chroma_settings, self.start_scan, self.end_scan, options 447 ) 448 449 traces = ChromatogramSignal.FromChromatogramData(data) 450 451 chroma = {} 452 453 if plot: 454 from matplotlib.transforms import Bbox 455 import matplotlib.pyplot as plt 456 457 if not ax: 458 # ax = plt.gca() 459 # ax.clear() 460 fig, ax = plt.subplots() 461 462 else: 463 fig = plt.gcf() 464 465 # plt.show() 466 467 for i, trace in enumerate(traces): 468 if trace.Length > 0: 469 rt, eic, scans = self.get_rt_time_from_trace(trace) 470 if smooth: 471 eic = self.smooth_tic(eic) 472 473 chroma[target_mzs[i]] = EIC_Data(scans=scans, time=rt, eic=eic) 474 if plot: 475 ax.plot(rt, eic, label="{:.5f}".format(target_mzs[i])) 476 477 if peak_detection: 478 # max_eic = self.get_max_eic(chroma) 479 max_signal = max(tic_data.tic) 480 481 for eic_data in chroma.values(): 482 eic = eic_data.eic 483 time = eic_data.time 484 485 if len(eic) != len(tic_data.tic): 486 warn( 487 "The software assumes same lenth of TIC and EIC, this does not seems to be the case and the results mass spectrum selected by the scan number might not be correct" 488 ) 489 490 if eic.max() > 0: 491 centroid_eics = self.eic_centroid_detector(time, eic, max_signal) 492 eic_data.apexes = [i for i in centroid_eics] 493 494 if plot: 495 for peak_indexes in eic_data.apexes: 496 apex_index = peak_indexes[1] 497 ax.plot( 498 time[apex_index], 499 eic[apex_index], 500 marker="x", 501 linewidth=0, 502 ) 503 504 if plot: 505 ax.set_xlabel("Time (min)") 506 ax.set_ylabel("a.u.") 507 ax.set_title(ms_type + " EIC") 508 ax.tick_params(axis="both", which="major", labelsize=12) 509 ax.axes.spines["top"].set_visible(False) 510 ax.axes.spines["right"].set_visible(False) 511 512 if legend: 513 legend = ax.legend(loc="upper left", bbox_to_anchor=(1.02, 0, 0.07, 1)) 514 fig.subplots_adjust(right=0.76) 515 # ax.set_prop_cycle(color=plt.cm.gist_rainbow(np.linspace(0, 1, len(traces)))) 516 517 d = {"down": 30, "up": -30} 518 519 def func(evt): 520 if legend.contains(evt): 521 bbox = legend.get_bbox_to_anchor() 522 bbox = Bbox.from_bounds( 523 bbox.x0, bbox.y0 + d[evt.button], bbox.width, bbox.height 524 ) 525 tr = legend.axes.transAxes.inverted() 526 legend.set_bbox_to_anchor(bbox.transformed(tr)) 527 fig.canvas.draw_idle() 528 529 fig.canvas.mpl_connect("scroll_event", func) 530 return chroma, ax 531 else: 532 return chroma, None 533 rt = [] 534 tic = [] 535 scans = [] 536 for i in range(traces[0].Length): 537 # print(trace[0].HasBasePeakData,trace[0].EndTime ) 538 539 # print(" {} - {}, {}".format( i, trace[0].Times[i], trace[0].Intensities[i] )) 540 rt.append(traces[0].Times[i]) 541 tic.append(traces[0].Intensities[i]) 542 scans.append(traces[0].Scans[i]) 543 544 return traces 545 # plot_chroma(rt, tic) 546 # plt.show() 547 548 def get_tic( 549 self, 550 ms_type="MS !d", 551 peak_detection=False, # This wont work right now 552 smooth=False, # This wont work right now 553 plot=False, 554 ax=None, 555 trace_type="TIC", 556 ) -> Tuple[TIC_Data, axes.Axes]: 557 """ms_type: str ('MS !d', 'MS2', None) 558 if you use None you get all scans. 559 peak_detection: bool 560 smooth: bool 561 plot: bool 562 ax: matplotlib axis object 563 trace_type: str ('TIC','BPC') 564 565 returns: 566 chroma: dict 567 { 568 Scan: [int] 569 original thermo scan numberMS 570 Time: [floats] 571 list of retention times 572 TIC: [floats] 573 total ion chromatogram 574 Apexes: [int] 575 original thermo apex scan number after peak picking 576 } 577 """ 578 if trace_type == "TIC": 579 settings = ChromatogramTraceSettings(TraceType.TIC) 580 elif trace_type == "BPC": 581 settings = ChromatogramTraceSettings(TraceType.BasePeak) 582 else: 583 raise ValueError(f"{trace_type} undefined") 584 if ms_type == "all": 585 settings.Filter = None 586 else: 587 settings.Filter = ms_type 588 589 chroma_settings = IChromatogramSettings(settings) 590 591 data = self.iRawDataPlus.GetChromatogramData( 592 [chroma_settings], self.start_scan, self.end_scan 593 ) 594 595 trace = ChromatogramSignal.FromChromatogramData(data) 596 597 data = TIC_Data(time=[], scans=[], tic=[], bpc=[], apexes=[]) 598 599 if trace[0].Length > 0: 600 for i in range(trace[0].Length): 601 # print(trace[0].HasBasePeakData,trace[0].EndTime ) 602 603 # print(" {} - {}, {}".format( i, trace[0].Times[i], trace[0].Intensities[i] )) 604 data.time.append(trace[0].Times[i]) 605 data.tic.append(trace[0].Intensities[i]) 606 data.scans.append(trace[0].Scans[i]) 607 608 # print(trace[0].Scans[i]) 609 if smooth: 610 data.tic = self.smooth_tic(data.tic) 611 612 else: 613 data.tic = np.array(data.tic) 614 615 if peak_detection: 616 centroid_peak_indexes = [ 617 i for i in self.centroid_detector(data.time, data.tic) 618 ] 619 620 data.apexes = centroid_peak_indexes 621 622 if plot: 623 if not ax: 624 import matplotlib.pyplot as plt 625 626 ax = plt.gca() 627 # fig, ax = plt.subplots(figsize=(6, 3)) 628 629 ax.plot(data.time, data.tic, label=trace_type) 630 ax.set_xlabel("Time (min)") 631 ax.set_ylabel("a.u.") 632 if peak_detection: 633 for peak_indexes in data.apexes: 634 apex_index = peak_indexes[1] 635 ax.plot( 636 data.time[apex_index], 637 data.tic[apex_index], 638 marker="x", 639 linewidth=0, 640 ) 641 642 # plt.show() 643 if trace_type == "BPC": 644 data.bpc = data.tic 645 data.tic = [] 646 return data, ax 647 if trace_type == "BPC": 648 data.bpc = data.tic 649 data.tic = [] 650 return data, None 651 652 else: 653 return None, None 654 655 def get_average_mass_spectrum( 656 self, 657 spectrum_mode: str = "profile", 658 auto_process: bool = True, 659 ppm_tolerance: float = 5.0, 660 ms_type: str = "MS1", 661 ) -> MassSpecProfile | MassSpecCentroid: 662 """ 663 Averages mass spectra over a scan range using Thermo's AverageScansInScanRange method 664 or a scan list using Thermo's AverageScans method 665 spectrum_mode: str 666 centroid or profile mass spectrum 667 auto_process: bool 668 If true performs peak picking, and noise threshold calculation after creation of mass spectrum object 669 ms_type: str 670 String of form 'ms1' or 'ms2' or 'MS3' etc. Valid up to MS10. 671 Internal function converts to Thermo MSOrderType class. 672 673 """ 674 675 def get_profile_mass_spec(averageScan, d_params: dict, auto_process: bool): 676 mz_list = list(averageScan.SegmentedScan.Positions) 677 abund_list = list(averageScan.SegmentedScan.Intensities) 678 679 data_dict = { 680 Labels.mz: mz_list, 681 Labels.abundance: abund_list, 682 } 683 684 return MassSpecProfile(data_dict, d_params, auto_process=auto_process) 685 686 def get_centroid_mass_spec(averageScan, d_params: dict): 687 noise = list(averageScan.centroidScan.Noises) 688 689 baselines = list(averageScan.centroidScan.Baselines) 690 691 rp = list(averageScan.centroidScan.Resolutions) 692 693 magnitude = list(averageScan.centroidScan.Intensities) 694 695 mz = list(averageScan.centroidScan.Masses) 696 697 array_noise_std = (np.array(noise) - np.array(baselines)) / 3 698 l_signal_to_noise = np.array(magnitude) / array_noise_std 699 700 d_params["baseline_noise"] = np.average(array_noise_std) 701 702 d_params["baseline_noise_std"] = np.std(array_noise_std) 703 704 data_dict = { 705 Labels.mz: mz, 706 Labels.abundance: magnitude, 707 Labels.rp: rp, 708 Labels.s2n: list(l_signal_to_noise), 709 } 710 711 mass_spec = MassSpecCentroid(data_dict, d_params, auto_process=False) 712 713 return mass_spec 714 715 d_params = self.set_metadata( 716 firstScanNumber=self.start_scan, lastScanNumber=self.end_scan 717 ) 718 719 # Create the mass options object that will be used when averaging the scans 720 options = MassOptions() 721 options.ToleranceUnits = ToleranceUnits.ppm 722 options.Tolerance = ppm_tolerance 723 724 # Get the scan filter for the first scan. This scan filter will be used to located 725 # scans within the given scan range of the same type 726 scanFilter = self.iRawDataPlus.GetFilterForScanNumber(self.start_scan) 727 728 # force it to only look for the MSType 729 scanFilter = self.set_msordertype(scanFilter, ms_type) 730 731 if isinstance(self.scans, tuple): 732 averageScan = Extensions.AverageScansInScanRange( 733 self.iRawDataPlus, self.start_scan, self.end_scan, scanFilter, options 734 ) 735 736 if averageScan: 737 if spectrum_mode == "profile": 738 mass_spec = get_profile_mass_spec( 739 averageScan, d_params, auto_process 740 ) 741 742 return mass_spec 743 744 elif spectrum_mode == "centroid": 745 if averageScan.HasCentroidStream: 746 mass_spec = get_centroid_mass_spec(averageScan, d_params) 747 748 return mass_spec 749 750 else: 751 raise ValueError( 752 "No Centroind data available for the selected scans" 753 ) 754 else: 755 raise ValueError("spectrum_mode must be 'profile' or centroid") 756 else: 757 raise ValueError("No data found for the selected scans") 758 759 elif isinstance(self.scans, list): 760 d_params = self.set_metadata(scans_list=self.scans) 761 762 scans = List[int]() 763 for scan in self.scans: 764 scans.Add(scan) 765 766 averageScan = Extensions.AverageScans(self.iRawDataPlus, scans, options) 767 768 if averageScan: 769 if spectrum_mode == "profile": 770 mass_spec = get_profile_mass_spec( 771 averageScan, d_params, auto_process 772 ) 773 774 return mass_spec 775 776 elif spectrum_mode == "centroid": 777 if averageScan.HasCentroidStream: 778 mass_spec = get_centroid_mass_spec(averageScan, d_params) 779 780 return mass_spec 781 782 else: 783 raise ValueError( 784 "No Centroind data available for the selected scans" 785 ) 786 787 else: 788 raise ValueError("spectrum_mode must be 'profile' or centroid") 789 790 else: 791 raise ValueError("No data found for the selected scans") 792 793 else: 794 raise ValueError("scans must be a list intergers or a tuple if integers") 795 796 def set_metadata( 797 self, 798 firstScanNumber=0, 799 lastScanNumber=0, 800 scans_list=False, 801 label=Labels.thermo_profile, 802 ): 803 """ 804 Collect metadata to be ingested in the mass spectrum object 805 806 scans_list: list[int] or false 807 lastScanNumber: int 808 firstScanNumber: int 809 """ 810 811 d_params = default_parameters(self.file_path) 812 813 # assumes scans is full scan or reduced profile scan 814 815 d_params["label"] = label 816 817 if scans_list: 818 d_params["scan_number"] = scans_list 819 820 d_params["polarity"] = self.get_polarity_mode(scans_list[0]) 821 822 else: 823 d_params["scan_number"] = "{}-{}".format(firstScanNumber, lastScanNumber) 824 825 d_params["polarity"] = self.get_polarity_mode(firstScanNumber) 826 827 d_params["analyzer"] = self.iRawDataPlus.GetInstrumentData().Model 828 829 d_params["acquisition_time"] = self.get_creation_time() 830 831 d_params["instrument_label"] = self.iRawDataPlus.GetInstrumentData().Name 832 833 return d_params 834 835 def get_instrument_methods(self, parse_strings: bool = True): 836 """ 837 This function will extract the instrument methods embedded in the raw file 838 839 First it will check if there are any instrument methods, if not returning None 840 Then it will get the total number of instrument methods. 841 For each method, it will extract the plaintext string of the method and attempt to parse it into a dictionary 842 If this fails, it will return just the string object. 843 844 This has been tested on data from an Orbitrap ID-X with embedded MS and LC methods, but other instrument types may fail. 845 846 Parameters: 847 ----------- 848 parse_strings: bool 849 If True, will attempt to parse the instrument methods into a dictionary. If False, will return the raw string. 850 851 Returns: 852 -------- 853 List[Dict[str, Any]] or List 854 A list of dictionaries containing the instrument methods, or a list of strings if parsing fails. 855 """ 856 857 if not self.iRawDataPlus.HasInstrumentMethod: 858 raise ValueError( 859 "Raw Data file does not have any instrument methods attached" 860 ) 861 return None 862 else: 863 864 def parse_instrument_method(data): 865 lines = data.split("\r\n") 866 method = {} 867 current_section = None 868 sub_section = None 869 870 for line in lines: 871 if not line.strip(): # Skip empty lines 872 continue 873 if ( 874 line.startswith("----") 875 or line.endswith("Settings") 876 or line.endswith("Summary") 877 or line.startswith("Experiment") 878 or line.startswith("Scan Event") 879 ): 880 current_section = line.replace("-", "").strip() 881 method[current_section] = {} 882 sub_section = None 883 elif line.startswith("\t"): 884 if "\t\t" in line: 885 indent_level = line.count("\t") 886 key_value = line.strip() 887 888 if indent_level == 2: 889 if sub_section: 890 key, value = ( 891 key_value.split("=", 1) 892 if "=" in key_value 893 else (key_value, None) 894 ) 895 method[current_section][sub_section][ 896 key.strip() 897 ] = value.strip() if value else None 898 elif indent_level == 3: 899 scan_type, key_value = ( 900 key_value.split(" ", 1) 901 if " " in key_value 902 else (key_value, None) 903 ) 904 method.setdefault(current_section, {}).setdefault( 905 sub_section, {} 906 ).setdefault(scan_type, {}) 907 908 if key_value: 909 key, value = ( 910 key_value.split("=", 1) 911 if "=" in key_value 912 else (key_value, None) 913 ) 914 method[current_section][sub_section][scan_type][ 915 key.strip() 916 ] = value.strip() if value else None 917 else: 918 key_value = line.strip() 919 if "=" in key_value: 920 key, value = key_value.split("=", 1) 921 method.setdefault(current_section, {})[key.strip()] = ( 922 value.strip() 923 ) 924 else: 925 sub_section = key_value 926 else: 927 if ":" in line: 928 key, value = line.split(":", 1) 929 method[current_section][key.strip()] = value.strip() 930 else: 931 method[current_section][line] = {} 932 933 return method 934 935 count_instrument_methods = self.iRawDataPlus.InstrumentMethodsCount 936 # TODO make this code better... 937 instrument_methods = [] 938 for i in range(count_instrument_methods): 939 instrument_method_string = self.iRawDataPlus.GetInstrumentMethod(i) 940 if parse_strings: 941 try: 942 instrument_method_dict = parse_instrument_method( 943 instrument_method_string 944 ) 945 except: # if it fails for any reason 946 instrument_method_dict = instrument_method_string 947 else: 948 instrument_method_dict = instrument_method_string 949 instrument_methods.append(instrument_method_dict) 950 return instrument_methods 951 952 def get_tune_method(self): 953 """ 954 This code will extract the tune method from the raw file 955 It has been tested on data from a Thermo Orbitrap ID-X, Astral and Q-Exactive, but may fail on other instrument types. 956 It attempts to parse out section headers and sub-sections, but may not work for all instrument types. 957 It will also not return Labels (keys) where the value is blank 958 959 Returns: 960 -------- 961 Dict[str, Any] 962 A dictionary containing the tune method information 963 964 Raises: 965 ------- 966 ValueError 967 If no tune methods are found in the raw file 968 969 """ 970 tunemethodcount = self.iRawDataPlus.GetTuneDataCount() 971 if tunemethodcount == 0: 972 raise ValueError("No tune methods found in the raw data file") 973 return None 974 elif tunemethodcount > 1: 975 warnings.warn( 976 "Multiple tune methods found in the raw data file, returning the 1st" 977 ) 978 979 header = self.iRawDataPlus.GetTuneData(0) 980 981 header_dic = {} 982 current_section = None 983 984 for i in range(header.Length): 985 label = header.Labels[i] 986 value = header.Values[i] 987 988 # Check for section headers 989 if "===" in label or ( 990 (value == "" or value is None) and not label.endswith(":") 991 ): 992 # This is a section header 993 section_name = ( 994 label.replace("=", "").replace(":", "").strip() 995 ) # Clean the label if it contains '=' 996 header_dic[section_name] = {} 997 current_section = section_name 998 else: 999 if current_section: 1000 header_dic[current_section][label] = value 1001 else: 1002 header_dic[label] = value 1003 return header_dic 1004 1005 def get_status_log(self, retention_time: float = 0): 1006 """ 1007 This code will extract the status logs from the raw file 1008 It has been tested on data from a Thermo Orbitrap ID-X, Astral and Q-Exactive, but may fail on other instrument types. 1009 It attempts to parse out section headers and sub-sections, but may not work for all instrument types. 1010 It will also not return Labels (keys) where the value is blank 1011 1012 Parameters: 1013 ----------- 1014 retention_time: float 1015 The retention time in minutes to extract the status log data from. 1016 Will use the closest retention time found. Default 0. 1017 1018 Returns: 1019 -------- 1020 Dict[str, Any] 1021 A dictionary containing the status log information 1022 1023 Raises: 1024 ------- 1025 ValueError 1026 If no status logs are found in the raw file 1027 1028 """ 1029 tunemethodcount = self.iRawDataPlus.GetStatusLogEntriesCount() 1030 if tunemethodcount == 0: 1031 raise ValueError("No status logs found in the raw data file") 1032 return None 1033 1034 header = self.iRawDataPlus.GetStatusLogForRetentionTime(retention_time) 1035 1036 header_dic = {} 1037 current_section = None 1038 1039 for i in range(header.Length): 1040 label = header.Labels[i] 1041 value = header.Values[i] 1042 1043 # Check for section headers 1044 if "===" in label or ( 1045 (value == "" or value is None) and not label.endswith(":") 1046 ): 1047 # This is a section header 1048 section_name = ( 1049 label.replace("=", "").replace(":", "").strip() 1050 ) # Clean the label if it contains '=' 1051 header_dic[section_name] = {} 1052 current_section = section_name 1053 else: 1054 if current_section: 1055 header_dic[current_section][label] = value 1056 else: 1057 header_dic[label] = value 1058 return header_dic 1059 1060 def get_error_logs(self): 1061 """ 1062 This code will extract the error logs from the raw file 1063 1064 Returns: 1065 -------- 1066 Dict[float, str] 1067 A dictionary containing the error log information with the retention time as the key 1068 1069 Raises: 1070 ------- 1071 ValueError 1072 If no error logs are found in the raw file 1073 """ 1074 1075 error_log_count = self.iRawDataPlus.RunHeaderEx.ErrorLogCount 1076 if error_log_count == 0: 1077 raise ValueError("No error logs found in the raw data file") 1078 return None 1079 1080 error_logs = {} 1081 1082 for i in range(error_log_count): 1083 error_log_item = self.iRawDataPlus.GetErrorLogItem(i) 1084 rt = error_log_item.RetentionTime 1085 message = error_log_item.Message 1086 # Use the index `i` as the unique ID key 1087 error_logs[i] = {"rt": rt, "message": message} 1088 return error_logs 1089 1090 def get_sample_information(self): 1091 """ 1092 This code will extract the sample information from the raw file 1093 1094 Returns: 1095 -------- 1096 Dict[str, Any] 1097 A dictionary containing the sample information 1098 Note that UserText field may not be handled properly and may need further processing 1099 """ 1100 sminfo = self.iRawDataPlus.SampleInformation 1101 smdict = {} 1102 smdict["Comment"] = sminfo.Comment 1103 smdict["SampleId"] = sminfo.SampleId 1104 smdict["SampleName"] = sminfo.SampleName 1105 smdict["Vial"] = sminfo.Vial 1106 smdict["InjectionVolume"] = sminfo.InjectionVolume 1107 smdict["Barcode"] = sminfo.Barcode 1108 smdict["BarcodeStatus"] = str(sminfo.BarcodeStatus) 1109 smdict["CalibrationLevel"] = sminfo.CalibrationLevel 1110 smdict["DilutionFactor"] = sminfo.DilutionFactor 1111 smdict["InstrumentMethodFile"] = sminfo.InstrumentMethodFile 1112 smdict["RawFileName"] = sminfo.RawFileName 1113 smdict["CalibrationFile"] = sminfo.CalibrationFile 1114 smdict["IstdAmount"] = sminfo.IstdAmount 1115 smdict["RowNumber"] = sminfo.RowNumber 1116 smdict["Path"] = sminfo.Path 1117 smdict["ProcessingMethodFile"] = sminfo.ProcessingMethodFile 1118 smdict["SampleType"] = str(sminfo.SampleType) 1119 smdict["SampleWeight"] = sminfo.SampleWeight 1120 smdict["UserText"] = { 1121 "UserText": [x for x in sminfo.UserText] 1122 } # [0] #This may not work - needs debugging with 1123 return smdict 1124 1125 def get_instrument_data(self): 1126 """ 1127 This code will extract the instrument data from the raw file 1128 1129 Returns: 1130 -------- 1131 Dict[str, Any] 1132 A dictionary containing the instrument data 1133 """ 1134 instrument_data = self.iRawDataPlus.GetInstrumentData() 1135 id_dict = {} 1136 id_dict["Name"] = instrument_data.Name 1137 id_dict["Model"] = instrument_data.Model 1138 id_dict["SerialNumber"] = instrument_data.SerialNumber 1139 id_dict["SoftwareVersion"] = instrument_data.SoftwareVersion 1140 id_dict["HardwareVersion"] = instrument_data.HardwareVersion 1141 id_dict["ChannelLabels"] = { 1142 "ChannelLabels": [x for x in instrument_data.ChannelLabels] 1143 } 1144 id_dict["Flags"] = instrument_data.Flags 1145 id_dict["AxisLabelY"] = instrument_data.AxisLabelY 1146 id_dict["AxisLabelX"] = instrument_data.AxisLabelX 1147 return id_dict 1148 1149 def get_centroid_msms_data(self, scan): 1150 """ 1151 .. deprecated:: 2.0 1152 This function will be removed in CoreMS 2.0. Please use `get_average_mass_spectrum()` instead for similar functionality. 1153 """ 1154 1155 warnings.warn( 1156 "The `get_centroid_msms_data()` is deprecated as of CoreMS 2.0 and will be removed in a future version. " 1157 "Please use `get_average_mass_spectrum()` instead.", 1158 DeprecationWarning, 1159 ) 1160 1161 d_params = self.set_metadata(scans_list=[scan], label=Labels.thermo_centroid) 1162 1163 centroidStream = self.iRawDataPlus.GetCentroidStream(scan, False) 1164 1165 noise = list(centroidStream.Noises) 1166 1167 baselines = list(centroidStream.Baselines) 1168 1169 rp = list(centroidStream.Resolutions) 1170 1171 magnitude = list(centroidStream.Intensities) 1172 1173 mz = list(centroidStream.Masses) 1174 1175 # charge = scans_labels[5] 1176 array_noise_std = (np.array(noise) - np.array(baselines)) / 3 1177 l_signal_to_noise = np.array(magnitude) / array_noise_std 1178 1179 d_params["baseline_noise"] = np.average(array_noise_std) 1180 1181 d_params["baseline_noise_std"] = np.std(array_noise_std) 1182 1183 data_dict = { 1184 Labels.mz: mz, 1185 Labels.abundance: magnitude, 1186 Labels.rp: rp, 1187 Labels.s2n: list(l_signal_to_noise), 1188 } 1189 1190 mass_spec = MassSpecCentroid(data_dict, d_params, auto_process=False) 1191 mass_spec.settings.noise_threshold_method = "relative_abundance" 1192 mass_spec.settings.noise_threshold_min_relative_abundance = 1 1193 mass_spec.process_mass_spec() 1194 return mass_spec 1195 1196 def get_average_mass_spectrum_by_scanlist( 1197 self, 1198 scans_list: List[int], 1199 auto_process: bool = True, 1200 ppm_tolerance: float = 5.0, 1201 ) -> MassSpecProfile: 1202 """ 1203 Averages selected scans mass spectra using Thermo's AverageScans method 1204 scans_list: list[int] 1205 auto_process: bool 1206 If true performs peak picking, and noise threshold calculation after creation of mass spectrum object 1207 Returns: 1208 MassSpecProfile 1209 1210 .. deprecated:: 2.0 1211 This function will be removed in CoreMS 2.0. Please use `get_average_mass_spectrum()` instead for similar functionality. 1212 """ 1213 1214 warnings.warn( 1215 "The `get_average_mass_spectrum_by_scanlist()` is deprecated as of CoreMS 2.0 and will be removed in a future version. " 1216 "Please use `get_average_mass_spectrum()` instead.", 1217 DeprecationWarning, 1218 ) 1219 1220 d_params = self.set_metadata(scans_list=scans_list) 1221 1222 # assumes scans is full scan or reduced profile scan 1223 1224 scans = List[int]() 1225 for scan in scans_list: 1226 scans.Add(scan) 1227 1228 # Create the mass options object that will be used when averaging the scans 1229 options = MassOptions() 1230 options.ToleranceUnits = ToleranceUnits.ppm 1231 options.Tolerance = ppm_tolerance 1232 1233 # Get the scan filter for the first scan. This scan filter will be used to located 1234 # scans within the given scan range of the same type 1235 1236 averageScan = Extensions.AverageScans(self.iRawDataPlus, scans, options) 1237 1238 len_data = averageScan.SegmentedScan.Positions.Length 1239 1240 mz_list = list(averageScan.SegmentedScan.Positions) 1241 abund_list = list(averageScan.SegmentedScan.Intensities) 1242 1243 data_dict = { 1244 Labels.mz: mz_list, 1245 Labels.abundance: abund_list, 1246 } 1247 1248 mass_spec = MassSpecProfile(data_dict, d_params, auto_process=auto_process) 1249 1250 return mass_spec
Class for parsing Thermo Raw files and extracting information from them.
Parameters:
file_location : str or pathlib.Path or s3path.S3Path Thermo Raw file path or S3 path.
Attributes:
file_path : str or pathlib.Path or s3path.S3Path The file path of the Thermo Raw file. parameters : LCMSParameters The LCMS parameters for the Thermo Raw file. chromatogram_settings : LiquidChromatographSetting The chromatogram settings for the Thermo Raw file. scans : list or tuple The selected scans for the Thermo Raw file. start_scan : int The starting scan number for the Thermo Raw file. end_scan : int The ending scan number for the Thermo Raw file.
Methods:
- set_msordertype(scanFilter, mstype: str = 'ms1') -> scanFilter Convert the user-passed MS Type string to a Thermo MSOrderType object.
- get_creation_time() -> datetime.datetime Extract the creation date stamp from the .RAW file and return it as a formatted datetime object.
- remove_temp_file() Remove the temporary file if the path is from S3Path.
- get_polarity_mode(scan_number: int) -> int Get the polarity mode for the given scan number.
- get_filter_for_scan_num(scan_number: int) -> List[str] Get the filter for the given scan number.
- check_full_scan(scan_number: int) -> bool Check if the given scan number is a full scan.
- get_all_filters() -> Tuple[Dict[int, str], List[str]] Get all scan filters for the Thermo Raw file.
- get_scan_header(scan: int) -> Dict[str, Any] Get the full dictionary of scan header metadata for the given scan number.
- get_rt_time_from_trace(trace) -> Tuple[List[float], List[float], List[int]] Get the retention time, intensity, and scan number from the given trace.
- get_eics(target_mzs: List[float], tic_data: Dict[str, Any], ms_type: str = 'MS !d', peak_detection: bool = True, smooth: bool = True, plot: bool = False, ax: Optional[matplotlib.axes.Axes] = None, legend: bool = False) -> Tuple[Dict[float, EIC_Data], matplotlib.axes.Axes] Get the extracted ion chromatograms (EICs) for the target m/z values.
110 def __init__(self, file_location): 111 """file_location: srt pathlib.Path or s3path.S3Path 112 Thermo Raw file path 113 """ 114 # Thread.__init__(self) 115 if isinstance(file_location, str): 116 file_path = Path(file_location) 117 118 elif isinstance(file_location, S3Path): 119 temp_dir = Path("tmp/") 120 temp_dir.mkdir(exist_ok=True) 121 122 file_path = temp_dir / file_location.name 123 with open(file_path, "wb") as fh: 124 fh.write(file_location.read_bytes()) 125 126 else: 127 file_path = file_location 128 129 self.iRawDataPlus = RawFileReaderAdapter.FileFactory(str(file_path)) 130 131 if not self.iRawDataPlus.IsOpen: 132 raise FileNotFoundError( 133 "Unable to access the RAW file using the RawFileReader class!" 134 ) 135 136 # Check for any errors in the RAW file 137 if self.iRawDataPlus.IsError: 138 raise IOError( 139 "Error opening ({}) - {}".format(self.iRawDataPlus.FileError, file_path) 140 ) 141 142 self.res = self.iRawDataPlus.SelectInstrument(Device.MS, 1) 143 144 self.file_path = file_location 145 self.iFileHeader = FileHeaderReaderFactory.ReadFile(str(file_path)) 146 147 # removing tmp file 148 149 self._init_settings()
file_location: srt pathlib.Path or s3path.S3Path Thermo Raw file path
Get or set the LiquidChromatographSetting object.
scans : list or tuple If list uses Thermo AverageScansInScanRange for selected scans, ortherwise uses Thermo AverageScans for a scan range
208 def set_msordertype(self, scanFilter, mstype: str = "ms1"): 209 """ 210 Function to convert user passed string MS Type to Thermo MSOrderType object 211 Limited to MS1 through MS10. 212 213 Parameters: 214 ----------- 215 scanFilter : Thermo.ScanFilter 216 The scan filter object. 217 mstype : str, optional 218 The MS Type string, by default 'ms1' 219 220 """ 221 mstype = mstype.upper() 222 # Check that a valid mstype is passed 223 if (int(mstype.split("MS")[1]) > 10) or (int(mstype.split("MS")[1]) < 1): 224 warn("MS Type not valid, must be between MS1 and MS10") 225 226 msordertypedict = { 227 "MS1": MSOrderType.Ms, 228 "MS2": MSOrderType.Ms2, 229 "MS3": MSOrderType.Ms3, 230 "MS4": MSOrderType.Ms4, 231 "MS5": MSOrderType.Ms5, 232 "MS6": MSOrderType.Ms6, 233 "MS7": MSOrderType.Ms7, 234 "MS8": MSOrderType.Ms8, 235 "MS9": MSOrderType.Ms9, 236 "MS10": MSOrderType.Ms10, 237 } 238 scanFilter.MSOrder = msordertypedict[mstype] 239 return scanFilter
Function to convert user passed string MS Type to Thermo MSOrderType object Limited to MS1 through MS10.
Parameters:
scanFilter : Thermo.ScanFilter The scan filter object. mstype : str, optional The MS Type string, by default 'ms1'
241 def get_creation_time(self) -> datetime.datetime: 242 """ 243 Extract the creation date stamp from the .RAW file 244 Return formatted creation date stamp. 245 246 """ 247 credate = self.iRawDataPlus.CreationDate.get_Ticks() 248 credate = datetime.datetime(1, 1, 1) + datetime.timedelta( 249 microseconds=credate / 10 250 ) 251 return credate
Extract the creation date stamp from the .RAW file Return formatted creation date stamp.
253 def remove_temp_file(self) -> None: 254 """if the path is from S3Path data cannot be serialized to io.ByteStream and 255 a temporary copy is stored at the temp dir 256 use this function only at the end of your execution scrip 257 some LCMS class methods depend on this file 258 """ 259 260 self.file_path.unlink()
if the path is from S3Path data cannot be serialized to io.ByteStream and a temporary copy is stored at the temp dir use this function only at the end of your execution scrip some LCMS class methods depend on this file
262 def close_file(self) -> None: 263 """ 264 Close the Thermo Raw file. 265 """ 266 self.iRawDataPlus.Dispose()
Close the Thermo Raw file.
268 def get_polarity_mode(self, scan_number: int) -> int: 269 """ 270 Get the polarity mode for the given scan number. 271 272 Parameters: 273 ----------- 274 scan_number : int 275 The scan number. 276 277 Raises: 278 ------- 279 Exception 280 If the polarity mode is unknown. 281 282 """ 283 polarity_symbol = self.get_filter_for_scan_num(scan_number)[1] 284 285 if polarity_symbol == "+": 286 return 1 287 # return 'POSITIVE_ION_MODE' 288 289 elif polarity_symbol == "-": 290 return -1 291 292 else: 293 raise Exception("Polarity Mode Unknown, please set it manually")
Get the polarity mode for the given scan number.
Parameters:
scan_number : int The scan number.
Raises:
Exception If the polarity mode is unknown.
295 def get_filter_for_scan_num(self, scan_number: int) -> List[str]: 296 """ 297 Returns the closest matching run time that corresponds to scan_number for the current 298 controller. This function is only supported for MS device controllers. 299 e.g. ['FTMS', '-', 'p', 'NSI', 'Full', 'ms', '[200.00-1000.00]'] 300 301 Parameters: 302 ----------- 303 scan_number : int 304 The scan number. 305 306 """ 307 scan_label = self.iRawDataPlus.GetScanEventStringForScanNumber(scan_number) 308 309 return str(scan_label).split()
Returns the closest matching run time that corresponds to scan_number for the current controller. This function is only supported for MS device controllers. e.g. ['FTMS', '-', 'p', 'NSI', 'Full', 'ms', '[200.00-1000.00]']
Parameters:
scan_number : int The scan number.
311 def get_ms_level_for_scan_num(self, scan_number: int) -> str: 312 """ 313 Get the MS order for the given scan number. 314 315 Parameters: 316 ----------- 317 scan_number : int 318 The scan number 319 320 Returns: 321 -------- 322 int 323 The MS order type (1 for MS, 2 for MS2, etc.) 324 """ 325 scan_filter = self.iRawDataPlus.GetFilterForScanNumber(scan_number) 326 327 msordertype = { 328 MSOrderType.Ms: 1, 329 MSOrderType.Ms2: 2, 330 MSOrderType.Ms3: 3, 331 MSOrderType.Ms4: 4, 332 MSOrderType.Ms5: 5, 333 MSOrderType.Ms6: 6, 334 MSOrderType.Ms7: 7, 335 MSOrderType.Ms8: 8, 336 MSOrderType.Ms9: 9, 337 MSOrderType.Ms10: 10, 338 } 339 340 if scan_filter.MSOrder in msordertype: 341 return msordertype[scan_filter.MSOrder] 342 else: 343 raise Exception("MS Order Type not found")
Get the MS order for the given scan number.
Parameters:
scan_number : int The scan number
Returns:
int The MS order type (1 for MS, 2 for MS2, etc.)
351 def get_all_filters(self) -> Tuple[Dict[int, str], List[str]]: 352 """ 353 Get all scan filters. 354 This function is only supported for MS device controllers. 355 e.g. ['FTMS', '-', 'p', 'NSI', 'Full', 'ms', '[200.00-1000.00]'] 356 357 """ 358 359 scanrange = range(self.start_scan, self.end_scan + 1) 360 scanfiltersdic = {} 361 scanfilterslist = [] 362 for scan_number in scanrange: 363 scan_label = self.iRawDataPlus.GetScanEventStringForScanNumber(scan_number) 364 scanfiltersdic[scan_number] = scan_label 365 scanfilterslist.append(scan_label) 366 scanfilterset = list(set(scanfilterslist)) 367 return scanfiltersdic, scanfilterset
Get all scan filters. This function is only supported for MS device controllers. e.g. ['FTMS', '-', 'p', 'NSI', 'Full', 'ms', '[200.00-1000.00]']
369 def get_scan_header(self, scan: int) -> Dict[str, Any]: 370 """ 371 Get full dictionary of scan header meta data, i.e. AGC status, ion injection time, etc. 372 373 Parameters: 374 ----------- 375 scan : int 376 The scan number. 377 378 """ 379 header = self.iRawDataPlus.GetTrailerExtraInformation(scan) 380 381 header_dic = {} 382 for i in range(header.Length): 383 header_dic.update({header.Labels[i]: header.Values[i]}) 384 return header_dic
Get full dictionary of scan header meta data, i.e. AGC status, ion injection time, etc.
Parameters:
scan : int The scan number.
386 @staticmethod 387 def get_rt_time_from_trace(trace) -> Tuple[List[float], List[float], List[int]]: 388 """trace: ThermoFisher.CommonCore.Data.Business.ChromatogramSignal""" 389 return list(trace.Times), list(trace.Intensities), list(trace.Scans)
trace: ThermoFisher.CommonCore.Data.Business.ChromatogramSignal
391 def get_eics( 392 self, 393 target_mzs: List[float], 394 tic_data: Dict[str, Any], 395 ms_type="MS !d", 396 peak_detection=False, 397 smooth=False, 398 plot=False, 399 ax: Optional[axes.Axes] = None, 400 legend=False, 401 ) -> Tuple[Dict[float, EIC_Data], axes.Axes]: 402 """ms_type: str ('MS', MS2') 403 start_scan: int default -1 will select the lowest available 404 end_scan: int default -1 will select the highest available 405 406 returns: 407 408 chroma: dict{target_mz: EIC_Data( 409 Scans: [int] 410 original thermo scan numbers 411 Time: [floats] 412 list of retention times 413 TIC: [floats] 414 total ion chromatogram 415 Apexes: [int] 416 original thermo apex scan number after peak picking 417 ) 418 419 """ 420 # If peak_detection or smooth is True, raise exception 421 if peak_detection or smooth: 422 raise Exception("Peak detection and smoothing are no longer implemented in this function") 423 424 options = MassOptions() 425 options.ToleranceUnits = ToleranceUnits.ppm 426 options.Tolerance = self.chromatogram_settings.eic_tolerance_ppm 427 428 all_chroma_settings = [] 429 430 for target_mz in target_mzs: 431 settings = ChromatogramTraceSettings(TraceType.MassRange) 432 settings.Filter = ms_type 433 settings.MassRanges = [Range(target_mz, target_mz)] 434 435 chroma_settings = IChromatogramSettings(settings) 436 437 all_chroma_settings.append(chroma_settings) 438 439 # chroma_settings2 = IChromatogramSettings(settings) 440 # print(chroma_settings.FragmentMass) 441 # print(chroma_settings.FragmentMass) 442 # print(chroma_settings) 443 # print(chroma_settings) 444 445 data = self.iRawDataPlus.GetChromatogramData( 446 all_chroma_settings, self.start_scan, self.end_scan, options 447 ) 448 449 traces = ChromatogramSignal.FromChromatogramData(data) 450 451 chroma = {} 452 453 if plot: 454 from matplotlib.transforms import Bbox 455 import matplotlib.pyplot as plt 456 457 if not ax: 458 # ax = plt.gca() 459 # ax.clear() 460 fig, ax = plt.subplots() 461 462 else: 463 fig = plt.gcf() 464 465 # plt.show() 466 467 for i, trace in enumerate(traces): 468 if trace.Length > 0: 469 rt, eic, scans = self.get_rt_time_from_trace(trace) 470 if smooth: 471 eic = self.smooth_tic(eic) 472 473 chroma[target_mzs[i]] = EIC_Data(scans=scans, time=rt, eic=eic) 474 if plot: 475 ax.plot(rt, eic, label="{:.5f}".format(target_mzs[i])) 476 477 if peak_detection: 478 # max_eic = self.get_max_eic(chroma) 479 max_signal = max(tic_data.tic) 480 481 for eic_data in chroma.values(): 482 eic = eic_data.eic 483 time = eic_data.time 484 485 if len(eic) != len(tic_data.tic): 486 warn( 487 "The software assumes same lenth of TIC and EIC, this does not seems to be the case and the results mass spectrum selected by the scan number might not be correct" 488 ) 489 490 if eic.max() > 0: 491 centroid_eics = self.eic_centroid_detector(time, eic, max_signal) 492 eic_data.apexes = [i for i in centroid_eics] 493 494 if plot: 495 for peak_indexes in eic_data.apexes: 496 apex_index = peak_indexes[1] 497 ax.plot( 498 time[apex_index], 499 eic[apex_index], 500 marker="x", 501 linewidth=0, 502 ) 503 504 if plot: 505 ax.set_xlabel("Time (min)") 506 ax.set_ylabel("a.u.") 507 ax.set_title(ms_type + " EIC") 508 ax.tick_params(axis="both", which="major", labelsize=12) 509 ax.axes.spines["top"].set_visible(False) 510 ax.axes.spines["right"].set_visible(False) 511 512 if legend: 513 legend = ax.legend(loc="upper left", bbox_to_anchor=(1.02, 0, 0.07, 1)) 514 fig.subplots_adjust(right=0.76) 515 # ax.set_prop_cycle(color=plt.cm.gist_rainbow(np.linspace(0, 1, len(traces)))) 516 517 d = {"down": 30, "up": -30} 518 519 def func(evt): 520 if legend.contains(evt): 521 bbox = legend.get_bbox_to_anchor() 522 bbox = Bbox.from_bounds( 523 bbox.x0, bbox.y0 + d[evt.button], bbox.width, bbox.height 524 ) 525 tr = legend.axes.transAxes.inverted() 526 legend.set_bbox_to_anchor(bbox.transformed(tr)) 527 fig.canvas.draw_idle() 528 529 fig.canvas.mpl_connect("scroll_event", func) 530 return chroma, ax 531 else: 532 return chroma, None 533 rt = [] 534 tic = [] 535 scans = [] 536 for i in range(traces[0].Length): 537 # print(trace[0].HasBasePeakData,trace[0].EndTime ) 538 539 # print(" {} - {}, {}".format( i, trace[0].Times[i], trace[0].Intensities[i] )) 540 rt.append(traces[0].Times[i]) 541 tic.append(traces[0].Intensities[i]) 542 scans.append(traces[0].Scans[i]) 543 544 return traces 545 # plot_chroma(rt, tic) 546 # plt.show()
ms_type: str ('MS', MS2') start_scan: int default -1 will select the lowest available end_scan: int default -1 will select the highest available
returns:
chroma: dict{target_mz: EIC_Data(
Scans: [int]
original thermo scan numbers
Time: [floats]
list of retention times
TIC: [floats]
total ion chromatogram
Apexes: [int]
original thermo apex scan number after peak picking
)
548 def get_tic( 549 self, 550 ms_type="MS !d", 551 peak_detection=False, # This wont work right now 552 smooth=False, # This wont work right now 553 plot=False, 554 ax=None, 555 trace_type="TIC", 556 ) -> Tuple[TIC_Data, axes.Axes]: 557 """ms_type: str ('MS !d', 'MS2', None) 558 if you use None you get all scans. 559 peak_detection: bool 560 smooth: bool 561 plot: bool 562 ax: matplotlib axis object 563 trace_type: str ('TIC','BPC') 564 565 returns: 566 chroma: dict 567 { 568 Scan: [int] 569 original thermo scan numberMS 570 Time: [floats] 571 list of retention times 572 TIC: [floats] 573 total ion chromatogram 574 Apexes: [int] 575 original thermo apex scan number after peak picking 576 } 577 """ 578 if trace_type == "TIC": 579 settings = ChromatogramTraceSettings(TraceType.TIC) 580 elif trace_type == "BPC": 581 settings = ChromatogramTraceSettings(TraceType.BasePeak) 582 else: 583 raise ValueError(f"{trace_type} undefined") 584 if ms_type == "all": 585 settings.Filter = None 586 else: 587 settings.Filter = ms_type 588 589 chroma_settings = IChromatogramSettings(settings) 590 591 data = self.iRawDataPlus.GetChromatogramData( 592 [chroma_settings], self.start_scan, self.end_scan 593 ) 594 595 trace = ChromatogramSignal.FromChromatogramData(data) 596 597 data = TIC_Data(time=[], scans=[], tic=[], bpc=[], apexes=[]) 598 599 if trace[0].Length > 0: 600 for i in range(trace[0].Length): 601 # print(trace[0].HasBasePeakData,trace[0].EndTime ) 602 603 # print(" {} - {}, {}".format( i, trace[0].Times[i], trace[0].Intensities[i] )) 604 data.time.append(trace[0].Times[i]) 605 data.tic.append(trace[0].Intensities[i]) 606 data.scans.append(trace[0].Scans[i]) 607 608 # print(trace[0].Scans[i]) 609 if smooth: 610 data.tic = self.smooth_tic(data.tic) 611 612 else: 613 data.tic = np.array(data.tic) 614 615 if peak_detection: 616 centroid_peak_indexes = [ 617 i for i in self.centroid_detector(data.time, data.tic) 618 ] 619 620 data.apexes = centroid_peak_indexes 621 622 if plot: 623 if not ax: 624 import matplotlib.pyplot as plt 625 626 ax = plt.gca() 627 # fig, ax = plt.subplots(figsize=(6, 3)) 628 629 ax.plot(data.time, data.tic, label=trace_type) 630 ax.set_xlabel("Time (min)") 631 ax.set_ylabel("a.u.") 632 if peak_detection: 633 for peak_indexes in data.apexes: 634 apex_index = peak_indexes[1] 635 ax.plot( 636 data.time[apex_index], 637 data.tic[apex_index], 638 marker="x", 639 linewidth=0, 640 ) 641 642 # plt.show() 643 if trace_type == "BPC": 644 data.bpc = data.tic 645 data.tic = [] 646 return data, ax 647 if trace_type == "BPC": 648 data.bpc = data.tic 649 data.tic = [] 650 return data, None 651 652 else: 653 return None, None
ms_type: str ('MS !d', 'MS2', None) if you use None you get all scans. peak_detection: bool smooth: bool plot: bool ax: matplotlib axis object trace_type: str ('TIC','BPC')
returns: chroma: dict { Scan: [int] original thermo scan numberMS Time: [floats] list of retention times TIC: [floats] total ion chromatogram Apexes: [int] original thermo apex scan number after peak picking }
655 def get_average_mass_spectrum( 656 self, 657 spectrum_mode: str = "profile", 658 auto_process: bool = True, 659 ppm_tolerance: float = 5.0, 660 ms_type: str = "MS1", 661 ) -> MassSpecProfile | MassSpecCentroid: 662 """ 663 Averages mass spectra over a scan range using Thermo's AverageScansInScanRange method 664 or a scan list using Thermo's AverageScans method 665 spectrum_mode: str 666 centroid or profile mass spectrum 667 auto_process: bool 668 If true performs peak picking, and noise threshold calculation after creation of mass spectrum object 669 ms_type: str 670 String of form 'ms1' or 'ms2' or 'MS3' etc. Valid up to MS10. 671 Internal function converts to Thermo MSOrderType class. 672 673 """ 674 675 def get_profile_mass_spec(averageScan, d_params: dict, auto_process: bool): 676 mz_list = list(averageScan.SegmentedScan.Positions) 677 abund_list = list(averageScan.SegmentedScan.Intensities) 678 679 data_dict = { 680 Labels.mz: mz_list, 681 Labels.abundance: abund_list, 682 } 683 684 return MassSpecProfile(data_dict, d_params, auto_process=auto_process) 685 686 def get_centroid_mass_spec(averageScan, d_params: dict): 687 noise = list(averageScan.centroidScan.Noises) 688 689 baselines = list(averageScan.centroidScan.Baselines) 690 691 rp = list(averageScan.centroidScan.Resolutions) 692 693 magnitude = list(averageScan.centroidScan.Intensities) 694 695 mz = list(averageScan.centroidScan.Masses) 696 697 array_noise_std = (np.array(noise) - np.array(baselines)) / 3 698 l_signal_to_noise = np.array(magnitude) / array_noise_std 699 700 d_params["baseline_noise"] = np.average(array_noise_std) 701 702 d_params["baseline_noise_std"] = np.std(array_noise_std) 703 704 data_dict = { 705 Labels.mz: mz, 706 Labels.abundance: magnitude, 707 Labels.rp: rp, 708 Labels.s2n: list(l_signal_to_noise), 709 } 710 711 mass_spec = MassSpecCentroid(data_dict, d_params, auto_process=False) 712 713 return mass_spec 714 715 d_params = self.set_metadata( 716 firstScanNumber=self.start_scan, lastScanNumber=self.end_scan 717 ) 718 719 # Create the mass options object that will be used when averaging the scans 720 options = MassOptions() 721 options.ToleranceUnits = ToleranceUnits.ppm 722 options.Tolerance = ppm_tolerance 723 724 # Get the scan filter for the first scan. This scan filter will be used to located 725 # scans within the given scan range of the same type 726 scanFilter = self.iRawDataPlus.GetFilterForScanNumber(self.start_scan) 727 728 # force it to only look for the MSType 729 scanFilter = self.set_msordertype(scanFilter, ms_type) 730 731 if isinstance(self.scans, tuple): 732 averageScan = Extensions.AverageScansInScanRange( 733 self.iRawDataPlus, self.start_scan, self.end_scan, scanFilter, options 734 ) 735 736 if averageScan: 737 if spectrum_mode == "profile": 738 mass_spec = get_profile_mass_spec( 739 averageScan, d_params, auto_process 740 ) 741 742 return mass_spec 743 744 elif spectrum_mode == "centroid": 745 if averageScan.HasCentroidStream: 746 mass_spec = get_centroid_mass_spec(averageScan, d_params) 747 748 return mass_spec 749 750 else: 751 raise ValueError( 752 "No Centroind data available for the selected scans" 753 ) 754 else: 755 raise ValueError("spectrum_mode must be 'profile' or centroid") 756 else: 757 raise ValueError("No data found for the selected scans") 758 759 elif isinstance(self.scans, list): 760 d_params = self.set_metadata(scans_list=self.scans) 761 762 scans = List[int]() 763 for scan in self.scans: 764 scans.Add(scan) 765 766 averageScan = Extensions.AverageScans(self.iRawDataPlus, scans, options) 767 768 if averageScan: 769 if spectrum_mode == "profile": 770 mass_spec = get_profile_mass_spec( 771 averageScan, d_params, auto_process 772 ) 773 774 return mass_spec 775 776 elif spectrum_mode == "centroid": 777 if averageScan.HasCentroidStream: 778 mass_spec = get_centroid_mass_spec(averageScan, d_params) 779 780 return mass_spec 781 782 else: 783 raise ValueError( 784 "No Centroind data available for the selected scans" 785 ) 786 787 else: 788 raise ValueError("spectrum_mode must be 'profile' or centroid") 789 790 else: 791 raise ValueError("No data found for the selected scans") 792 793 else: 794 raise ValueError("scans must be a list intergers or a tuple if integers")
Averages mass spectra over a scan range using Thermo's AverageScansInScanRange method or a scan list using Thermo's AverageScans method spectrum_mode: str centroid or profile mass spectrum auto_process: bool If true performs peak picking, and noise threshold calculation after creation of mass spectrum object ms_type: str String of form 'ms1' or 'ms2' or 'MS3' etc. Valid up to MS10. Internal function converts to Thermo MSOrderType class.
796 def set_metadata( 797 self, 798 firstScanNumber=0, 799 lastScanNumber=0, 800 scans_list=False, 801 label=Labels.thermo_profile, 802 ): 803 """ 804 Collect metadata to be ingested in the mass spectrum object 805 806 scans_list: list[int] or false 807 lastScanNumber: int 808 firstScanNumber: int 809 """ 810 811 d_params = default_parameters(self.file_path) 812 813 # assumes scans is full scan or reduced profile scan 814 815 d_params["label"] = label 816 817 if scans_list: 818 d_params["scan_number"] = scans_list 819 820 d_params["polarity"] = self.get_polarity_mode(scans_list[0]) 821 822 else: 823 d_params["scan_number"] = "{}-{}".format(firstScanNumber, lastScanNumber) 824 825 d_params["polarity"] = self.get_polarity_mode(firstScanNumber) 826 827 d_params["analyzer"] = self.iRawDataPlus.GetInstrumentData().Model 828 829 d_params["acquisition_time"] = self.get_creation_time() 830 831 d_params["instrument_label"] = self.iRawDataPlus.GetInstrumentData().Name 832 833 return d_params
Collect metadata to be ingested in the mass spectrum object
scans_list: list[int] or false lastScanNumber: int firstScanNumber: int
835 def get_instrument_methods(self, parse_strings: bool = True): 836 """ 837 This function will extract the instrument methods embedded in the raw file 838 839 First it will check if there are any instrument methods, if not returning None 840 Then it will get the total number of instrument methods. 841 For each method, it will extract the plaintext string of the method and attempt to parse it into a dictionary 842 If this fails, it will return just the string object. 843 844 This has been tested on data from an Orbitrap ID-X with embedded MS and LC methods, but other instrument types may fail. 845 846 Parameters: 847 ----------- 848 parse_strings: bool 849 If True, will attempt to parse the instrument methods into a dictionary. If False, will return the raw string. 850 851 Returns: 852 -------- 853 List[Dict[str, Any]] or List 854 A list of dictionaries containing the instrument methods, or a list of strings if parsing fails. 855 """ 856 857 if not self.iRawDataPlus.HasInstrumentMethod: 858 raise ValueError( 859 "Raw Data file does not have any instrument methods attached" 860 ) 861 return None 862 else: 863 864 def parse_instrument_method(data): 865 lines = data.split("\r\n") 866 method = {} 867 current_section = None 868 sub_section = None 869 870 for line in lines: 871 if not line.strip(): # Skip empty lines 872 continue 873 if ( 874 line.startswith("----") 875 or line.endswith("Settings") 876 or line.endswith("Summary") 877 or line.startswith("Experiment") 878 or line.startswith("Scan Event") 879 ): 880 current_section = line.replace("-", "").strip() 881 method[current_section] = {} 882 sub_section = None 883 elif line.startswith("\t"): 884 if "\t\t" in line: 885 indent_level = line.count("\t") 886 key_value = line.strip() 887 888 if indent_level == 2: 889 if sub_section: 890 key, value = ( 891 key_value.split("=", 1) 892 if "=" in key_value 893 else (key_value, None) 894 ) 895 method[current_section][sub_section][ 896 key.strip() 897 ] = value.strip() if value else None 898 elif indent_level == 3: 899 scan_type, key_value = ( 900 key_value.split(" ", 1) 901 if " " in key_value 902 else (key_value, None) 903 ) 904 method.setdefault(current_section, {}).setdefault( 905 sub_section, {} 906 ).setdefault(scan_type, {}) 907 908 if key_value: 909 key, value = ( 910 key_value.split("=", 1) 911 if "=" in key_value 912 else (key_value, None) 913 ) 914 method[current_section][sub_section][scan_type][ 915 key.strip() 916 ] = value.strip() if value else None 917 else: 918 key_value = line.strip() 919 if "=" in key_value: 920 key, value = key_value.split("=", 1) 921 method.setdefault(current_section, {})[key.strip()] = ( 922 value.strip() 923 ) 924 else: 925 sub_section = key_value 926 else: 927 if ":" in line: 928 key, value = line.split(":", 1) 929 method[current_section][key.strip()] = value.strip() 930 else: 931 method[current_section][line] = {} 932 933 return method 934 935 count_instrument_methods = self.iRawDataPlus.InstrumentMethodsCount 936 # TODO make this code better... 937 instrument_methods = [] 938 for i in range(count_instrument_methods): 939 instrument_method_string = self.iRawDataPlus.GetInstrumentMethod(i) 940 if parse_strings: 941 try: 942 instrument_method_dict = parse_instrument_method( 943 instrument_method_string 944 ) 945 except: # if it fails for any reason 946 instrument_method_dict = instrument_method_string 947 else: 948 instrument_method_dict = instrument_method_string 949 instrument_methods.append(instrument_method_dict) 950 return instrument_methods
This function will extract the instrument methods embedded in the raw file
First it will check if there are any instrument methods, if not returning None Then it will get the total number of instrument methods. For each method, it will extract the plaintext string of the method and attempt to parse it into a dictionary If this fails, it will return just the string object.
This has been tested on data from an Orbitrap ID-X with embedded MS and LC methods, but other instrument types may fail.
Parameters:
parse_strings: bool If True, will attempt to parse the instrument methods into a dictionary. If False, will return the raw string.
Returns:
List[Dict[str, Any]] or List A list of dictionaries containing the instrument methods, or a list of strings if parsing fails.
952 def get_tune_method(self): 953 """ 954 This code will extract the tune method from the raw file 955 It has been tested on data from a Thermo Orbitrap ID-X, Astral and Q-Exactive, but may fail on other instrument types. 956 It attempts to parse out section headers and sub-sections, but may not work for all instrument types. 957 It will also not return Labels (keys) where the value is blank 958 959 Returns: 960 -------- 961 Dict[str, Any] 962 A dictionary containing the tune method information 963 964 Raises: 965 ------- 966 ValueError 967 If no tune methods are found in the raw file 968 969 """ 970 tunemethodcount = self.iRawDataPlus.GetTuneDataCount() 971 if tunemethodcount == 0: 972 raise ValueError("No tune methods found in the raw data file") 973 return None 974 elif tunemethodcount > 1: 975 warnings.warn( 976 "Multiple tune methods found in the raw data file, returning the 1st" 977 ) 978 979 header = self.iRawDataPlus.GetTuneData(0) 980 981 header_dic = {} 982 current_section = None 983 984 for i in range(header.Length): 985 label = header.Labels[i] 986 value = header.Values[i] 987 988 # Check for section headers 989 if "===" in label or ( 990 (value == "" or value is None) and not label.endswith(":") 991 ): 992 # This is a section header 993 section_name = ( 994 label.replace("=", "").replace(":", "").strip() 995 ) # Clean the label if it contains '=' 996 header_dic[section_name] = {} 997 current_section = section_name 998 else: 999 if current_section: 1000 header_dic[current_section][label] = value 1001 else: 1002 header_dic[label] = value 1003 return header_dic
This code will extract the tune method from the raw file It has been tested on data from a Thermo Orbitrap ID-X, Astral and Q-Exactive, but may fail on other instrument types. It attempts to parse out section headers and sub-sections, but may not work for all instrument types. It will also not return Labels (keys) where the value is blank
Returns:
Dict[str, Any] A dictionary containing the tune method information
Raises:
ValueError If no tune methods are found in the raw file
1005 def get_status_log(self, retention_time: float = 0): 1006 """ 1007 This code will extract the status logs from the raw file 1008 It has been tested on data from a Thermo Orbitrap ID-X, Astral and Q-Exactive, but may fail on other instrument types. 1009 It attempts to parse out section headers and sub-sections, but may not work for all instrument types. 1010 It will also not return Labels (keys) where the value is blank 1011 1012 Parameters: 1013 ----------- 1014 retention_time: float 1015 The retention time in minutes to extract the status log data from. 1016 Will use the closest retention time found. Default 0. 1017 1018 Returns: 1019 -------- 1020 Dict[str, Any] 1021 A dictionary containing the status log information 1022 1023 Raises: 1024 ------- 1025 ValueError 1026 If no status logs are found in the raw file 1027 1028 """ 1029 tunemethodcount = self.iRawDataPlus.GetStatusLogEntriesCount() 1030 if tunemethodcount == 0: 1031 raise ValueError("No status logs found in the raw data file") 1032 return None 1033 1034 header = self.iRawDataPlus.GetStatusLogForRetentionTime(retention_time) 1035 1036 header_dic = {} 1037 current_section = None 1038 1039 for i in range(header.Length): 1040 label = header.Labels[i] 1041 value = header.Values[i] 1042 1043 # Check for section headers 1044 if "===" in label or ( 1045 (value == "" or value is None) and not label.endswith(":") 1046 ): 1047 # This is a section header 1048 section_name = ( 1049 label.replace("=", "").replace(":", "").strip() 1050 ) # Clean the label if it contains '=' 1051 header_dic[section_name] = {} 1052 current_section = section_name 1053 else: 1054 if current_section: 1055 header_dic[current_section][label] = value 1056 else: 1057 header_dic[label] = value 1058 return header_dic
This code will extract the status logs from the raw file It has been tested on data from a Thermo Orbitrap ID-X, Astral and Q-Exactive, but may fail on other instrument types. It attempts to parse out section headers and sub-sections, but may not work for all instrument types. It will also not return Labels (keys) where the value is blank
Parameters:
retention_time: float The retention time in minutes to extract the status log data from. Will use the closest retention time found. Default 0.
Returns:
Dict[str, Any] A dictionary containing the status log information
Raises:
ValueError If no status logs are found in the raw file
1060 def get_error_logs(self): 1061 """ 1062 This code will extract the error logs from the raw file 1063 1064 Returns: 1065 -------- 1066 Dict[float, str] 1067 A dictionary containing the error log information with the retention time as the key 1068 1069 Raises: 1070 ------- 1071 ValueError 1072 If no error logs are found in the raw file 1073 """ 1074 1075 error_log_count = self.iRawDataPlus.RunHeaderEx.ErrorLogCount 1076 if error_log_count == 0: 1077 raise ValueError("No error logs found in the raw data file") 1078 return None 1079 1080 error_logs = {} 1081 1082 for i in range(error_log_count): 1083 error_log_item = self.iRawDataPlus.GetErrorLogItem(i) 1084 rt = error_log_item.RetentionTime 1085 message = error_log_item.Message 1086 # Use the index `i` as the unique ID key 1087 error_logs[i] = {"rt": rt, "message": message} 1088 return error_logs
This code will extract the error logs from the raw file
Returns:
Dict[float, str] A dictionary containing the error log information with the retention time as the key
Raises:
ValueError If no error logs are found in the raw file
1090 def get_sample_information(self): 1091 """ 1092 This code will extract the sample information from the raw file 1093 1094 Returns: 1095 -------- 1096 Dict[str, Any] 1097 A dictionary containing the sample information 1098 Note that UserText field may not be handled properly and may need further processing 1099 """ 1100 sminfo = self.iRawDataPlus.SampleInformation 1101 smdict = {} 1102 smdict["Comment"] = sminfo.Comment 1103 smdict["SampleId"] = sminfo.SampleId 1104 smdict["SampleName"] = sminfo.SampleName 1105 smdict["Vial"] = sminfo.Vial 1106 smdict["InjectionVolume"] = sminfo.InjectionVolume 1107 smdict["Barcode"] = sminfo.Barcode 1108 smdict["BarcodeStatus"] = str(sminfo.BarcodeStatus) 1109 smdict["CalibrationLevel"] = sminfo.CalibrationLevel 1110 smdict["DilutionFactor"] = sminfo.DilutionFactor 1111 smdict["InstrumentMethodFile"] = sminfo.InstrumentMethodFile 1112 smdict["RawFileName"] = sminfo.RawFileName 1113 smdict["CalibrationFile"] = sminfo.CalibrationFile 1114 smdict["IstdAmount"] = sminfo.IstdAmount 1115 smdict["RowNumber"] = sminfo.RowNumber 1116 smdict["Path"] = sminfo.Path 1117 smdict["ProcessingMethodFile"] = sminfo.ProcessingMethodFile 1118 smdict["SampleType"] = str(sminfo.SampleType) 1119 smdict["SampleWeight"] = sminfo.SampleWeight 1120 smdict["UserText"] = { 1121 "UserText": [x for x in sminfo.UserText] 1122 } # [0] #This may not work - needs debugging with 1123 return smdict
This code will extract the sample information from the raw file
Returns:
Dict[str, Any] A dictionary containing the sample information Note that UserText field may not be handled properly and may need further processing
1125 def get_instrument_data(self): 1126 """ 1127 This code will extract the instrument data from the raw file 1128 1129 Returns: 1130 -------- 1131 Dict[str, Any] 1132 A dictionary containing the instrument data 1133 """ 1134 instrument_data = self.iRawDataPlus.GetInstrumentData() 1135 id_dict = {} 1136 id_dict["Name"] = instrument_data.Name 1137 id_dict["Model"] = instrument_data.Model 1138 id_dict["SerialNumber"] = instrument_data.SerialNumber 1139 id_dict["SoftwareVersion"] = instrument_data.SoftwareVersion 1140 id_dict["HardwareVersion"] = instrument_data.HardwareVersion 1141 id_dict["ChannelLabels"] = { 1142 "ChannelLabels": [x for x in instrument_data.ChannelLabels] 1143 } 1144 id_dict["Flags"] = instrument_data.Flags 1145 id_dict["AxisLabelY"] = instrument_data.AxisLabelY 1146 id_dict["AxisLabelX"] = instrument_data.AxisLabelX 1147 return id_dict
This code will extract the instrument data from the raw file
Returns:
Dict[str, Any] A dictionary containing the instrument data
1149 def get_centroid_msms_data(self, scan): 1150 """ 1151 .. deprecated:: 2.0 1152 This function will be removed in CoreMS 2.0. Please use `get_average_mass_spectrum()` instead for similar functionality. 1153 """ 1154 1155 warnings.warn( 1156 "The `get_centroid_msms_data()` is deprecated as of CoreMS 2.0 and will be removed in a future version. " 1157 "Please use `get_average_mass_spectrum()` instead.", 1158 DeprecationWarning, 1159 ) 1160 1161 d_params = self.set_metadata(scans_list=[scan], label=Labels.thermo_centroid) 1162 1163 centroidStream = self.iRawDataPlus.GetCentroidStream(scan, False) 1164 1165 noise = list(centroidStream.Noises) 1166 1167 baselines = list(centroidStream.Baselines) 1168 1169 rp = list(centroidStream.Resolutions) 1170 1171 magnitude = list(centroidStream.Intensities) 1172 1173 mz = list(centroidStream.Masses) 1174 1175 # charge = scans_labels[5] 1176 array_noise_std = (np.array(noise) - np.array(baselines)) / 3 1177 l_signal_to_noise = np.array(magnitude) / array_noise_std 1178 1179 d_params["baseline_noise"] = np.average(array_noise_std) 1180 1181 d_params["baseline_noise_std"] = np.std(array_noise_std) 1182 1183 data_dict = { 1184 Labels.mz: mz, 1185 Labels.abundance: magnitude, 1186 Labels.rp: rp, 1187 Labels.s2n: list(l_signal_to_noise), 1188 } 1189 1190 mass_spec = MassSpecCentroid(data_dict, d_params, auto_process=False) 1191 mass_spec.settings.noise_threshold_method = "relative_abundance" 1192 mass_spec.settings.noise_threshold_min_relative_abundance = 1 1193 mass_spec.process_mass_spec() 1194 return mass_spec
Deprecated since version 2.0:
This function will be removed in CoreMS 2.0. Please use get_average_mass_spectrum()
instead for similar functionality.
1196 def get_average_mass_spectrum_by_scanlist( 1197 self, 1198 scans_list: List[int], 1199 auto_process: bool = True, 1200 ppm_tolerance: float = 5.0, 1201 ) -> MassSpecProfile: 1202 """ 1203 Averages selected scans mass spectra using Thermo's AverageScans method 1204 scans_list: list[int] 1205 auto_process: bool 1206 If true performs peak picking, and noise threshold calculation after creation of mass spectrum object 1207 Returns: 1208 MassSpecProfile 1209 1210 .. deprecated:: 2.0 1211 This function will be removed in CoreMS 2.0. Please use `get_average_mass_spectrum()` instead for similar functionality. 1212 """ 1213 1214 warnings.warn( 1215 "The `get_average_mass_spectrum_by_scanlist()` is deprecated as of CoreMS 2.0 and will be removed in a future version. " 1216 "Please use `get_average_mass_spectrum()` instead.", 1217 DeprecationWarning, 1218 ) 1219 1220 d_params = self.set_metadata(scans_list=scans_list) 1221 1222 # assumes scans is full scan or reduced profile scan 1223 1224 scans = List[int]() 1225 for scan in scans_list: 1226 scans.Add(scan) 1227 1228 # Create the mass options object that will be used when averaging the scans 1229 options = MassOptions() 1230 options.ToleranceUnits = ToleranceUnits.ppm 1231 options.Tolerance = ppm_tolerance 1232 1233 # Get the scan filter for the first scan. This scan filter will be used to located 1234 # scans within the given scan range of the same type 1235 1236 averageScan = Extensions.AverageScans(self.iRawDataPlus, scans, options) 1237 1238 len_data = averageScan.SegmentedScan.Positions.Length 1239 1240 mz_list = list(averageScan.SegmentedScan.Positions) 1241 abund_list = list(averageScan.SegmentedScan.Intensities) 1242 1243 data_dict = { 1244 Labels.mz: mz_list, 1245 Labels.abundance: abund_list, 1246 } 1247 1248 mass_spec = MassSpecProfile(data_dict, d_params, auto_process=auto_process) 1249 1250 return mass_spec
Averages selected scans mass spectra using Thermo's AverageScans method scans_list: list[int] auto_process: bool If true performs peak picking, and noise threshold calculation after creation of mass spectrum object Returns: MassSpecProfile
Deprecated since version 2.0.
This function will be removed in CoreMS 2.0. Please use get_average_mass_spectrum()
instead for similar functionality.
1253class ImportMassSpectraThermoMSFileReader(ThermoBaseClass, SpectraParserInterface): 1254 """A class for parsing Thermo RAW mass spectrometry data files and instatiating MassSpectraBase or LCMSBase objects 1255 1256 Parameters 1257 ---------- 1258 file_location : str or Path 1259 The path to the RAW file to be parsed. 1260 analyzer : str, optional 1261 The type of mass analyzer used in the instrument. Default is "Unknown". 1262 instrument_label : str, optional 1263 The name of the instrument used to acquire the data. Default is "Unknown". 1264 sample_name : str, optional 1265 The name of the sample being analyzed. If not provided, the stem of the file_location path will be used. 1266 1267 Attributes 1268 ---------- 1269 file_location : Path 1270 The path to the RAW file being parsed. 1271 analyzer : str 1272 The type of mass analyzer used in the instrument. 1273 instrument_label : str 1274 The name of the instrument used to acquire the data. 1275 sample_name : str 1276 The name of the sample being analyzed. 1277 1278 Methods 1279 ------- 1280 * run(spectra=True). 1281 Parses the RAW file and returns a dictionary of mass spectra dataframes and a scan metadata dataframe. 1282 * get_mass_spectrum_from_scan(scan_number, polarity, auto_process=True) 1283 Parses the RAW file and returns a MassSpecBase object from a single scan. 1284 * get_mass_spectra_obj(). 1285 Parses the RAW file and instantiates a MassSpectraBase object. 1286 * get_lcms_obj(). 1287 Parses the RAW file and instantiates an LCMSBase object. 1288 * get_icr_transient_times(). 1289 Return a list for transient time targets for all scans, or selected scans range 1290 1291 Inherits from ThermoBaseClass and SpectraParserInterface 1292 """ 1293 1294 def __init__( 1295 self, 1296 file_location, 1297 analyzer="Unknown", 1298 instrument_label="Unknown", 1299 sample_name=None, 1300 ): 1301 super().__init__(file_location) 1302 if isinstance(file_location, str): 1303 # if obj is a string it defaults to create a Path obj, pass the S3Path if needed 1304 file_location = Path(file_location) 1305 if not file_location.exists(): 1306 raise FileExistsError("File does not exist: " + str(file_location)) 1307 1308 self.file_location = file_location 1309 self.analyzer = analyzer 1310 self.instrument_label = instrument_label 1311 1312 if sample_name: 1313 self.sample_name = sample_name 1314 else: 1315 self.sample_name = file_location.stem 1316 1317 def load(self): 1318 pass 1319 1320 def get_scan_df(self): 1321 # This automatically brings in all the data 1322 self.chromatogram_settings.scans = (-1, -1) 1323 1324 # Get scan df info; starting with TIC data 1325 tic_data, _ = self.get_tic(ms_type="all", peak_detection=False, smooth=False) 1326 tic_data = { 1327 "scan": tic_data.scans, 1328 "scan_time": tic_data.time, 1329 "tic": tic_data.tic, 1330 } 1331 scan_df = pd.DataFrame.from_dict(tic_data) 1332 scan_df["ms_level"] = None 1333 1334 # get scan text 1335 scan_filter_df = pd.DataFrame.from_dict( 1336 self.get_all_filters()[0], orient="index" 1337 ) 1338 scan_filter_df.reset_index(inplace=True) 1339 scan_filter_df.rename(columns={"index": "scan", 0: "scan_text"}, inplace=True) 1340 1341 scan_df = scan_df.merge(scan_filter_df, on="scan", how="left") 1342 scan_df["scan_window_lower"] = scan_df.scan_text.str.extract( 1343 r"\[(\d+\.\d+)-\d+\.\d+\]" 1344 ) 1345 scan_df["scan_window_upper"] = scan_df.scan_text.str.extract( 1346 r"\[\d+\.\d+-(\d+\.\d+)\]" 1347 ) 1348 scan_df["polarity"] = np.where( 1349 scan_df.scan_text.str.contains(" - "), "negative", "positive" 1350 ) 1351 scan_df["precursor_mz"] = scan_df.scan_text.str.extract(r"(\d+\.\d+)@") 1352 scan_df["precursor_mz"] = scan_df["precursor_mz"].astype(float) 1353 1354 # Assign each scan as centroid or profile and add ms_level 1355 scan_df["ms_format"] = None 1356 for i in scan_df.scan.to_list(): 1357 scan_df.loc[scan_df.scan == i, "ms_level"] = self.get_ms_level_for_scan_num(i) 1358 if self.iRawDataPlus.IsCentroidScanFromScanNumber(i): 1359 scan_df.loc[scan_df.scan == i, "ms_format"] = "centroid" 1360 else: 1361 scan_df.loc[scan_df.scan == i, "ms_format"] = "profile" 1362 1363 return scan_df 1364 1365 def get_ms_raw(self, spectra, scan_df): 1366 if spectra == "all": 1367 scan_df_forspec = scan_df 1368 elif spectra == "ms1": 1369 scan_df_forspec = scan_df[scan_df.ms_level == 1] 1370 elif spectra == "ms2": 1371 scan_df_forspec = scan_df[scan_df.ms_level == 2] 1372 else: 1373 raise ValueError("spectra must be 'none', 'all', 'ms1', or 'ms2'") 1374 1375 # Result container 1376 res = {} 1377 1378 # Row count container 1379 counter = {} 1380 1381 # Column name container 1382 cols = {} 1383 1384 # set at float32 1385 dtype = np.float32 1386 1387 # First pass: get nrows 1388 N = defaultdict(lambda: 0) 1389 for i in scan_df_forspec.scan.to_list(): 1390 level = scan_df_forspec.loc[scan_df_forspec.scan == i, "ms_level"].values[0] 1391 scanStatistics = self.iRawDataPlus.GetScanStatsForScanNumber(i) 1392 profileStream = self.iRawDataPlus.GetSegmentedScanFromScanNumber( 1393 i, scanStatistics 1394 ) 1395 abun = list(profileStream.Intensities) 1396 abun = np.array(abun)[np.where(np.array(abun) > 0)[0]] 1397 1398 N[level] += len(abun) 1399 1400 # Second pass: parse 1401 for i in scan_df_forspec.scan.to_list(): 1402 scanStatistics = self.iRawDataPlus.GetScanStatsForScanNumber(i) 1403 profileStream = self.iRawDataPlus.GetSegmentedScanFromScanNumber( 1404 i, scanStatistics 1405 ) 1406 abun = list(profileStream.Intensities) 1407 mz = list(profileStream.Positions) 1408 1409 # Get index of abun that are > 0 1410 inx = np.where(np.array(abun) > 0)[0] 1411 mz = np.array(mz)[inx] 1412 mz = np.float32(mz) 1413 abun = np.array(abun)[inx] 1414 abun = np.float32(abun) 1415 1416 level = scan_df_forspec.loc[scan_df_forspec.scan == i, "ms_level"].values[0] 1417 1418 # Number of rows 1419 n = len(mz) 1420 1421 # No measurements 1422 if n == 0: 1423 continue 1424 1425 # Dimension check 1426 if len(mz) != len(abun): 1427 warnings.warn("m/z and intensity array dimension mismatch") 1428 continue 1429 1430 # Scan/frame info 1431 id_dict = i 1432 1433 # Columns 1434 cols[level] = ["scan", "mz", "intensity"] 1435 m = len(cols[level]) 1436 1437 # Subarray init 1438 arr = np.empty((n, m), dtype=dtype) 1439 inx = 0 1440 1441 # Populate scan/frame info 1442 arr[:, inx] = i 1443 inx += 1 1444 1445 # Populate m/z 1446 arr[:, inx] = mz 1447 inx += 1 1448 1449 # Populate intensity 1450 arr[:, inx] = abun 1451 inx += 1 1452 1453 # Initialize output container 1454 if level not in res: 1455 res[level] = np.empty((N[level], m), dtype=dtype) 1456 counter[level] = 0 1457 1458 # Insert subarray 1459 res[level][counter[level] : counter[level] + n, :] = arr 1460 counter[level] += n 1461 1462 # Construct ms1 and ms2 mz dataframes 1463 for level in res.keys(): 1464 res[level] = pd.DataFrame(res[level]) 1465 res[level].columns = cols[level] 1466 # rename keys in res to add 'ms' prefix 1467 res = {f"ms{key}": value for key, value in res.items()} 1468 1469 return res 1470 1471 def run(self, spectra="all", scan_df=None): 1472 """ 1473 Extracts mass spectra data from a raw file. 1474 1475 Parameters 1476 ---------- 1477 spectra : str, optional 1478 Which mass spectra data to include in the output. Default is all. Other options: none, ms1, ms2. 1479 scan_df : pandas.DataFrame, optional 1480 Scan dataframe. If not provided, the scan dataframe is created from the mzML file. 1481 1482 Returns 1483 ------- 1484 tuple 1485 A tuple containing two elements: 1486 - A dictionary containing mass spectra data, separated by MS level. 1487 - A pandas DataFrame containing scan information, including scan number, scan time, TIC, MS level, 1488 scan text, scan window lower and upper bounds, polarity, and precursor m/z (if applicable). 1489 """ 1490 # Prepare scan_df 1491 if scan_df is None: 1492 scan_df = self.get_scan_df() 1493 1494 # Prepare mass spectra data 1495 if spectra != "none": 1496 res = self.get_ms_raw(spectra=spectra, scan_df=scan_df) 1497 else: 1498 res = None 1499 1500 return res, scan_df 1501 1502 def get_mass_spectrum_from_scan( 1503 self, scan_number, spectrum_mode, auto_process=True 1504 ): 1505 """Instatiate a MassSpecBase object from a single scan number from the binary file, currently only supports profile mode. 1506 1507 Parameters 1508 ---------- 1509 scan_number : int 1510 The scan number to extract the mass spectrum from. 1511 polarity : int 1512 The polarity of the scan. 1 for positive mode, -1 for negative mode. 1513 spectrum_mode : str 1514 The type of mass spectrum to extract. Must be 'profile' or 'centroid'. 1515 auto_process : bool, optional 1516 If True, perform peak picking and noise threshold calculation after creating the mass spectrum object. Default is True. 1517 1518 Returns 1519 ------- 1520 MassSpecProfile | MassSpecCentroid 1521 The MassSpecProfile or MassSpecCentroid object containing the parsed mass spectrum. 1522 """ 1523 1524 if spectrum_mode == "profile": 1525 scanStatistics = self.iRawDataPlus.GetScanStatsForScanNumber(scan_number) 1526 profileStream = self.iRawDataPlus.GetSegmentedScanFromScanNumber( 1527 scan_number, scanStatistics 1528 ) 1529 abun = list(profileStream.Intensities) 1530 mz = list(profileStream.Positions) 1531 data_dict = { 1532 Labels.mz: mz, 1533 Labels.abundance: abun, 1534 } 1535 d_params = self.set_metadata( 1536 firstScanNumber=scan_number, 1537 lastScanNumber=scan_number, 1538 scans_list=False, 1539 label=Labels.thermo_profile, 1540 ) 1541 mass_spectrum_obj = MassSpecProfile( 1542 data_dict, d_params, auto_process=auto_process 1543 ) 1544 1545 elif spectrum_mode == "centroid": 1546 centroid_scan = self.iRawDataPlus.GetCentroidStream(scan_number, False) 1547 if centroid_scan.Masses is not None: 1548 mz = list(centroid_scan.Masses) 1549 abun = list(centroid_scan.Intensities) 1550 rp = list(centroid_scan.Resolutions) 1551 magnitude = list(centroid_scan.Intensities) 1552 noise = list(centroid_scan.Noises) 1553 baselines = list(centroid_scan.Baselines) 1554 array_noise_std = (np.array(noise) - np.array(baselines)) / 3 1555 l_signal_to_noise = np.array(magnitude) / array_noise_std 1556 data_dict = { 1557 Labels.mz: mz, 1558 Labels.abundance: abun, 1559 Labels.rp: rp, 1560 Labels.s2n: list(l_signal_to_noise), 1561 } 1562 else: # For CID MS2, the centroid data are stored in the profile data location, they do not have any associated rp or baseline data, but they should be treated as centroid data 1563 scanStatistics = self.iRawDataPlus.GetScanStatsForScanNumber( 1564 scan_number 1565 ) 1566 profileStream = self.iRawDataPlus.GetSegmentedScanFromScanNumber( 1567 scan_number, scanStatistics 1568 ) 1569 abun = list(profileStream.Intensities) 1570 mz = list(profileStream.Positions) 1571 data_dict = { 1572 Labels.mz: mz, 1573 Labels.abundance: abun, 1574 Labels.rp: [np.nan] * len(mz), 1575 Labels.s2n: [np.nan] * len(mz), 1576 } 1577 d_params = self.set_metadata( 1578 firstScanNumber=scan_number, 1579 lastScanNumber=scan_number, 1580 scans_list=False, 1581 label=Labels.thermo_centroid, 1582 ) 1583 mass_spectrum_obj = MassSpecCentroid( 1584 data_dict, d_params, auto_process=auto_process 1585 ) 1586 1587 return mass_spectrum_obj 1588 1589 def get_mass_spectra_obj(self): 1590 """Instatiate a MassSpectraBase object from the binary data file file. 1591 1592 Returns 1593 ------- 1594 MassSpectraBase 1595 The MassSpectra object containing the parsed mass spectra. The object is instatiated with the mzML file, analyzer, instrument, sample name, and scan dataframe. 1596 """ 1597 _, scan_df = self.run(spectra="none") 1598 mass_spectra_obj = MassSpectraBase( 1599 self.file_location, 1600 self.analyzer, 1601 self.instrument_label, 1602 self.sample_name, 1603 self, 1604 ) 1605 scan_df = scan_df.set_index("scan", drop=False) 1606 mass_spectra_obj.scan_df = scan_df 1607 1608 return mass_spectra_obj 1609 1610 def get_lcms_obj(self, spectra="all"): 1611 """Instatiates a LCMSBase object from the mzML file. 1612 1613 Parameters 1614 ---------- 1615 spectra : str, optional 1616 Which mass spectra data to include in the output. Default is "all". Other options: "none", "ms1", "ms2". 1617 1618 Returns 1619 ------- 1620 LCMSBase 1621 LCMS object containing mass spectra data. The object is instatiated with the file location, analyzer, instrument, sample name, scan info, mz dataframe (as specifified), polarity, as well as the attributes holding the scans, retention times, and tics. 1622 """ 1623 _, scan_df = self.run(spectra="none") # first run it to just get scan info 1624 res, scan_df = self.run( 1625 scan_df=scan_df, spectra=spectra 1626 ) # second run to parse data 1627 lcms_obj = LCMSBase( 1628 self.file_location, 1629 self.analyzer, 1630 self.instrument_label, 1631 self.sample_name, 1632 self, 1633 ) 1634 if spectra != "none": 1635 for key in res: 1636 key_int = int(key.replace("ms", "")) 1637 res[key] = res[key][res[key].intensity > 0] 1638 res[key] = ( 1639 res[key].sort_values(by=["scan", "mz"]).reset_index(drop=True) 1640 ) 1641 lcms_obj._ms_unprocessed[key_int] = res[key] 1642 lcms_obj.scan_df = scan_df.set_index("scan", drop=False) 1643 # Check if polarity is mixed 1644 if len(set(scan_df.polarity)) > 1: 1645 raise ValueError("Mixed polarities detected in scan data") 1646 lcms_obj.polarity = scan_df.polarity[0] 1647 lcms_obj._scans_number_list = list(scan_df.scan) 1648 lcms_obj._retention_time_list = list(scan_df.scan_time) 1649 lcms_obj._tic_list = list(scan_df.tic) 1650 1651 return lcms_obj 1652 1653 def get_icr_transient_times(self): 1654 """Return a list for transient time targets for all scans, or selected scans range 1655 1656 Notes 1657 -------- 1658 Resolving Power and Transient time targets based on 7T FT-ICR MS system 1659 """ 1660 1661 res_trans_time = { 1662 "50": 0.384, 1663 "100000": 0.768, 1664 "200000": 1.536, 1665 "400000": 3.072, 1666 "750000": 6.144, 1667 "1000000": 12.288, 1668 } 1669 1670 firstScanNumber = self.start_scan 1671 1672 lastScanNumber = self.end_scan 1673 1674 transient_time_list = [] 1675 1676 for scan in range(firstScanNumber, lastScanNumber): 1677 scan_header = self.get_scan_header(scan) 1678 1679 rp_target = scan_header["FT Resolution:"] 1680 1681 transient_time = res_trans_time.get(rp_target) 1682 1683 transient_time_list.append(transient_time) 1684 1685 # print(transient_time, rp_target) 1686 1687 return transient_time_list
A class for parsing Thermo RAW mass spectrometry data files and instatiating MassSpectraBase or LCMSBase objects
Parameters
- file_location (str or Path): The path to the RAW file to be parsed.
- analyzer (str, optional): The type of mass analyzer used in the instrument. Default is "Unknown".
- instrument_label (str, optional): The name of the instrument used to acquire the data. Default is "Unknown".
- sample_name (str, optional): The name of the sample being analyzed. If not provided, the stem of the file_location path will be used.
Attributes
- file_location (Path): The path to the RAW file being parsed.
- analyzer (str): The type of mass analyzer used in the instrument.
- instrument_label (str): The name of the instrument used to acquire the data.
- sample_name (str): The name of the sample being analyzed.
Methods
- run(spectra=True). Parses the RAW file and returns a dictionary of mass spectra dataframes and a scan metadata dataframe.
- get_mass_spectrum_from_scan(scan_number, polarity, auto_process=True) Parses the RAW file and returns a MassSpecBase object from a single scan.
- get_mass_spectra_obj(). Parses the RAW file and instantiates a MassSpectraBase object.
- get_lcms_obj(). Parses the RAW file and instantiates an LCMSBase object.
- get_icr_transient_times(). Return a list for transient time targets for all scans, or selected scans range
Inherits from ThermoBaseClass and SpectraParserInterface
1294 def __init__( 1295 self, 1296 file_location, 1297 analyzer="Unknown", 1298 instrument_label="Unknown", 1299 sample_name=None, 1300 ): 1301 super().__init__(file_location) 1302 if isinstance(file_location, str): 1303 # if obj is a string it defaults to create a Path obj, pass the S3Path if needed 1304 file_location = Path(file_location) 1305 if not file_location.exists(): 1306 raise FileExistsError("File does not exist: " + str(file_location)) 1307 1308 self.file_location = file_location 1309 self.analyzer = analyzer 1310 self.instrument_label = instrument_label 1311 1312 if sample_name: 1313 self.sample_name = sample_name 1314 else: 1315 self.sample_name = file_location.stem
file_location: srt pathlib.Path or s3path.S3Path Thermo Raw file path
1320 def get_scan_df(self): 1321 # This automatically brings in all the data 1322 self.chromatogram_settings.scans = (-1, -1) 1323 1324 # Get scan df info; starting with TIC data 1325 tic_data, _ = self.get_tic(ms_type="all", peak_detection=False, smooth=False) 1326 tic_data = { 1327 "scan": tic_data.scans, 1328 "scan_time": tic_data.time, 1329 "tic": tic_data.tic, 1330 } 1331 scan_df = pd.DataFrame.from_dict(tic_data) 1332 scan_df["ms_level"] = None 1333 1334 # get scan text 1335 scan_filter_df = pd.DataFrame.from_dict( 1336 self.get_all_filters()[0], orient="index" 1337 ) 1338 scan_filter_df.reset_index(inplace=True) 1339 scan_filter_df.rename(columns={"index": "scan", 0: "scan_text"}, inplace=True) 1340 1341 scan_df = scan_df.merge(scan_filter_df, on="scan", how="left") 1342 scan_df["scan_window_lower"] = scan_df.scan_text.str.extract( 1343 r"\[(\d+\.\d+)-\d+\.\d+\]" 1344 ) 1345 scan_df["scan_window_upper"] = scan_df.scan_text.str.extract( 1346 r"\[\d+\.\d+-(\d+\.\d+)\]" 1347 ) 1348 scan_df["polarity"] = np.where( 1349 scan_df.scan_text.str.contains(" - "), "negative", "positive" 1350 ) 1351 scan_df["precursor_mz"] = scan_df.scan_text.str.extract(r"(\d+\.\d+)@") 1352 scan_df["precursor_mz"] = scan_df["precursor_mz"].astype(float) 1353 1354 # Assign each scan as centroid or profile and add ms_level 1355 scan_df["ms_format"] = None 1356 for i in scan_df.scan.to_list(): 1357 scan_df.loc[scan_df.scan == i, "ms_level"] = self.get_ms_level_for_scan_num(i) 1358 if self.iRawDataPlus.IsCentroidScanFromScanNumber(i): 1359 scan_df.loc[scan_df.scan == i, "ms_format"] = "centroid" 1360 else: 1361 scan_df.loc[scan_df.scan == i, "ms_format"] = "profile" 1362 1363 return scan_df
Return scan data as a pandas DataFrame.
1365 def get_ms_raw(self, spectra, scan_df): 1366 if spectra == "all": 1367 scan_df_forspec = scan_df 1368 elif spectra == "ms1": 1369 scan_df_forspec = scan_df[scan_df.ms_level == 1] 1370 elif spectra == "ms2": 1371 scan_df_forspec = scan_df[scan_df.ms_level == 2] 1372 else: 1373 raise ValueError("spectra must be 'none', 'all', 'ms1', or 'ms2'") 1374 1375 # Result container 1376 res = {} 1377 1378 # Row count container 1379 counter = {} 1380 1381 # Column name container 1382 cols = {} 1383 1384 # set at float32 1385 dtype = np.float32 1386 1387 # First pass: get nrows 1388 N = defaultdict(lambda: 0) 1389 for i in scan_df_forspec.scan.to_list(): 1390 level = scan_df_forspec.loc[scan_df_forspec.scan == i, "ms_level"].values[0] 1391 scanStatistics = self.iRawDataPlus.GetScanStatsForScanNumber(i) 1392 profileStream = self.iRawDataPlus.GetSegmentedScanFromScanNumber( 1393 i, scanStatistics 1394 ) 1395 abun = list(profileStream.Intensities) 1396 abun = np.array(abun)[np.where(np.array(abun) > 0)[0]] 1397 1398 N[level] += len(abun) 1399 1400 # Second pass: parse 1401 for i in scan_df_forspec.scan.to_list(): 1402 scanStatistics = self.iRawDataPlus.GetScanStatsForScanNumber(i) 1403 profileStream = self.iRawDataPlus.GetSegmentedScanFromScanNumber( 1404 i, scanStatistics 1405 ) 1406 abun = list(profileStream.Intensities) 1407 mz = list(profileStream.Positions) 1408 1409 # Get index of abun that are > 0 1410 inx = np.where(np.array(abun) > 0)[0] 1411 mz = np.array(mz)[inx] 1412 mz = np.float32(mz) 1413 abun = np.array(abun)[inx] 1414 abun = np.float32(abun) 1415 1416 level = scan_df_forspec.loc[scan_df_forspec.scan == i, "ms_level"].values[0] 1417 1418 # Number of rows 1419 n = len(mz) 1420 1421 # No measurements 1422 if n == 0: 1423 continue 1424 1425 # Dimension check 1426 if len(mz) != len(abun): 1427 warnings.warn("m/z and intensity array dimension mismatch") 1428 continue 1429 1430 # Scan/frame info 1431 id_dict = i 1432 1433 # Columns 1434 cols[level] = ["scan", "mz", "intensity"] 1435 m = len(cols[level]) 1436 1437 # Subarray init 1438 arr = np.empty((n, m), dtype=dtype) 1439 inx = 0 1440 1441 # Populate scan/frame info 1442 arr[:, inx] = i 1443 inx += 1 1444 1445 # Populate m/z 1446 arr[:, inx] = mz 1447 inx += 1 1448 1449 # Populate intensity 1450 arr[:, inx] = abun 1451 inx += 1 1452 1453 # Initialize output container 1454 if level not in res: 1455 res[level] = np.empty((N[level], m), dtype=dtype) 1456 counter[level] = 0 1457 1458 # Insert subarray 1459 res[level][counter[level] : counter[level] + n, :] = arr 1460 counter[level] += n 1461 1462 # Construct ms1 and ms2 mz dataframes 1463 for level in res.keys(): 1464 res[level] = pd.DataFrame(res[level]) 1465 res[level].columns = cols[level] 1466 # rename keys in res to add 'ms' prefix 1467 res = {f"ms{key}": value for key, value in res.items()} 1468 1469 return res
Return a dictionary of mass spectra data as a pandas DataFrame.
1471 def run(self, spectra="all", scan_df=None): 1472 """ 1473 Extracts mass spectra data from a raw file. 1474 1475 Parameters 1476 ---------- 1477 spectra : str, optional 1478 Which mass spectra data to include in the output. Default is all. Other options: none, ms1, ms2. 1479 scan_df : pandas.DataFrame, optional 1480 Scan dataframe. If not provided, the scan dataframe is created from the mzML file. 1481 1482 Returns 1483 ------- 1484 tuple 1485 A tuple containing two elements: 1486 - A dictionary containing mass spectra data, separated by MS level. 1487 - A pandas DataFrame containing scan information, including scan number, scan time, TIC, MS level, 1488 scan text, scan window lower and upper bounds, polarity, and precursor m/z (if applicable). 1489 """ 1490 # Prepare scan_df 1491 if scan_df is None: 1492 scan_df = self.get_scan_df() 1493 1494 # Prepare mass spectra data 1495 if spectra != "none": 1496 res = self.get_ms_raw(spectra=spectra, scan_df=scan_df) 1497 else: 1498 res = None 1499 1500 return res, scan_df
Extracts mass spectra data from a raw file.
Parameters
- spectra (str, optional): Which mass spectra data to include in the output. Default is all. Other options: none, ms1, ms2.
- scan_df (pandas.DataFrame, optional): Scan dataframe. If not provided, the scan dataframe is created from the mzML file.
Returns
- tuple: A tuple containing two elements:
- A dictionary containing mass spectra data, separated by MS level.
- A pandas DataFrame containing scan information, including scan number, scan time, TIC, MS level, scan text, scan window lower and upper bounds, polarity, and precursor m/z (if applicable).
1502 def get_mass_spectrum_from_scan( 1503 self, scan_number, spectrum_mode, auto_process=True 1504 ): 1505 """Instatiate a MassSpecBase object from a single scan number from the binary file, currently only supports profile mode. 1506 1507 Parameters 1508 ---------- 1509 scan_number : int 1510 The scan number to extract the mass spectrum from. 1511 polarity : int 1512 The polarity of the scan. 1 for positive mode, -1 for negative mode. 1513 spectrum_mode : str 1514 The type of mass spectrum to extract. Must be 'profile' or 'centroid'. 1515 auto_process : bool, optional 1516 If True, perform peak picking and noise threshold calculation after creating the mass spectrum object. Default is True. 1517 1518 Returns 1519 ------- 1520 MassSpecProfile | MassSpecCentroid 1521 The MassSpecProfile or MassSpecCentroid object containing the parsed mass spectrum. 1522 """ 1523 1524 if spectrum_mode == "profile": 1525 scanStatistics = self.iRawDataPlus.GetScanStatsForScanNumber(scan_number) 1526 profileStream = self.iRawDataPlus.GetSegmentedScanFromScanNumber( 1527 scan_number, scanStatistics 1528 ) 1529 abun = list(profileStream.Intensities) 1530 mz = list(profileStream.Positions) 1531 data_dict = { 1532 Labels.mz: mz, 1533 Labels.abundance: abun, 1534 } 1535 d_params = self.set_metadata( 1536 firstScanNumber=scan_number, 1537 lastScanNumber=scan_number, 1538 scans_list=False, 1539 label=Labels.thermo_profile, 1540 ) 1541 mass_spectrum_obj = MassSpecProfile( 1542 data_dict, d_params, auto_process=auto_process 1543 ) 1544 1545 elif spectrum_mode == "centroid": 1546 centroid_scan = self.iRawDataPlus.GetCentroidStream(scan_number, False) 1547 if centroid_scan.Masses is not None: 1548 mz = list(centroid_scan.Masses) 1549 abun = list(centroid_scan.Intensities) 1550 rp = list(centroid_scan.Resolutions) 1551 magnitude = list(centroid_scan.Intensities) 1552 noise = list(centroid_scan.Noises) 1553 baselines = list(centroid_scan.Baselines) 1554 array_noise_std = (np.array(noise) - np.array(baselines)) / 3 1555 l_signal_to_noise = np.array(magnitude) / array_noise_std 1556 data_dict = { 1557 Labels.mz: mz, 1558 Labels.abundance: abun, 1559 Labels.rp: rp, 1560 Labels.s2n: list(l_signal_to_noise), 1561 } 1562 else: # For CID MS2, the centroid data are stored in the profile data location, they do not have any associated rp or baseline data, but they should be treated as centroid data 1563 scanStatistics = self.iRawDataPlus.GetScanStatsForScanNumber( 1564 scan_number 1565 ) 1566 profileStream = self.iRawDataPlus.GetSegmentedScanFromScanNumber( 1567 scan_number, scanStatistics 1568 ) 1569 abun = list(profileStream.Intensities) 1570 mz = list(profileStream.Positions) 1571 data_dict = { 1572 Labels.mz: mz, 1573 Labels.abundance: abun, 1574 Labels.rp: [np.nan] * len(mz), 1575 Labels.s2n: [np.nan] * len(mz), 1576 } 1577 d_params = self.set_metadata( 1578 firstScanNumber=scan_number, 1579 lastScanNumber=scan_number, 1580 scans_list=False, 1581 label=Labels.thermo_centroid, 1582 ) 1583 mass_spectrum_obj = MassSpecCentroid( 1584 data_dict, d_params, auto_process=auto_process 1585 ) 1586 1587 return mass_spectrum_obj
Instatiate a MassSpecBase object from a single scan number from the binary file, currently only supports profile mode.
Parameters
- scan_number (int): The scan number to extract the mass spectrum from.
- polarity (int): The polarity of the scan. 1 for positive mode, -1 for negative mode.
- spectrum_mode (str): The type of mass spectrum to extract. Must be 'profile' or 'centroid'.
- auto_process (bool, optional): If True, perform peak picking and noise threshold calculation after creating the mass spectrum object. Default is True.
Returns
- MassSpecProfile | MassSpecCentroid: The MassSpecProfile or MassSpecCentroid object containing the parsed mass spectrum.
1589 def get_mass_spectra_obj(self): 1590 """Instatiate a MassSpectraBase object from the binary data file file. 1591 1592 Returns 1593 ------- 1594 MassSpectraBase 1595 The MassSpectra object containing the parsed mass spectra. The object is instatiated with the mzML file, analyzer, instrument, sample name, and scan dataframe. 1596 """ 1597 _, scan_df = self.run(spectra="none") 1598 mass_spectra_obj = MassSpectraBase( 1599 self.file_location, 1600 self.analyzer, 1601 self.instrument_label, 1602 self.sample_name, 1603 self, 1604 ) 1605 scan_df = scan_df.set_index("scan", drop=False) 1606 mass_spectra_obj.scan_df = scan_df 1607 1608 return mass_spectra_obj
Instatiate a MassSpectraBase object from the binary data file file.
Returns
- MassSpectraBase: The MassSpectra object containing the parsed mass spectra. The object is instatiated with the mzML file, analyzer, instrument, sample name, and scan dataframe.
1610 def get_lcms_obj(self, spectra="all"): 1611 """Instatiates a LCMSBase object from the mzML file. 1612 1613 Parameters 1614 ---------- 1615 spectra : str, optional 1616 Which mass spectra data to include in the output. Default is "all". Other options: "none", "ms1", "ms2". 1617 1618 Returns 1619 ------- 1620 LCMSBase 1621 LCMS object containing mass spectra data. The object is instatiated with the file location, analyzer, instrument, sample name, scan info, mz dataframe (as specifified), polarity, as well as the attributes holding the scans, retention times, and tics. 1622 """ 1623 _, scan_df = self.run(spectra="none") # first run it to just get scan info 1624 res, scan_df = self.run( 1625 scan_df=scan_df, spectra=spectra 1626 ) # second run to parse data 1627 lcms_obj = LCMSBase( 1628 self.file_location, 1629 self.analyzer, 1630 self.instrument_label, 1631 self.sample_name, 1632 self, 1633 ) 1634 if spectra != "none": 1635 for key in res: 1636 key_int = int(key.replace("ms", "")) 1637 res[key] = res[key][res[key].intensity > 0] 1638 res[key] = ( 1639 res[key].sort_values(by=["scan", "mz"]).reset_index(drop=True) 1640 ) 1641 lcms_obj._ms_unprocessed[key_int] = res[key] 1642 lcms_obj.scan_df = scan_df.set_index("scan", drop=False) 1643 # Check if polarity is mixed 1644 if len(set(scan_df.polarity)) > 1: 1645 raise ValueError("Mixed polarities detected in scan data") 1646 lcms_obj.polarity = scan_df.polarity[0] 1647 lcms_obj._scans_number_list = list(scan_df.scan) 1648 lcms_obj._retention_time_list = list(scan_df.scan_time) 1649 lcms_obj._tic_list = list(scan_df.tic) 1650 1651 return lcms_obj
Instatiates a LCMSBase object from the mzML file.
Parameters
- spectra (str, optional): Which mass spectra data to include in the output. Default is "all". Other options: "none", "ms1", "ms2".
Returns
- LCMSBase: LCMS object containing mass spectra data. The object is instatiated with the file location, analyzer, instrument, sample name, scan info, mz dataframe (as specifified), polarity, as well as the attributes holding the scans, retention times, and tics.
1653 def get_icr_transient_times(self): 1654 """Return a list for transient time targets for all scans, or selected scans range 1655 1656 Notes 1657 -------- 1658 Resolving Power and Transient time targets based on 7T FT-ICR MS system 1659 """ 1660 1661 res_trans_time = { 1662 "50": 0.384, 1663 "100000": 0.768, 1664 "200000": 1.536, 1665 "400000": 3.072, 1666 "750000": 6.144, 1667 "1000000": 12.288, 1668 } 1669 1670 firstScanNumber = self.start_scan 1671 1672 lastScanNumber = self.end_scan 1673 1674 transient_time_list = [] 1675 1676 for scan in range(firstScanNumber, lastScanNumber): 1677 scan_header = self.get_scan_header(scan) 1678 1679 rp_target = scan_header["FT Resolution:"] 1680 1681 transient_time = res_trans_time.get(rp_target) 1682 1683 transient_time_list.append(transient_time) 1684 1685 # print(transient_time, rp_target) 1686 1687 return transient_time_list
Return a list for transient time targets for all scans, or selected scans range
Notes
Resolving Power and Transient time targets based on 7T FT-ICR MS system
Inherited Members
- ThermoBaseClass
- iRawDataPlus
- res
- file_path
- iFileHeader
- parameters
- chromatogram_settings
- scans
- start_scan
- end_scan
- set_msordertype
- get_creation_time
- remove_temp_file
- close_file
- get_polarity_mode
- get_filter_for_scan_num
- get_ms_level_for_scan_num
- check_full_scan
- get_all_filters
- get_scan_header
- get_rt_time_from_trace
- get_eics
- get_tic
- get_average_mass_spectrum
- set_metadata
- get_instrument_methods
- get_tune_method
- get_status_log
- get_error_logs
- get_sample_information
- get_instrument_data
- get_centroid_msms_data
- get_average_mass_spectrum_by_scanlist