corems.mass_spectra.input.rawFileReader

   1__author__ = "Yuri E. Corilo"
   2__date__ = "Jun 09, 2021"
   3
   4
   5from warnings import warn
   6import warnings
   7from collections import defaultdict
   8
   9from matplotlib import axes
  10from corems.encapsulation.factory.processingSetting import LiquidChromatographSetting
  11
  12import numpy as np
  13import sys
  14import site
  15from pathlib import Path
  16import datetime
  17import importlib.util
  18import os
  19
  20import clr
  21import pandas as pd
  22from s3path import S3Path
  23
  24
  25from typing import Any, Dict, List, Optional, Tuple
  26from corems.encapsulation.constant import Labels
  27from corems.mass_spectra.factory.lc_class import MassSpectraBase, LCMSBase
  28from corems.mass_spectra.factory.chromat_data import EIC_Data, TIC_Data
  29from corems.mass_spectrum.factory.MassSpectrumClasses import (
  30    MassSpecProfile,
  31    MassSpecCentroid,
  32)
  33from corems.encapsulation.factory.parameters import LCMSParameters, default_parameters
  34from corems.mass_spectra.input.parserbase import SpectraParserInterface
  35
  36# Add the path of the Thermo .NET libraries to the system path
  37spec = importlib.util.find_spec("corems")
  38sys.path.append(str(Path(os.path.dirname(spec.origin)).parent) + "/ext_lib/dotnet/")
  39
  40clr.AddReference("ThermoFisher.CommonCore.RawFileReader")
  41clr.AddReference("ThermoFisher.CommonCore.Data")
  42clr.AddReference("ThermoFisher.CommonCore.MassPrecisionEstimator")
  43
  44from ThermoFisher.CommonCore.RawFileReader import RawFileReaderAdapter
  45from ThermoFisher.CommonCore.Data import ToleranceUnits, Extensions
  46from ThermoFisher.CommonCore.Data.Business import (
  47    ChromatogramTraceSettings,
  48    TraceType,
  49    MassOptions,
  50)
  51from ThermoFisher.CommonCore.Data.Business import ChromatogramSignal, Range
  52from ThermoFisher.CommonCore.Data.Business import Device
  53from ThermoFisher.CommonCore.Data.Interfaces import IChromatogramSettings
  54from ThermoFisher.CommonCore.Data.Business import MassOptions, FileHeaderReaderFactory
  55from ThermoFisher.CommonCore.Data.FilterEnums import MSOrderType
  56from System.Collections.Generic import List
  57
  58
  59class ThermoBaseClass:
  60    """Class for parsing Thermo Raw files and extracting information from them.
  61
  62    Parameters:
  63    -----------
  64    file_location : str or pathlib.Path or s3path.S3Path
  65        Thermo Raw file path or S3 path.
  66
  67    Attributes:
  68    -----------
  69    file_path : str or pathlib.Path or s3path.S3Path
  70        The file path of the Thermo Raw file.
  71    parameters : LCMSParameters
  72        The LCMS parameters for the Thermo Raw file.
  73    chromatogram_settings : LiquidChromatographSetting
  74        The chromatogram settings for the Thermo Raw file.
  75    scans : list or tuple
  76        The selected scans for the Thermo Raw file.
  77    start_scan : int
  78        The starting scan number for the Thermo Raw file.
  79    end_scan : int
  80        The ending scan number for the Thermo Raw file.
  81
  82    Methods:
  83    --------
  84    * set_msordertype(scanFilter, mstype: str = 'ms1') -> scanFilter
  85        Convert the user-passed MS Type string to a Thermo MSOrderType object.
  86    * get_creation_time() -> datetime.datetime
  87        Extract the creation date stamp from the .RAW file and return it as a formatted datetime object.
  88    * remove_temp_file()
  89        Remove the temporary file if the path is from S3Path.
  90    * get_polarity_mode(scan_number: int) -> int
  91        Get the polarity mode for the given scan number.
  92    * get_filter_for_scan_num(scan_number: int) -> List[str]
  93        Get the filter for the given scan number.
  94    * check_full_scan(scan_number: int) -> bool
  95        Check if the given scan number is a full scan.
  96    * get_all_filters() -> Tuple[Dict[int, str], List[str]]
  97        Get all scan filters for the Thermo Raw file.
  98    * get_scan_header(scan: int) -> Dict[str, Any]
  99        Get the full dictionary of scan header metadata for the given scan number.
 100    * get_rt_time_from_trace(trace) -> Tuple[List[float], List[float], List[int]]
 101        Get the retention time, intensity, and scan number from the given trace.
 102    * get_eics(target_mzs: List[float], tic_data: Dict[str, Any], ms_type: str = 'MS !d',
 103             peak_detection: bool = True, smooth: bool = True, plot: bool = False,
 104             ax: Optional[matplotlib.axes.Axes] = None, legend: bool = False) -> Tuple[Dict[float, EIC_Data], matplotlib.axes.Axes]
 105        Get the extracted ion chromatograms (EICs) for the target m/z values.
 106
 107    """
 108
 109    def __init__(self, file_location):
 110        """file_location: srt pathlib.Path or s3path.S3Path
 111        Thermo Raw file path
 112        """
 113        # Thread.__init__(self)
 114        if isinstance(file_location, str):
 115            file_path = Path(file_location)
 116
 117        elif isinstance(file_location, S3Path):
 118            temp_dir = Path("tmp/")
 119            temp_dir.mkdir(exist_ok=True)
 120
 121            file_path = temp_dir / file_location.name
 122            with open(file_path, "wb") as fh:
 123                fh.write(file_location.read_bytes())
 124
 125        else:
 126            file_path = file_location
 127
 128        self.iRawDataPlus = RawFileReaderAdapter.FileFactory(str(file_path))
 129
 130        if not self.iRawDataPlus.IsOpen:
 131            raise FileNotFoundError(
 132                "Unable to access the RAW file using the RawFileReader class!"
 133            )
 134
 135        # Check for any errors in the RAW file
 136        if self.iRawDataPlus.IsError:
 137            raise IOError(
 138                "Error opening ({}) - {}".format(self.iRawDataPlus.FileError, file_path)
 139            )
 140
 141        self.res = self.iRawDataPlus.SelectInstrument(Device.MS, 1)
 142
 143        self.file_path = file_location
 144        self.iFileHeader = FileHeaderReaderFactory.ReadFile(str(file_path))
 145
 146        # removing tmp file
 147
 148        self._init_settings()
 149
 150    def _init_settings(self):
 151        """
 152        Initialize the LCMSParameters object.
 153        """
 154        self._parameters = LCMSParameters()
 155
 156    @property
 157    def parameters(self) -> LCMSParameters:
 158        """
 159        Get or set the LCMSParameters object.
 160        """
 161        return self._parameters
 162
 163    @parameters.setter
 164    def parameters(self, instance_LCMSParameters: LCMSParameters):
 165        self._parameters = instance_LCMSParameters
 166
 167    @property
 168    def chromatogram_settings(self) -> LiquidChromatographSetting:
 169        """
 170        Get or set the LiquidChromatographSetting object.
 171        """
 172        return self.parameters.lc_ms
 173
 174    @chromatogram_settings.setter
 175    def chromatogram_settings(
 176        self, instance_LiquidChromatographSetting: LiquidChromatographSetting
 177    ):
 178        self.parameters.lc_ms = instance_LiquidChromatographSetting
 179
 180    @property
 181    def scans(self) -> list | tuple:
 182        """scans : list or tuple
 183        If list uses Thermo AverageScansInScanRange for selected scans, ortherwise uses Thermo AverageScans for a scan range
 184        """
 185        return self.chromatogram_settings.scans
 186
 187    @property
 188    def start_scan(self) -> int:
 189        """
 190        Get the starting scan number for the Thermo Raw file.
 191        """
 192        if self.scans[0] == -1:
 193            return self.iRawDataPlus.RunHeaderEx.FirstSpectrum
 194        else:
 195            return self.scans[0]
 196
 197    @property
 198    def end_scan(self) -> int:
 199        """
 200        Get the ending scan number for the Thermo Raw file.
 201        """
 202        if self.scans[-1] == -1:
 203            return self.iRawDataPlus.RunHeaderEx.LastSpectrum
 204        else:
 205            return self.scans[-1]
 206
 207    def set_msordertype(self, scanFilter, mstype: str = "ms1"):
 208        """
 209        Function to convert user passed string MS Type to Thermo MSOrderType object
 210        Limited to MS1 through MS10.
 211
 212        Parameters:
 213        -----------
 214        scanFilter : Thermo.ScanFilter
 215            The scan filter object.
 216        mstype : str, optional
 217            The MS Type string, by default 'ms1'
 218
 219        """
 220        mstype = mstype.upper()
 221        # Check that a valid mstype is passed
 222        if (int(mstype.split("MS")[1]) > 10) or (int(mstype.split("MS")[1]) < 1):
 223            warn("MS Type not valid, must be between MS1 and MS10")
 224
 225        msordertypedict = {
 226            "MS1": MSOrderType.Ms,
 227            "MS2": MSOrderType.Ms2,
 228            "MS3": MSOrderType.Ms3,
 229            "MS4": MSOrderType.Ms4,
 230            "MS5": MSOrderType.Ms5,
 231            "MS6": MSOrderType.Ms6,
 232            "MS7": MSOrderType.Ms7,
 233            "MS8": MSOrderType.Ms8,
 234            "MS9": MSOrderType.Ms9,
 235            "MS10": MSOrderType.Ms10,
 236        }
 237        scanFilter.MSOrder = msordertypedict[mstype]
 238        return scanFilter
 239
 240    def get_creation_time(self) -> datetime.datetime:
 241        """
 242        Extract the creation date stamp from the .RAW file
 243        Return formatted creation date stamp.
 244
 245        """
 246        credate = self.iRawDataPlus.CreationDate.get_Ticks()
 247        credate = datetime.datetime(1, 1, 1) + datetime.timedelta(
 248            microseconds=credate / 10
 249        )
 250        return credate
 251
 252    def remove_temp_file(self) -> None:
 253        """if the path is from S3Path data cannot be serialized to io.ByteStream and
 254        a temporary copy is stored at the temp dir
 255        use this function only at the end of your execution scrip
 256        some LCMS class methods depend on this file
 257        """
 258
 259        self.file_path.unlink()
 260
 261    def close_file(self) -> None:
 262        """
 263        Close the Thermo Raw file.
 264        """
 265        self.iRawDataPlus.Dispose()
 266
 267    def get_polarity_mode(self, scan_number: int) -> int:
 268        """
 269        Get the polarity mode for the given scan number.
 270
 271        Parameters:
 272        -----------
 273        scan_number : int
 274            The scan number.
 275
 276        Raises:
 277        -------
 278        Exception
 279            If the polarity mode is unknown.
 280
 281        """
 282        polarity_symbol = self.get_filter_for_scan_num(scan_number)[1]
 283
 284        if polarity_symbol == "+":
 285            return 1
 286            # return 'POSITIVE_ION_MODE'
 287
 288        elif polarity_symbol == "-":
 289            return -1
 290
 291        else:
 292            raise Exception("Polarity Mode Unknown, please set it manually")
 293
 294    def get_filter_for_scan_num(self, scan_number: int) -> List[str]:
 295        """
 296        Returns the closest matching run time that corresponds to scan_number for the current
 297        controller. This function is only supported for MS device controllers.
 298        e.g.  ['FTMS', '-', 'p', 'NSI', 'Full', 'ms', '[200.00-1000.00]']
 299
 300        Parameters:
 301        -----------
 302        scan_number : int
 303            The scan number.
 304
 305        """
 306        scan_label = self.iRawDataPlus.GetScanEventStringForScanNumber(scan_number)
 307
 308        return str(scan_label).split()
 309
 310    def get_ms_level_for_scan_num(self, scan_number: int) -> str:
 311        """
 312        Get the MS order for the given scan number.
 313
 314        Parameters:
 315        -----------
 316        scan_number : int
 317            The scan number
 318
 319        Returns:
 320        --------
 321        int
 322            The MS order type (1 for MS, 2 for MS2, etc.)
 323        """
 324        scan_filter = self.iRawDataPlus.GetFilterForScanNumber(scan_number)
 325
 326        msordertype = {
 327            MSOrderType.Ms: 1,
 328            MSOrderType.Ms2: 2,
 329            MSOrderType.Ms3: 3,
 330            MSOrderType.Ms4: 4,
 331            MSOrderType.Ms5: 5,
 332            MSOrderType.Ms6: 6,
 333            MSOrderType.Ms7: 7,
 334            MSOrderType.Ms8: 8,
 335            MSOrderType.Ms9: 9,
 336            MSOrderType.Ms10: 10,
 337        }
 338
 339        if scan_filter.MSOrder in msordertype:
 340            return msordertype[scan_filter.MSOrder]
 341        else:
 342            raise Exception("MS Order Type not found")
 343    
 344    def check_full_scan(self, scan_number: int) -> bool:
 345        # scan_filter.ScanMode 0 = FULL
 346        scan_filter = self.iRawDataPlus.GetFilterForScanNumber(scan_number)
 347
 348        return scan_filter.ScanMode == MSOrderType.Ms
 349
 350    def get_all_filters(self) -> Tuple[Dict[int, str], List[str]]:
 351        """
 352        Get all scan filters.
 353        This function is only supported for MS device controllers.
 354        e.g.  ['FTMS', '-', 'p', 'NSI', 'Full', 'ms', '[200.00-1000.00]']
 355
 356        """
 357
 358        scanrange = range(self.start_scan, self.end_scan + 1)
 359        scanfiltersdic = {}
 360        scanfilterslist = []
 361        for scan_number in scanrange:
 362            scan_label = self.iRawDataPlus.GetScanEventStringForScanNumber(scan_number)
 363            scanfiltersdic[scan_number] = scan_label
 364            scanfilterslist.append(scan_label)
 365        scanfilterset = list(set(scanfilterslist))
 366        return scanfiltersdic, scanfilterset
 367
 368    def get_scan_header(self, scan: int) -> Dict[str, Any]:
 369        """
 370        Get full dictionary of scan header meta data, i.e. AGC status, ion injection time, etc.
 371
 372        Parameters:
 373        -----------
 374        scan : int
 375            The scan number.
 376
 377        """
 378        header = self.iRawDataPlus.GetTrailerExtraInformation(scan)
 379
 380        header_dic = {}
 381        for i in range(header.Length):
 382            header_dic.update({header.Labels[i]: header.Values[i]})
 383        return header_dic
 384
 385    @staticmethod
 386    def get_rt_time_from_trace(trace) -> Tuple[List[float], List[float], List[int]]:
 387        """trace: ThermoFisher.CommonCore.Data.Business.ChromatogramSignal"""
 388        return list(trace.Times), list(trace.Intensities), list(trace.Scans)
 389
 390    def get_eics(
 391        self,
 392        target_mzs: List[float],
 393        tic_data: Dict[str, Any],
 394        ms_type="MS !d",
 395        peak_detection=False,
 396        smooth=False,
 397        plot=False,
 398        ax: Optional[axes.Axes] = None,
 399        legend=False,
 400    ) -> Tuple[Dict[float, EIC_Data], axes.Axes]:
 401        """ms_type: str ('MS', MS2')
 402        start_scan: int default -1 will select the lowest available
 403        end_scan: int default -1 will select the highest available
 404
 405        returns:
 406
 407            chroma: dict{target_mz: EIC_Data(
 408                                        Scans: [int]
 409                                            original thermo scan numbers
 410                                        Time: [floats]
 411                                            list of retention times
 412                                        TIC: [floats]
 413                                            total ion chromatogram
 414                                        Apexes: [int]
 415                                            original thermo apex scan number after peak picking
 416                                        )
 417
 418        """
 419        # If peak_detection or smooth is True, raise exception
 420        if peak_detection or smooth:
 421            raise Exception("Peak detection and smoothing are no longer implemented in this function")
 422
 423        options = MassOptions()
 424        options.ToleranceUnits = ToleranceUnits.ppm
 425        options.Tolerance = self.chromatogram_settings.eic_tolerance_ppm
 426
 427        all_chroma_settings = []
 428
 429        for target_mz in target_mzs:
 430            settings = ChromatogramTraceSettings(TraceType.MassRange)
 431            settings.Filter = ms_type
 432            settings.MassRanges = [Range(target_mz, target_mz)]
 433
 434            chroma_settings = IChromatogramSettings(settings)
 435
 436            all_chroma_settings.append(chroma_settings)
 437
 438        # chroma_settings2 = IChromatogramSettings(settings)
 439        # print(chroma_settings.FragmentMass)
 440        # print(chroma_settings.FragmentMass)
 441        # print(chroma_settings)
 442        # print(chroma_settings)
 443
 444        data = self.iRawDataPlus.GetChromatogramData(
 445            all_chroma_settings, self.start_scan, self.end_scan, options
 446        )
 447
 448        traces = ChromatogramSignal.FromChromatogramData(data)
 449
 450        chroma = {}
 451
 452        if plot:
 453            from matplotlib.transforms import Bbox
 454            import matplotlib.pyplot as plt
 455
 456            if not ax:
 457                # ax = plt.gca()
 458                # ax.clear()
 459                fig, ax = plt.subplots()
 460
 461            else:
 462                fig = plt.gcf()
 463
 464            # plt.show()
 465
 466        for i, trace in enumerate(traces):
 467            if trace.Length > 0:
 468                rt, eic, scans = self.get_rt_time_from_trace(trace)
 469                if smooth:
 470                    eic = self.smooth_tic(eic)
 471
 472                chroma[target_mzs[i]] = EIC_Data(scans=scans, time=rt, eic=eic)
 473                if plot:
 474                    ax.plot(rt, eic, label="{:.5f}".format(target_mzs[i]))
 475
 476        if peak_detection:
 477            # max_eic = self.get_max_eic(chroma)
 478            max_signal = max(tic_data.tic)
 479
 480            for eic_data in chroma.values():
 481                eic = eic_data.eic
 482                time = eic_data.time
 483
 484                if len(eic) != len(tic_data.tic):
 485                    warn(
 486                        "The software assumes same lenth of TIC and EIC, this does not seems to be the case and the results mass spectrum selected by the scan number might not be correct"
 487                    )
 488
 489                if eic.max() > 0:
 490                    centroid_eics = self.eic_centroid_detector(time, eic, max_signal)
 491                    eic_data.apexes = [i for i in centroid_eics]
 492
 493                    if plot:
 494                        for peak_indexes in eic_data.apexes:
 495                            apex_index = peak_indexes[1]
 496                            ax.plot(
 497                                time[apex_index],
 498                                eic[apex_index],
 499                                marker="x",
 500                                linewidth=0,
 501                            )
 502
 503        if plot:
 504            ax.set_xlabel("Time (min)")
 505            ax.set_ylabel("a.u.")
 506            ax.set_title(ms_type + " EIC")
 507            ax.tick_params(axis="both", which="major", labelsize=12)
 508            ax.axes.spines["top"].set_visible(False)
 509            ax.axes.spines["right"].set_visible(False)
 510
 511            if legend:
 512                legend = ax.legend(loc="upper left", bbox_to_anchor=(1.02, 0, 0.07, 1))
 513                fig.subplots_adjust(right=0.76)
 514                # ax.set_prop_cycle(color=plt.cm.gist_rainbow(np.linspace(0, 1, len(traces))))
 515
 516                d = {"down": 30, "up": -30}
 517
 518                def func(evt):
 519                    if legend.contains(evt):
 520                        bbox = legend.get_bbox_to_anchor()
 521                        bbox = Bbox.from_bounds(
 522                            bbox.x0, bbox.y0 + d[evt.button], bbox.width, bbox.height
 523                        )
 524                        tr = legend.axes.transAxes.inverted()
 525                        legend.set_bbox_to_anchor(bbox.transformed(tr))
 526                        fig.canvas.draw_idle()
 527
 528                fig.canvas.mpl_connect("scroll_event", func)
 529            return chroma, ax
 530        else:
 531            return chroma, None
 532            rt = []
 533            tic = []
 534            scans = []
 535            for i in range(traces[0].Length):
 536                # print(trace[0].HasBasePeakData,trace[0].EndTime )
 537
 538                # print("  {} - {}, {}".format( i, trace[0].Times[i], trace[0].Intensities[i] ))
 539                rt.append(traces[0].Times[i])
 540                tic.append(traces[0].Intensities[i])
 541                scans.append(traces[0].Scans[i])
 542
 543            return traces
 544            # plot_chroma(rt, tic)
 545            # plt.show()
 546
 547    def get_tic(
 548        self,
 549        ms_type="MS !d",
 550        peak_detection=False,  # This wont work right now
 551        smooth=False,  # This wont work right now
 552        plot=False,
 553        ax=None,
 554        trace_type="TIC",
 555    ) -> Tuple[TIC_Data, axes.Axes]:
 556        """ms_type: str ('MS !d', 'MS2', None)
 557            if you use None you get all scans.
 558        peak_detection: bool
 559        smooth: bool
 560        plot: bool
 561        ax: matplotlib axis object
 562        trace_type: str ('TIC','BPC')
 563
 564        returns:
 565            chroma: dict
 566            {
 567            Scan: [int]
 568                original thermo scan numberMS
 569            Time: [floats]
 570                list of retention times
 571            TIC: [floats]
 572                total ion chromatogram
 573            Apexes: [int]
 574                original thermo apex scan number after peak picking
 575            }
 576        """
 577        if trace_type == "TIC":
 578            settings = ChromatogramTraceSettings(TraceType.TIC)
 579        elif trace_type == "BPC":
 580            settings = ChromatogramTraceSettings(TraceType.BasePeak)
 581        else:
 582            raise ValueError(f"{trace_type} undefined")
 583        if ms_type == "all":
 584            settings.Filter = None
 585        else:
 586            settings.Filter = ms_type
 587
 588        chroma_settings = IChromatogramSettings(settings)
 589
 590        data = self.iRawDataPlus.GetChromatogramData(
 591            [chroma_settings], self.start_scan, self.end_scan
 592        )
 593
 594        trace = ChromatogramSignal.FromChromatogramData(data)
 595
 596        data = TIC_Data(time=[], scans=[], tic=[], bpc=[], apexes=[])
 597
 598        if trace[0].Length > 0:
 599            for i in range(trace[0].Length):
 600                # print(trace[0].HasBasePeakData,trace[0].EndTime )
 601
 602                # print("  {} - {}, {}".format( i, trace[0].Times[i], trace[0].Intensities[i] ))
 603                data.time.append(trace[0].Times[i])
 604                data.tic.append(trace[0].Intensities[i])
 605                data.scans.append(trace[0].Scans[i])
 606
 607                # print(trace[0].Scans[i])
 608            if smooth:
 609                data.tic = self.smooth_tic(data.tic)
 610
 611            else:
 612                data.tic = np.array(data.tic)
 613
 614            if peak_detection:
 615                centroid_peak_indexes = [
 616                    i for i in self.centroid_detector(data.time, data.tic)
 617                ]
 618
 619                data.apexes = centroid_peak_indexes
 620
 621            if plot:
 622                if not ax:
 623                    import matplotlib.pyplot as plt
 624
 625                    ax = plt.gca()
 626                    # fig, ax = plt.subplots(figsize=(6, 3))
 627
 628                ax.plot(data.time, data.tic, label=trace_type)
 629                ax.set_xlabel("Time (min)")
 630                ax.set_ylabel("a.u.")
 631                if peak_detection:
 632                    for peak_indexes in data.apexes:
 633                        apex_index = peak_indexes[1]
 634                        ax.plot(
 635                            data.time[apex_index],
 636                            data.tic[apex_index],
 637                            marker="x",
 638                            linewidth=0,
 639                        )
 640
 641                # plt.show()
 642                if trace_type == "BPC":
 643                    data.bpc = data.tic
 644                    data.tic = []
 645                return data, ax
 646            if trace_type == "BPC":
 647                data.bpc = data.tic
 648                data.tic = []
 649            return data, None
 650
 651        else:
 652            return None, None
 653
 654    def get_average_mass_spectrum(
 655        self,
 656        spectrum_mode: str = "profile",
 657        auto_process: bool = True,
 658        ppm_tolerance: float = 5.0,
 659        ms_type: str = "MS1",
 660    ) -> MassSpecProfile | MassSpecCentroid:
 661        """
 662        Averages mass spectra over a scan range using Thermo's AverageScansInScanRange method
 663        or a scan list using Thermo's AverageScans method
 664        spectrum_mode: str
 665            centroid or profile mass spectrum
 666        auto_process: bool
 667            If true performs peak picking, and noise threshold calculation after creation of mass spectrum object
 668        ms_type: str
 669            String of form 'ms1' or 'ms2' or 'MS3' etc. Valid up to MS10.
 670            Internal function converts to Thermo MSOrderType class.
 671
 672        """
 673
 674        def get_profile_mass_spec(averageScan, d_params: dict, auto_process: bool):
 675            mz_list = list(averageScan.SegmentedScan.Positions)
 676            abund_list = list(averageScan.SegmentedScan.Intensities)
 677
 678            data_dict = {
 679                Labels.mz: mz_list,
 680                Labels.abundance: abund_list,
 681            }
 682
 683            return MassSpecProfile(data_dict, d_params, auto_process=auto_process)
 684
 685        def get_centroid_mass_spec(averageScan, d_params: dict):
 686            noise = list(averageScan.centroidScan.Noises)
 687
 688            baselines = list(averageScan.centroidScan.Baselines)
 689
 690            rp = list(averageScan.centroidScan.Resolutions)
 691
 692            magnitude = list(averageScan.centroidScan.Intensities)
 693
 694            mz = list(averageScan.centroidScan.Masses)
 695
 696            array_noise_std = (np.array(noise) - np.array(baselines)) / 3
 697            l_signal_to_noise = np.array(magnitude) / array_noise_std
 698
 699            d_params["baseline_noise"] = np.average(array_noise_std)
 700
 701            d_params["baseline_noise_std"] = np.std(array_noise_std)
 702
 703            data_dict = {
 704                Labels.mz: mz,
 705                Labels.abundance: magnitude,
 706                Labels.rp: rp,
 707                Labels.s2n: list(l_signal_to_noise),
 708            }
 709
 710            mass_spec = MassSpecCentroid(data_dict, d_params, auto_process=False)
 711
 712            return mass_spec
 713
 714        d_params = self.set_metadata(
 715            firstScanNumber=self.start_scan, lastScanNumber=self.end_scan
 716        )
 717
 718        # Create the mass options object that will be used when averaging the scans
 719        options = MassOptions()
 720        options.ToleranceUnits = ToleranceUnits.ppm
 721        options.Tolerance = ppm_tolerance
 722
 723        # Get the scan filter for the first scan.  This scan filter will be used to located
 724        # scans within the given scan range of the same type
 725        scanFilter = self.iRawDataPlus.GetFilterForScanNumber(self.start_scan)
 726
 727        # force it to only look for the MSType
 728        scanFilter = self.set_msordertype(scanFilter, ms_type)
 729
 730        if isinstance(self.scans, tuple):
 731            averageScan = Extensions.AverageScansInScanRange(
 732                self.iRawDataPlus, self.start_scan, self.end_scan, scanFilter, options
 733            )
 734
 735            if averageScan:
 736                if spectrum_mode == "profile":
 737                    mass_spec = get_profile_mass_spec(
 738                        averageScan, d_params, auto_process
 739                    )
 740
 741                    return mass_spec
 742
 743                elif spectrum_mode == "centroid":
 744                    if averageScan.HasCentroidStream:
 745                        mass_spec = get_centroid_mass_spec(averageScan, d_params)
 746
 747                        return mass_spec
 748
 749                    else:
 750                        raise ValueError(
 751                            "No Centroind data available for the selected scans"
 752                        )
 753                else:
 754                    raise ValueError("spectrum_mode must be 'profile' or centroid")
 755            else:
 756                raise ValueError("No data found for the selected scans")
 757
 758        elif isinstance(self.scans, list):
 759            d_params = self.set_metadata(scans_list=self.scans)
 760
 761            scans = List[int]()
 762            for scan in self.scans:
 763                scans.Add(scan)
 764
 765            averageScan = Extensions.AverageScans(self.iRawDataPlus, scans, options)
 766
 767            if averageScan:
 768                if spectrum_mode == "profile":
 769                    mass_spec = get_profile_mass_spec(
 770                        averageScan, d_params, auto_process
 771                    )
 772
 773                    return mass_spec
 774
 775                elif spectrum_mode == "centroid":
 776                    if averageScan.HasCentroidStream:
 777                        mass_spec = get_centroid_mass_spec(averageScan, d_params)
 778
 779                        return mass_spec
 780
 781                    else:
 782                        raise ValueError(
 783                            "No Centroind data available for the selected scans"
 784                        )
 785
 786                else:
 787                    raise ValueError("spectrum_mode must be 'profile' or centroid")
 788
 789            else:
 790                raise ValueError("No data found for the selected scans")
 791
 792        else:
 793            raise ValueError("scans must be a list intergers or a tuple if integers")
 794
 795    def set_metadata(
 796        self,
 797        firstScanNumber=0,
 798        lastScanNumber=0,
 799        scans_list=False,
 800        label=Labels.thermo_profile,
 801    ):
 802        """
 803        Collect metadata to be ingested in the mass spectrum object
 804
 805        scans_list: list[int] or false
 806        lastScanNumber: int
 807        firstScanNumber: int
 808        """
 809
 810        d_params = default_parameters(self.file_path)
 811
 812        # assumes scans is full scan or reduced profile scan
 813
 814        d_params["label"] = label
 815
 816        if scans_list:
 817            d_params["scan_number"] = scans_list
 818
 819            d_params["polarity"] = self.get_polarity_mode(scans_list[0])
 820
 821        else:
 822            d_params["scan_number"] = "{}-{}".format(firstScanNumber, lastScanNumber)
 823
 824            d_params["polarity"] = self.get_polarity_mode(firstScanNumber)
 825
 826        d_params["analyzer"] = self.iRawDataPlus.GetInstrumentData().Model
 827
 828        d_params["acquisition_time"] = self.get_creation_time()
 829
 830        d_params["instrument_label"] = self.iRawDataPlus.GetInstrumentData().Name
 831
 832        return d_params
 833
 834    def get_instrument_methods(self, parse_strings: bool = True):
 835        """
 836        This function will extract the instrument methods embedded in the raw file
 837
 838        First it will check if there are any instrument methods, if not returning None
 839        Then it will get the total number of instrument methods.
 840        For each method, it will extract the plaintext string of the method and attempt to parse it into a dictionary
 841        If this fails, it will return just the string object.
 842
 843        This has been tested on data from an Orbitrap ID-X with embedded MS and LC methods, but other instrument types may fail.
 844
 845        Parameters:
 846        -----------
 847        parse_strings: bool
 848            If True, will attempt to parse the instrument methods into a dictionary. If False, will return the raw string.
 849
 850        Returns:
 851        --------
 852        List[Dict[str, Any]] or List
 853            A list of dictionaries containing the instrument methods, or a list of strings if parsing fails.
 854        """
 855
 856        if not self.iRawDataPlus.HasInstrumentMethod:
 857            raise ValueError(
 858                "Raw Data file does not have any instrument methods attached"
 859            )
 860            return None
 861        else:
 862
 863            def parse_instrument_method(data):
 864                lines = data.split("\r\n")
 865                method = {}
 866                current_section = None
 867                sub_section = None
 868
 869                for line in lines:
 870                    if not line.strip():  # Skip empty lines
 871                        continue
 872                    if (
 873                        line.startswith("----")
 874                        or line.endswith("Settings")
 875                        or line.endswith("Summary")
 876                        or line.startswith("Experiment")
 877                        or line.startswith("Scan Event")
 878                    ):
 879                        current_section = line.replace("-", "").strip()
 880                        method[current_section] = {}
 881                        sub_section = None
 882                    elif line.startswith("\t"):
 883                        if "\t\t" in line:
 884                            indent_level = line.count("\t")
 885                            key_value = line.strip()
 886
 887                            if indent_level == 2:
 888                                if sub_section:
 889                                    key, value = (
 890                                        key_value.split("=", 1)
 891                                        if "=" in key_value
 892                                        else (key_value, None)
 893                                    )
 894                                    method[current_section][sub_section][
 895                                        key.strip()
 896                                    ] = value.strip() if value else None
 897                            elif indent_level == 3:
 898                                scan_type, key_value = (
 899                                    key_value.split(" ", 1)
 900                                    if " " in key_value
 901                                    else (key_value, None)
 902                                )
 903                                method.setdefault(current_section, {}).setdefault(
 904                                    sub_section, {}
 905                                ).setdefault(scan_type, {})
 906
 907                                if key_value:
 908                                    key, value = (
 909                                        key_value.split("=", 1)
 910                                        if "=" in key_value
 911                                        else (key_value, None)
 912                                    )
 913                                    method[current_section][sub_section][scan_type][
 914                                        key.strip()
 915                                    ] = value.strip() if value else None
 916                        else:
 917                            key_value = line.strip()
 918                            if "=" in key_value:
 919                                key, value = key_value.split("=", 1)
 920                                method.setdefault(current_section, {})[key.strip()] = (
 921                                    value.strip()
 922                                )
 923                            else:
 924                                sub_section = key_value
 925                    else:
 926                        if ":" in line:
 927                            key, value = line.split(":", 1)
 928                            method[current_section][key.strip()] = value.strip()
 929                        else:
 930                            method[current_section][line] = {}
 931
 932                return method
 933
 934            count_instrument_methods = self.iRawDataPlus.InstrumentMethodsCount
 935            # TODO make this code better...
 936            instrument_methods = []
 937            for i in range(count_instrument_methods):
 938                instrument_method_string = self.iRawDataPlus.GetInstrumentMethod(i)
 939                if parse_strings:
 940                    try:
 941                        instrument_method_dict = parse_instrument_method(
 942                            instrument_method_string
 943                        )
 944                    except:  # if it fails for any reason
 945                        instrument_method_dict = instrument_method_string
 946                else:
 947                    instrument_method_dict = instrument_method_string
 948                instrument_methods.append(instrument_method_dict)
 949            return instrument_methods
 950
 951    def get_tune_method(self):
 952        """
 953        This code will extract the tune method from the raw file
 954        It has been tested on data from a Thermo Orbitrap ID-X, Astral and Q-Exactive, but may fail on other instrument types.
 955        It attempts to parse out section headers and sub-sections, but may not work for all instrument types.
 956        It will also not return Labels (keys) where the value is blank
 957
 958        Returns:
 959        --------
 960        Dict[str, Any]
 961            A dictionary containing the tune method information
 962
 963        Raises:
 964        -------
 965        ValueError
 966            If no tune methods are found in the raw file
 967
 968        """
 969        tunemethodcount = self.iRawDataPlus.GetTuneDataCount()
 970        if tunemethodcount == 0:
 971            raise ValueError("No tune methods found in the raw data file")
 972            return None
 973        elif tunemethodcount > 1:
 974            warnings.warn(
 975                "Multiple tune methods found in the raw data file, returning the 1st"
 976            )
 977
 978        header = self.iRawDataPlus.GetTuneData(0)
 979
 980        header_dic = {}
 981        current_section = None
 982
 983        for i in range(header.Length):
 984            label = header.Labels[i]
 985            value = header.Values[i]
 986
 987            # Check for section headers
 988            if "===" in label or (
 989                (value == "" or value is None) and not label.endswith(":")
 990            ):
 991                # This is a section header
 992                section_name = (
 993                    label.replace("=", "").replace(":", "").strip()
 994                )  # Clean the label if it contains '='
 995                header_dic[section_name] = {}
 996                current_section = section_name
 997            else:
 998                if current_section:
 999                    header_dic[current_section][label] = value
1000                else:
1001                    header_dic[label] = value
1002        return header_dic
1003
1004    def get_status_log(self, retention_time: float = 0):
1005        """
1006        This code will extract the status logs from the raw file
1007        It has been tested on data from a Thermo Orbitrap ID-X, Astral and Q-Exactive, but may fail on other instrument types.
1008        It attempts to parse out section headers and sub-sections, but may not work for all instrument types.
1009        It will also not return Labels (keys) where the value is blank
1010
1011        Parameters:
1012        -----------
1013        retention_time: float
1014            The retention time in minutes to extract the status log data from.
1015            Will use the closest retention time found. Default 0.
1016
1017        Returns:
1018        --------
1019        Dict[str, Any]
1020            A dictionary containing the status log information
1021
1022        Raises:
1023        -------
1024        ValueError
1025            If no status logs are found in the raw file
1026
1027        """
1028        tunemethodcount = self.iRawDataPlus.GetStatusLogEntriesCount()
1029        if tunemethodcount == 0:
1030            raise ValueError("No status logs found in the raw data file")
1031            return None
1032
1033        header = self.iRawDataPlus.GetStatusLogForRetentionTime(retention_time)
1034
1035        header_dic = {}
1036        current_section = None
1037
1038        for i in range(header.Length):
1039            label = header.Labels[i]
1040            value = header.Values[i]
1041
1042            # Check for section headers
1043            if "===" in label or (
1044                (value == "" or value is None) and not label.endswith(":")
1045            ):
1046                # This is a section header
1047                section_name = (
1048                    label.replace("=", "").replace(":", "").strip()
1049                )  # Clean the label if it contains '='
1050                header_dic[section_name] = {}
1051                current_section = section_name
1052            else:
1053                if current_section:
1054                    header_dic[current_section][label] = value
1055                else:
1056                    header_dic[label] = value
1057        return header_dic
1058
1059    def get_error_logs(self):
1060        """
1061        This code will extract the error logs from the raw file
1062
1063        Returns:
1064        --------
1065        Dict[float, str]
1066            A dictionary containing the error log information with the retention time as the key
1067
1068        Raises:
1069        -------
1070        ValueError
1071            If no error logs are found in the raw file
1072        """
1073
1074        error_log_count = self.iRawDataPlus.RunHeaderEx.ErrorLogCount
1075        if error_log_count == 0:
1076            raise ValueError("No error logs found in the raw data file")
1077            return None
1078
1079        error_logs = {}
1080
1081        for i in range(error_log_count):
1082            error_log_item = self.iRawDataPlus.GetErrorLogItem(i)
1083            rt = error_log_item.RetentionTime
1084            message = error_log_item.Message
1085            # Use the index `i` as the unique ID key
1086            error_logs[i] = {"rt": rt, "message": message}
1087        return error_logs
1088
1089    def get_sample_information(self):
1090        """
1091        This code will extract the sample information from the raw file
1092
1093        Returns:
1094        --------
1095        Dict[str, Any]
1096            A dictionary containing the sample information
1097            Note that UserText field may not be handled properly and may need further processing
1098        """
1099        sminfo = self.iRawDataPlus.SampleInformation
1100        smdict = {}
1101        smdict["Comment"] = sminfo.Comment
1102        smdict["SampleId"] = sminfo.SampleId
1103        smdict["SampleName"] = sminfo.SampleName
1104        smdict["Vial"] = sminfo.Vial
1105        smdict["InjectionVolume"] = sminfo.InjectionVolume
1106        smdict["Barcode"] = sminfo.Barcode
1107        smdict["BarcodeStatus"] = str(sminfo.BarcodeStatus)
1108        smdict["CalibrationLevel"] = sminfo.CalibrationLevel
1109        smdict["DilutionFactor"] = sminfo.DilutionFactor
1110        smdict["InstrumentMethodFile"] = sminfo.InstrumentMethodFile
1111        smdict["RawFileName"] = sminfo.RawFileName
1112        smdict["CalibrationFile"] = sminfo.CalibrationFile
1113        smdict["IstdAmount"] = sminfo.IstdAmount
1114        smdict["RowNumber"] = sminfo.RowNumber
1115        smdict["Path"] = sminfo.Path
1116        smdict["ProcessingMethodFile"] = sminfo.ProcessingMethodFile
1117        smdict["SampleType"] = str(sminfo.SampleType)
1118        smdict["SampleWeight"] = sminfo.SampleWeight
1119        smdict["UserText"] = {
1120            "UserText": [x for x in sminfo.UserText]
1121        }  # [0] #This may not work - needs debugging with
1122        return smdict
1123
1124    def get_instrument_data(self):
1125        """
1126        This code will extract the instrument data from the raw file
1127
1128        Returns:
1129        --------
1130        Dict[str, Any]
1131            A dictionary containing the instrument data
1132        """
1133        instrument_data = self.iRawDataPlus.GetInstrumentData()
1134        id_dict = {}
1135        id_dict["Name"] = instrument_data.Name
1136        id_dict["Model"] = instrument_data.Model
1137        id_dict["SerialNumber"] = instrument_data.SerialNumber
1138        id_dict["SoftwareVersion"] = instrument_data.SoftwareVersion
1139        id_dict["HardwareVersion"] = instrument_data.HardwareVersion
1140        id_dict["ChannelLabels"] = {
1141            "ChannelLabels": [x for x in instrument_data.ChannelLabels]
1142        }
1143        id_dict["Flags"] = instrument_data.Flags
1144        id_dict["AxisLabelY"] = instrument_data.AxisLabelY
1145        id_dict["AxisLabelX"] = instrument_data.AxisLabelX
1146        return id_dict
1147
1148    def get_centroid_msms_data(self, scan):
1149        """
1150        .. deprecated:: 2.0
1151            This function will be removed in CoreMS 2.0. Please use `get_average_mass_spectrum()` instead for similar functionality.
1152        """
1153
1154        warnings.warn(
1155            "The `get_centroid_msms_data()` is deprecated as of CoreMS 2.0 and will be removed in a future version. "
1156            "Please use `get_average_mass_spectrum()` instead.",
1157            DeprecationWarning,
1158        )
1159
1160        d_params = self.set_metadata(scans_list=[scan], label=Labels.thermo_centroid)
1161
1162        centroidStream = self.iRawDataPlus.GetCentroidStream(scan, False)
1163
1164        noise = list(centroidStream.Noises)
1165
1166        baselines = list(centroidStream.Baselines)
1167
1168        rp = list(centroidStream.Resolutions)
1169
1170        magnitude = list(centroidStream.Intensities)
1171
1172        mz = list(centroidStream.Masses)
1173
1174        # charge = scans_labels[5]
1175        array_noise_std = (np.array(noise) - np.array(baselines)) / 3
1176        l_signal_to_noise = np.array(magnitude) / array_noise_std
1177
1178        d_params["baseline_noise"] = np.average(array_noise_std)
1179
1180        d_params["baseline_noise_std"] = np.std(array_noise_std)
1181
1182        data_dict = {
1183            Labels.mz: mz,
1184            Labels.abundance: magnitude,
1185            Labels.rp: rp,
1186            Labels.s2n: list(l_signal_to_noise),
1187        }
1188
1189        mass_spec = MassSpecCentroid(data_dict, d_params, auto_process=False)
1190        mass_spec.settings.noise_threshold_method = "relative_abundance"
1191        mass_spec.settings.noise_threshold_min_relative_abundance = 1
1192        mass_spec.process_mass_spec()
1193        return mass_spec
1194
1195    def get_average_mass_spectrum_by_scanlist(
1196        self,
1197        scans_list: List[int],
1198        auto_process: bool = True,
1199        ppm_tolerance: float = 5.0,
1200    ) -> MassSpecProfile:
1201        """
1202        Averages selected scans mass spectra using Thermo's AverageScans method
1203        scans_list: list[int]
1204        auto_process: bool
1205            If true performs peak picking, and noise threshold calculation after creation of mass spectrum object
1206        Returns:
1207            MassSpecProfile
1208
1209         .. deprecated:: 2.0
1210        This function will be removed in CoreMS 2.0. Please use `get_average_mass_spectrum()` instead for similar functionality.
1211        """
1212
1213        warnings.warn(
1214            "The `get_average_mass_spectrum_by_scanlist()` is deprecated as of CoreMS 2.0 and will be removed in a future version. "
1215            "Please use `get_average_mass_spectrum()` instead.",
1216            DeprecationWarning,
1217        )
1218
1219        d_params = self.set_metadata(scans_list=scans_list)
1220
1221        # assumes scans is full scan or reduced profile scan
1222
1223        scans = List[int]()
1224        for scan in scans_list:
1225            scans.Add(scan)
1226
1227        # Create the mass options object that will be used when averaging the scans
1228        options = MassOptions()
1229        options.ToleranceUnits = ToleranceUnits.ppm
1230        options.Tolerance = ppm_tolerance
1231
1232        # Get the scan filter for the first scan.  This scan filter will be used to located
1233        # scans within the given scan range of the same type
1234
1235        averageScan = Extensions.AverageScans(self.iRawDataPlus, scans, options)
1236
1237        len_data = averageScan.SegmentedScan.Positions.Length
1238
1239        mz_list = list(averageScan.SegmentedScan.Positions)
1240        abund_list = list(averageScan.SegmentedScan.Intensities)
1241
1242        data_dict = {
1243            Labels.mz: mz_list,
1244            Labels.abundance: abund_list,
1245        }
1246
1247        mass_spec = MassSpecProfile(data_dict, d_params, auto_process=auto_process)
1248
1249        return mass_spec
1250
1251
1252class ImportMassSpectraThermoMSFileReader(ThermoBaseClass, SpectraParserInterface):
1253    """A class for parsing Thermo RAW mass spectrometry data files and instatiating MassSpectraBase or LCMSBase objects
1254
1255    Parameters
1256    ----------
1257    file_location : str or Path
1258        The path to the RAW file to be parsed.
1259    analyzer : str, optional
1260        The type of mass analyzer used in the instrument. Default is "Unknown".
1261    instrument_label : str, optional
1262        The name of the instrument used to acquire the data. Default is "Unknown".
1263    sample_name : str, optional
1264        The name of the sample being analyzed. If not provided, the stem of the file_location path will be used.
1265
1266    Attributes
1267    ----------
1268    file_location : Path
1269        The path to the RAW file being parsed.
1270    analyzer : str
1271        The type of mass analyzer used in the instrument.
1272    instrument_label : str
1273        The name of the instrument used to acquire the data.
1274    sample_name : str
1275        The name of the sample being analyzed.
1276
1277    Methods
1278    -------
1279    * run(spectra=True).
1280        Parses the RAW file and returns a dictionary of mass spectra dataframes and a scan metadata dataframe.
1281    * get_mass_spectrum_from_scan(scan_number, polarity, auto_process=True)
1282        Parses the RAW file and returns a MassSpecBase object from a single scan.
1283    * get_mass_spectra_obj().
1284        Parses the RAW file and instantiates a MassSpectraBase object.
1285    * get_lcms_obj().
1286        Parses the RAW file and instantiates an LCMSBase object.
1287    * get_icr_transient_times().
1288        Return a list for transient time targets for all scans, or selected scans range
1289
1290    Inherits from ThermoBaseClass and SpectraParserInterface
1291    """
1292
1293    def __init__(
1294        self,
1295        file_location,
1296        analyzer="Unknown",
1297        instrument_label="Unknown",
1298        sample_name=None,
1299    ):
1300        super().__init__(file_location)
1301        if isinstance(file_location, str):
1302            # if obj is a string it defaults to create a Path obj, pass the S3Path if needed
1303            file_location = Path(file_location)
1304        if not file_location.exists():
1305            raise FileExistsError("File does not exist: " + str(file_location))
1306
1307        self.file_location = file_location
1308        self.analyzer = analyzer
1309        self.instrument_label = instrument_label
1310
1311        if sample_name:
1312            self.sample_name = sample_name
1313        else:
1314            self.sample_name = file_location.stem
1315
1316    def load(self):
1317        pass
1318
1319    def get_scan_df(self):
1320        # This automatically brings in all the data
1321        self.chromatogram_settings.scans = (-1, -1)
1322
1323        # Get scan df info; starting with TIC data
1324        tic_data, _ = self.get_tic(ms_type="all", peak_detection=False, smooth=False)
1325        tic_data = {
1326            "scan": tic_data.scans,
1327            "scan_time": tic_data.time,
1328            "tic": tic_data.tic,
1329        }
1330        scan_df = pd.DataFrame.from_dict(tic_data)
1331        scan_df["ms_level"] = None
1332        
1333        # get scan text
1334        scan_filter_df = pd.DataFrame.from_dict(
1335            self.get_all_filters()[0], orient="index"
1336        )
1337        scan_filter_df.reset_index(inplace=True)
1338        scan_filter_df.rename(columns={"index": "scan", 0: "scan_text"}, inplace=True)
1339
1340        scan_df = scan_df.merge(scan_filter_df, on="scan", how="left")
1341        scan_df["scan_window_lower"] = scan_df.scan_text.str.extract(
1342            r"\[(\d+\.\d+)-\d+\.\d+\]"
1343        )
1344        scan_df["scan_window_upper"] = scan_df.scan_text.str.extract(
1345            r"\[\d+\.\d+-(\d+\.\d+)\]"
1346        )
1347        scan_df["polarity"] = np.where(
1348            scan_df.scan_text.str.contains(" - "), "negative", "positive"
1349        )
1350        scan_df["precursor_mz"] = scan_df.scan_text.str.extract(r"(\d+\.\d+)@")
1351        scan_df["precursor_mz"] = scan_df["precursor_mz"].astype(float)
1352
1353        # Assign each scan as centroid or profile and add ms_level
1354        scan_df["ms_format"] = None
1355        for i in scan_df.scan.to_list():
1356            scan_df.loc[scan_df.scan == i, "ms_level"] = self.get_ms_level_for_scan_num(i)
1357            if self.iRawDataPlus.IsCentroidScanFromScanNumber(i):
1358                scan_df.loc[scan_df.scan == i, "ms_format"] = "centroid"
1359            else:
1360                scan_df.loc[scan_df.scan == i, "ms_format"] = "profile"
1361
1362        return scan_df
1363
1364    def get_ms_raw(self, spectra, scan_df):
1365        if spectra == "all":
1366            scan_df_forspec = scan_df
1367        elif spectra == "ms1":
1368            scan_df_forspec = scan_df[scan_df.ms_level == 1]
1369        elif spectra == "ms2":
1370            scan_df_forspec = scan_df[scan_df.ms_level == 2]
1371        else:
1372            raise ValueError("spectra must be 'none', 'all', 'ms1', or 'ms2'")
1373
1374        # Result container
1375        res = {}
1376
1377        # Row count container
1378        counter = {}
1379
1380        # Column name container
1381        cols = {}
1382
1383        # set at float32
1384        dtype = np.float32
1385
1386        # First pass: get nrows
1387        N = defaultdict(lambda: 0)
1388        for i in scan_df_forspec.scan.to_list():
1389            level = scan_df_forspec.loc[scan_df_forspec.scan == i, "ms_level"].values[0]
1390            scanStatistics = self.iRawDataPlus.GetScanStatsForScanNumber(i)
1391            profileStream = self.iRawDataPlus.GetSegmentedScanFromScanNumber(
1392                i, scanStatistics
1393            )
1394            abun = list(profileStream.Intensities)
1395            abun = np.array(abun)[np.where(np.array(abun) > 0)[0]]
1396
1397            N[level] += len(abun)
1398
1399        # Second pass: parse
1400        for i in scan_df_forspec.scan.to_list():
1401            scanStatistics = self.iRawDataPlus.GetScanStatsForScanNumber(i)
1402            profileStream = self.iRawDataPlus.GetSegmentedScanFromScanNumber(
1403                i, scanStatistics
1404            )
1405            abun = list(profileStream.Intensities)
1406            mz = list(profileStream.Positions)
1407
1408            # Get index of abun that are > 0
1409            inx = np.where(np.array(abun) > 0)[0]
1410            mz = np.array(mz)[inx]
1411            mz = np.float32(mz)
1412            abun = np.array(abun)[inx]
1413            abun = np.float32(abun)
1414
1415            level = scan_df_forspec.loc[scan_df_forspec.scan == i, "ms_level"].values[0]
1416
1417            # Number of rows
1418            n = len(mz)
1419
1420            # No measurements
1421            if n == 0:
1422                continue
1423
1424            # Dimension check
1425            if len(mz) != len(abun):
1426                warnings.warn("m/z and intensity array dimension mismatch")
1427                continue
1428
1429            # Scan/frame info
1430            id_dict = i
1431
1432            # Columns
1433            cols[level] = ["scan", "mz", "intensity"]
1434            m = len(cols[level])
1435
1436            # Subarray init
1437            arr = np.empty((n, m), dtype=dtype)
1438            inx = 0
1439
1440            # Populate scan/frame info
1441            arr[:, inx] = i
1442            inx += 1
1443
1444            # Populate m/z
1445            arr[:, inx] = mz
1446            inx += 1
1447
1448            # Populate intensity
1449            arr[:, inx] = abun
1450            inx += 1
1451
1452            # Initialize output container
1453            if level not in res:
1454                res[level] = np.empty((N[level], m), dtype=dtype)
1455                counter[level] = 0
1456
1457            # Insert subarray
1458            res[level][counter[level] : counter[level] + n, :] = arr
1459            counter[level] += n
1460
1461        # Construct ms1 and ms2 mz dataframes
1462        for level in res.keys():
1463            res[level] = pd.DataFrame(res[level])
1464            res[level].columns = cols[level]
1465        # rename keys in res to add 'ms' prefix
1466        res = {f"ms{key}": value for key, value in res.items()}
1467
1468        return res
1469
1470    def run(self, spectra="all", scan_df=None):
1471        """
1472        Extracts mass spectra data from a raw file.
1473
1474        Parameters
1475        ----------
1476        spectra : str, optional
1477            Which mass spectra data to include in the output. Default is all.  Other options: none, ms1, ms2.
1478        scan_df : pandas.DataFrame, optional
1479            Scan dataframe.  If not provided, the scan dataframe is created from the mzML file.
1480
1481        Returns
1482        -------
1483        tuple
1484            A tuple containing two elements:
1485            - A dictionary containing mass spectra data, separated by MS level.
1486            - A pandas DataFrame containing scan information, including scan number, scan time, TIC, MS level,
1487                scan text, scan window lower and upper bounds, polarity, and precursor m/z (if applicable).
1488        """
1489        # Prepare scan_df
1490        if scan_df is None:
1491            scan_df = self.get_scan_df()
1492
1493        # Prepare mass spectra data
1494        if spectra != "none":
1495            res = self.get_ms_raw(spectra=spectra, scan_df=scan_df)
1496        else:
1497            res = None
1498
1499        return res, scan_df
1500
1501    def get_mass_spectrum_from_scan(
1502        self, scan_number, spectrum_mode, auto_process=True
1503    ):
1504        """Instatiate a MassSpecBase object from a single scan number from the binary file, currently only supports profile mode.
1505
1506        Parameters
1507        ----------
1508        scan_number : int
1509            The scan number to extract the mass spectrum from.
1510        polarity : int
1511            The polarity of the scan.  1 for positive mode, -1 for negative mode.
1512        spectrum_mode : str
1513            The type of mass spectrum to extract.  Must be 'profile' or 'centroid'.
1514        auto_process : bool, optional
1515            If True, perform peak picking and noise threshold calculation after creating the mass spectrum object. Default is True.
1516
1517        Returns
1518        -------
1519        MassSpecProfile | MassSpecCentroid
1520            The MassSpecProfile or MassSpecCentroid object containing the parsed mass spectrum.
1521        """
1522
1523        if spectrum_mode == "profile":
1524            scanStatistics = self.iRawDataPlus.GetScanStatsForScanNumber(scan_number)
1525            profileStream = self.iRawDataPlus.GetSegmentedScanFromScanNumber(
1526                scan_number, scanStatistics
1527            )
1528            abun = list(profileStream.Intensities)
1529            mz = list(profileStream.Positions)
1530            data_dict = {
1531                Labels.mz: mz,
1532                Labels.abundance: abun,
1533            }
1534            d_params = self.set_metadata(
1535                firstScanNumber=scan_number,
1536                lastScanNumber=scan_number,
1537                scans_list=False,
1538                label=Labels.thermo_profile,
1539            )
1540            mass_spectrum_obj = MassSpecProfile(
1541                data_dict, d_params, auto_process=auto_process
1542            )
1543
1544        elif spectrum_mode == "centroid":
1545            centroid_scan = self.iRawDataPlus.GetCentroidStream(scan_number, False)
1546            if centroid_scan.Masses is not None:
1547                mz = list(centroid_scan.Masses)
1548                abun = list(centroid_scan.Intensities)
1549                rp = list(centroid_scan.Resolutions)
1550                magnitude = list(centroid_scan.Intensities)
1551                noise = list(centroid_scan.Noises)
1552                baselines = list(centroid_scan.Baselines)
1553                array_noise_std = (np.array(noise) - np.array(baselines)) / 3
1554                l_signal_to_noise = np.array(magnitude) / array_noise_std
1555                data_dict = {
1556                    Labels.mz: mz,
1557                    Labels.abundance: abun,
1558                    Labels.rp: rp,
1559                    Labels.s2n: list(l_signal_to_noise),
1560                }
1561            else:  # For CID MS2, the centroid data are stored in the profile data location, they do not have any associated rp or baseline data, but they should be treated as centroid data
1562                scanStatistics = self.iRawDataPlus.GetScanStatsForScanNumber(
1563                    scan_number
1564                )
1565                profileStream = self.iRawDataPlus.GetSegmentedScanFromScanNumber(
1566                    scan_number, scanStatistics
1567                )
1568                abun = list(profileStream.Intensities)
1569                mz = list(profileStream.Positions)
1570                data_dict = {
1571                    Labels.mz: mz,
1572                    Labels.abundance: abun,
1573                    Labels.rp: [np.nan] * len(mz),
1574                    Labels.s2n: [np.nan] * len(mz),
1575                }
1576            d_params = self.set_metadata(
1577                firstScanNumber=scan_number,
1578                lastScanNumber=scan_number,
1579                scans_list=False,
1580                label=Labels.thermo_centroid,
1581            )
1582            mass_spectrum_obj = MassSpecCentroid(
1583                data_dict, d_params, auto_process=auto_process
1584            )
1585
1586        return mass_spectrum_obj
1587
1588    def get_mass_spectra_obj(self):
1589        """Instatiate a MassSpectraBase object from the binary data file file.
1590
1591        Returns
1592        -------
1593        MassSpectraBase
1594            The MassSpectra object containing the parsed mass spectra.  The object is instatiated with the mzML file, analyzer, instrument, sample name, and scan dataframe.
1595        """
1596        _, scan_df = self.run(spectra="none")
1597        mass_spectra_obj = MassSpectraBase(
1598            self.file_location,
1599            self.analyzer,
1600            self.instrument_label,
1601            self.sample_name,
1602            self,
1603        )
1604        scan_df = scan_df.set_index("scan", drop=False)
1605        mass_spectra_obj.scan_df = scan_df
1606
1607        return mass_spectra_obj
1608
1609    def get_lcms_obj(self, spectra="all"):
1610        """Instatiates a LCMSBase object from the mzML file.
1611
1612        Parameters
1613        ----------
1614        spectra : str, optional
1615            Which mass spectra data to include in the output. Default is "all".  Other options: "none", "ms1", "ms2".
1616
1617        Returns
1618        -------
1619        LCMSBase
1620            LCMS object containing mass spectra data. The object is instatiated with the file location, analyzer, instrument, sample name, scan info, mz dataframe (as specifified), polarity, as well as the attributes holding the scans, retention times, and tics.
1621        """
1622        _, scan_df = self.run(spectra="none")  # first run it to just get scan info
1623        res, scan_df = self.run(
1624            scan_df=scan_df, spectra=spectra
1625        )  # second run to parse data
1626        lcms_obj = LCMSBase(
1627            self.file_location,
1628            self.analyzer,
1629            self.instrument_label,
1630            self.sample_name,
1631            self,
1632        )
1633        if spectra != "none":
1634            for key in res:
1635                key_int = int(key.replace("ms", ""))
1636                res[key] = res[key][res[key].intensity > 0]
1637                res[key] = (
1638                    res[key].sort_values(by=["scan", "mz"]).reset_index(drop=True)
1639                )
1640                lcms_obj._ms_unprocessed[key_int] = res[key]
1641        lcms_obj.scan_df = scan_df.set_index("scan", drop=False)
1642        # Check if polarity is mixed
1643        if len(set(scan_df.polarity)) > 1:
1644            raise ValueError("Mixed polarities detected in scan data")
1645        lcms_obj.polarity = scan_df.polarity[0]
1646        lcms_obj._scans_number_list = list(scan_df.scan)
1647        lcms_obj._retention_time_list = list(scan_df.scan_time)
1648        lcms_obj._tic_list = list(scan_df.tic)
1649
1650        return lcms_obj
1651
1652    def get_icr_transient_times(self):
1653        """Return a list for transient time targets for all scans, or selected scans range
1654
1655        Notes
1656        --------
1657        Resolving Power and Transient time targets based on 7T FT-ICR MS system
1658        """
1659
1660        res_trans_time = {
1661            "50": 0.384,
1662            "100000": 0.768,
1663            "200000": 1.536,
1664            "400000": 3.072,
1665            "750000": 6.144,
1666            "1000000": 12.288,
1667        }
1668
1669        firstScanNumber = self.start_scan
1670
1671        lastScanNumber = self.end_scan
1672
1673        transient_time_list = []
1674
1675        for scan in range(firstScanNumber, lastScanNumber):
1676            scan_header = self.get_scan_header(scan)
1677
1678            rp_target = scan_header["FT Resolution:"]
1679
1680            transient_time = res_trans_time.get(rp_target)
1681
1682            transient_time_list.append(transient_time)
1683
1684            # print(transient_time, rp_target)
1685
1686        return transient_time_list
spec = ModuleSpec(name='corems', loader=<_frozen_importlib_external.SourceFileLoader object>, origin='/Users/heal742/LOCAL/corems_dev/corems/corems/__init__.py', submodule_search_locations=['/Users/heal742/LOCAL/corems_dev/corems/corems'])
class ThermoBaseClass:
  60class ThermoBaseClass:
  61    """Class for parsing Thermo Raw files and extracting information from them.
  62
  63    Parameters:
  64    -----------
  65    file_location : str or pathlib.Path or s3path.S3Path
  66        Thermo Raw file path or S3 path.
  67
  68    Attributes:
  69    -----------
  70    file_path : str or pathlib.Path or s3path.S3Path
  71        The file path of the Thermo Raw file.
  72    parameters : LCMSParameters
  73        The LCMS parameters for the Thermo Raw file.
  74    chromatogram_settings : LiquidChromatographSetting
  75        The chromatogram settings for the Thermo Raw file.
  76    scans : list or tuple
  77        The selected scans for the Thermo Raw file.
  78    start_scan : int
  79        The starting scan number for the Thermo Raw file.
  80    end_scan : int
  81        The ending scan number for the Thermo Raw file.
  82
  83    Methods:
  84    --------
  85    * set_msordertype(scanFilter, mstype: str = 'ms1') -> scanFilter
  86        Convert the user-passed MS Type string to a Thermo MSOrderType object.
  87    * get_creation_time() -> datetime.datetime
  88        Extract the creation date stamp from the .RAW file and return it as a formatted datetime object.
  89    * remove_temp_file()
  90        Remove the temporary file if the path is from S3Path.
  91    * get_polarity_mode(scan_number: int) -> int
  92        Get the polarity mode for the given scan number.
  93    * get_filter_for_scan_num(scan_number: int) -> List[str]
  94        Get the filter for the given scan number.
  95    * check_full_scan(scan_number: int) -> bool
  96        Check if the given scan number is a full scan.
  97    * get_all_filters() -> Tuple[Dict[int, str], List[str]]
  98        Get all scan filters for the Thermo Raw file.
  99    * get_scan_header(scan: int) -> Dict[str, Any]
 100        Get the full dictionary of scan header metadata for the given scan number.
 101    * get_rt_time_from_trace(trace) -> Tuple[List[float], List[float], List[int]]
 102        Get the retention time, intensity, and scan number from the given trace.
 103    * get_eics(target_mzs: List[float], tic_data: Dict[str, Any], ms_type: str = 'MS !d',
 104             peak_detection: bool = True, smooth: bool = True, plot: bool = False,
 105             ax: Optional[matplotlib.axes.Axes] = None, legend: bool = False) -> Tuple[Dict[float, EIC_Data], matplotlib.axes.Axes]
 106        Get the extracted ion chromatograms (EICs) for the target m/z values.
 107
 108    """
 109
 110    def __init__(self, file_location):
 111        """file_location: srt pathlib.Path or s3path.S3Path
 112        Thermo Raw file path
 113        """
 114        # Thread.__init__(self)
 115        if isinstance(file_location, str):
 116            file_path = Path(file_location)
 117
 118        elif isinstance(file_location, S3Path):
 119            temp_dir = Path("tmp/")
 120            temp_dir.mkdir(exist_ok=True)
 121
 122            file_path = temp_dir / file_location.name
 123            with open(file_path, "wb") as fh:
 124                fh.write(file_location.read_bytes())
 125
 126        else:
 127            file_path = file_location
 128
 129        self.iRawDataPlus = RawFileReaderAdapter.FileFactory(str(file_path))
 130
 131        if not self.iRawDataPlus.IsOpen:
 132            raise FileNotFoundError(
 133                "Unable to access the RAW file using the RawFileReader class!"
 134            )
 135
 136        # Check for any errors in the RAW file
 137        if self.iRawDataPlus.IsError:
 138            raise IOError(
 139                "Error opening ({}) - {}".format(self.iRawDataPlus.FileError, file_path)
 140            )
 141
 142        self.res = self.iRawDataPlus.SelectInstrument(Device.MS, 1)
 143
 144        self.file_path = file_location
 145        self.iFileHeader = FileHeaderReaderFactory.ReadFile(str(file_path))
 146
 147        # removing tmp file
 148
 149        self._init_settings()
 150
 151    def _init_settings(self):
 152        """
 153        Initialize the LCMSParameters object.
 154        """
 155        self._parameters = LCMSParameters()
 156
 157    @property
 158    def parameters(self) -> LCMSParameters:
 159        """
 160        Get or set the LCMSParameters object.
 161        """
 162        return self._parameters
 163
 164    @parameters.setter
 165    def parameters(self, instance_LCMSParameters: LCMSParameters):
 166        self._parameters = instance_LCMSParameters
 167
 168    @property
 169    def chromatogram_settings(self) -> LiquidChromatographSetting:
 170        """
 171        Get or set the LiquidChromatographSetting object.
 172        """
 173        return self.parameters.lc_ms
 174
 175    @chromatogram_settings.setter
 176    def chromatogram_settings(
 177        self, instance_LiquidChromatographSetting: LiquidChromatographSetting
 178    ):
 179        self.parameters.lc_ms = instance_LiquidChromatographSetting
 180
 181    @property
 182    def scans(self) -> list | tuple:
 183        """scans : list or tuple
 184        If list uses Thermo AverageScansInScanRange for selected scans, ortherwise uses Thermo AverageScans for a scan range
 185        """
 186        return self.chromatogram_settings.scans
 187
 188    @property
 189    def start_scan(self) -> int:
 190        """
 191        Get the starting scan number for the Thermo Raw file.
 192        """
 193        if self.scans[0] == -1:
 194            return self.iRawDataPlus.RunHeaderEx.FirstSpectrum
 195        else:
 196            return self.scans[0]
 197
 198    @property
 199    def end_scan(self) -> int:
 200        """
 201        Get the ending scan number for the Thermo Raw file.
 202        """
 203        if self.scans[-1] == -1:
 204            return self.iRawDataPlus.RunHeaderEx.LastSpectrum
 205        else:
 206            return self.scans[-1]
 207
 208    def set_msordertype(self, scanFilter, mstype: str = "ms1"):
 209        """
 210        Function to convert user passed string MS Type to Thermo MSOrderType object
 211        Limited to MS1 through MS10.
 212
 213        Parameters:
 214        -----------
 215        scanFilter : Thermo.ScanFilter
 216            The scan filter object.
 217        mstype : str, optional
 218            The MS Type string, by default 'ms1'
 219
 220        """
 221        mstype = mstype.upper()
 222        # Check that a valid mstype is passed
 223        if (int(mstype.split("MS")[1]) > 10) or (int(mstype.split("MS")[1]) < 1):
 224            warn("MS Type not valid, must be between MS1 and MS10")
 225
 226        msordertypedict = {
 227            "MS1": MSOrderType.Ms,
 228            "MS2": MSOrderType.Ms2,
 229            "MS3": MSOrderType.Ms3,
 230            "MS4": MSOrderType.Ms4,
 231            "MS5": MSOrderType.Ms5,
 232            "MS6": MSOrderType.Ms6,
 233            "MS7": MSOrderType.Ms7,
 234            "MS8": MSOrderType.Ms8,
 235            "MS9": MSOrderType.Ms9,
 236            "MS10": MSOrderType.Ms10,
 237        }
 238        scanFilter.MSOrder = msordertypedict[mstype]
 239        return scanFilter
 240
 241    def get_creation_time(self) -> datetime.datetime:
 242        """
 243        Extract the creation date stamp from the .RAW file
 244        Return formatted creation date stamp.
 245
 246        """
 247        credate = self.iRawDataPlus.CreationDate.get_Ticks()
 248        credate = datetime.datetime(1, 1, 1) + datetime.timedelta(
 249            microseconds=credate / 10
 250        )
 251        return credate
 252
 253    def remove_temp_file(self) -> None:
 254        """if the path is from S3Path data cannot be serialized to io.ByteStream and
 255        a temporary copy is stored at the temp dir
 256        use this function only at the end of your execution scrip
 257        some LCMS class methods depend on this file
 258        """
 259
 260        self.file_path.unlink()
 261
 262    def close_file(self) -> None:
 263        """
 264        Close the Thermo Raw file.
 265        """
 266        self.iRawDataPlus.Dispose()
 267
 268    def get_polarity_mode(self, scan_number: int) -> int:
 269        """
 270        Get the polarity mode for the given scan number.
 271
 272        Parameters:
 273        -----------
 274        scan_number : int
 275            The scan number.
 276
 277        Raises:
 278        -------
 279        Exception
 280            If the polarity mode is unknown.
 281
 282        """
 283        polarity_symbol = self.get_filter_for_scan_num(scan_number)[1]
 284
 285        if polarity_symbol == "+":
 286            return 1
 287            # return 'POSITIVE_ION_MODE'
 288
 289        elif polarity_symbol == "-":
 290            return -1
 291
 292        else:
 293            raise Exception("Polarity Mode Unknown, please set it manually")
 294
 295    def get_filter_for_scan_num(self, scan_number: int) -> List[str]:
 296        """
 297        Returns the closest matching run time that corresponds to scan_number for the current
 298        controller. This function is only supported for MS device controllers.
 299        e.g.  ['FTMS', '-', 'p', 'NSI', 'Full', 'ms', '[200.00-1000.00]']
 300
 301        Parameters:
 302        -----------
 303        scan_number : int
 304            The scan number.
 305
 306        """
 307        scan_label = self.iRawDataPlus.GetScanEventStringForScanNumber(scan_number)
 308
 309        return str(scan_label).split()
 310
 311    def get_ms_level_for_scan_num(self, scan_number: int) -> str:
 312        """
 313        Get the MS order for the given scan number.
 314
 315        Parameters:
 316        -----------
 317        scan_number : int
 318            The scan number
 319
 320        Returns:
 321        --------
 322        int
 323            The MS order type (1 for MS, 2 for MS2, etc.)
 324        """
 325        scan_filter = self.iRawDataPlus.GetFilterForScanNumber(scan_number)
 326
 327        msordertype = {
 328            MSOrderType.Ms: 1,
 329            MSOrderType.Ms2: 2,
 330            MSOrderType.Ms3: 3,
 331            MSOrderType.Ms4: 4,
 332            MSOrderType.Ms5: 5,
 333            MSOrderType.Ms6: 6,
 334            MSOrderType.Ms7: 7,
 335            MSOrderType.Ms8: 8,
 336            MSOrderType.Ms9: 9,
 337            MSOrderType.Ms10: 10,
 338        }
 339
 340        if scan_filter.MSOrder in msordertype:
 341            return msordertype[scan_filter.MSOrder]
 342        else:
 343            raise Exception("MS Order Type not found")
 344    
 345    def check_full_scan(self, scan_number: int) -> bool:
 346        # scan_filter.ScanMode 0 = FULL
 347        scan_filter = self.iRawDataPlus.GetFilterForScanNumber(scan_number)
 348
 349        return scan_filter.ScanMode == MSOrderType.Ms
 350
 351    def get_all_filters(self) -> Tuple[Dict[int, str], List[str]]:
 352        """
 353        Get all scan filters.
 354        This function is only supported for MS device controllers.
 355        e.g.  ['FTMS', '-', 'p', 'NSI', 'Full', 'ms', '[200.00-1000.00]']
 356
 357        """
 358
 359        scanrange = range(self.start_scan, self.end_scan + 1)
 360        scanfiltersdic = {}
 361        scanfilterslist = []
 362        for scan_number in scanrange:
 363            scan_label = self.iRawDataPlus.GetScanEventStringForScanNumber(scan_number)
 364            scanfiltersdic[scan_number] = scan_label
 365            scanfilterslist.append(scan_label)
 366        scanfilterset = list(set(scanfilterslist))
 367        return scanfiltersdic, scanfilterset
 368
 369    def get_scan_header(self, scan: int) -> Dict[str, Any]:
 370        """
 371        Get full dictionary of scan header meta data, i.e. AGC status, ion injection time, etc.
 372
 373        Parameters:
 374        -----------
 375        scan : int
 376            The scan number.
 377
 378        """
 379        header = self.iRawDataPlus.GetTrailerExtraInformation(scan)
 380
 381        header_dic = {}
 382        for i in range(header.Length):
 383            header_dic.update({header.Labels[i]: header.Values[i]})
 384        return header_dic
 385
 386    @staticmethod
 387    def get_rt_time_from_trace(trace) -> Tuple[List[float], List[float], List[int]]:
 388        """trace: ThermoFisher.CommonCore.Data.Business.ChromatogramSignal"""
 389        return list(trace.Times), list(trace.Intensities), list(trace.Scans)
 390
 391    def get_eics(
 392        self,
 393        target_mzs: List[float],
 394        tic_data: Dict[str, Any],
 395        ms_type="MS !d",
 396        peak_detection=False,
 397        smooth=False,
 398        plot=False,
 399        ax: Optional[axes.Axes] = None,
 400        legend=False,
 401    ) -> Tuple[Dict[float, EIC_Data], axes.Axes]:
 402        """ms_type: str ('MS', MS2')
 403        start_scan: int default -1 will select the lowest available
 404        end_scan: int default -1 will select the highest available
 405
 406        returns:
 407
 408            chroma: dict{target_mz: EIC_Data(
 409                                        Scans: [int]
 410                                            original thermo scan numbers
 411                                        Time: [floats]
 412                                            list of retention times
 413                                        TIC: [floats]
 414                                            total ion chromatogram
 415                                        Apexes: [int]
 416                                            original thermo apex scan number after peak picking
 417                                        )
 418
 419        """
 420        # If peak_detection or smooth is True, raise exception
 421        if peak_detection or smooth:
 422            raise Exception("Peak detection and smoothing are no longer implemented in this function")
 423
 424        options = MassOptions()
 425        options.ToleranceUnits = ToleranceUnits.ppm
 426        options.Tolerance = self.chromatogram_settings.eic_tolerance_ppm
 427
 428        all_chroma_settings = []
 429
 430        for target_mz in target_mzs:
 431            settings = ChromatogramTraceSettings(TraceType.MassRange)
 432            settings.Filter = ms_type
 433            settings.MassRanges = [Range(target_mz, target_mz)]
 434
 435            chroma_settings = IChromatogramSettings(settings)
 436
 437            all_chroma_settings.append(chroma_settings)
 438
 439        # chroma_settings2 = IChromatogramSettings(settings)
 440        # print(chroma_settings.FragmentMass)
 441        # print(chroma_settings.FragmentMass)
 442        # print(chroma_settings)
 443        # print(chroma_settings)
 444
 445        data = self.iRawDataPlus.GetChromatogramData(
 446            all_chroma_settings, self.start_scan, self.end_scan, options
 447        )
 448
 449        traces = ChromatogramSignal.FromChromatogramData(data)
 450
 451        chroma = {}
 452
 453        if plot:
 454            from matplotlib.transforms import Bbox
 455            import matplotlib.pyplot as plt
 456
 457            if not ax:
 458                # ax = plt.gca()
 459                # ax.clear()
 460                fig, ax = plt.subplots()
 461
 462            else:
 463                fig = plt.gcf()
 464
 465            # plt.show()
 466
 467        for i, trace in enumerate(traces):
 468            if trace.Length > 0:
 469                rt, eic, scans = self.get_rt_time_from_trace(trace)
 470                if smooth:
 471                    eic = self.smooth_tic(eic)
 472
 473                chroma[target_mzs[i]] = EIC_Data(scans=scans, time=rt, eic=eic)
 474                if plot:
 475                    ax.plot(rt, eic, label="{:.5f}".format(target_mzs[i]))
 476
 477        if peak_detection:
 478            # max_eic = self.get_max_eic(chroma)
 479            max_signal = max(tic_data.tic)
 480
 481            for eic_data in chroma.values():
 482                eic = eic_data.eic
 483                time = eic_data.time
 484
 485                if len(eic) != len(tic_data.tic):
 486                    warn(
 487                        "The software assumes same lenth of TIC and EIC, this does not seems to be the case and the results mass spectrum selected by the scan number might not be correct"
 488                    )
 489
 490                if eic.max() > 0:
 491                    centroid_eics = self.eic_centroid_detector(time, eic, max_signal)
 492                    eic_data.apexes = [i for i in centroid_eics]
 493
 494                    if plot:
 495                        for peak_indexes in eic_data.apexes:
 496                            apex_index = peak_indexes[1]
 497                            ax.plot(
 498                                time[apex_index],
 499                                eic[apex_index],
 500                                marker="x",
 501                                linewidth=0,
 502                            )
 503
 504        if plot:
 505            ax.set_xlabel("Time (min)")
 506            ax.set_ylabel("a.u.")
 507            ax.set_title(ms_type + " EIC")
 508            ax.tick_params(axis="both", which="major", labelsize=12)
 509            ax.axes.spines["top"].set_visible(False)
 510            ax.axes.spines["right"].set_visible(False)
 511
 512            if legend:
 513                legend = ax.legend(loc="upper left", bbox_to_anchor=(1.02, 0, 0.07, 1))
 514                fig.subplots_adjust(right=0.76)
 515                # ax.set_prop_cycle(color=plt.cm.gist_rainbow(np.linspace(0, 1, len(traces))))
 516
 517                d = {"down": 30, "up": -30}
 518
 519                def func(evt):
 520                    if legend.contains(evt):
 521                        bbox = legend.get_bbox_to_anchor()
 522                        bbox = Bbox.from_bounds(
 523                            bbox.x0, bbox.y0 + d[evt.button], bbox.width, bbox.height
 524                        )
 525                        tr = legend.axes.transAxes.inverted()
 526                        legend.set_bbox_to_anchor(bbox.transformed(tr))
 527                        fig.canvas.draw_idle()
 528
 529                fig.canvas.mpl_connect("scroll_event", func)
 530            return chroma, ax
 531        else:
 532            return chroma, None
 533            rt = []
 534            tic = []
 535            scans = []
 536            for i in range(traces[0].Length):
 537                # print(trace[0].HasBasePeakData,trace[0].EndTime )
 538
 539                # print("  {} - {}, {}".format( i, trace[0].Times[i], trace[0].Intensities[i] ))
 540                rt.append(traces[0].Times[i])
 541                tic.append(traces[0].Intensities[i])
 542                scans.append(traces[0].Scans[i])
 543
 544            return traces
 545            # plot_chroma(rt, tic)
 546            # plt.show()
 547
 548    def get_tic(
 549        self,
 550        ms_type="MS !d",
 551        peak_detection=False,  # This wont work right now
 552        smooth=False,  # This wont work right now
 553        plot=False,
 554        ax=None,
 555        trace_type="TIC",
 556    ) -> Tuple[TIC_Data, axes.Axes]:
 557        """ms_type: str ('MS !d', 'MS2', None)
 558            if you use None you get all scans.
 559        peak_detection: bool
 560        smooth: bool
 561        plot: bool
 562        ax: matplotlib axis object
 563        trace_type: str ('TIC','BPC')
 564
 565        returns:
 566            chroma: dict
 567            {
 568            Scan: [int]
 569                original thermo scan numberMS
 570            Time: [floats]
 571                list of retention times
 572            TIC: [floats]
 573                total ion chromatogram
 574            Apexes: [int]
 575                original thermo apex scan number after peak picking
 576            }
 577        """
 578        if trace_type == "TIC":
 579            settings = ChromatogramTraceSettings(TraceType.TIC)
 580        elif trace_type == "BPC":
 581            settings = ChromatogramTraceSettings(TraceType.BasePeak)
 582        else:
 583            raise ValueError(f"{trace_type} undefined")
 584        if ms_type == "all":
 585            settings.Filter = None
 586        else:
 587            settings.Filter = ms_type
 588
 589        chroma_settings = IChromatogramSettings(settings)
 590
 591        data = self.iRawDataPlus.GetChromatogramData(
 592            [chroma_settings], self.start_scan, self.end_scan
 593        )
 594
 595        trace = ChromatogramSignal.FromChromatogramData(data)
 596
 597        data = TIC_Data(time=[], scans=[], tic=[], bpc=[], apexes=[])
 598
 599        if trace[0].Length > 0:
 600            for i in range(trace[0].Length):
 601                # print(trace[0].HasBasePeakData,trace[0].EndTime )
 602
 603                # print("  {} - {}, {}".format( i, trace[0].Times[i], trace[0].Intensities[i] ))
 604                data.time.append(trace[0].Times[i])
 605                data.tic.append(trace[0].Intensities[i])
 606                data.scans.append(trace[0].Scans[i])
 607
 608                # print(trace[0].Scans[i])
 609            if smooth:
 610                data.tic = self.smooth_tic(data.tic)
 611
 612            else:
 613                data.tic = np.array(data.tic)
 614
 615            if peak_detection:
 616                centroid_peak_indexes = [
 617                    i for i in self.centroid_detector(data.time, data.tic)
 618                ]
 619
 620                data.apexes = centroid_peak_indexes
 621
 622            if plot:
 623                if not ax:
 624                    import matplotlib.pyplot as plt
 625
 626                    ax = plt.gca()
 627                    # fig, ax = plt.subplots(figsize=(6, 3))
 628
 629                ax.plot(data.time, data.tic, label=trace_type)
 630                ax.set_xlabel("Time (min)")
 631                ax.set_ylabel("a.u.")
 632                if peak_detection:
 633                    for peak_indexes in data.apexes:
 634                        apex_index = peak_indexes[1]
 635                        ax.plot(
 636                            data.time[apex_index],
 637                            data.tic[apex_index],
 638                            marker="x",
 639                            linewidth=0,
 640                        )
 641
 642                # plt.show()
 643                if trace_type == "BPC":
 644                    data.bpc = data.tic
 645                    data.tic = []
 646                return data, ax
 647            if trace_type == "BPC":
 648                data.bpc = data.tic
 649                data.tic = []
 650            return data, None
 651
 652        else:
 653            return None, None
 654
 655    def get_average_mass_spectrum(
 656        self,
 657        spectrum_mode: str = "profile",
 658        auto_process: bool = True,
 659        ppm_tolerance: float = 5.0,
 660        ms_type: str = "MS1",
 661    ) -> MassSpecProfile | MassSpecCentroid:
 662        """
 663        Averages mass spectra over a scan range using Thermo's AverageScansInScanRange method
 664        or a scan list using Thermo's AverageScans method
 665        spectrum_mode: str
 666            centroid or profile mass spectrum
 667        auto_process: bool
 668            If true performs peak picking, and noise threshold calculation after creation of mass spectrum object
 669        ms_type: str
 670            String of form 'ms1' or 'ms2' or 'MS3' etc. Valid up to MS10.
 671            Internal function converts to Thermo MSOrderType class.
 672
 673        """
 674
 675        def get_profile_mass_spec(averageScan, d_params: dict, auto_process: bool):
 676            mz_list = list(averageScan.SegmentedScan.Positions)
 677            abund_list = list(averageScan.SegmentedScan.Intensities)
 678
 679            data_dict = {
 680                Labels.mz: mz_list,
 681                Labels.abundance: abund_list,
 682            }
 683
 684            return MassSpecProfile(data_dict, d_params, auto_process=auto_process)
 685
 686        def get_centroid_mass_spec(averageScan, d_params: dict):
 687            noise = list(averageScan.centroidScan.Noises)
 688
 689            baselines = list(averageScan.centroidScan.Baselines)
 690
 691            rp = list(averageScan.centroidScan.Resolutions)
 692
 693            magnitude = list(averageScan.centroidScan.Intensities)
 694
 695            mz = list(averageScan.centroidScan.Masses)
 696
 697            array_noise_std = (np.array(noise) - np.array(baselines)) / 3
 698            l_signal_to_noise = np.array(magnitude) / array_noise_std
 699
 700            d_params["baseline_noise"] = np.average(array_noise_std)
 701
 702            d_params["baseline_noise_std"] = np.std(array_noise_std)
 703
 704            data_dict = {
 705                Labels.mz: mz,
 706                Labels.abundance: magnitude,
 707                Labels.rp: rp,
 708                Labels.s2n: list(l_signal_to_noise),
 709            }
 710
 711            mass_spec = MassSpecCentroid(data_dict, d_params, auto_process=False)
 712
 713            return mass_spec
 714
 715        d_params = self.set_metadata(
 716            firstScanNumber=self.start_scan, lastScanNumber=self.end_scan
 717        )
 718
 719        # Create the mass options object that will be used when averaging the scans
 720        options = MassOptions()
 721        options.ToleranceUnits = ToleranceUnits.ppm
 722        options.Tolerance = ppm_tolerance
 723
 724        # Get the scan filter for the first scan.  This scan filter will be used to located
 725        # scans within the given scan range of the same type
 726        scanFilter = self.iRawDataPlus.GetFilterForScanNumber(self.start_scan)
 727
 728        # force it to only look for the MSType
 729        scanFilter = self.set_msordertype(scanFilter, ms_type)
 730
 731        if isinstance(self.scans, tuple):
 732            averageScan = Extensions.AverageScansInScanRange(
 733                self.iRawDataPlus, self.start_scan, self.end_scan, scanFilter, options
 734            )
 735
 736            if averageScan:
 737                if spectrum_mode == "profile":
 738                    mass_spec = get_profile_mass_spec(
 739                        averageScan, d_params, auto_process
 740                    )
 741
 742                    return mass_spec
 743
 744                elif spectrum_mode == "centroid":
 745                    if averageScan.HasCentroidStream:
 746                        mass_spec = get_centroid_mass_spec(averageScan, d_params)
 747
 748                        return mass_spec
 749
 750                    else:
 751                        raise ValueError(
 752                            "No Centroind data available for the selected scans"
 753                        )
 754                else:
 755                    raise ValueError("spectrum_mode must be 'profile' or centroid")
 756            else:
 757                raise ValueError("No data found for the selected scans")
 758
 759        elif isinstance(self.scans, list):
 760            d_params = self.set_metadata(scans_list=self.scans)
 761
 762            scans = List[int]()
 763            for scan in self.scans:
 764                scans.Add(scan)
 765
 766            averageScan = Extensions.AverageScans(self.iRawDataPlus, scans, options)
 767
 768            if averageScan:
 769                if spectrum_mode == "profile":
 770                    mass_spec = get_profile_mass_spec(
 771                        averageScan, d_params, auto_process
 772                    )
 773
 774                    return mass_spec
 775
 776                elif spectrum_mode == "centroid":
 777                    if averageScan.HasCentroidStream:
 778                        mass_spec = get_centroid_mass_spec(averageScan, d_params)
 779
 780                        return mass_spec
 781
 782                    else:
 783                        raise ValueError(
 784                            "No Centroind data available for the selected scans"
 785                        )
 786
 787                else:
 788                    raise ValueError("spectrum_mode must be 'profile' or centroid")
 789
 790            else:
 791                raise ValueError("No data found for the selected scans")
 792
 793        else:
 794            raise ValueError("scans must be a list intergers or a tuple if integers")
 795
 796    def set_metadata(
 797        self,
 798        firstScanNumber=0,
 799        lastScanNumber=0,
 800        scans_list=False,
 801        label=Labels.thermo_profile,
 802    ):
 803        """
 804        Collect metadata to be ingested in the mass spectrum object
 805
 806        scans_list: list[int] or false
 807        lastScanNumber: int
 808        firstScanNumber: int
 809        """
 810
 811        d_params = default_parameters(self.file_path)
 812
 813        # assumes scans is full scan or reduced profile scan
 814
 815        d_params["label"] = label
 816
 817        if scans_list:
 818            d_params["scan_number"] = scans_list
 819
 820            d_params["polarity"] = self.get_polarity_mode(scans_list[0])
 821
 822        else:
 823            d_params["scan_number"] = "{}-{}".format(firstScanNumber, lastScanNumber)
 824
 825            d_params["polarity"] = self.get_polarity_mode(firstScanNumber)
 826
 827        d_params["analyzer"] = self.iRawDataPlus.GetInstrumentData().Model
 828
 829        d_params["acquisition_time"] = self.get_creation_time()
 830
 831        d_params["instrument_label"] = self.iRawDataPlus.GetInstrumentData().Name
 832
 833        return d_params
 834
 835    def get_instrument_methods(self, parse_strings: bool = True):
 836        """
 837        This function will extract the instrument methods embedded in the raw file
 838
 839        First it will check if there are any instrument methods, if not returning None
 840        Then it will get the total number of instrument methods.
 841        For each method, it will extract the plaintext string of the method and attempt to parse it into a dictionary
 842        If this fails, it will return just the string object.
 843
 844        This has been tested on data from an Orbitrap ID-X with embedded MS and LC methods, but other instrument types may fail.
 845
 846        Parameters:
 847        -----------
 848        parse_strings: bool
 849            If True, will attempt to parse the instrument methods into a dictionary. If False, will return the raw string.
 850
 851        Returns:
 852        --------
 853        List[Dict[str, Any]] or List
 854            A list of dictionaries containing the instrument methods, or a list of strings if parsing fails.
 855        """
 856
 857        if not self.iRawDataPlus.HasInstrumentMethod:
 858            raise ValueError(
 859                "Raw Data file does not have any instrument methods attached"
 860            )
 861            return None
 862        else:
 863
 864            def parse_instrument_method(data):
 865                lines = data.split("\r\n")
 866                method = {}
 867                current_section = None
 868                sub_section = None
 869
 870                for line in lines:
 871                    if not line.strip():  # Skip empty lines
 872                        continue
 873                    if (
 874                        line.startswith("----")
 875                        or line.endswith("Settings")
 876                        or line.endswith("Summary")
 877                        or line.startswith("Experiment")
 878                        or line.startswith("Scan Event")
 879                    ):
 880                        current_section = line.replace("-", "").strip()
 881                        method[current_section] = {}
 882                        sub_section = None
 883                    elif line.startswith("\t"):
 884                        if "\t\t" in line:
 885                            indent_level = line.count("\t")
 886                            key_value = line.strip()
 887
 888                            if indent_level == 2:
 889                                if sub_section:
 890                                    key, value = (
 891                                        key_value.split("=", 1)
 892                                        if "=" in key_value
 893                                        else (key_value, None)
 894                                    )
 895                                    method[current_section][sub_section][
 896                                        key.strip()
 897                                    ] = value.strip() if value else None
 898                            elif indent_level == 3:
 899                                scan_type, key_value = (
 900                                    key_value.split(" ", 1)
 901                                    if " " in key_value
 902                                    else (key_value, None)
 903                                )
 904                                method.setdefault(current_section, {}).setdefault(
 905                                    sub_section, {}
 906                                ).setdefault(scan_type, {})
 907
 908                                if key_value:
 909                                    key, value = (
 910                                        key_value.split("=", 1)
 911                                        if "=" in key_value
 912                                        else (key_value, None)
 913                                    )
 914                                    method[current_section][sub_section][scan_type][
 915                                        key.strip()
 916                                    ] = value.strip() if value else None
 917                        else:
 918                            key_value = line.strip()
 919                            if "=" in key_value:
 920                                key, value = key_value.split("=", 1)
 921                                method.setdefault(current_section, {})[key.strip()] = (
 922                                    value.strip()
 923                                )
 924                            else:
 925                                sub_section = key_value
 926                    else:
 927                        if ":" in line:
 928                            key, value = line.split(":", 1)
 929                            method[current_section][key.strip()] = value.strip()
 930                        else:
 931                            method[current_section][line] = {}
 932
 933                return method
 934
 935            count_instrument_methods = self.iRawDataPlus.InstrumentMethodsCount
 936            # TODO make this code better...
 937            instrument_methods = []
 938            for i in range(count_instrument_methods):
 939                instrument_method_string = self.iRawDataPlus.GetInstrumentMethod(i)
 940                if parse_strings:
 941                    try:
 942                        instrument_method_dict = parse_instrument_method(
 943                            instrument_method_string
 944                        )
 945                    except:  # if it fails for any reason
 946                        instrument_method_dict = instrument_method_string
 947                else:
 948                    instrument_method_dict = instrument_method_string
 949                instrument_methods.append(instrument_method_dict)
 950            return instrument_methods
 951
 952    def get_tune_method(self):
 953        """
 954        This code will extract the tune method from the raw file
 955        It has been tested on data from a Thermo Orbitrap ID-X, Astral and Q-Exactive, but may fail on other instrument types.
 956        It attempts to parse out section headers and sub-sections, but may not work for all instrument types.
 957        It will also not return Labels (keys) where the value is blank
 958
 959        Returns:
 960        --------
 961        Dict[str, Any]
 962            A dictionary containing the tune method information
 963
 964        Raises:
 965        -------
 966        ValueError
 967            If no tune methods are found in the raw file
 968
 969        """
 970        tunemethodcount = self.iRawDataPlus.GetTuneDataCount()
 971        if tunemethodcount == 0:
 972            raise ValueError("No tune methods found in the raw data file")
 973            return None
 974        elif tunemethodcount > 1:
 975            warnings.warn(
 976                "Multiple tune methods found in the raw data file, returning the 1st"
 977            )
 978
 979        header = self.iRawDataPlus.GetTuneData(0)
 980
 981        header_dic = {}
 982        current_section = None
 983
 984        for i in range(header.Length):
 985            label = header.Labels[i]
 986            value = header.Values[i]
 987
 988            # Check for section headers
 989            if "===" in label or (
 990                (value == "" or value is None) and not label.endswith(":")
 991            ):
 992                # This is a section header
 993                section_name = (
 994                    label.replace("=", "").replace(":", "").strip()
 995                )  # Clean the label if it contains '='
 996                header_dic[section_name] = {}
 997                current_section = section_name
 998            else:
 999                if current_section:
1000                    header_dic[current_section][label] = value
1001                else:
1002                    header_dic[label] = value
1003        return header_dic
1004
1005    def get_status_log(self, retention_time: float = 0):
1006        """
1007        This code will extract the status logs from the raw file
1008        It has been tested on data from a Thermo Orbitrap ID-X, Astral and Q-Exactive, but may fail on other instrument types.
1009        It attempts to parse out section headers and sub-sections, but may not work for all instrument types.
1010        It will also not return Labels (keys) where the value is blank
1011
1012        Parameters:
1013        -----------
1014        retention_time: float
1015            The retention time in minutes to extract the status log data from.
1016            Will use the closest retention time found. Default 0.
1017
1018        Returns:
1019        --------
1020        Dict[str, Any]
1021            A dictionary containing the status log information
1022
1023        Raises:
1024        -------
1025        ValueError
1026            If no status logs are found in the raw file
1027
1028        """
1029        tunemethodcount = self.iRawDataPlus.GetStatusLogEntriesCount()
1030        if tunemethodcount == 0:
1031            raise ValueError("No status logs found in the raw data file")
1032            return None
1033
1034        header = self.iRawDataPlus.GetStatusLogForRetentionTime(retention_time)
1035
1036        header_dic = {}
1037        current_section = None
1038
1039        for i in range(header.Length):
1040            label = header.Labels[i]
1041            value = header.Values[i]
1042
1043            # Check for section headers
1044            if "===" in label or (
1045                (value == "" or value is None) and not label.endswith(":")
1046            ):
1047                # This is a section header
1048                section_name = (
1049                    label.replace("=", "").replace(":", "").strip()
1050                )  # Clean the label if it contains '='
1051                header_dic[section_name] = {}
1052                current_section = section_name
1053            else:
1054                if current_section:
1055                    header_dic[current_section][label] = value
1056                else:
1057                    header_dic[label] = value
1058        return header_dic
1059
1060    def get_error_logs(self):
1061        """
1062        This code will extract the error logs from the raw file
1063
1064        Returns:
1065        --------
1066        Dict[float, str]
1067            A dictionary containing the error log information with the retention time as the key
1068
1069        Raises:
1070        -------
1071        ValueError
1072            If no error logs are found in the raw file
1073        """
1074
1075        error_log_count = self.iRawDataPlus.RunHeaderEx.ErrorLogCount
1076        if error_log_count == 0:
1077            raise ValueError("No error logs found in the raw data file")
1078            return None
1079
1080        error_logs = {}
1081
1082        for i in range(error_log_count):
1083            error_log_item = self.iRawDataPlus.GetErrorLogItem(i)
1084            rt = error_log_item.RetentionTime
1085            message = error_log_item.Message
1086            # Use the index `i` as the unique ID key
1087            error_logs[i] = {"rt": rt, "message": message}
1088        return error_logs
1089
1090    def get_sample_information(self):
1091        """
1092        This code will extract the sample information from the raw file
1093
1094        Returns:
1095        --------
1096        Dict[str, Any]
1097            A dictionary containing the sample information
1098            Note that UserText field may not be handled properly and may need further processing
1099        """
1100        sminfo = self.iRawDataPlus.SampleInformation
1101        smdict = {}
1102        smdict["Comment"] = sminfo.Comment
1103        smdict["SampleId"] = sminfo.SampleId
1104        smdict["SampleName"] = sminfo.SampleName
1105        smdict["Vial"] = sminfo.Vial
1106        smdict["InjectionVolume"] = sminfo.InjectionVolume
1107        smdict["Barcode"] = sminfo.Barcode
1108        smdict["BarcodeStatus"] = str(sminfo.BarcodeStatus)
1109        smdict["CalibrationLevel"] = sminfo.CalibrationLevel
1110        smdict["DilutionFactor"] = sminfo.DilutionFactor
1111        smdict["InstrumentMethodFile"] = sminfo.InstrumentMethodFile
1112        smdict["RawFileName"] = sminfo.RawFileName
1113        smdict["CalibrationFile"] = sminfo.CalibrationFile
1114        smdict["IstdAmount"] = sminfo.IstdAmount
1115        smdict["RowNumber"] = sminfo.RowNumber
1116        smdict["Path"] = sminfo.Path
1117        smdict["ProcessingMethodFile"] = sminfo.ProcessingMethodFile
1118        smdict["SampleType"] = str(sminfo.SampleType)
1119        smdict["SampleWeight"] = sminfo.SampleWeight
1120        smdict["UserText"] = {
1121            "UserText": [x for x in sminfo.UserText]
1122        }  # [0] #This may not work - needs debugging with
1123        return smdict
1124
1125    def get_instrument_data(self):
1126        """
1127        This code will extract the instrument data from the raw file
1128
1129        Returns:
1130        --------
1131        Dict[str, Any]
1132            A dictionary containing the instrument data
1133        """
1134        instrument_data = self.iRawDataPlus.GetInstrumentData()
1135        id_dict = {}
1136        id_dict["Name"] = instrument_data.Name
1137        id_dict["Model"] = instrument_data.Model
1138        id_dict["SerialNumber"] = instrument_data.SerialNumber
1139        id_dict["SoftwareVersion"] = instrument_data.SoftwareVersion
1140        id_dict["HardwareVersion"] = instrument_data.HardwareVersion
1141        id_dict["ChannelLabels"] = {
1142            "ChannelLabels": [x for x in instrument_data.ChannelLabels]
1143        }
1144        id_dict["Flags"] = instrument_data.Flags
1145        id_dict["AxisLabelY"] = instrument_data.AxisLabelY
1146        id_dict["AxisLabelX"] = instrument_data.AxisLabelX
1147        return id_dict
1148
1149    def get_centroid_msms_data(self, scan):
1150        """
1151        .. deprecated:: 2.0
1152            This function will be removed in CoreMS 2.0. Please use `get_average_mass_spectrum()` instead for similar functionality.
1153        """
1154
1155        warnings.warn(
1156            "The `get_centroid_msms_data()` is deprecated as of CoreMS 2.0 and will be removed in a future version. "
1157            "Please use `get_average_mass_spectrum()` instead.",
1158            DeprecationWarning,
1159        )
1160
1161        d_params = self.set_metadata(scans_list=[scan], label=Labels.thermo_centroid)
1162
1163        centroidStream = self.iRawDataPlus.GetCentroidStream(scan, False)
1164
1165        noise = list(centroidStream.Noises)
1166
1167        baselines = list(centroidStream.Baselines)
1168
1169        rp = list(centroidStream.Resolutions)
1170
1171        magnitude = list(centroidStream.Intensities)
1172
1173        mz = list(centroidStream.Masses)
1174
1175        # charge = scans_labels[5]
1176        array_noise_std = (np.array(noise) - np.array(baselines)) / 3
1177        l_signal_to_noise = np.array(magnitude) / array_noise_std
1178
1179        d_params["baseline_noise"] = np.average(array_noise_std)
1180
1181        d_params["baseline_noise_std"] = np.std(array_noise_std)
1182
1183        data_dict = {
1184            Labels.mz: mz,
1185            Labels.abundance: magnitude,
1186            Labels.rp: rp,
1187            Labels.s2n: list(l_signal_to_noise),
1188        }
1189
1190        mass_spec = MassSpecCentroid(data_dict, d_params, auto_process=False)
1191        mass_spec.settings.noise_threshold_method = "relative_abundance"
1192        mass_spec.settings.noise_threshold_min_relative_abundance = 1
1193        mass_spec.process_mass_spec()
1194        return mass_spec
1195
1196    def get_average_mass_spectrum_by_scanlist(
1197        self,
1198        scans_list: List[int],
1199        auto_process: bool = True,
1200        ppm_tolerance: float = 5.0,
1201    ) -> MassSpecProfile:
1202        """
1203        Averages selected scans mass spectra using Thermo's AverageScans method
1204        scans_list: list[int]
1205        auto_process: bool
1206            If true performs peak picking, and noise threshold calculation after creation of mass spectrum object
1207        Returns:
1208            MassSpecProfile
1209
1210         .. deprecated:: 2.0
1211        This function will be removed in CoreMS 2.0. Please use `get_average_mass_spectrum()` instead for similar functionality.
1212        """
1213
1214        warnings.warn(
1215            "The `get_average_mass_spectrum_by_scanlist()` is deprecated as of CoreMS 2.0 and will be removed in a future version. "
1216            "Please use `get_average_mass_spectrum()` instead.",
1217            DeprecationWarning,
1218        )
1219
1220        d_params = self.set_metadata(scans_list=scans_list)
1221
1222        # assumes scans is full scan or reduced profile scan
1223
1224        scans = List[int]()
1225        for scan in scans_list:
1226            scans.Add(scan)
1227
1228        # Create the mass options object that will be used when averaging the scans
1229        options = MassOptions()
1230        options.ToleranceUnits = ToleranceUnits.ppm
1231        options.Tolerance = ppm_tolerance
1232
1233        # Get the scan filter for the first scan.  This scan filter will be used to located
1234        # scans within the given scan range of the same type
1235
1236        averageScan = Extensions.AverageScans(self.iRawDataPlus, scans, options)
1237
1238        len_data = averageScan.SegmentedScan.Positions.Length
1239
1240        mz_list = list(averageScan.SegmentedScan.Positions)
1241        abund_list = list(averageScan.SegmentedScan.Intensities)
1242
1243        data_dict = {
1244            Labels.mz: mz_list,
1245            Labels.abundance: abund_list,
1246        }
1247
1248        mass_spec = MassSpecProfile(data_dict, d_params, auto_process=auto_process)
1249
1250        return mass_spec

Class for parsing Thermo Raw files and extracting information from them.

Parameters:

file_location : str or pathlib.Path or s3path.S3Path Thermo Raw file path or S3 path.

Attributes:

file_path : str or pathlib.Path or s3path.S3Path The file path of the Thermo Raw file. parameters : LCMSParameters The LCMS parameters for the Thermo Raw file. chromatogram_settings : LiquidChromatographSetting The chromatogram settings for the Thermo Raw file. scans : list or tuple The selected scans for the Thermo Raw file. start_scan : int The starting scan number for the Thermo Raw file. end_scan : int The ending scan number for the Thermo Raw file.

Methods:

  • set_msordertype(scanFilter, mstype: str = 'ms1') -> scanFilter Convert the user-passed MS Type string to a Thermo MSOrderType object.
  • get_creation_time() -> datetime.datetime Extract the creation date stamp from the .RAW file and return it as a formatted datetime object.
  • remove_temp_file() Remove the temporary file if the path is from S3Path.
  • get_polarity_mode(scan_number: int) -> int Get the polarity mode for the given scan number.
  • get_filter_for_scan_num(scan_number: int) -> List[str] Get the filter for the given scan number.
  • check_full_scan(scan_number: int) -> bool Check if the given scan number is a full scan.
  • get_all_filters() -> Tuple[Dict[int, str], List[str]] Get all scan filters for the Thermo Raw file.
  • get_scan_header(scan: int) -> Dict[str, Any] Get the full dictionary of scan header metadata for the given scan number.
  • get_rt_time_from_trace(trace) -> Tuple[List[float], List[float], List[int]] Get the retention time, intensity, and scan number from the given trace.
  • get_eics(target_mzs: List[float], tic_data: Dict[str, Any], ms_type: str = 'MS !d', peak_detection: bool = True, smooth: bool = True, plot: bool = False, ax: Optional[matplotlib.axes.Axes] = None, legend: bool = False) -> Tuple[Dict[float, EIC_Data], matplotlib.axes.Axes] Get the extracted ion chromatograms (EICs) for the target m/z values.
ThermoBaseClass(file_location)
110    def __init__(self, file_location):
111        """file_location: srt pathlib.Path or s3path.S3Path
112        Thermo Raw file path
113        """
114        # Thread.__init__(self)
115        if isinstance(file_location, str):
116            file_path = Path(file_location)
117
118        elif isinstance(file_location, S3Path):
119            temp_dir = Path("tmp/")
120            temp_dir.mkdir(exist_ok=True)
121
122            file_path = temp_dir / file_location.name
123            with open(file_path, "wb") as fh:
124                fh.write(file_location.read_bytes())
125
126        else:
127            file_path = file_location
128
129        self.iRawDataPlus = RawFileReaderAdapter.FileFactory(str(file_path))
130
131        if not self.iRawDataPlus.IsOpen:
132            raise FileNotFoundError(
133                "Unable to access the RAW file using the RawFileReader class!"
134            )
135
136        # Check for any errors in the RAW file
137        if self.iRawDataPlus.IsError:
138            raise IOError(
139                "Error opening ({}) - {}".format(self.iRawDataPlus.FileError, file_path)
140            )
141
142        self.res = self.iRawDataPlus.SelectInstrument(Device.MS, 1)
143
144        self.file_path = file_location
145        self.iFileHeader = FileHeaderReaderFactory.ReadFile(str(file_path))
146
147        # removing tmp file
148
149        self._init_settings()

file_location: srt pathlib.Path or s3path.S3Path Thermo Raw file path

iRawDataPlus
res
file_path
iFileHeader

Get or set the LCMSParameters object.

Get or set the LiquidChromatographSetting object.

scans: list | tuple

scans : list or tuple If list uses Thermo AverageScansInScanRange for selected scans, ortherwise uses Thermo AverageScans for a scan range

start_scan: int

Get the starting scan number for the Thermo Raw file.

end_scan: int

Get the ending scan number for the Thermo Raw file.

def set_msordertype(self, scanFilter, mstype: str = 'ms1'):
208    def set_msordertype(self, scanFilter, mstype: str = "ms1"):
209        """
210        Function to convert user passed string MS Type to Thermo MSOrderType object
211        Limited to MS1 through MS10.
212
213        Parameters:
214        -----------
215        scanFilter : Thermo.ScanFilter
216            The scan filter object.
217        mstype : str, optional
218            The MS Type string, by default 'ms1'
219
220        """
221        mstype = mstype.upper()
222        # Check that a valid mstype is passed
223        if (int(mstype.split("MS")[1]) > 10) or (int(mstype.split("MS")[1]) < 1):
224            warn("MS Type not valid, must be between MS1 and MS10")
225
226        msordertypedict = {
227            "MS1": MSOrderType.Ms,
228            "MS2": MSOrderType.Ms2,
229            "MS3": MSOrderType.Ms3,
230            "MS4": MSOrderType.Ms4,
231            "MS5": MSOrderType.Ms5,
232            "MS6": MSOrderType.Ms6,
233            "MS7": MSOrderType.Ms7,
234            "MS8": MSOrderType.Ms8,
235            "MS9": MSOrderType.Ms9,
236            "MS10": MSOrderType.Ms10,
237        }
238        scanFilter.MSOrder = msordertypedict[mstype]
239        return scanFilter

Function to convert user passed string MS Type to Thermo MSOrderType object Limited to MS1 through MS10.

Parameters:

scanFilter : Thermo.ScanFilter The scan filter object. mstype : str, optional The MS Type string, by default 'ms1'

def get_creation_time(self) -> datetime.datetime:
241    def get_creation_time(self) -> datetime.datetime:
242        """
243        Extract the creation date stamp from the .RAW file
244        Return formatted creation date stamp.
245
246        """
247        credate = self.iRawDataPlus.CreationDate.get_Ticks()
248        credate = datetime.datetime(1, 1, 1) + datetime.timedelta(
249            microseconds=credate / 10
250        )
251        return credate

Extract the creation date stamp from the .RAW file Return formatted creation date stamp.

def remove_temp_file(self) -> None:
253    def remove_temp_file(self) -> None:
254        """if the path is from S3Path data cannot be serialized to io.ByteStream and
255        a temporary copy is stored at the temp dir
256        use this function only at the end of your execution scrip
257        some LCMS class methods depend on this file
258        """
259
260        self.file_path.unlink()

if the path is from S3Path data cannot be serialized to io.ByteStream and a temporary copy is stored at the temp dir use this function only at the end of your execution scrip some LCMS class methods depend on this file

def close_file(self) -> None:
262    def close_file(self) -> None:
263        """
264        Close the Thermo Raw file.
265        """
266        self.iRawDataPlus.Dispose()

Close the Thermo Raw file.

def get_polarity_mode(self, scan_number: int) -> int:
268    def get_polarity_mode(self, scan_number: int) -> int:
269        """
270        Get the polarity mode for the given scan number.
271
272        Parameters:
273        -----------
274        scan_number : int
275            The scan number.
276
277        Raises:
278        -------
279        Exception
280            If the polarity mode is unknown.
281
282        """
283        polarity_symbol = self.get_filter_for_scan_num(scan_number)[1]
284
285        if polarity_symbol == "+":
286            return 1
287            # return 'POSITIVE_ION_MODE'
288
289        elif polarity_symbol == "-":
290            return -1
291
292        else:
293            raise Exception("Polarity Mode Unknown, please set it manually")

Get the polarity mode for the given scan number.

Parameters:

scan_number : int The scan number.

Raises:

Exception If the polarity mode is unknown.

def get_filter_for_scan_num(self, scan_number: int) -> System.Collections.Generic.List[String]:
295    def get_filter_for_scan_num(self, scan_number: int) -> List[str]:
296        """
297        Returns the closest matching run time that corresponds to scan_number for the current
298        controller. This function is only supported for MS device controllers.
299        e.g.  ['FTMS', '-', 'p', 'NSI', 'Full', 'ms', '[200.00-1000.00]']
300
301        Parameters:
302        -----------
303        scan_number : int
304            The scan number.
305
306        """
307        scan_label = self.iRawDataPlus.GetScanEventStringForScanNumber(scan_number)
308
309        return str(scan_label).split()

Returns the closest matching run time that corresponds to scan_number for the current controller. This function is only supported for MS device controllers. e.g. ['FTMS', '-', 'p', 'NSI', 'Full', 'ms', '[200.00-1000.00]']

Parameters:

scan_number : int The scan number.

def get_ms_level_for_scan_num(self, scan_number: int) -> str:
311    def get_ms_level_for_scan_num(self, scan_number: int) -> str:
312        """
313        Get the MS order for the given scan number.
314
315        Parameters:
316        -----------
317        scan_number : int
318            The scan number
319
320        Returns:
321        --------
322        int
323            The MS order type (1 for MS, 2 for MS2, etc.)
324        """
325        scan_filter = self.iRawDataPlus.GetFilterForScanNumber(scan_number)
326
327        msordertype = {
328            MSOrderType.Ms: 1,
329            MSOrderType.Ms2: 2,
330            MSOrderType.Ms3: 3,
331            MSOrderType.Ms4: 4,
332            MSOrderType.Ms5: 5,
333            MSOrderType.Ms6: 6,
334            MSOrderType.Ms7: 7,
335            MSOrderType.Ms8: 8,
336            MSOrderType.Ms9: 9,
337            MSOrderType.Ms10: 10,
338        }
339
340        if scan_filter.MSOrder in msordertype:
341            return msordertype[scan_filter.MSOrder]
342        else:
343            raise Exception("MS Order Type not found")

Get the MS order for the given scan number.

Parameters:

scan_number : int The scan number

Returns:

int The MS order type (1 for MS, 2 for MS2, etc.)

def check_full_scan(self, scan_number: int) -> bool:
345    def check_full_scan(self, scan_number: int) -> bool:
346        # scan_filter.ScanMode 0 = FULL
347        scan_filter = self.iRawDataPlus.GetFilterForScanNumber(scan_number)
348
349        return scan_filter.ScanMode == MSOrderType.Ms
def get_all_filters(self) -> Tuple[Dict[int, str], System.Collections.Generic.List[String]]:
351    def get_all_filters(self) -> Tuple[Dict[int, str], List[str]]:
352        """
353        Get all scan filters.
354        This function is only supported for MS device controllers.
355        e.g.  ['FTMS', '-', 'p', 'NSI', 'Full', 'ms', '[200.00-1000.00]']
356
357        """
358
359        scanrange = range(self.start_scan, self.end_scan + 1)
360        scanfiltersdic = {}
361        scanfilterslist = []
362        for scan_number in scanrange:
363            scan_label = self.iRawDataPlus.GetScanEventStringForScanNumber(scan_number)
364            scanfiltersdic[scan_number] = scan_label
365            scanfilterslist.append(scan_label)
366        scanfilterset = list(set(scanfilterslist))
367        return scanfiltersdic, scanfilterset

Get all scan filters. This function is only supported for MS device controllers. e.g. ['FTMS', '-', 'p', 'NSI', 'Full', 'ms', '[200.00-1000.00]']

def get_scan_header(self, scan: int) -> Dict[str, Any]:
369    def get_scan_header(self, scan: int) -> Dict[str, Any]:
370        """
371        Get full dictionary of scan header meta data, i.e. AGC status, ion injection time, etc.
372
373        Parameters:
374        -----------
375        scan : int
376            The scan number.
377
378        """
379        header = self.iRawDataPlus.GetTrailerExtraInformation(scan)
380
381        header_dic = {}
382        for i in range(header.Length):
383            header_dic.update({header.Labels[i]: header.Values[i]})
384        return header_dic

Get full dictionary of scan header meta data, i.e. AGC status, ion injection time, etc.

Parameters:

scan : int The scan number.

@staticmethod
def get_rt_time_from_trace( trace) -> Tuple[System.Collections.Generic.List[Double], System.Collections.Generic.List[Double], System.Collections.Generic.List[Int32]]:
386    @staticmethod
387    def get_rt_time_from_trace(trace) -> Tuple[List[float], List[float], List[int]]:
388        """trace: ThermoFisher.CommonCore.Data.Business.ChromatogramSignal"""
389        return list(trace.Times), list(trace.Intensities), list(trace.Scans)

trace: ThermoFisher.CommonCore.Data.Business.ChromatogramSignal

def get_eics( self, target_mzs: System.Collections.Generic.List[Double], tic_data: Dict[str, Any], ms_type='MS !d', peak_detection=False, smooth=False, plot=False, ax: Optional[matplotlib.axes._axes.Axes] = None, legend=False) -> Tuple[Dict[float, corems.mass_spectra.factory.chromat_data.EIC_Data], matplotlib.axes._axes.Axes]:
391    def get_eics(
392        self,
393        target_mzs: List[float],
394        tic_data: Dict[str, Any],
395        ms_type="MS !d",
396        peak_detection=False,
397        smooth=False,
398        plot=False,
399        ax: Optional[axes.Axes] = None,
400        legend=False,
401    ) -> Tuple[Dict[float, EIC_Data], axes.Axes]:
402        """ms_type: str ('MS', MS2')
403        start_scan: int default -1 will select the lowest available
404        end_scan: int default -1 will select the highest available
405
406        returns:
407
408            chroma: dict{target_mz: EIC_Data(
409                                        Scans: [int]
410                                            original thermo scan numbers
411                                        Time: [floats]
412                                            list of retention times
413                                        TIC: [floats]
414                                            total ion chromatogram
415                                        Apexes: [int]
416                                            original thermo apex scan number after peak picking
417                                        )
418
419        """
420        # If peak_detection or smooth is True, raise exception
421        if peak_detection or smooth:
422            raise Exception("Peak detection and smoothing are no longer implemented in this function")
423
424        options = MassOptions()
425        options.ToleranceUnits = ToleranceUnits.ppm
426        options.Tolerance = self.chromatogram_settings.eic_tolerance_ppm
427
428        all_chroma_settings = []
429
430        for target_mz in target_mzs:
431            settings = ChromatogramTraceSettings(TraceType.MassRange)
432            settings.Filter = ms_type
433            settings.MassRanges = [Range(target_mz, target_mz)]
434
435            chroma_settings = IChromatogramSettings(settings)
436
437            all_chroma_settings.append(chroma_settings)
438
439        # chroma_settings2 = IChromatogramSettings(settings)
440        # print(chroma_settings.FragmentMass)
441        # print(chroma_settings.FragmentMass)
442        # print(chroma_settings)
443        # print(chroma_settings)
444
445        data = self.iRawDataPlus.GetChromatogramData(
446            all_chroma_settings, self.start_scan, self.end_scan, options
447        )
448
449        traces = ChromatogramSignal.FromChromatogramData(data)
450
451        chroma = {}
452
453        if plot:
454            from matplotlib.transforms import Bbox
455            import matplotlib.pyplot as plt
456
457            if not ax:
458                # ax = plt.gca()
459                # ax.clear()
460                fig, ax = plt.subplots()
461
462            else:
463                fig = plt.gcf()
464
465            # plt.show()
466
467        for i, trace in enumerate(traces):
468            if trace.Length > 0:
469                rt, eic, scans = self.get_rt_time_from_trace(trace)
470                if smooth:
471                    eic = self.smooth_tic(eic)
472
473                chroma[target_mzs[i]] = EIC_Data(scans=scans, time=rt, eic=eic)
474                if plot:
475                    ax.plot(rt, eic, label="{:.5f}".format(target_mzs[i]))
476
477        if peak_detection:
478            # max_eic = self.get_max_eic(chroma)
479            max_signal = max(tic_data.tic)
480
481            for eic_data in chroma.values():
482                eic = eic_data.eic
483                time = eic_data.time
484
485                if len(eic) != len(tic_data.tic):
486                    warn(
487                        "The software assumes same lenth of TIC and EIC, this does not seems to be the case and the results mass spectrum selected by the scan number might not be correct"
488                    )
489
490                if eic.max() > 0:
491                    centroid_eics = self.eic_centroid_detector(time, eic, max_signal)
492                    eic_data.apexes = [i for i in centroid_eics]
493
494                    if plot:
495                        for peak_indexes in eic_data.apexes:
496                            apex_index = peak_indexes[1]
497                            ax.plot(
498                                time[apex_index],
499                                eic[apex_index],
500                                marker="x",
501                                linewidth=0,
502                            )
503
504        if plot:
505            ax.set_xlabel("Time (min)")
506            ax.set_ylabel("a.u.")
507            ax.set_title(ms_type + " EIC")
508            ax.tick_params(axis="both", which="major", labelsize=12)
509            ax.axes.spines["top"].set_visible(False)
510            ax.axes.spines["right"].set_visible(False)
511
512            if legend:
513                legend = ax.legend(loc="upper left", bbox_to_anchor=(1.02, 0, 0.07, 1))
514                fig.subplots_adjust(right=0.76)
515                # ax.set_prop_cycle(color=plt.cm.gist_rainbow(np.linspace(0, 1, len(traces))))
516
517                d = {"down": 30, "up": -30}
518
519                def func(evt):
520                    if legend.contains(evt):
521                        bbox = legend.get_bbox_to_anchor()
522                        bbox = Bbox.from_bounds(
523                            bbox.x0, bbox.y0 + d[evt.button], bbox.width, bbox.height
524                        )
525                        tr = legend.axes.transAxes.inverted()
526                        legend.set_bbox_to_anchor(bbox.transformed(tr))
527                        fig.canvas.draw_idle()
528
529                fig.canvas.mpl_connect("scroll_event", func)
530            return chroma, ax
531        else:
532            return chroma, None
533            rt = []
534            tic = []
535            scans = []
536            for i in range(traces[0].Length):
537                # print(trace[0].HasBasePeakData,trace[0].EndTime )
538
539                # print("  {} - {}, {}".format( i, trace[0].Times[i], trace[0].Intensities[i] ))
540                rt.append(traces[0].Times[i])
541                tic.append(traces[0].Intensities[i])
542                scans.append(traces[0].Scans[i])
543
544            return traces
545            # plot_chroma(rt, tic)
546            # plt.show()

ms_type: str ('MS', MS2') start_scan: int default -1 will select the lowest available end_scan: int default -1 will select the highest available

returns:

chroma: dict{target_mz: EIC_Data(
                            Scans: [int]
                                original thermo scan numbers
                            Time: [floats]
                                list of retention times
                            TIC: [floats]
                                total ion chromatogram
                            Apexes: [int]
                                original thermo apex scan number after peak picking
                            )
def get_tic( self, ms_type='MS !d', peak_detection=False, smooth=False, plot=False, ax=None, trace_type='TIC') -> Tuple[corems.mass_spectra.factory.chromat_data.TIC_Data, matplotlib.axes._axes.Axes]:
548    def get_tic(
549        self,
550        ms_type="MS !d",
551        peak_detection=False,  # This wont work right now
552        smooth=False,  # This wont work right now
553        plot=False,
554        ax=None,
555        trace_type="TIC",
556    ) -> Tuple[TIC_Data, axes.Axes]:
557        """ms_type: str ('MS !d', 'MS2', None)
558            if you use None you get all scans.
559        peak_detection: bool
560        smooth: bool
561        plot: bool
562        ax: matplotlib axis object
563        trace_type: str ('TIC','BPC')
564
565        returns:
566            chroma: dict
567            {
568            Scan: [int]
569                original thermo scan numberMS
570            Time: [floats]
571                list of retention times
572            TIC: [floats]
573                total ion chromatogram
574            Apexes: [int]
575                original thermo apex scan number after peak picking
576            }
577        """
578        if trace_type == "TIC":
579            settings = ChromatogramTraceSettings(TraceType.TIC)
580        elif trace_type == "BPC":
581            settings = ChromatogramTraceSettings(TraceType.BasePeak)
582        else:
583            raise ValueError(f"{trace_type} undefined")
584        if ms_type == "all":
585            settings.Filter = None
586        else:
587            settings.Filter = ms_type
588
589        chroma_settings = IChromatogramSettings(settings)
590
591        data = self.iRawDataPlus.GetChromatogramData(
592            [chroma_settings], self.start_scan, self.end_scan
593        )
594
595        trace = ChromatogramSignal.FromChromatogramData(data)
596
597        data = TIC_Data(time=[], scans=[], tic=[], bpc=[], apexes=[])
598
599        if trace[0].Length > 0:
600            for i in range(trace[0].Length):
601                # print(trace[0].HasBasePeakData,trace[0].EndTime )
602
603                # print("  {} - {}, {}".format( i, trace[0].Times[i], trace[0].Intensities[i] ))
604                data.time.append(trace[0].Times[i])
605                data.tic.append(trace[0].Intensities[i])
606                data.scans.append(trace[0].Scans[i])
607
608                # print(trace[0].Scans[i])
609            if smooth:
610                data.tic = self.smooth_tic(data.tic)
611
612            else:
613                data.tic = np.array(data.tic)
614
615            if peak_detection:
616                centroid_peak_indexes = [
617                    i for i in self.centroid_detector(data.time, data.tic)
618                ]
619
620                data.apexes = centroid_peak_indexes
621
622            if plot:
623                if not ax:
624                    import matplotlib.pyplot as plt
625
626                    ax = plt.gca()
627                    # fig, ax = plt.subplots(figsize=(6, 3))
628
629                ax.plot(data.time, data.tic, label=trace_type)
630                ax.set_xlabel("Time (min)")
631                ax.set_ylabel("a.u.")
632                if peak_detection:
633                    for peak_indexes in data.apexes:
634                        apex_index = peak_indexes[1]
635                        ax.plot(
636                            data.time[apex_index],
637                            data.tic[apex_index],
638                            marker="x",
639                            linewidth=0,
640                        )
641
642                # plt.show()
643                if trace_type == "BPC":
644                    data.bpc = data.tic
645                    data.tic = []
646                return data, ax
647            if trace_type == "BPC":
648                data.bpc = data.tic
649                data.tic = []
650            return data, None
651
652        else:
653            return None, None

ms_type: str ('MS !d', 'MS2', None) if you use None you get all scans. peak_detection: bool smooth: bool plot: bool ax: matplotlib axis object trace_type: str ('TIC','BPC')

returns: chroma: dict { Scan: [int] original thermo scan numberMS Time: [floats] list of retention times TIC: [floats] total ion chromatogram Apexes: [int] original thermo apex scan number after peak picking }

def get_average_mass_spectrum( self, spectrum_mode: str = 'profile', auto_process: bool = True, ppm_tolerance: float = 5.0, ms_type: str = 'MS1') -> corems.mass_spectrum.factory.MassSpectrumClasses.MassSpecProfile | corems.mass_spectrum.factory.MassSpectrumClasses.MassSpecCentroid:
655    def get_average_mass_spectrum(
656        self,
657        spectrum_mode: str = "profile",
658        auto_process: bool = True,
659        ppm_tolerance: float = 5.0,
660        ms_type: str = "MS1",
661    ) -> MassSpecProfile | MassSpecCentroid:
662        """
663        Averages mass spectra over a scan range using Thermo's AverageScansInScanRange method
664        or a scan list using Thermo's AverageScans method
665        spectrum_mode: str
666            centroid or profile mass spectrum
667        auto_process: bool
668            If true performs peak picking, and noise threshold calculation after creation of mass spectrum object
669        ms_type: str
670            String of form 'ms1' or 'ms2' or 'MS3' etc. Valid up to MS10.
671            Internal function converts to Thermo MSOrderType class.
672
673        """
674
675        def get_profile_mass_spec(averageScan, d_params: dict, auto_process: bool):
676            mz_list = list(averageScan.SegmentedScan.Positions)
677            abund_list = list(averageScan.SegmentedScan.Intensities)
678
679            data_dict = {
680                Labels.mz: mz_list,
681                Labels.abundance: abund_list,
682            }
683
684            return MassSpecProfile(data_dict, d_params, auto_process=auto_process)
685
686        def get_centroid_mass_spec(averageScan, d_params: dict):
687            noise = list(averageScan.centroidScan.Noises)
688
689            baselines = list(averageScan.centroidScan.Baselines)
690
691            rp = list(averageScan.centroidScan.Resolutions)
692
693            magnitude = list(averageScan.centroidScan.Intensities)
694
695            mz = list(averageScan.centroidScan.Masses)
696
697            array_noise_std = (np.array(noise) - np.array(baselines)) / 3
698            l_signal_to_noise = np.array(magnitude) / array_noise_std
699
700            d_params["baseline_noise"] = np.average(array_noise_std)
701
702            d_params["baseline_noise_std"] = np.std(array_noise_std)
703
704            data_dict = {
705                Labels.mz: mz,
706                Labels.abundance: magnitude,
707                Labels.rp: rp,
708                Labels.s2n: list(l_signal_to_noise),
709            }
710
711            mass_spec = MassSpecCentroid(data_dict, d_params, auto_process=False)
712
713            return mass_spec
714
715        d_params = self.set_metadata(
716            firstScanNumber=self.start_scan, lastScanNumber=self.end_scan
717        )
718
719        # Create the mass options object that will be used when averaging the scans
720        options = MassOptions()
721        options.ToleranceUnits = ToleranceUnits.ppm
722        options.Tolerance = ppm_tolerance
723
724        # Get the scan filter for the first scan.  This scan filter will be used to located
725        # scans within the given scan range of the same type
726        scanFilter = self.iRawDataPlus.GetFilterForScanNumber(self.start_scan)
727
728        # force it to only look for the MSType
729        scanFilter = self.set_msordertype(scanFilter, ms_type)
730
731        if isinstance(self.scans, tuple):
732            averageScan = Extensions.AverageScansInScanRange(
733                self.iRawDataPlus, self.start_scan, self.end_scan, scanFilter, options
734            )
735
736            if averageScan:
737                if spectrum_mode == "profile":
738                    mass_spec = get_profile_mass_spec(
739                        averageScan, d_params, auto_process
740                    )
741
742                    return mass_spec
743
744                elif spectrum_mode == "centroid":
745                    if averageScan.HasCentroidStream:
746                        mass_spec = get_centroid_mass_spec(averageScan, d_params)
747
748                        return mass_spec
749
750                    else:
751                        raise ValueError(
752                            "No Centroind data available for the selected scans"
753                        )
754                else:
755                    raise ValueError("spectrum_mode must be 'profile' or centroid")
756            else:
757                raise ValueError("No data found for the selected scans")
758
759        elif isinstance(self.scans, list):
760            d_params = self.set_metadata(scans_list=self.scans)
761
762            scans = List[int]()
763            for scan in self.scans:
764                scans.Add(scan)
765
766            averageScan = Extensions.AverageScans(self.iRawDataPlus, scans, options)
767
768            if averageScan:
769                if spectrum_mode == "profile":
770                    mass_spec = get_profile_mass_spec(
771                        averageScan, d_params, auto_process
772                    )
773
774                    return mass_spec
775
776                elif spectrum_mode == "centroid":
777                    if averageScan.HasCentroidStream:
778                        mass_spec = get_centroid_mass_spec(averageScan, d_params)
779
780                        return mass_spec
781
782                    else:
783                        raise ValueError(
784                            "No Centroind data available for the selected scans"
785                        )
786
787                else:
788                    raise ValueError("spectrum_mode must be 'profile' or centroid")
789
790            else:
791                raise ValueError("No data found for the selected scans")
792
793        else:
794            raise ValueError("scans must be a list intergers or a tuple if integers")

Averages mass spectra over a scan range using Thermo's AverageScansInScanRange method or a scan list using Thermo's AverageScans method spectrum_mode: str centroid or profile mass spectrum auto_process: bool If true performs peak picking, and noise threshold calculation after creation of mass spectrum object ms_type: str String of form 'ms1' or 'ms2' or 'MS3' etc. Valid up to MS10. Internal function converts to Thermo MSOrderType class.

def set_metadata( self, firstScanNumber=0, lastScanNumber=0, scans_list=False, label='Thermo_Profile'):
796    def set_metadata(
797        self,
798        firstScanNumber=0,
799        lastScanNumber=0,
800        scans_list=False,
801        label=Labels.thermo_profile,
802    ):
803        """
804        Collect metadata to be ingested in the mass spectrum object
805
806        scans_list: list[int] or false
807        lastScanNumber: int
808        firstScanNumber: int
809        """
810
811        d_params = default_parameters(self.file_path)
812
813        # assumes scans is full scan or reduced profile scan
814
815        d_params["label"] = label
816
817        if scans_list:
818            d_params["scan_number"] = scans_list
819
820            d_params["polarity"] = self.get_polarity_mode(scans_list[0])
821
822        else:
823            d_params["scan_number"] = "{}-{}".format(firstScanNumber, lastScanNumber)
824
825            d_params["polarity"] = self.get_polarity_mode(firstScanNumber)
826
827        d_params["analyzer"] = self.iRawDataPlus.GetInstrumentData().Model
828
829        d_params["acquisition_time"] = self.get_creation_time()
830
831        d_params["instrument_label"] = self.iRawDataPlus.GetInstrumentData().Name
832
833        return d_params

Collect metadata to be ingested in the mass spectrum object

scans_list: list[int] or false lastScanNumber: int firstScanNumber: int

def get_instrument_methods(self, parse_strings: bool = True):
835    def get_instrument_methods(self, parse_strings: bool = True):
836        """
837        This function will extract the instrument methods embedded in the raw file
838
839        First it will check if there are any instrument methods, if not returning None
840        Then it will get the total number of instrument methods.
841        For each method, it will extract the plaintext string of the method and attempt to parse it into a dictionary
842        If this fails, it will return just the string object.
843
844        This has been tested on data from an Orbitrap ID-X with embedded MS and LC methods, but other instrument types may fail.
845
846        Parameters:
847        -----------
848        parse_strings: bool
849            If True, will attempt to parse the instrument methods into a dictionary. If False, will return the raw string.
850
851        Returns:
852        --------
853        List[Dict[str, Any]] or List
854            A list of dictionaries containing the instrument methods, or a list of strings if parsing fails.
855        """
856
857        if not self.iRawDataPlus.HasInstrumentMethod:
858            raise ValueError(
859                "Raw Data file does not have any instrument methods attached"
860            )
861            return None
862        else:
863
864            def parse_instrument_method(data):
865                lines = data.split("\r\n")
866                method = {}
867                current_section = None
868                sub_section = None
869
870                for line in lines:
871                    if not line.strip():  # Skip empty lines
872                        continue
873                    if (
874                        line.startswith("----")
875                        or line.endswith("Settings")
876                        or line.endswith("Summary")
877                        or line.startswith("Experiment")
878                        or line.startswith("Scan Event")
879                    ):
880                        current_section = line.replace("-", "").strip()
881                        method[current_section] = {}
882                        sub_section = None
883                    elif line.startswith("\t"):
884                        if "\t\t" in line:
885                            indent_level = line.count("\t")
886                            key_value = line.strip()
887
888                            if indent_level == 2:
889                                if sub_section:
890                                    key, value = (
891                                        key_value.split("=", 1)
892                                        if "=" in key_value
893                                        else (key_value, None)
894                                    )
895                                    method[current_section][sub_section][
896                                        key.strip()
897                                    ] = value.strip() if value else None
898                            elif indent_level == 3:
899                                scan_type, key_value = (
900                                    key_value.split(" ", 1)
901                                    if " " in key_value
902                                    else (key_value, None)
903                                )
904                                method.setdefault(current_section, {}).setdefault(
905                                    sub_section, {}
906                                ).setdefault(scan_type, {})
907
908                                if key_value:
909                                    key, value = (
910                                        key_value.split("=", 1)
911                                        if "=" in key_value
912                                        else (key_value, None)
913                                    )
914                                    method[current_section][sub_section][scan_type][
915                                        key.strip()
916                                    ] = value.strip() if value else None
917                        else:
918                            key_value = line.strip()
919                            if "=" in key_value:
920                                key, value = key_value.split("=", 1)
921                                method.setdefault(current_section, {})[key.strip()] = (
922                                    value.strip()
923                                )
924                            else:
925                                sub_section = key_value
926                    else:
927                        if ":" in line:
928                            key, value = line.split(":", 1)
929                            method[current_section][key.strip()] = value.strip()
930                        else:
931                            method[current_section][line] = {}
932
933                return method
934
935            count_instrument_methods = self.iRawDataPlus.InstrumentMethodsCount
936            # TODO make this code better...
937            instrument_methods = []
938            for i in range(count_instrument_methods):
939                instrument_method_string = self.iRawDataPlus.GetInstrumentMethod(i)
940                if parse_strings:
941                    try:
942                        instrument_method_dict = parse_instrument_method(
943                            instrument_method_string
944                        )
945                    except:  # if it fails for any reason
946                        instrument_method_dict = instrument_method_string
947                else:
948                    instrument_method_dict = instrument_method_string
949                instrument_methods.append(instrument_method_dict)
950            return instrument_methods

This function will extract the instrument methods embedded in the raw file

First it will check if there are any instrument methods, if not returning None Then it will get the total number of instrument methods. For each method, it will extract the plaintext string of the method and attempt to parse it into a dictionary If this fails, it will return just the string object.

This has been tested on data from an Orbitrap ID-X with embedded MS and LC methods, but other instrument types may fail.

Parameters:

parse_strings: bool If True, will attempt to parse the instrument methods into a dictionary. If False, will return the raw string.

Returns:

List[Dict[str, Any]] or List A list of dictionaries containing the instrument methods, or a list of strings if parsing fails.

def get_tune_method(self):
 952    def get_tune_method(self):
 953        """
 954        This code will extract the tune method from the raw file
 955        It has been tested on data from a Thermo Orbitrap ID-X, Astral and Q-Exactive, but may fail on other instrument types.
 956        It attempts to parse out section headers and sub-sections, but may not work for all instrument types.
 957        It will also not return Labels (keys) where the value is blank
 958
 959        Returns:
 960        --------
 961        Dict[str, Any]
 962            A dictionary containing the tune method information
 963
 964        Raises:
 965        -------
 966        ValueError
 967            If no tune methods are found in the raw file
 968
 969        """
 970        tunemethodcount = self.iRawDataPlus.GetTuneDataCount()
 971        if tunemethodcount == 0:
 972            raise ValueError("No tune methods found in the raw data file")
 973            return None
 974        elif tunemethodcount > 1:
 975            warnings.warn(
 976                "Multiple tune methods found in the raw data file, returning the 1st"
 977            )
 978
 979        header = self.iRawDataPlus.GetTuneData(0)
 980
 981        header_dic = {}
 982        current_section = None
 983
 984        for i in range(header.Length):
 985            label = header.Labels[i]
 986            value = header.Values[i]
 987
 988            # Check for section headers
 989            if "===" in label or (
 990                (value == "" or value is None) and not label.endswith(":")
 991            ):
 992                # This is a section header
 993                section_name = (
 994                    label.replace("=", "").replace(":", "").strip()
 995                )  # Clean the label if it contains '='
 996                header_dic[section_name] = {}
 997                current_section = section_name
 998            else:
 999                if current_section:
1000                    header_dic[current_section][label] = value
1001                else:
1002                    header_dic[label] = value
1003        return header_dic

This code will extract the tune method from the raw file It has been tested on data from a Thermo Orbitrap ID-X, Astral and Q-Exactive, but may fail on other instrument types. It attempts to parse out section headers and sub-sections, but may not work for all instrument types. It will also not return Labels (keys) where the value is blank

Returns:

Dict[str, Any] A dictionary containing the tune method information

Raises:

ValueError If no tune methods are found in the raw file

def get_status_log(self, retention_time: float = 0):
1005    def get_status_log(self, retention_time: float = 0):
1006        """
1007        This code will extract the status logs from the raw file
1008        It has been tested on data from a Thermo Orbitrap ID-X, Astral and Q-Exactive, but may fail on other instrument types.
1009        It attempts to parse out section headers and sub-sections, but may not work for all instrument types.
1010        It will also not return Labels (keys) where the value is blank
1011
1012        Parameters:
1013        -----------
1014        retention_time: float
1015            The retention time in minutes to extract the status log data from.
1016            Will use the closest retention time found. Default 0.
1017
1018        Returns:
1019        --------
1020        Dict[str, Any]
1021            A dictionary containing the status log information
1022
1023        Raises:
1024        -------
1025        ValueError
1026            If no status logs are found in the raw file
1027
1028        """
1029        tunemethodcount = self.iRawDataPlus.GetStatusLogEntriesCount()
1030        if tunemethodcount == 0:
1031            raise ValueError("No status logs found in the raw data file")
1032            return None
1033
1034        header = self.iRawDataPlus.GetStatusLogForRetentionTime(retention_time)
1035
1036        header_dic = {}
1037        current_section = None
1038
1039        for i in range(header.Length):
1040            label = header.Labels[i]
1041            value = header.Values[i]
1042
1043            # Check for section headers
1044            if "===" in label or (
1045                (value == "" or value is None) and not label.endswith(":")
1046            ):
1047                # This is a section header
1048                section_name = (
1049                    label.replace("=", "").replace(":", "").strip()
1050                )  # Clean the label if it contains '='
1051                header_dic[section_name] = {}
1052                current_section = section_name
1053            else:
1054                if current_section:
1055                    header_dic[current_section][label] = value
1056                else:
1057                    header_dic[label] = value
1058        return header_dic

This code will extract the status logs from the raw file It has been tested on data from a Thermo Orbitrap ID-X, Astral and Q-Exactive, but may fail on other instrument types. It attempts to parse out section headers and sub-sections, but may not work for all instrument types. It will also not return Labels (keys) where the value is blank

Parameters:

retention_time: float The retention time in minutes to extract the status log data from. Will use the closest retention time found. Default 0.

Returns:

Dict[str, Any] A dictionary containing the status log information

Raises:

ValueError If no status logs are found in the raw file

def get_error_logs(self):
1060    def get_error_logs(self):
1061        """
1062        This code will extract the error logs from the raw file
1063
1064        Returns:
1065        --------
1066        Dict[float, str]
1067            A dictionary containing the error log information with the retention time as the key
1068
1069        Raises:
1070        -------
1071        ValueError
1072            If no error logs are found in the raw file
1073        """
1074
1075        error_log_count = self.iRawDataPlus.RunHeaderEx.ErrorLogCount
1076        if error_log_count == 0:
1077            raise ValueError("No error logs found in the raw data file")
1078            return None
1079
1080        error_logs = {}
1081
1082        for i in range(error_log_count):
1083            error_log_item = self.iRawDataPlus.GetErrorLogItem(i)
1084            rt = error_log_item.RetentionTime
1085            message = error_log_item.Message
1086            # Use the index `i` as the unique ID key
1087            error_logs[i] = {"rt": rt, "message": message}
1088        return error_logs

This code will extract the error logs from the raw file

Returns:

Dict[float, str] A dictionary containing the error log information with the retention time as the key

Raises:

ValueError If no error logs are found in the raw file

def get_sample_information(self):
1090    def get_sample_information(self):
1091        """
1092        This code will extract the sample information from the raw file
1093
1094        Returns:
1095        --------
1096        Dict[str, Any]
1097            A dictionary containing the sample information
1098            Note that UserText field may not be handled properly and may need further processing
1099        """
1100        sminfo = self.iRawDataPlus.SampleInformation
1101        smdict = {}
1102        smdict["Comment"] = sminfo.Comment
1103        smdict["SampleId"] = sminfo.SampleId
1104        smdict["SampleName"] = sminfo.SampleName
1105        smdict["Vial"] = sminfo.Vial
1106        smdict["InjectionVolume"] = sminfo.InjectionVolume
1107        smdict["Barcode"] = sminfo.Barcode
1108        smdict["BarcodeStatus"] = str(sminfo.BarcodeStatus)
1109        smdict["CalibrationLevel"] = sminfo.CalibrationLevel
1110        smdict["DilutionFactor"] = sminfo.DilutionFactor
1111        smdict["InstrumentMethodFile"] = sminfo.InstrumentMethodFile
1112        smdict["RawFileName"] = sminfo.RawFileName
1113        smdict["CalibrationFile"] = sminfo.CalibrationFile
1114        smdict["IstdAmount"] = sminfo.IstdAmount
1115        smdict["RowNumber"] = sminfo.RowNumber
1116        smdict["Path"] = sminfo.Path
1117        smdict["ProcessingMethodFile"] = sminfo.ProcessingMethodFile
1118        smdict["SampleType"] = str(sminfo.SampleType)
1119        smdict["SampleWeight"] = sminfo.SampleWeight
1120        smdict["UserText"] = {
1121            "UserText": [x for x in sminfo.UserText]
1122        }  # [0] #This may not work - needs debugging with
1123        return smdict

This code will extract the sample information from the raw file

Returns:

Dict[str, Any] A dictionary containing the sample information Note that UserText field may not be handled properly and may need further processing

def get_instrument_data(self):
1125    def get_instrument_data(self):
1126        """
1127        This code will extract the instrument data from the raw file
1128
1129        Returns:
1130        --------
1131        Dict[str, Any]
1132            A dictionary containing the instrument data
1133        """
1134        instrument_data = self.iRawDataPlus.GetInstrumentData()
1135        id_dict = {}
1136        id_dict["Name"] = instrument_data.Name
1137        id_dict["Model"] = instrument_data.Model
1138        id_dict["SerialNumber"] = instrument_data.SerialNumber
1139        id_dict["SoftwareVersion"] = instrument_data.SoftwareVersion
1140        id_dict["HardwareVersion"] = instrument_data.HardwareVersion
1141        id_dict["ChannelLabels"] = {
1142            "ChannelLabels": [x for x in instrument_data.ChannelLabels]
1143        }
1144        id_dict["Flags"] = instrument_data.Flags
1145        id_dict["AxisLabelY"] = instrument_data.AxisLabelY
1146        id_dict["AxisLabelX"] = instrument_data.AxisLabelX
1147        return id_dict

This code will extract the instrument data from the raw file

Returns:

Dict[str, Any] A dictionary containing the instrument data

def get_centroid_msms_data(self, scan):
1149    def get_centroid_msms_data(self, scan):
1150        """
1151        .. deprecated:: 2.0
1152            This function will be removed in CoreMS 2.0. Please use `get_average_mass_spectrum()` instead for similar functionality.
1153        """
1154
1155        warnings.warn(
1156            "The `get_centroid_msms_data()` is deprecated as of CoreMS 2.0 and will be removed in a future version. "
1157            "Please use `get_average_mass_spectrum()` instead.",
1158            DeprecationWarning,
1159        )
1160
1161        d_params = self.set_metadata(scans_list=[scan], label=Labels.thermo_centroid)
1162
1163        centroidStream = self.iRawDataPlus.GetCentroidStream(scan, False)
1164
1165        noise = list(centroidStream.Noises)
1166
1167        baselines = list(centroidStream.Baselines)
1168
1169        rp = list(centroidStream.Resolutions)
1170
1171        magnitude = list(centroidStream.Intensities)
1172
1173        mz = list(centroidStream.Masses)
1174
1175        # charge = scans_labels[5]
1176        array_noise_std = (np.array(noise) - np.array(baselines)) / 3
1177        l_signal_to_noise = np.array(magnitude) / array_noise_std
1178
1179        d_params["baseline_noise"] = np.average(array_noise_std)
1180
1181        d_params["baseline_noise_std"] = np.std(array_noise_std)
1182
1183        data_dict = {
1184            Labels.mz: mz,
1185            Labels.abundance: magnitude,
1186            Labels.rp: rp,
1187            Labels.s2n: list(l_signal_to_noise),
1188        }
1189
1190        mass_spec = MassSpecCentroid(data_dict, d_params, auto_process=False)
1191        mass_spec.settings.noise_threshold_method = "relative_abundance"
1192        mass_spec.settings.noise_threshold_min_relative_abundance = 1
1193        mass_spec.process_mass_spec()
1194        return mass_spec

Deprecated since version 2.0: This function will be removed in CoreMS 2.0. Please use get_average_mass_spectrum() instead for similar functionality.

def get_average_mass_spectrum_by_scanlist( self, scans_list: System.Collections.Generic.List[Int32], auto_process: bool = True, ppm_tolerance: float = 5.0) -> corems.mass_spectrum.factory.MassSpectrumClasses.MassSpecProfile:
1196    def get_average_mass_spectrum_by_scanlist(
1197        self,
1198        scans_list: List[int],
1199        auto_process: bool = True,
1200        ppm_tolerance: float = 5.0,
1201    ) -> MassSpecProfile:
1202        """
1203        Averages selected scans mass spectra using Thermo's AverageScans method
1204        scans_list: list[int]
1205        auto_process: bool
1206            If true performs peak picking, and noise threshold calculation after creation of mass spectrum object
1207        Returns:
1208            MassSpecProfile
1209
1210         .. deprecated:: 2.0
1211        This function will be removed in CoreMS 2.0. Please use `get_average_mass_spectrum()` instead for similar functionality.
1212        """
1213
1214        warnings.warn(
1215            "The `get_average_mass_spectrum_by_scanlist()` is deprecated as of CoreMS 2.0 and will be removed in a future version. "
1216            "Please use `get_average_mass_spectrum()` instead.",
1217            DeprecationWarning,
1218        )
1219
1220        d_params = self.set_metadata(scans_list=scans_list)
1221
1222        # assumes scans is full scan or reduced profile scan
1223
1224        scans = List[int]()
1225        for scan in scans_list:
1226            scans.Add(scan)
1227
1228        # Create the mass options object that will be used when averaging the scans
1229        options = MassOptions()
1230        options.ToleranceUnits = ToleranceUnits.ppm
1231        options.Tolerance = ppm_tolerance
1232
1233        # Get the scan filter for the first scan.  This scan filter will be used to located
1234        # scans within the given scan range of the same type
1235
1236        averageScan = Extensions.AverageScans(self.iRawDataPlus, scans, options)
1237
1238        len_data = averageScan.SegmentedScan.Positions.Length
1239
1240        mz_list = list(averageScan.SegmentedScan.Positions)
1241        abund_list = list(averageScan.SegmentedScan.Intensities)
1242
1243        data_dict = {
1244            Labels.mz: mz_list,
1245            Labels.abundance: abund_list,
1246        }
1247
1248        mass_spec = MassSpecProfile(data_dict, d_params, auto_process=auto_process)
1249
1250        return mass_spec

Averages selected scans mass spectra using Thermo's AverageScans method scans_list: list[int] auto_process: bool If true performs peak picking, and noise threshold calculation after creation of mass spectrum object Returns: MassSpecProfile

Deprecated since version 2.0.

This function will be removed in CoreMS 2.0. Please use get_average_mass_spectrum() instead for similar functionality.

class ImportMassSpectraThermoMSFileReader(ThermoBaseClass, corems.mass_spectra.input.parserbase.SpectraParserInterface):
1253class ImportMassSpectraThermoMSFileReader(ThermoBaseClass, SpectraParserInterface):
1254    """A class for parsing Thermo RAW mass spectrometry data files and instatiating MassSpectraBase or LCMSBase objects
1255
1256    Parameters
1257    ----------
1258    file_location : str or Path
1259        The path to the RAW file to be parsed.
1260    analyzer : str, optional
1261        The type of mass analyzer used in the instrument. Default is "Unknown".
1262    instrument_label : str, optional
1263        The name of the instrument used to acquire the data. Default is "Unknown".
1264    sample_name : str, optional
1265        The name of the sample being analyzed. If not provided, the stem of the file_location path will be used.
1266
1267    Attributes
1268    ----------
1269    file_location : Path
1270        The path to the RAW file being parsed.
1271    analyzer : str
1272        The type of mass analyzer used in the instrument.
1273    instrument_label : str
1274        The name of the instrument used to acquire the data.
1275    sample_name : str
1276        The name of the sample being analyzed.
1277
1278    Methods
1279    -------
1280    * run(spectra=True).
1281        Parses the RAW file and returns a dictionary of mass spectra dataframes and a scan metadata dataframe.
1282    * get_mass_spectrum_from_scan(scan_number, polarity, auto_process=True)
1283        Parses the RAW file and returns a MassSpecBase object from a single scan.
1284    * get_mass_spectra_obj().
1285        Parses the RAW file and instantiates a MassSpectraBase object.
1286    * get_lcms_obj().
1287        Parses the RAW file and instantiates an LCMSBase object.
1288    * get_icr_transient_times().
1289        Return a list for transient time targets for all scans, or selected scans range
1290
1291    Inherits from ThermoBaseClass and SpectraParserInterface
1292    """
1293
1294    def __init__(
1295        self,
1296        file_location,
1297        analyzer="Unknown",
1298        instrument_label="Unknown",
1299        sample_name=None,
1300    ):
1301        super().__init__(file_location)
1302        if isinstance(file_location, str):
1303            # if obj is a string it defaults to create a Path obj, pass the S3Path if needed
1304            file_location = Path(file_location)
1305        if not file_location.exists():
1306            raise FileExistsError("File does not exist: " + str(file_location))
1307
1308        self.file_location = file_location
1309        self.analyzer = analyzer
1310        self.instrument_label = instrument_label
1311
1312        if sample_name:
1313            self.sample_name = sample_name
1314        else:
1315            self.sample_name = file_location.stem
1316
1317    def load(self):
1318        pass
1319
1320    def get_scan_df(self):
1321        # This automatically brings in all the data
1322        self.chromatogram_settings.scans = (-1, -1)
1323
1324        # Get scan df info; starting with TIC data
1325        tic_data, _ = self.get_tic(ms_type="all", peak_detection=False, smooth=False)
1326        tic_data = {
1327            "scan": tic_data.scans,
1328            "scan_time": tic_data.time,
1329            "tic": tic_data.tic,
1330        }
1331        scan_df = pd.DataFrame.from_dict(tic_data)
1332        scan_df["ms_level"] = None
1333        
1334        # get scan text
1335        scan_filter_df = pd.DataFrame.from_dict(
1336            self.get_all_filters()[0], orient="index"
1337        )
1338        scan_filter_df.reset_index(inplace=True)
1339        scan_filter_df.rename(columns={"index": "scan", 0: "scan_text"}, inplace=True)
1340
1341        scan_df = scan_df.merge(scan_filter_df, on="scan", how="left")
1342        scan_df["scan_window_lower"] = scan_df.scan_text.str.extract(
1343            r"\[(\d+\.\d+)-\d+\.\d+\]"
1344        )
1345        scan_df["scan_window_upper"] = scan_df.scan_text.str.extract(
1346            r"\[\d+\.\d+-(\d+\.\d+)\]"
1347        )
1348        scan_df["polarity"] = np.where(
1349            scan_df.scan_text.str.contains(" - "), "negative", "positive"
1350        )
1351        scan_df["precursor_mz"] = scan_df.scan_text.str.extract(r"(\d+\.\d+)@")
1352        scan_df["precursor_mz"] = scan_df["precursor_mz"].astype(float)
1353
1354        # Assign each scan as centroid or profile and add ms_level
1355        scan_df["ms_format"] = None
1356        for i in scan_df.scan.to_list():
1357            scan_df.loc[scan_df.scan == i, "ms_level"] = self.get_ms_level_for_scan_num(i)
1358            if self.iRawDataPlus.IsCentroidScanFromScanNumber(i):
1359                scan_df.loc[scan_df.scan == i, "ms_format"] = "centroid"
1360            else:
1361                scan_df.loc[scan_df.scan == i, "ms_format"] = "profile"
1362
1363        return scan_df
1364
1365    def get_ms_raw(self, spectra, scan_df):
1366        if spectra == "all":
1367            scan_df_forspec = scan_df
1368        elif spectra == "ms1":
1369            scan_df_forspec = scan_df[scan_df.ms_level == 1]
1370        elif spectra == "ms2":
1371            scan_df_forspec = scan_df[scan_df.ms_level == 2]
1372        else:
1373            raise ValueError("spectra must be 'none', 'all', 'ms1', or 'ms2'")
1374
1375        # Result container
1376        res = {}
1377
1378        # Row count container
1379        counter = {}
1380
1381        # Column name container
1382        cols = {}
1383
1384        # set at float32
1385        dtype = np.float32
1386
1387        # First pass: get nrows
1388        N = defaultdict(lambda: 0)
1389        for i in scan_df_forspec.scan.to_list():
1390            level = scan_df_forspec.loc[scan_df_forspec.scan == i, "ms_level"].values[0]
1391            scanStatistics = self.iRawDataPlus.GetScanStatsForScanNumber(i)
1392            profileStream = self.iRawDataPlus.GetSegmentedScanFromScanNumber(
1393                i, scanStatistics
1394            )
1395            abun = list(profileStream.Intensities)
1396            abun = np.array(abun)[np.where(np.array(abun) > 0)[0]]
1397
1398            N[level] += len(abun)
1399
1400        # Second pass: parse
1401        for i in scan_df_forspec.scan.to_list():
1402            scanStatistics = self.iRawDataPlus.GetScanStatsForScanNumber(i)
1403            profileStream = self.iRawDataPlus.GetSegmentedScanFromScanNumber(
1404                i, scanStatistics
1405            )
1406            abun = list(profileStream.Intensities)
1407            mz = list(profileStream.Positions)
1408
1409            # Get index of abun that are > 0
1410            inx = np.where(np.array(abun) > 0)[0]
1411            mz = np.array(mz)[inx]
1412            mz = np.float32(mz)
1413            abun = np.array(abun)[inx]
1414            abun = np.float32(abun)
1415
1416            level = scan_df_forspec.loc[scan_df_forspec.scan == i, "ms_level"].values[0]
1417
1418            # Number of rows
1419            n = len(mz)
1420
1421            # No measurements
1422            if n == 0:
1423                continue
1424
1425            # Dimension check
1426            if len(mz) != len(abun):
1427                warnings.warn("m/z and intensity array dimension mismatch")
1428                continue
1429
1430            # Scan/frame info
1431            id_dict = i
1432
1433            # Columns
1434            cols[level] = ["scan", "mz", "intensity"]
1435            m = len(cols[level])
1436
1437            # Subarray init
1438            arr = np.empty((n, m), dtype=dtype)
1439            inx = 0
1440
1441            # Populate scan/frame info
1442            arr[:, inx] = i
1443            inx += 1
1444
1445            # Populate m/z
1446            arr[:, inx] = mz
1447            inx += 1
1448
1449            # Populate intensity
1450            arr[:, inx] = abun
1451            inx += 1
1452
1453            # Initialize output container
1454            if level not in res:
1455                res[level] = np.empty((N[level], m), dtype=dtype)
1456                counter[level] = 0
1457
1458            # Insert subarray
1459            res[level][counter[level] : counter[level] + n, :] = arr
1460            counter[level] += n
1461
1462        # Construct ms1 and ms2 mz dataframes
1463        for level in res.keys():
1464            res[level] = pd.DataFrame(res[level])
1465            res[level].columns = cols[level]
1466        # rename keys in res to add 'ms' prefix
1467        res = {f"ms{key}": value for key, value in res.items()}
1468
1469        return res
1470
1471    def run(self, spectra="all", scan_df=None):
1472        """
1473        Extracts mass spectra data from a raw file.
1474
1475        Parameters
1476        ----------
1477        spectra : str, optional
1478            Which mass spectra data to include in the output. Default is all.  Other options: none, ms1, ms2.
1479        scan_df : pandas.DataFrame, optional
1480            Scan dataframe.  If not provided, the scan dataframe is created from the mzML file.
1481
1482        Returns
1483        -------
1484        tuple
1485            A tuple containing two elements:
1486            - A dictionary containing mass spectra data, separated by MS level.
1487            - A pandas DataFrame containing scan information, including scan number, scan time, TIC, MS level,
1488                scan text, scan window lower and upper bounds, polarity, and precursor m/z (if applicable).
1489        """
1490        # Prepare scan_df
1491        if scan_df is None:
1492            scan_df = self.get_scan_df()
1493
1494        # Prepare mass spectra data
1495        if spectra != "none":
1496            res = self.get_ms_raw(spectra=spectra, scan_df=scan_df)
1497        else:
1498            res = None
1499
1500        return res, scan_df
1501
1502    def get_mass_spectrum_from_scan(
1503        self, scan_number, spectrum_mode, auto_process=True
1504    ):
1505        """Instatiate a MassSpecBase object from a single scan number from the binary file, currently only supports profile mode.
1506
1507        Parameters
1508        ----------
1509        scan_number : int
1510            The scan number to extract the mass spectrum from.
1511        polarity : int
1512            The polarity of the scan.  1 for positive mode, -1 for negative mode.
1513        spectrum_mode : str
1514            The type of mass spectrum to extract.  Must be 'profile' or 'centroid'.
1515        auto_process : bool, optional
1516            If True, perform peak picking and noise threshold calculation after creating the mass spectrum object. Default is True.
1517
1518        Returns
1519        -------
1520        MassSpecProfile | MassSpecCentroid
1521            The MassSpecProfile or MassSpecCentroid object containing the parsed mass spectrum.
1522        """
1523
1524        if spectrum_mode == "profile":
1525            scanStatistics = self.iRawDataPlus.GetScanStatsForScanNumber(scan_number)
1526            profileStream = self.iRawDataPlus.GetSegmentedScanFromScanNumber(
1527                scan_number, scanStatistics
1528            )
1529            abun = list(profileStream.Intensities)
1530            mz = list(profileStream.Positions)
1531            data_dict = {
1532                Labels.mz: mz,
1533                Labels.abundance: abun,
1534            }
1535            d_params = self.set_metadata(
1536                firstScanNumber=scan_number,
1537                lastScanNumber=scan_number,
1538                scans_list=False,
1539                label=Labels.thermo_profile,
1540            )
1541            mass_spectrum_obj = MassSpecProfile(
1542                data_dict, d_params, auto_process=auto_process
1543            )
1544
1545        elif spectrum_mode == "centroid":
1546            centroid_scan = self.iRawDataPlus.GetCentroidStream(scan_number, False)
1547            if centroid_scan.Masses is not None:
1548                mz = list(centroid_scan.Masses)
1549                abun = list(centroid_scan.Intensities)
1550                rp = list(centroid_scan.Resolutions)
1551                magnitude = list(centroid_scan.Intensities)
1552                noise = list(centroid_scan.Noises)
1553                baselines = list(centroid_scan.Baselines)
1554                array_noise_std = (np.array(noise) - np.array(baselines)) / 3
1555                l_signal_to_noise = np.array(magnitude) / array_noise_std
1556                data_dict = {
1557                    Labels.mz: mz,
1558                    Labels.abundance: abun,
1559                    Labels.rp: rp,
1560                    Labels.s2n: list(l_signal_to_noise),
1561                }
1562            else:  # For CID MS2, the centroid data are stored in the profile data location, they do not have any associated rp or baseline data, but they should be treated as centroid data
1563                scanStatistics = self.iRawDataPlus.GetScanStatsForScanNumber(
1564                    scan_number
1565                )
1566                profileStream = self.iRawDataPlus.GetSegmentedScanFromScanNumber(
1567                    scan_number, scanStatistics
1568                )
1569                abun = list(profileStream.Intensities)
1570                mz = list(profileStream.Positions)
1571                data_dict = {
1572                    Labels.mz: mz,
1573                    Labels.abundance: abun,
1574                    Labels.rp: [np.nan] * len(mz),
1575                    Labels.s2n: [np.nan] * len(mz),
1576                }
1577            d_params = self.set_metadata(
1578                firstScanNumber=scan_number,
1579                lastScanNumber=scan_number,
1580                scans_list=False,
1581                label=Labels.thermo_centroid,
1582            )
1583            mass_spectrum_obj = MassSpecCentroid(
1584                data_dict, d_params, auto_process=auto_process
1585            )
1586
1587        return mass_spectrum_obj
1588
1589    def get_mass_spectra_obj(self):
1590        """Instatiate a MassSpectraBase object from the binary data file file.
1591
1592        Returns
1593        -------
1594        MassSpectraBase
1595            The MassSpectra object containing the parsed mass spectra.  The object is instatiated with the mzML file, analyzer, instrument, sample name, and scan dataframe.
1596        """
1597        _, scan_df = self.run(spectra="none")
1598        mass_spectra_obj = MassSpectraBase(
1599            self.file_location,
1600            self.analyzer,
1601            self.instrument_label,
1602            self.sample_name,
1603            self,
1604        )
1605        scan_df = scan_df.set_index("scan", drop=False)
1606        mass_spectra_obj.scan_df = scan_df
1607
1608        return mass_spectra_obj
1609
1610    def get_lcms_obj(self, spectra="all"):
1611        """Instatiates a LCMSBase object from the mzML file.
1612
1613        Parameters
1614        ----------
1615        spectra : str, optional
1616            Which mass spectra data to include in the output. Default is "all".  Other options: "none", "ms1", "ms2".
1617
1618        Returns
1619        -------
1620        LCMSBase
1621            LCMS object containing mass spectra data. The object is instatiated with the file location, analyzer, instrument, sample name, scan info, mz dataframe (as specifified), polarity, as well as the attributes holding the scans, retention times, and tics.
1622        """
1623        _, scan_df = self.run(spectra="none")  # first run it to just get scan info
1624        res, scan_df = self.run(
1625            scan_df=scan_df, spectra=spectra
1626        )  # second run to parse data
1627        lcms_obj = LCMSBase(
1628            self.file_location,
1629            self.analyzer,
1630            self.instrument_label,
1631            self.sample_name,
1632            self,
1633        )
1634        if spectra != "none":
1635            for key in res:
1636                key_int = int(key.replace("ms", ""))
1637                res[key] = res[key][res[key].intensity > 0]
1638                res[key] = (
1639                    res[key].sort_values(by=["scan", "mz"]).reset_index(drop=True)
1640                )
1641                lcms_obj._ms_unprocessed[key_int] = res[key]
1642        lcms_obj.scan_df = scan_df.set_index("scan", drop=False)
1643        # Check if polarity is mixed
1644        if len(set(scan_df.polarity)) > 1:
1645            raise ValueError("Mixed polarities detected in scan data")
1646        lcms_obj.polarity = scan_df.polarity[0]
1647        lcms_obj._scans_number_list = list(scan_df.scan)
1648        lcms_obj._retention_time_list = list(scan_df.scan_time)
1649        lcms_obj._tic_list = list(scan_df.tic)
1650
1651        return lcms_obj
1652
1653    def get_icr_transient_times(self):
1654        """Return a list for transient time targets for all scans, or selected scans range
1655
1656        Notes
1657        --------
1658        Resolving Power and Transient time targets based on 7T FT-ICR MS system
1659        """
1660
1661        res_trans_time = {
1662            "50": 0.384,
1663            "100000": 0.768,
1664            "200000": 1.536,
1665            "400000": 3.072,
1666            "750000": 6.144,
1667            "1000000": 12.288,
1668        }
1669
1670        firstScanNumber = self.start_scan
1671
1672        lastScanNumber = self.end_scan
1673
1674        transient_time_list = []
1675
1676        for scan in range(firstScanNumber, lastScanNumber):
1677            scan_header = self.get_scan_header(scan)
1678
1679            rp_target = scan_header["FT Resolution:"]
1680
1681            transient_time = res_trans_time.get(rp_target)
1682
1683            transient_time_list.append(transient_time)
1684
1685            # print(transient_time, rp_target)
1686
1687        return transient_time_list

A class for parsing Thermo RAW mass spectrometry data files and instatiating MassSpectraBase or LCMSBase objects

Parameters
  • file_location (str or Path): The path to the RAW file to be parsed.
  • analyzer (str, optional): The type of mass analyzer used in the instrument. Default is "Unknown".
  • instrument_label (str, optional): The name of the instrument used to acquire the data. Default is "Unknown".
  • sample_name (str, optional): The name of the sample being analyzed. If not provided, the stem of the file_location path will be used.
Attributes
  • file_location (Path): The path to the RAW file being parsed.
  • analyzer (str): The type of mass analyzer used in the instrument.
  • instrument_label (str): The name of the instrument used to acquire the data.
  • sample_name (str): The name of the sample being analyzed.
Methods
  • run(spectra=True). Parses the RAW file and returns a dictionary of mass spectra dataframes and a scan metadata dataframe.
  • get_mass_spectrum_from_scan(scan_number, polarity, auto_process=True) Parses the RAW file and returns a MassSpecBase object from a single scan.
  • get_mass_spectra_obj(). Parses the RAW file and instantiates a MassSpectraBase object.
  • get_lcms_obj(). Parses the RAW file and instantiates an LCMSBase object.
  • get_icr_transient_times(). Return a list for transient time targets for all scans, or selected scans range

Inherits from ThermoBaseClass and SpectraParserInterface

ImportMassSpectraThermoMSFileReader( file_location, analyzer='Unknown', instrument_label='Unknown', sample_name=None)
1294    def __init__(
1295        self,
1296        file_location,
1297        analyzer="Unknown",
1298        instrument_label="Unknown",
1299        sample_name=None,
1300    ):
1301        super().__init__(file_location)
1302        if isinstance(file_location, str):
1303            # if obj is a string it defaults to create a Path obj, pass the S3Path if needed
1304            file_location = Path(file_location)
1305        if not file_location.exists():
1306            raise FileExistsError("File does not exist: " + str(file_location))
1307
1308        self.file_location = file_location
1309        self.analyzer = analyzer
1310        self.instrument_label = instrument_label
1311
1312        if sample_name:
1313            self.sample_name = sample_name
1314        else:
1315            self.sample_name = file_location.stem

file_location: srt pathlib.Path or s3path.S3Path Thermo Raw file path

file_location
analyzer
instrument_label
def load(self):
1317    def load(self):
1318        pass

Load mass spectra data.

def get_scan_df(self):
1320    def get_scan_df(self):
1321        # This automatically brings in all the data
1322        self.chromatogram_settings.scans = (-1, -1)
1323
1324        # Get scan df info; starting with TIC data
1325        tic_data, _ = self.get_tic(ms_type="all", peak_detection=False, smooth=False)
1326        tic_data = {
1327            "scan": tic_data.scans,
1328            "scan_time": tic_data.time,
1329            "tic": tic_data.tic,
1330        }
1331        scan_df = pd.DataFrame.from_dict(tic_data)
1332        scan_df["ms_level"] = None
1333        
1334        # get scan text
1335        scan_filter_df = pd.DataFrame.from_dict(
1336            self.get_all_filters()[0], orient="index"
1337        )
1338        scan_filter_df.reset_index(inplace=True)
1339        scan_filter_df.rename(columns={"index": "scan", 0: "scan_text"}, inplace=True)
1340
1341        scan_df = scan_df.merge(scan_filter_df, on="scan", how="left")
1342        scan_df["scan_window_lower"] = scan_df.scan_text.str.extract(
1343            r"\[(\d+\.\d+)-\d+\.\d+\]"
1344        )
1345        scan_df["scan_window_upper"] = scan_df.scan_text.str.extract(
1346            r"\[\d+\.\d+-(\d+\.\d+)\]"
1347        )
1348        scan_df["polarity"] = np.where(
1349            scan_df.scan_text.str.contains(" - "), "negative", "positive"
1350        )
1351        scan_df["precursor_mz"] = scan_df.scan_text.str.extract(r"(\d+\.\d+)@")
1352        scan_df["precursor_mz"] = scan_df["precursor_mz"].astype(float)
1353
1354        # Assign each scan as centroid or profile and add ms_level
1355        scan_df["ms_format"] = None
1356        for i in scan_df.scan.to_list():
1357            scan_df.loc[scan_df.scan == i, "ms_level"] = self.get_ms_level_for_scan_num(i)
1358            if self.iRawDataPlus.IsCentroidScanFromScanNumber(i):
1359                scan_df.loc[scan_df.scan == i, "ms_format"] = "centroid"
1360            else:
1361                scan_df.loc[scan_df.scan == i, "ms_format"] = "profile"
1362
1363        return scan_df

Return scan data as a pandas DataFrame.

def get_ms_raw(self, spectra, scan_df):
1365    def get_ms_raw(self, spectra, scan_df):
1366        if spectra == "all":
1367            scan_df_forspec = scan_df
1368        elif spectra == "ms1":
1369            scan_df_forspec = scan_df[scan_df.ms_level == 1]
1370        elif spectra == "ms2":
1371            scan_df_forspec = scan_df[scan_df.ms_level == 2]
1372        else:
1373            raise ValueError("spectra must be 'none', 'all', 'ms1', or 'ms2'")
1374
1375        # Result container
1376        res = {}
1377
1378        # Row count container
1379        counter = {}
1380
1381        # Column name container
1382        cols = {}
1383
1384        # set at float32
1385        dtype = np.float32
1386
1387        # First pass: get nrows
1388        N = defaultdict(lambda: 0)
1389        for i in scan_df_forspec.scan.to_list():
1390            level = scan_df_forspec.loc[scan_df_forspec.scan == i, "ms_level"].values[0]
1391            scanStatistics = self.iRawDataPlus.GetScanStatsForScanNumber(i)
1392            profileStream = self.iRawDataPlus.GetSegmentedScanFromScanNumber(
1393                i, scanStatistics
1394            )
1395            abun = list(profileStream.Intensities)
1396            abun = np.array(abun)[np.where(np.array(abun) > 0)[0]]
1397
1398            N[level] += len(abun)
1399
1400        # Second pass: parse
1401        for i in scan_df_forspec.scan.to_list():
1402            scanStatistics = self.iRawDataPlus.GetScanStatsForScanNumber(i)
1403            profileStream = self.iRawDataPlus.GetSegmentedScanFromScanNumber(
1404                i, scanStatistics
1405            )
1406            abun = list(profileStream.Intensities)
1407            mz = list(profileStream.Positions)
1408
1409            # Get index of abun that are > 0
1410            inx = np.where(np.array(abun) > 0)[0]
1411            mz = np.array(mz)[inx]
1412            mz = np.float32(mz)
1413            abun = np.array(abun)[inx]
1414            abun = np.float32(abun)
1415
1416            level = scan_df_forspec.loc[scan_df_forspec.scan == i, "ms_level"].values[0]
1417
1418            # Number of rows
1419            n = len(mz)
1420
1421            # No measurements
1422            if n == 0:
1423                continue
1424
1425            # Dimension check
1426            if len(mz) != len(abun):
1427                warnings.warn("m/z and intensity array dimension mismatch")
1428                continue
1429
1430            # Scan/frame info
1431            id_dict = i
1432
1433            # Columns
1434            cols[level] = ["scan", "mz", "intensity"]
1435            m = len(cols[level])
1436
1437            # Subarray init
1438            arr = np.empty((n, m), dtype=dtype)
1439            inx = 0
1440
1441            # Populate scan/frame info
1442            arr[:, inx] = i
1443            inx += 1
1444
1445            # Populate m/z
1446            arr[:, inx] = mz
1447            inx += 1
1448
1449            # Populate intensity
1450            arr[:, inx] = abun
1451            inx += 1
1452
1453            # Initialize output container
1454            if level not in res:
1455                res[level] = np.empty((N[level], m), dtype=dtype)
1456                counter[level] = 0
1457
1458            # Insert subarray
1459            res[level][counter[level] : counter[level] + n, :] = arr
1460            counter[level] += n
1461
1462        # Construct ms1 and ms2 mz dataframes
1463        for level in res.keys():
1464            res[level] = pd.DataFrame(res[level])
1465            res[level].columns = cols[level]
1466        # rename keys in res to add 'ms' prefix
1467        res = {f"ms{key}": value for key, value in res.items()}
1468
1469        return res

Return a dictionary of mass spectra data as a pandas DataFrame.

def run(self, spectra='all', scan_df=None):
1471    def run(self, spectra="all", scan_df=None):
1472        """
1473        Extracts mass spectra data from a raw file.
1474
1475        Parameters
1476        ----------
1477        spectra : str, optional
1478            Which mass spectra data to include in the output. Default is all.  Other options: none, ms1, ms2.
1479        scan_df : pandas.DataFrame, optional
1480            Scan dataframe.  If not provided, the scan dataframe is created from the mzML file.
1481
1482        Returns
1483        -------
1484        tuple
1485            A tuple containing two elements:
1486            - A dictionary containing mass spectra data, separated by MS level.
1487            - A pandas DataFrame containing scan information, including scan number, scan time, TIC, MS level,
1488                scan text, scan window lower and upper bounds, polarity, and precursor m/z (if applicable).
1489        """
1490        # Prepare scan_df
1491        if scan_df is None:
1492            scan_df = self.get_scan_df()
1493
1494        # Prepare mass spectra data
1495        if spectra != "none":
1496            res = self.get_ms_raw(spectra=spectra, scan_df=scan_df)
1497        else:
1498            res = None
1499
1500        return res, scan_df

Extracts mass spectra data from a raw file.

Parameters
  • spectra (str, optional): Which mass spectra data to include in the output. Default is all. Other options: none, ms1, ms2.
  • scan_df (pandas.DataFrame, optional): Scan dataframe. If not provided, the scan dataframe is created from the mzML file.
Returns
  • tuple: A tuple containing two elements:
    • A dictionary containing mass spectra data, separated by MS level.
    • A pandas DataFrame containing scan information, including scan number, scan time, TIC, MS level, scan text, scan window lower and upper bounds, polarity, and precursor m/z (if applicable).
def get_mass_spectrum_from_scan(self, scan_number, spectrum_mode, auto_process=True):
1502    def get_mass_spectrum_from_scan(
1503        self, scan_number, spectrum_mode, auto_process=True
1504    ):
1505        """Instatiate a MassSpecBase object from a single scan number from the binary file, currently only supports profile mode.
1506
1507        Parameters
1508        ----------
1509        scan_number : int
1510            The scan number to extract the mass spectrum from.
1511        polarity : int
1512            The polarity of the scan.  1 for positive mode, -1 for negative mode.
1513        spectrum_mode : str
1514            The type of mass spectrum to extract.  Must be 'profile' or 'centroid'.
1515        auto_process : bool, optional
1516            If True, perform peak picking and noise threshold calculation after creating the mass spectrum object. Default is True.
1517
1518        Returns
1519        -------
1520        MassSpecProfile | MassSpecCentroid
1521            The MassSpecProfile or MassSpecCentroid object containing the parsed mass spectrum.
1522        """
1523
1524        if spectrum_mode == "profile":
1525            scanStatistics = self.iRawDataPlus.GetScanStatsForScanNumber(scan_number)
1526            profileStream = self.iRawDataPlus.GetSegmentedScanFromScanNumber(
1527                scan_number, scanStatistics
1528            )
1529            abun = list(profileStream.Intensities)
1530            mz = list(profileStream.Positions)
1531            data_dict = {
1532                Labels.mz: mz,
1533                Labels.abundance: abun,
1534            }
1535            d_params = self.set_metadata(
1536                firstScanNumber=scan_number,
1537                lastScanNumber=scan_number,
1538                scans_list=False,
1539                label=Labels.thermo_profile,
1540            )
1541            mass_spectrum_obj = MassSpecProfile(
1542                data_dict, d_params, auto_process=auto_process
1543            )
1544
1545        elif spectrum_mode == "centroid":
1546            centroid_scan = self.iRawDataPlus.GetCentroidStream(scan_number, False)
1547            if centroid_scan.Masses is not None:
1548                mz = list(centroid_scan.Masses)
1549                abun = list(centroid_scan.Intensities)
1550                rp = list(centroid_scan.Resolutions)
1551                magnitude = list(centroid_scan.Intensities)
1552                noise = list(centroid_scan.Noises)
1553                baselines = list(centroid_scan.Baselines)
1554                array_noise_std = (np.array(noise) - np.array(baselines)) / 3
1555                l_signal_to_noise = np.array(magnitude) / array_noise_std
1556                data_dict = {
1557                    Labels.mz: mz,
1558                    Labels.abundance: abun,
1559                    Labels.rp: rp,
1560                    Labels.s2n: list(l_signal_to_noise),
1561                }
1562            else:  # For CID MS2, the centroid data are stored in the profile data location, they do not have any associated rp or baseline data, but they should be treated as centroid data
1563                scanStatistics = self.iRawDataPlus.GetScanStatsForScanNumber(
1564                    scan_number
1565                )
1566                profileStream = self.iRawDataPlus.GetSegmentedScanFromScanNumber(
1567                    scan_number, scanStatistics
1568                )
1569                abun = list(profileStream.Intensities)
1570                mz = list(profileStream.Positions)
1571                data_dict = {
1572                    Labels.mz: mz,
1573                    Labels.abundance: abun,
1574                    Labels.rp: [np.nan] * len(mz),
1575                    Labels.s2n: [np.nan] * len(mz),
1576                }
1577            d_params = self.set_metadata(
1578                firstScanNumber=scan_number,
1579                lastScanNumber=scan_number,
1580                scans_list=False,
1581                label=Labels.thermo_centroid,
1582            )
1583            mass_spectrum_obj = MassSpecCentroid(
1584                data_dict, d_params, auto_process=auto_process
1585            )
1586
1587        return mass_spectrum_obj

Instatiate a MassSpecBase object from a single scan number from the binary file, currently only supports profile mode.

Parameters
  • scan_number (int): The scan number to extract the mass spectrum from.
  • polarity (int): The polarity of the scan. 1 for positive mode, -1 for negative mode.
  • spectrum_mode (str): The type of mass spectrum to extract. Must be 'profile' or 'centroid'.
  • auto_process (bool, optional): If True, perform peak picking and noise threshold calculation after creating the mass spectrum object. Default is True.
Returns
  • MassSpecProfile | MassSpecCentroid: The MassSpecProfile or MassSpecCentroid object containing the parsed mass spectrum.
def get_mass_spectra_obj(self):
1589    def get_mass_spectra_obj(self):
1590        """Instatiate a MassSpectraBase object from the binary data file file.
1591
1592        Returns
1593        -------
1594        MassSpectraBase
1595            The MassSpectra object containing the parsed mass spectra.  The object is instatiated with the mzML file, analyzer, instrument, sample name, and scan dataframe.
1596        """
1597        _, scan_df = self.run(spectra="none")
1598        mass_spectra_obj = MassSpectraBase(
1599            self.file_location,
1600            self.analyzer,
1601            self.instrument_label,
1602            self.sample_name,
1603            self,
1604        )
1605        scan_df = scan_df.set_index("scan", drop=False)
1606        mass_spectra_obj.scan_df = scan_df
1607
1608        return mass_spectra_obj

Instatiate a MassSpectraBase object from the binary data file file.

Returns
  • MassSpectraBase: The MassSpectra object containing the parsed mass spectra. The object is instatiated with the mzML file, analyzer, instrument, sample name, and scan dataframe.
def get_lcms_obj(self, spectra='all'):
1610    def get_lcms_obj(self, spectra="all"):
1611        """Instatiates a LCMSBase object from the mzML file.
1612
1613        Parameters
1614        ----------
1615        spectra : str, optional
1616            Which mass spectra data to include in the output. Default is "all".  Other options: "none", "ms1", "ms2".
1617
1618        Returns
1619        -------
1620        LCMSBase
1621            LCMS object containing mass spectra data. The object is instatiated with the file location, analyzer, instrument, sample name, scan info, mz dataframe (as specifified), polarity, as well as the attributes holding the scans, retention times, and tics.
1622        """
1623        _, scan_df = self.run(spectra="none")  # first run it to just get scan info
1624        res, scan_df = self.run(
1625            scan_df=scan_df, spectra=spectra
1626        )  # second run to parse data
1627        lcms_obj = LCMSBase(
1628            self.file_location,
1629            self.analyzer,
1630            self.instrument_label,
1631            self.sample_name,
1632            self,
1633        )
1634        if spectra != "none":
1635            for key in res:
1636                key_int = int(key.replace("ms", ""))
1637                res[key] = res[key][res[key].intensity > 0]
1638                res[key] = (
1639                    res[key].sort_values(by=["scan", "mz"]).reset_index(drop=True)
1640                )
1641                lcms_obj._ms_unprocessed[key_int] = res[key]
1642        lcms_obj.scan_df = scan_df.set_index("scan", drop=False)
1643        # Check if polarity is mixed
1644        if len(set(scan_df.polarity)) > 1:
1645            raise ValueError("Mixed polarities detected in scan data")
1646        lcms_obj.polarity = scan_df.polarity[0]
1647        lcms_obj._scans_number_list = list(scan_df.scan)
1648        lcms_obj._retention_time_list = list(scan_df.scan_time)
1649        lcms_obj._tic_list = list(scan_df.tic)
1650
1651        return lcms_obj

Instatiates a LCMSBase object from the mzML file.

Parameters
  • spectra (str, optional): Which mass spectra data to include in the output. Default is "all". Other options: "none", "ms1", "ms2".
Returns
  • LCMSBase: LCMS object containing mass spectra data. The object is instatiated with the file location, analyzer, instrument, sample name, scan info, mz dataframe (as specifified), polarity, as well as the attributes holding the scans, retention times, and tics.
def get_icr_transient_times(self):
1653    def get_icr_transient_times(self):
1654        """Return a list for transient time targets for all scans, or selected scans range
1655
1656        Notes
1657        --------
1658        Resolving Power and Transient time targets based on 7T FT-ICR MS system
1659        """
1660
1661        res_trans_time = {
1662            "50": 0.384,
1663            "100000": 0.768,
1664            "200000": 1.536,
1665            "400000": 3.072,
1666            "750000": 6.144,
1667            "1000000": 12.288,
1668        }
1669
1670        firstScanNumber = self.start_scan
1671
1672        lastScanNumber = self.end_scan
1673
1674        transient_time_list = []
1675
1676        for scan in range(firstScanNumber, lastScanNumber):
1677            scan_header = self.get_scan_header(scan)
1678
1679            rp_target = scan_header["FT Resolution:"]
1680
1681            transient_time = res_trans_time.get(rp_target)
1682
1683            transient_time_list.append(transient_time)
1684
1685            # print(transient_time, rp_target)
1686
1687        return transient_time_list

Return a list for transient time targets for all scans, or selected scans range

Notes

Resolving Power and Transient time targets based on 7T FT-ICR MS system