corems.mass_spectra.input.rawFileReader

   1__author__ = "Yuri E. Corilo"
   2__date__ = "Jun 09, 2021"
   3
   4
   5from warnings import warn
   6import warnings
   7from collections import defaultdict
   8
   9from matplotlib import axes
  10from corems.encapsulation.factory.processingSetting import LiquidChromatographSetting
  11
  12import numpy as np
  13import sys
  14import site
  15from pathlib import Path
  16import datetime
  17import importlib.util
  18import os
  19
  20import clr
  21import pandas as pd
  22from s3path import S3Path
  23
  24
  25from typing import Any, Dict, List, Optional, Tuple
  26from corems.encapsulation.constant import Labels
  27from corems.mass_spectra.factory.lc_class import MassSpectraBase, LCMSBase
  28from corems.mass_spectra.factory.chromat_data import EIC_Data, TIC_Data
  29from corems.mass_spectrum.factory.MassSpectrumClasses import (
  30    MassSpecProfile,
  31    MassSpecCentroid,
  32)
  33from corems.encapsulation.factory.parameters import LCMSParameters, default_parameters
  34from corems.mass_spectra.input.parserbase import SpectraParserInterface
  35
  36# Add the path of the Thermo .NET libraries to the system path
  37spec = importlib.util.find_spec("corems")
  38sys.path.append(str(Path(os.path.dirname(spec.origin)).parent) + "/ext_lib/dotnet/")
  39
  40clr.AddReference("ThermoFisher.CommonCore.RawFileReader")
  41clr.AddReference("ThermoFisher.CommonCore.Data")
  42clr.AddReference("ThermoFisher.CommonCore.MassPrecisionEstimator")
  43
  44from ThermoFisher.CommonCore.RawFileReader import RawFileReaderAdapter
  45from ThermoFisher.CommonCore.Data import ToleranceUnits, Extensions
  46from ThermoFisher.CommonCore.Data.Business import (
  47    ChromatogramTraceSettings,
  48    TraceType,
  49    MassOptions,
  50)
  51from ThermoFisher.CommonCore.Data.Business import ChromatogramSignal, Range
  52from ThermoFisher.CommonCore.Data.Business import Device
  53from ThermoFisher.CommonCore.Data.Interfaces import IChromatogramSettings
  54from ThermoFisher.CommonCore.Data.Business import MassOptions, FileHeaderReaderFactory
  55from ThermoFisher.CommonCore.Data.FilterEnums import MSOrderType
  56from System.Collections.Generic import List
  57
  58
  59class ThermoBaseClass:
  60    """Class for parsing Thermo Raw files and extracting information from them.
  61
  62    Parameters:
  63    -----------
  64    file_location : str or pathlib.Path or s3path.S3Path
  65        Thermo Raw file path or S3 path.
  66
  67    Attributes:
  68    -----------
  69    file_path : str or pathlib.Path or s3path.S3Path
  70        The file path of the Thermo Raw file.
  71    parameters : LCMSParameters
  72        The LCMS parameters for the Thermo Raw file.
  73    chromatogram_settings : LiquidChromatographSetting
  74        The chromatogram settings for the Thermo Raw file.
  75    scans : list or tuple
  76        The selected scans for the Thermo Raw file.
  77    start_scan : int
  78        The starting scan number for the Thermo Raw file.
  79    end_scan : int
  80        The ending scan number for the Thermo Raw file.
  81
  82    Methods:
  83    --------
  84    * set_msordertype(scanFilter, mstype: str = 'ms1') -> scanFilter
  85        Convert the user-passed MS Type string to a Thermo MSOrderType object.
  86    * get_instrument_info() -> dict
  87        Get the instrument information from the Thermo Raw file.
  88    * get_creation_time() -> datetime.datetime
  89        Extract the creation date stamp from the .RAW file and return it as a formatted datetime object.
  90    * remove_temp_file()
  91        Remove the temporary file if the path is from S3Path.
  92    * get_polarity_mode(scan_number: int) -> int
  93        Get the polarity mode for the given scan number.
  94    * get_filter_for_scan_num(scan_number: int) -> List[str]
  95        Get the filter for the given scan number.
  96    * check_full_scan(scan_number: int) -> bool
  97        Check if the given scan number is a full scan.
  98    * get_all_filters() -> Tuple[Dict[int, str], List[str]]
  99        Get all scan filters for the Thermo Raw file.
 100    * get_scan_header(scan: int) -> Dict[str, Any]
 101        Get the full dictionary of scan header metadata for the given scan number.
 102    * get_rt_time_from_trace(trace) -> Tuple[List[float], List[float], List[int]]
 103        Get the retention time, intensity, and scan number from the given trace.
 104    * get_eics(target_mzs: List[float], tic_data: Dict[str, Any], ms_type: str = 'MS !d',
 105             peak_detection: bool = True, smooth: bool = True, plot: bool = False,
 106             ax: Optional[matplotlib.axes.Axes] = None, legend: bool = False) -> Tuple[Dict[float, EIC_Data], matplotlib.axes.Axes]
 107        Get the extracted ion chromatograms (EICs) for the target m/z values.
 108
 109    """
 110
 111    def __init__(self, file_location):
 112        """file_location: srt pathlib.Path or s3path.S3Path
 113        Thermo Raw file path
 114        """
 115        # Thread.__init__(self)
 116        if isinstance(file_location, str):
 117            file_path = Path(file_location)
 118
 119        elif isinstance(file_location, S3Path):
 120            temp_dir = Path("tmp/")
 121            temp_dir.mkdir(exist_ok=True)
 122
 123            file_path = temp_dir / file_location.name
 124            with open(file_path, "wb") as fh:
 125                fh.write(file_location.read_bytes())
 126
 127        else:
 128            file_path = file_location
 129
 130        self.iRawDataPlus = RawFileReaderAdapter.FileFactory(str(file_path))
 131
 132        if not self.iRawDataPlus.IsOpen:
 133            raise FileNotFoundError(
 134                "Unable to access the RAW file using the RawFileReader class!"
 135            )
 136
 137        # Check for any errors in the RAW file
 138        if self.iRawDataPlus.IsError:
 139            raise IOError(
 140                "Error opening ({}) - {}".format(self.iRawDataPlus.FileError, file_path)
 141            )
 142
 143        self.res = self.iRawDataPlus.SelectInstrument(Device.MS, 1)
 144
 145        self.file_path = file_location
 146        self.iFileHeader = FileHeaderReaderFactory.ReadFile(str(file_path))
 147
 148        # removing tmp file
 149
 150        self._init_settings()
 151
 152    def _init_settings(self):
 153        """
 154        Initialize the LCMSParameters object.
 155        """
 156        self._parameters = LCMSParameters()
 157
 158    @property
 159    def parameters(self) -> LCMSParameters:
 160        """
 161        Get or set the LCMSParameters object.
 162        """
 163        return self._parameters
 164
 165    @parameters.setter
 166    def parameters(self, instance_LCMSParameters: LCMSParameters):
 167        self._parameters = instance_LCMSParameters
 168
 169    @property
 170    def chromatogram_settings(self) -> LiquidChromatographSetting:
 171        """
 172        Get or set the LiquidChromatographSetting object.
 173        """
 174        return self.parameters.lc_ms
 175
 176    @chromatogram_settings.setter
 177    def chromatogram_settings(
 178        self, instance_LiquidChromatographSetting: LiquidChromatographSetting
 179    ):
 180        self.parameters.lc_ms = instance_LiquidChromatographSetting
 181
 182    @property
 183    def scans(self) -> list | tuple:
 184        """scans : list or tuple
 185        If list uses Thermo AverageScansInScanRange for selected scans, ortherwise uses Thermo AverageScans for a scan range
 186        """
 187        return self.chromatogram_settings.scans
 188
 189    @property
 190    def start_scan(self) -> int:
 191        """
 192        Get the starting scan number for the Thermo Raw file.
 193        """
 194        if self.scans[0] == -1:
 195            return self.iRawDataPlus.RunHeaderEx.FirstSpectrum
 196        else:
 197            return self.scans[0]
 198
 199    @property
 200    def end_scan(self) -> int:
 201        """
 202        Get the ending scan number for the Thermo Raw file.
 203        """
 204        if self.scans[-1] == -1:
 205            return self.iRawDataPlus.RunHeaderEx.LastSpectrum
 206        else:
 207            return self.scans[-1]
 208
 209    def set_msordertype(self, scanFilter, mstype: str = "ms1"):
 210        """
 211        Function to convert user passed string MS Type to Thermo MSOrderType object
 212        Limited to MS1 through MS10.
 213
 214        Parameters:
 215        -----------
 216        scanFilter : Thermo.ScanFilter
 217            The scan filter object.
 218        mstype : str, optional
 219            The MS Type string, by default 'ms1'
 220
 221        """
 222        mstype = mstype.upper()
 223        # Check that a valid mstype is passed
 224        if (int(mstype.split("MS")[1]) > 10) or (int(mstype.split("MS")[1]) < 1):
 225            warn("MS Type not valid, must be between MS1 and MS10")
 226
 227        msordertypedict = {
 228            "MS1": MSOrderType.Ms,
 229            "MS2": MSOrderType.Ms2,
 230            "MS3": MSOrderType.Ms3,
 231            "MS4": MSOrderType.Ms4,
 232            "MS5": MSOrderType.Ms5,
 233            "MS6": MSOrderType.Ms6,
 234            "MS7": MSOrderType.Ms7,
 235            "MS8": MSOrderType.Ms8,
 236            "MS9": MSOrderType.Ms9,
 237            "MS10": MSOrderType.Ms10,
 238        }
 239        scanFilter.MSOrder = msordertypedict[mstype]
 240        return scanFilter
 241
 242    def get_instrument_info(self) -> dict:
 243        """
 244        Get the instrument information from the Thermo Raw file.
 245
 246        Returns:
 247        --------
 248        dict
 249            A dictionary with the keys 'model', and 'serial_number'.
 250        """
 251        instrumentData = self.iRawDataPlus.GetInstrumentData()
 252        return {
 253            "model": instrumentData.Model,
 254            "serial_number": instrumentData.SerialNumber
 255        }
 256    
 257    def get_creation_time(self) -> datetime.datetime:
 258        """
 259        Extract the creation date stamp from the .RAW file
 260        Return formatted creation date stamp.
 261
 262        """
 263        credate = self.iRawDataPlus.CreationDate.get_Ticks()
 264        credate = datetime.datetime(1, 1, 1) + datetime.timedelta(
 265            microseconds=credate / 10
 266        )
 267        return credate
 268
 269    def remove_temp_file(self) -> None:
 270        """if the path is from S3Path data cannot be serialized to io.ByteStream and
 271        a temporary copy is stored at the temp dir
 272        use this function only at the end of your execution scrip
 273        some LCMS class methods depend on this file
 274        """
 275
 276        self.file_path.unlink()
 277
 278    def close_file(self) -> None:
 279        """
 280        Close the Thermo Raw file.
 281        """
 282        self.iRawDataPlus.Dispose()
 283
 284    def get_polarity_mode(self, scan_number: int) -> int:
 285        """
 286        Get the polarity mode for the given scan number.
 287
 288        Parameters:
 289        -----------
 290        scan_number : int
 291            The scan number.
 292
 293        Raises:
 294        -------
 295        Exception
 296            If the polarity mode is unknown.
 297
 298        """
 299        polarity_symbol = self.get_filter_for_scan_num(scan_number)[1]
 300
 301        if polarity_symbol == "+":
 302            return 1
 303            # return 'POSITIVE_ION_MODE'
 304
 305        elif polarity_symbol == "-":
 306            return -1
 307
 308        else:
 309            raise Exception("Polarity Mode Unknown, please set it manually")
 310
 311    def get_filter_for_scan_num(self, scan_number: int) -> List[str]:
 312        """
 313        Returns the closest matching run time that corresponds to scan_number for the current
 314        controller. This function is only supported for MS device controllers.
 315        e.g.  ['FTMS', '-', 'p', 'NSI', 'Full', 'ms', '[200.00-1000.00]']
 316
 317        Parameters:
 318        -----------
 319        scan_number : int
 320            The scan number.
 321
 322        """
 323        scan_label = self.iRawDataPlus.GetScanEventStringForScanNumber(scan_number)
 324
 325        return str(scan_label).split()
 326
 327    def get_ms_level_for_scan_num(self, scan_number: int) -> str:
 328        """
 329        Get the MS order for the given scan number.
 330
 331        Parameters:
 332        -----------
 333        scan_number : int
 334            The scan number
 335
 336        Returns:
 337        --------
 338        int
 339            The MS order type (1 for MS, 2 for MS2, etc.)
 340        """
 341        scan_filter = self.iRawDataPlus.GetFilterForScanNumber(scan_number)
 342
 343        msordertype = {
 344            MSOrderType.Ms: 1,
 345            MSOrderType.Ms2: 2,
 346            MSOrderType.Ms3: 3,
 347            MSOrderType.Ms4: 4,
 348            MSOrderType.Ms5: 5,
 349            MSOrderType.Ms6: 6,
 350            MSOrderType.Ms7: 7,
 351            MSOrderType.Ms8: 8,
 352            MSOrderType.Ms9: 9,
 353            MSOrderType.Ms10: 10,
 354        }
 355
 356        if scan_filter.MSOrder in msordertype:
 357            return msordertype[scan_filter.MSOrder]
 358        else:
 359            raise Exception("MS Order Type not found")
 360    
 361    def check_full_scan(self, scan_number: int) -> bool:
 362        # scan_filter.ScanMode 0 = FULL
 363        scan_filter = self.iRawDataPlus.GetFilterForScanNumber(scan_number)
 364
 365        return scan_filter.ScanMode == MSOrderType.Ms
 366
 367    def get_all_filters(self) -> Tuple[Dict[int, str], List[str]]:
 368        """
 369        Get all scan filters.
 370        This function is only supported for MS device controllers.
 371        e.g.  ['FTMS', '-', 'p', 'NSI', 'Full', 'ms', '[200.00-1000.00]']
 372
 373        """
 374
 375        scanrange = range(self.start_scan, self.end_scan + 1)
 376        scanfiltersdic = {}
 377        scanfilterslist = []
 378        for scan_number in scanrange:
 379            scan_label = self.iRawDataPlus.GetScanEventStringForScanNumber(scan_number)
 380            scanfiltersdic[scan_number] = scan_label
 381            scanfilterslist.append(scan_label)
 382        scanfilterset = list(set(scanfilterslist))
 383        return scanfiltersdic, scanfilterset
 384
 385    def get_scan_header(self, scan: int) -> Dict[str, Any]:
 386        """
 387        Get full dictionary of scan header meta data, i.e. AGC status, ion injection time, etc.
 388
 389        Parameters:
 390        -----------
 391        scan : int
 392            The scan number.
 393
 394        """
 395        header = self.iRawDataPlus.GetTrailerExtraInformation(scan)
 396
 397        header_dic = {}
 398        for i in range(header.Length):
 399            header_dic.update({header.Labels[i]: header.Values[i]})
 400        return header_dic
 401
 402    @staticmethod
 403    def get_rt_time_from_trace(trace) -> Tuple[List[float], List[float], List[int]]:
 404        """trace: ThermoFisher.CommonCore.Data.Business.ChromatogramSignal"""
 405        return list(trace.Times), list(trace.Intensities), list(trace.Scans)
 406
 407    def get_eics(
 408        self,
 409        target_mzs: List[float],
 410        tic_data: Dict[str, Any],
 411        ms_type="MS !d",
 412        peak_detection=False,
 413        smooth=False,
 414        plot=False,
 415        ax: Optional[axes.Axes] = None,
 416        legend=False,
 417    ) -> Tuple[Dict[float, EIC_Data], axes.Axes]:
 418        """ms_type: str ('MS', MS2')
 419        start_scan: int default -1 will select the lowest available
 420        end_scan: int default -1 will select the highest available
 421
 422        returns:
 423
 424            chroma: dict{target_mz: EIC_Data(
 425                                        Scans: [int]
 426                                            original thermo scan numbers
 427                                        Time: [floats]
 428                                            list of retention times
 429                                        TIC: [floats]
 430                                            total ion chromatogram
 431                                        Apexes: [int]
 432                                            original thermo apex scan number after peak picking
 433                                        )
 434
 435        """
 436        # If peak_detection or smooth is True, raise exception
 437        if peak_detection or smooth:
 438            raise Exception("Peak detection and smoothing are no longer implemented in this function")
 439
 440        options = MassOptions()
 441        options.ToleranceUnits = ToleranceUnits.ppm
 442        options.Tolerance = self.chromatogram_settings.eic_tolerance_ppm
 443
 444        all_chroma_settings = []
 445
 446        for target_mz in target_mzs:
 447            settings = ChromatogramTraceSettings(TraceType.MassRange)
 448            settings.Filter = ms_type
 449            settings.MassRanges = [Range(target_mz, target_mz)]
 450
 451            chroma_settings = IChromatogramSettings(settings)
 452
 453            all_chroma_settings.append(chroma_settings)
 454
 455        # chroma_settings2 = IChromatogramSettings(settings)
 456        # print(chroma_settings.FragmentMass)
 457        # print(chroma_settings.FragmentMass)
 458        # print(chroma_settings)
 459        # print(chroma_settings)
 460
 461        data = self.iRawDataPlus.GetChromatogramData(
 462            all_chroma_settings, self.start_scan, self.end_scan, options
 463        )
 464
 465        traces = ChromatogramSignal.FromChromatogramData(data)
 466
 467        chroma = {}
 468
 469        if plot:
 470            from matplotlib.transforms import Bbox
 471            import matplotlib.pyplot as plt
 472
 473            if not ax:
 474                # ax = plt.gca()
 475                # ax.clear()
 476                fig, ax = plt.subplots()
 477
 478            else:
 479                fig = plt.gcf()
 480
 481            # plt.show()
 482
 483        for i, trace in enumerate(traces):
 484            if trace.Length > 0:
 485                rt, eic, scans = self.get_rt_time_from_trace(trace)
 486                if smooth:
 487                    eic = self.smooth_tic(eic)
 488
 489                chroma[target_mzs[i]] = EIC_Data(scans=scans, time=rt, eic=eic)
 490                if plot:
 491                    ax.plot(rt, eic, label="{:.5f}".format(target_mzs[i]))
 492
 493        if peak_detection:
 494            # max_eic = self.get_max_eic(chroma)
 495            max_signal = max(tic_data.tic)
 496
 497            for eic_data in chroma.values():
 498                eic = eic_data.eic
 499                time = eic_data.time
 500
 501                if len(eic) != len(tic_data.tic):
 502                    warn(
 503                        "The software assumes same lenth of TIC and EIC, this does not seems to be the case and the results mass spectrum selected by the scan number might not be correct"
 504                    )
 505
 506                if eic.max() > 0:
 507                    centroid_eics = self.eic_centroid_detector(time, eic, max_signal)
 508                    eic_data.apexes = [i for i in centroid_eics]
 509
 510                    if plot:
 511                        for peak_indexes in eic_data.apexes:
 512                            apex_index = peak_indexes[1]
 513                            ax.plot(
 514                                time[apex_index],
 515                                eic[apex_index],
 516                                marker="x",
 517                                linewidth=0,
 518                            )
 519
 520        if plot:
 521            ax.set_xlabel("Time (min)")
 522            ax.set_ylabel("a.u.")
 523            ax.set_title(ms_type + " EIC")
 524            ax.tick_params(axis="both", which="major", labelsize=12)
 525            ax.axes.spines["top"].set_visible(False)
 526            ax.axes.spines["right"].set_visible(False)
 527
 528            if legend:
 529                legend = ax.legend(loc="upper left", bbox_to_anchor=(1.02, 0, 0.07, 1))
 530                fig.subplots_adjust(right=0.76)
 531                # ax.set_prop_cycle(color=plt.cm.gist_rainbow(np.linspace(0, 1, len(traces))))
 532
 533                d = {"down": 30, "up": -30}
 534
 535                def func(evt):
 536                    if legend.contains(evt):
 537                        bbox = legend.get_bbox_to_anchor()
 538                        bbox = Bbox.from_bounds(
 539                            bbox.x0, bbox.y0 + d[evt.button], bbox.width, bbox.height
 540                        )
 541                        tr = legend.axes.transAxes.inverted()
 542                        legend.set_bbox_to_anchor(bbox.transformed(tr))
 543                        fig.canvas.draw_idle()
 544
 545                fig.canvas.mpl_connect("scroll_event", func)
 546            return chroma, ax
 547        else:
 548            return chroma, None
 549            rt = []
 550            tic = []
 551            scans = []
 552            for i in range(traces[0].Length):
 553                # print(trace[0].HasBasePeakData,trace[0].EndTime )
 554
 555                # print("  {} - {}, {}".format( i, trace[0].Times[i], trace[0].Intensities[i] ))
 556                rt.append(traces[0].Times[i])
 557                tic.append(traces[0].Intensities[i])
 558                scans.append(traces[0].Scans[i])
 559
 560            return traces
 561            # plot_chroma(rt, tic)
 562            # plt.show()
 563
 564    def get_tic(
 565        self,
 566        ms_type="MS !d",
 567        peak_detection=False,  # This wont work right now
 568        smooth=False,  # This wont work right now
 569        plot=False,
 570        ax=None,
 571        trace_type="TIC",
 572    ) -> Tuple[TIC_Data, axes.Axes]:
 573        """ms_type: str ('MS !d', 'MS2', None)
 574            if you use None you get all scans.
 575        peak_detection: bool
 576        smooth: bool
 577        plot: bool
 578        ax: matplotlib axis object
 579        trace_type: str ('TIC','BPC')
 580
 581        returns:
 582            chroma: dict
 583            {
 584            Scan: [int]
 585                original thermo scan numberMS
 586            Time: [floats]
 587                list of retention times
 588            TIC: [floats]
 589                total ion chromatogram
 590            Apexes: [int]
 591                original thermo apex scan number after peak picking
 592            }
 593        """
 594        if trace_type == "TIC":
 595            settings = ChromatogramTraceSettings(TraceType.TIC)
 596        elif trace_type == "BPC":
 597            settings = ChromatogramTraceSettings(TraceType.BasePeak)
 598        else:
 599            raise ValueError(f"{trace_type} undefined")
 600        if ms_type == "all":
 601            settings.Filter = None
 602        else:
 603            settings.Filter = ms_type
 604
 605        chroma_settings = IChromatogramSettings(settings)
 606
 607        data = self.iRawDataPlus.GetChromatogramData(
 608            [chroma_settings], self.start_scan, self.end_scan
 609        )
 610
 611        trace = ChromatogramSignal.FromChromatogramData(data)
 612
 613        data = TIC_Data(time=[], scans=[], tic=[], bpc=[], apexes=[])
 614
 615        if trace[0].Length > 0:
 616            for i in range(trace[0].Length):
 617                # print(trace[0].HasBasePeakData,trace[0].EndTime )
 618
 619                # print("  {} - {}, {}".format( i, trace[0].Times[i], trace[0].Intensities[i] ))
 620                data.time.append(trace[0].Times[i])
 621                data.tic.append(trace[0].Intensities[i])
 622                data.scans.append(trace[0].Scans[i])
 623
 624                # print(trace[0].Scans[i])
 625            if smooth:
 626                data.tic = self.smooth_tic(data.tic)
 627
 628            else:
 629                data.tic = np.array(data.tic)
 630
 631            if peak_detection:
 632                centroid_peak_indexes = [
 633                    i for i in self.centroid_detector(data.time, data.tic)
 634                ]
 635
 636                data.apexes = centroid_peak_indexes
 637
 638            if plot:
 639                if not ax:
 640                    import matplotlib.pyplot as plt
 641
 642                    ax = plt.gca()
 643                    # fig, ax = plt.subplots(figsize=(6, 3))
 644
 645                ax.plot(data.time, data.tic, label=trace_type)
 646                ax.set_xlabel("Time (min)")
 647                ax.set_ylabel("a.u.")
 648                if peak_detection:
 649                    for peak_indexes in data.apexes:
 650                        apex_index = peak_indexes[1]
 651                        ax.plot(
 652                            data.time[apex_index],
 653                            data.tic[apex_index],
 654                            marker="x",
 655                            linewidth=0,
 656                        )
 657
 658                # plt.show()
 659                if trace_type == "BPC":
 660                    data.bpc = data.tic
 661                    data.tic = []
 662                return data, ax
 663            if trace_type == "BPC":
 664                data.bpc = data.tic
 665                data.tic = []
 666            return data, None
 667
 668        else:
 669            return None, None
 670
 671    def get_average_mass_spectrum(
 672        self,
 673        spectrum_mode: str = "profile",
 674        auto_process: bool = True,
 675        ppm_tolerance: float = 5.0,
 676        ms_type: str = "MS1",
 677    ) -> MassSpecProfile | MassSpecCentroid:
 678        """
 679        Averages mass spectra over a scan range using Thermo's AverageScansInScanRange method
 680        or a scan list using Thermo's AverageScans method
 681        spectrum_mode: str
 682            centroid or profile mass spectrum
 683        auto_process: bool
 684            If true performs peak picking, and noise threshold calculation after creation of mass spectrum object
 685        ms_type: str
 686            String of form 'ms1' or 'ms2' or 'MS3' etc. Valid up to MS10.
 687            Internal function converts to Thermo MSOrderType class.
 688
 689        """
 690
 691        def get_profile_mass_spec(averageScan, d_params: dict, auto_process: bool):
 692            mz_list = list(averageScan.SegmentedScan.Positions)
 693            abund_list = list(averageScan.SegmentedScan.Intensities)
 694
 695            data_dict = {
 696                Labels.mz: mz_list,
 697                Labels.abundance: abund_list,
 698            }
 699
 700            return MassSpecProfile(data_dict, d_params, auto_process=auto_process)
 701
 702        def get_centroid_mass_spec(averageScan, d_params: dict):
 703            noise = list(averageScan.centroidScan.Noises)
 704
 705            baselines = list(averageScan.centroidScan.Baselines)
 706
 707            rp = list(averageScan.centroidScan.Resolutions)
 708
 709            magnitude = list(averageScan.centroidScan.Intensities)
 710
 711            mz = list(averageScan.centroidScan.Masses)
 712
 713            array_noise_std = (np.array(noise) - np.array(baselines)) / 3
 714            l_signal_to_noise = np.array(magnitude) / array_noise_std
 715
 716            d_params["baseline_noise"] = np.average(array_noise_std)
 717
 718            d_params["baseline_noise_std"] = np.std(array_noise_std)
 719
 720            data_dict = {
 721                Labels.mz: mz,
 722                Labels.abundance: magnitude,
 723                Labels.rp: rp,
 724                Labels.s2n: list(l_signal_to_noise),
 725            }
 726
 727            mass_spec = MassSpecCentroid(data_dict, d_params, auto_process=False)
 728
 729            return mass_spec
 730
 731        d_params = self.set_metadata(
 732            firstScanNumber=self.start_scan, lastScanNumber=self.end_scan
 733        )
 734
 735        # Create the mass options object that will be used when averaging the scans
 736        options = MassOptions()
 737        options.ToleranceUnits = ToleranceUnits.ppm
 738        options.Tolerance = ppm_tolerance
 739
 740        # Get the scan filter for the first scan.  This scan filter will be used to located
 741        # scans within the given scan range of the same type
 742        scanFilter = self.iRawDataPlus.GetFilterForScanNumber(self.start_scan)
 743
 744        # force it to only look for the MSType
 745        scanFilter = self.set_msordertype(scanFilter, ms_type)
 746
 747        if isinstance(self.scans, tuple):
 748            averageScan = Extensions.AverageScansInScanRange(
 749                self.iRawDataPlus, self.start_scan, self.end_scan, scanFilter, options
 750            )
 751
 752            if averageScan:
 753                if spectrum_mode == "profile":
 754                    mass_spec = get_profile_mass_spec(
 755                        averageScan, d_params, auto_process
 756                    )
 757
 758                    return mass_spec
 759
 760                elif spectrum_mode == "centroid":
 761                    if averageScan.HasCentroidStream:
 762                        mass_spec = get_centroid_mass_spec(averageScan, d_params)
 763
 764                        return mass_spec
 765
 766                    else:
 767                        raise ValueError(
 768                            "No Centroind data available for the selected scans"
 769                        )
 770                else:
 771                    raise ValueError("spectrum_mode must be 'profile' or centroid")
 772            else:
 773                raise ValueError("No data found for the selected scans")
 774
 775        elif isinstance(self.scans, list):
 776            d_params = self.set_metadata(scans_list=self.scans)
 777
 778            scans = List[int]()
 779            for scan in self.scans:
 780                scans.Add(scan)
 781
 782            averageScan = Extensions.AverageScans(self.iRawDataPlus, scans, options)
 783
 784            if averageScan:
 785                if spectrum_mode == "profile":
 786                    mass_spec = get_profile_mass_spec(
 787                        averageScan, d_params, auto_process
 788                    )
 789
 790                    return mass_spec
 791
 792                elif spectrum_mode == "centroid":
 793                    if averageScan.HasCentroidStream:
 794                        mass_spec = get_centroid_mass_spec(averageScan, d_params)
 795
 796                        return mass_spec
 797
 798                    else:
 799                        raise ValueError(
 800                            "No Centroind data available for the selected scans"
 801                        )
 802
 803                else:
 804                    raise ValueError("spectrum_mode must be 'profile' or centroid")
 805
 806            else:
 807                raise ValueError("No data found for the selected scans")
 808
 809        else:
 810            raise ValueError("scans must be a list intergers or a tuple if integers")
 811
 812    def set_metadata(
 813        self,
 814        firstScanNumber=0,
 815        lastScanNumber=0,
 816        scans_list=False,
 817        label=Labels.thermo_profile,
 818    ):
 819        """
 820        Collect metadata to be ingested in the mass spectrum object
 821
 822        scans_list: list[int] or false
 823        lastScanNumber: int
 824        firstScanNumber: int
 825        """
 826
 827        d_params = default_parameters(self.file_path)
 828
 829        # assumes scans is full scan or reduced profile scan
 830
 831        d_params["label"] = label
 832
 833        if scans_list:
 834            d_params["scan_number"] = scans_list
 835
 836            d_params["polarity"] = self.get_polarity_mode(scans_list[0])
 837
 838        else:
 839            d_params["scan_number"] = "{}-{}".format(firstScanNumber, lastScanNumber)
 840
 841            d_params["polarity"] = self.get_polarity_mode(firstScanNumber)
 842
 843        d_params["analyzer"] = self.iRawDataPlus.GetInstrumentData().Model
 844
 845        d_params["acquisition_time"] = self.get_creation_time()
 846
 847        d_params["instrument_label"] = self.iRawDataPlus.GetInstrumentData().Name
 848
 849        return d_params
 850
 851    def get_instrument_methods(self, parse_strings: bool = True):
 852        """
 853        This function will extract the instrument methods embedded in the raw file
 854
 855        First it will check if there are any instrument methods, if not returning None
 856        Then it will get the total number of instrument methods.
 857        For each method, it will extract the plaintext string of the method and attempt to parse it into a dictionary
 858        If this fails, it will return just the string object.
 859
 860        This has been tested on data from an Orbitrap ID-X with embedded MS and LC methods, but other instrument types may fail.
 861
 862        Parameters:
 863        -----------
 864        parse_strings: bool
 865            If True, will attempt to parse the instrument methods into a dictionary. If False, will return the raw string.
 866
 867        Returns:
 868        --------
 869        List[Dict[str, Any]] or List
 870            A list of dictionaries containing the instrument methods, or a list of strings if parsing fails.
 871        """
 872
 873        if not self.iRawDataPlus.HasInstrumentMethod:
 874            raise ValueError(
 875                "Raw Data file does not have any instrument methods attached"
 876            )
 877            return None
 878        else:
 879
 880            def parse_instrument_method(data):
 881                lines = data.split("\r\n")
 882                method = {}
 883                current_section = None
 884                sub_section = None
 885
 886                for line in lines:
 887                    if not line.strip():  # Skip empty lines
 888                        continue
 889                    if (
 890                        line.startswith("----")
 891                        or line.endswith("Settings")
 892                        or line.endswith("Summary")
 893                        or line.startswith("Experiment")
 894                        or line.startswith("Scan Event")
 895                    ):
 896                        current_section = line.replace("-", "").strip()
 897                        method[current_section] = {}
 898                        sub_section = None
 899                    elif line.startswith("\t"):
 900                        if "\t\t" in line:
 901                            indent_level = line.count("\t")
 902                            key_value = line.strip()
 903
 904                            if indent_level == 2:
 905                                if sub_section:
 906                                    key, value = (
 907                                        key_value.split("=", 1)
 908                                        if "=" in key_value
 909                                        else (key_value, None)
 910                                    )
 911                                    method[current_section][sub_section][
 912                                        key.strip()
 913                                    ] = value.strip() if value else None
 914                            elif indent_level == 3:
 915                                scan_type, key_value = (
 916                                    key_value.split(" ", 1)
 917                                    if " " in key_value
 918                                    else (key_value, None)
 919                                )
 920                                method.setdefault(current_section, {}).setdefault(
 921                                    sub_section, {}
 922                                ).setdefault(scan_type, {})
 923
 924                                if key_value:
 925                                    key, value = (
 926                                        key_value.split("=", 1)
 927                                        if "=" in key_value
 928                                        else (key_value, None)
 929                                    )
 930                                    method[current_section][sub_section][scan_type][
 931                                        key.strip()
 932                                    ] = value.strip() if value else None
 933                        else:
 934                            key_value = line.strip()
 935                            if "=" in key_value:
 936                                key, value = key_value.split("=", 1)
 937                                method.setdefault(current_section, {})[key.strip()] = (
 938                                    value.strip()
 939                                )
 940                            else:
 941                                sub_section = key_value
 942                    else:
 943                        if ":" in line:
 944                            key, value = line.split(":", 1)
 945                            method[current_section][key.strip()] = value.strip()
 946                        else:
 947                            method[current_section][line] = {}
 948
 949                return method
 950
 951            count_instrument_methods = self.iRawDataPlus.InstrumentMethodsCount
 952            # TODO make this code better...
 953            instrument_methods = []
 954            for i in range(count_instrument_methods):
 955                instrument_method_string = self.iRawDataPlus.GetInstrumentMethod(i)
 956                if parse_strings:
 957                    try:
 958                        instrument_method_dict = parse_instrument_method(
 959                            instrument_method_string
 960                        )
 961                    except:  # if it fails for any reason
 962                        instrument_method_dict = instrument_method_string
 963                else:
 964                    instrument_method_dict = instrument_method_string
 965                instrument_methods.append(instrument_method_dict)
 966            return instrument_methods
 967
 968    def get_tune_method(self):
 969        """
 970        This code will extract the tune method from the raw file
 971        It has been tested on data from a Thermo Orbitrap ID-X, Astral and Q-Exactive, but may fail on other instrument types.
 972        It attempts to parse out section headers and sub-sections, but may not work for all instrument types.
 973        It will also not return Labels (keys) where the value is blank
 974
 975        Returns:
 976        --------
 977        Dict[str, Any]
 978            A dictionary containing the tune method information
 979
 980        Raises:
 981        -------
 982        ValueError
 983            If no tune methods are found in the raw file
 984
 985        """
 986        tunemethodcount = self.iRawDataPlus.GetTuneDataCount()
 987        if tunemethodcount == 0:
 988            raise ValueError("No tune methods found in the raw data file")
 989            return None
 990        elif tunemethodcount > 1:
 991            warnings.warn(
 992                "Multiple tune methods found in the raw data file, returning the 1st"
 993            )
 994
 995        header = self.iRawDataPlus.GetTuneData(0)
 996
 997        header_dic = {}
 998        current_section = None
 999
1000        for i in range(header.Length):
1001            label = header.Labels[i]
1002            value = header.Values[i]
1003
1004            # Check for section headers
1005            if "===" in label or (
1006                (value == "" or value is None) and not label.endswith(":")
1007            ):
1008                # This is a section header
1009                section_name = (
1010                    label.replace("=", "").replace(":", "").strip()
1011                )  # Clean the label if it contains '='
1012                header_dic[section_name] = {}
1013                current_section = section_name
1014            else:
1015                if current_section:
1016                    header_dic[current_section][label] = value
1017                else:
1018                    header_dic[label] = value
1019        return header_dic
1020
1021    def get_status_log(self, retention_time: float = 0):
1022        """
1023        This code will extract the status logs from the raw file
1024        It has been tested on data from a Thermo Orbitrap ID-X, Astral and Q-Exactive, but may fail on other instrument types.
1025        It attempts to parse out section headers and sub-sections, but may not work for all instrument types.
1026        It will also not return Labels (keys) where the value is blank
1027
1028        Parameters:
1029        -----------
1030        retention_time: float
1031            The retention time in minutes to extract the status log data from.
1032            Will use the closest retention time found. Default 0.
1033
1034        Returns:
1035        --------
1036        Dict[str, Any]
1037            A dictionary containing the status log information
1038
1039        Raises:
1040        -------
1041        ValueError
1042            If no status logs are found in the raw file
1043
1044        """
1045        tunemethodcount = self.iRawDataPlus.GetStatusLogEntriesCount()
1046        if tunemethodcount == 0:
1047            raise ValueError("No status logs found in the raw data file")
1048            return None
1049
1050        header = self.iRawDataPlus.GetStatusLogForRetentionTime(retention_time)
1051
1052        header_dic = {}
1053        current_section = None
1054
1055        for i in range(header.Length):
1056            label = header.Labels[i]
1057            value = header.Values[i]
1058
1059            # Check for section headers
1060            if "===" in label or (
1061                (value == "" or value is None) and not label.endswith(":")
1062            ):
1063                # This is a section header
1064                section_name = (
1065                    label.replace("=", "").replace(":", "").strip()
1066                )  # Clean the label if it contains '='
1067                header_dic[section_name] = {}
1068                current_section = section_name
1069            else:
1070                if current_section:
1071                    header_dic[current_section][label] = value
1072                else:
1073                    header_dic[label] = value
1074        return header_dic
1075
1076    def get_error_logs(self):
1077        """
1078        This code will extract the error logs from the raw file
1079
1080        Returns:
1081        --------
1082        Dict[float, str]
1083            A dictionary containing the error log information with the retention time as the key
1084
1085        Raises:
1086        -------
1087        ValueError
1088            If no error logs are found in the raw file
1089        """
1090
1091        error_log_count = self.iRawDataPlus.RunHeaderEx.ErrorLogCount
1092        if error_log_count == 0:
1093            raise ValueError("No error logs found in the raw data file")
1094            return None
1095
1096        error_logs = {}
1097
1098        for i in range(error_log_count):
1099            error_log_item = self.iRawDataPlus.GetErrorLogItem(i)
1100            rt = error_log_item.RetentionTime
1101            message = error_log_item.Message
1102            # Use the index `i` as the unique ID key
1103            error_logs[i] = {"rt": rt, "message": message}
1104        return error_logs
1105
1106    def get_sample_information(self):
1107        """
1108        This code will extract the sample information from the raw file
1109
1110        Returns:
1111        --------
1112        Dict[str, Any]
1113            A dictionary containing the sample information
1114            Note that UserText field may not be handled properly and may need further processing
1115        """
1116        sminfo = self.iRawDataPlus.SampleInformation
1117        smdict = {}
1118        smdict["Comment"] = sminfo.Comment
1119        smdict["SampleId"] = sminfo.SampleId
1120        smdict["SampleName"] = sminfo.SampleName
1121        smdict["Vial"] = sminfo.Vial
1122        smdict["InjectionVolume"] = sminfo.InjectionVolume
1123        smdict["Barcode"] = sminfo.Barcode
1124        smdict["BarcodeStatus"] = str(sminfo.BarcodeStatus)
1125        smdict["CalibrationLevel"] = sminfo.CalibrationLevel
1126        smdict["DilutionFactor"] = sminfo.DilutionFactor
1127        smdict["InstrumentMethodFile"] = sminfo.InstrumentMethodFile
1128        smdict["RawFileName"] = sminfo.RawFileName
1129        smdict["CalibrationFile"] = sminfo.CalibrationFile
1130        smdict["IstdAmount"] = sminfo.IstdAmount
1131        smdict["RowNumber"] = sminfo.RowNumber
1132        smdict["Path"] = sminfo.Path
1133        smdict["ProcessingMethodFile"] = sminfo.ProcessingMethodFile
1134        smdict["SampleType"] = str(sminfo.SampleType)
1135        smdict["SampleWeight"] = sminfo.SampleWeight
1136        smdict["UserText"] = {
1137            "UserText": [x for x in sminfo.UserText]
1138        }  # [0] #This may not work - needs debugging with
1139        return smdict
1140
1141    def get_instrument_data(self):
1142        """
1143        This code will extract the instrument data from the raw file
1144
1145        Returns:
1146        --------
1147        Dict[str, Any]
1148            A dictionary containing the instrument data
1149        """
1150        instrument_data = self.iRawDataPlus.GetInstrumentData()
1151        id_dict = {}
1152        id_dict["Name"] = instrument_data.Name
1153        id_dict["Model"] = instrument_data.Model
1154        id_dict["SerialNumber"] = instrument_data.SerialNumber
1155        id_dict["SoftwareVersion"] = instrument_data.SoftwareVersion
1156        id_dict["HardwareVersion"] = instrument_data.HardwareVersion
1157        id_dict["ChannelLabels"] = {
1158            "ChannelLabels": [x for x in instrument_data.ChannelLabels]
1159        }
1160        id_dict["Flags"] = instrument_data.Flags
1161        id_dict["AxisLabelY"] = instrument_data.AxisLabelY
1162        id_dict["AxisLabelX"] = instrument_data.AxisLabelX
1163        return id_dict
1164
1165    def get_centroid_msms_data(self, scan):
1166        """
1167        .. deprecated:: 2.0
1168            This function will be removed in CoreMS 2.0. Please use `get_average_mass_spectrum()` instead for similar functionality.
1169        """
1170
1171        warnings.warn(
1172            "The `get_centroid_msms_data()` is deprecated as of CoreMS 2.0 and will be removed in a future version. "
1173            "Please use `get_average_mass_spectrum()` instead.",
1174            DeprecationWarning,
1175        )
1176
1177        d_params = self.set_metadata(scans_list=[scan], label=Labels.thermo_centroid)
1178
1179        centroidStream = self.iRawDataPlus.GetCentroidStream(scan, False)
1180
1181        noise = list(centroidStream.Noises)
1182
1183        baselines = list(centroidStream.Baselines)
1184
1185        rp = list(centroidStream.Resolutions)
1186
1187        magnitude = list(centroidStream.Intensities)
1188
1189        mz = list(centroidStream.Masses)
1190
1191        # charge = scans_labels[5]
1192        array_noise_std = (np.array(noise) - np.array(baselines)) / 3
1193        l_signal_to_noise = np.array(magnitude) / array_noise_std
1194
1195        d_params["baseline_noise"] = np.average(array_noise_std)
1196
1197        d_params["baseline_noise_std"] = np.std(array_noise_std)
1198
1199        data_dict = {
1200            Labels.mz: mz,
1201            Labels.abundance: magnitude,
1202            Labels.rp: rp,
1203            Labels.s2n: list(l_signal_to_noise),
1204        }
1205
1206        mass_spec = MassSpecCentroid(data_dict, d_params, auto_process=False)
1207        mass_spec.settings.noise_threshold_method = "relative_abundance"
1208        mass_spec.settings.noise_threshold_min_relative_abundance = 1
1209        mass_spec.process_mass_spec()
1210        return mass_spec
1211
1212    def get_average_mass_spectrum_by_scanlist(
1213        self,
1214        scans_list: List[int],
1215        auto_process: bool = True,
1216        ppm_tolerance: float = 5.0,
1217    ) -> MassSpecProfile:
1218        """
1219        Averages selected scans mass spectra using Thermo's AverageScans method
1220        scans_list: list[int]
1221        auto_process: bool
1222            If true performs peak picking, and noise threshold calculation after creation of mass spectrum object
1223        Returns:
1224            MassSpecProfile
1225
1226         .. deprecated:: 2.0
1227        This function will be removed in CoreMS 2.0. Please use `get_average_mass_spectrum()` instead for similar functionality.
1228        """
1229
1230        warnings.warn(
1231            "The `get_average_mass_spectrum_by_scanlist()` is deprecated as of CoreMS 2.0 and will be removed in a future version. "
1232            "Please use `get_average_mass_spectrum()` instead.",
1233            DeprecationWarning,
1234        )
1235
1236        d_params = self.set_metadata(scans_list=scans_list)
1237
1238        # assumes scans is full scan or reduced profile scan
1239
1240        scans = List[int]()
1241        for scan in scans_list:
1242            scans.Add(scan)
1243
1244        # Create the mass options object that will be used when averaging the scans
1245        options = MassOptions()
1246        options.ToleranceUnits = ToleranceUnits.ppm
1247        options.Tolerance = ppm_tolerance
1248
1249        # Get the scan filter for the first scan.  This scan filter will be used to located
1250        # scans within the given scan range of the same type
1251
1252        averageScan = Extensions.AverageScans(self.iRawDataPlus, scans, options)
1253
1254        len_data = averageScan.SegmentedScan.Positions.Length
1255
1256        mz_list = list(averageScan.SegmentedScan.Positions)
1257        abund_list = list(averageScan.SegmentedScan.Intensities)
1258
1259        data_dict = {
1260            Labels.mz: mz_list,
1261            Labels.abundance: abund_list,
1262        }
1263
1264        mass_spec = MassSpecProfile(data_dict, d_params, auto_process=auto_process)
1265
1266        return mass_spec
1267
1268
1269class ImportMassSpectraThermoMSFileReader(ThermoBaseClass, SpectraParserInterface):
1270    """A class for parsing Thermo RAW mass spectrometry data files and instatiating MassSpectraBase or LCMSBase objects
1271
1272    Parameters
1273    ----------
1274    file_location : str or Path
1275        The path to the RAW file to be parsed.
1276    analyzer : str, optional
1277        The type of mass analyzer used in the instrument. Default is "Unknown".
1278    instrument_label : str, optional
1279        The name of the instrument used to acquire the data. Default is "Unknown".
1280    sample_name : str, optional
1281        The name of the sample being analyzed. If not provided, the stem of the file_location path will be used.
1282
1283    Attributes
1284    ----------
1285    file_location : Path
1286        The path to the RAW file being parsed.
1287    analyzer : str
1288        The type of mass analyzer used in the instrument.
1289    instrument_label : str
1290        The name of the instrument used to acquire the data.
1291    sample_name : str
1292        The name of the sample being analyzed.
1293
1294    Methods
1295    -------
1296    * run(spectra=True).
1297        Parses the RAW file and returns a dictionary of mass spectra dataframes and a scan metadata dataframe.
1298    * get_mass_spectrum_from_scan(scan_number, polarity, auto_process=True)
1299        Parses the RAW file and returns a MassSpecBase object from a single scan.
1300    * get_mass_spectra_obj().
1301        Parses the RAW file and instantiates a MassSpectraBase object.
1302    * get_lcms_obj().
1303        Parses the RAW file and instantiates an LCMSBase object.
1304    * get_icr_transient_times().
1305        Return a list for transient time targets for all scans, or selected scans range
1306
1307    Inherits from ThermoBaseClass and SpectraParserInterface
1308    """
1309
1310    def __init__(
1311        self,
1312        file_location,
1313        analyzer="Unknown",
1314        instrument_label="Unknown",
1315        sample_name=None,
1316    ):
1317        super().__init__(file_location)
1318        if isinstance(file_location, str):
1319            # if obj is a string it defaults to create a Path obj, pass the S3Path if needed
1320            file_location = Path(file_location)
1321        if not file_location.exists():
1322            raise FileExistsError("File does not exist: " + str(file_location))
1323
1324        self.file_location = file_location
1325        self.analyzer = analyzer
1326        self.instrument_label = instrument_label
1327
1328        if sample_name:
1329            self.sample_name = sample_name
1330        else:
1331            self.sample_name = file_location.stem
1332
1333    def load(self):
1334        pass
1335
1336    def get_scan_df(self):
1337        # This automatically brings in all the data
1338        self.chromatogram_settings.scans = (-1, -1)
1339
1340        # Get scan df info; starting with TIC data
1341        tic_data, _ = self.get_tic(ms_type="all", peak_detection=False, smooth=False)
1342        tic_data = {
1343            "scan": tic_data.scans,
1344            "scan_time": tic_data.time,
1345            "tic": tic_data.tic,
1346        }
1347        scan_df = pd.DataFrame.from_dict(tic_data)
1348        scan_df["ms_level"] = None
1349        
1350        # get scan text
1351        scan_filter_df = pd.DataFrame.from_dict(
1352            self.get_all_filters()[0], orient="index"
1353        )
1354        scan_filter_df.reset_index(inplace=True)
1355        scan_filter_df.rename(columns={"index": "scan", 0: "scan_text"}, inplace=True)
1356
1357        scan_df = scan_df.merge(scan_filter_df, on="scan", how="left")
1358        scan_df["scan_window_lower"] = scan_df.scan_text.str.extract(
1359            r"\[(\d+\.\d+)-\d+\.\d+\]"
1360        )
1361        scan_df["scan_window_upper"] = scan_df.scan_text.str.extract(
1362            r"\[\d+\.\d+-(\d+\.\d+)\]"
1363        )
1364        scan_df["polarity"] = np.where(
1365            scan_df.scan_text.str.contains(" - "), "negative", "positive"
1366        )
1367        scan_df["precursor_mz"] = scan_df.scan_text.str.extract(r"(\d+\.\d+)@")
1368        scan_df["precursor_mz"] = scan_df["precursor_mz"].astype(float)
1369
1370        # Assign each scan as centroid or profile and add ms_level
1371        scan_df["ms_format"] = None
1372        for i in scan_df.scan.to_list():
1373            scan_df.loc[scan_df.scan == i, "ms_level"] = self.get_ms_level_for_scan_num(i)
1374            if self.iRawDataPlus.IsCentroidScanFromScanNumber(i):
1375                scan_df.loc[scan_df.scan == i, "ms_format"] = "centroid"
1376            else:
1377                scan_df.loc[scan_df.scan == i, "ms_format"] = "profile"
1378
1379        return scan_df
1380
1381    def get_ms_raw(self, spectra, scan_df):
1382        if spectra == "all":
1383            scan_df_forspec = scan_df
1384        elif spectra == "ms1":
1385            scan_df_forspec = scan_df[scan_df.ms_level == 1]
1386        elif spectra == "ms2":
1387            scan_df_forspec = scan_df[scan_df.ms_level == 2]
1388        else:
1389            raise ValueError("spectra must be 'none', 'all', 'ms1', or 'ms2'")
1390
1391        # Result container
1392        res = {}
1393
1394        # Row count container
1395        counter = {}
1396
1397        # Column name container
1398        cols = {}
1399
1400        # set at float32
1401        dtype = np.float32
1402
1403        # First pass: get nrows
1404        N = defaultdict(lambda: 0)
1405        for i in scan_df_forspec.scan.to_list():
1406            level = scan_df_forspec.loc[scan_df_forspec.scan == i, "ms_level"].values[0]
1407            scanStatistics = self.iRawDataPlus.GetScanStatsForScanNumber(i)
1408            profileStream = self.iRawDataPlus.GetSegmentedScanFromScanNumber(
1409                i, scanStatistics
1410            )
1411            abun = list(profileStream.Intensities)
1412            abun = np.array(abun)[np.where(np.array(abun) > 0)[0]]
1413
1414            N[level] += len(abun)
1415
1416        # Second pass: parse
1417        for i in scan_df_forspec.scan.to_list():
1418            scanStatistics = self.iRawDataPlus.GetScanStatsForScanNumber(i)
1419            profileStream = self.iRawDataPlus.GetSegmentedScanFromScanNumber(
1420                i, scanStatistics
1421            )
1422            abun = list(profileStream.Intensities)
1423            mz = list(profileStream.Positions)
1424
1425            # Get index of abun that are > 0
1426            inx = np.where(np.array(abun) > 0)[0]
1427            mz = np.array(mz)[inx]
1428            mz = np.float32(mz)
1429            abun = np.array(abun)[inx]
1430            abun = np.float32(abun)
1431
1432            level = scan_df_forspec.loc[scan_df_forspec.scan == i, "ms_level"].values[0]
1433
1434            # Number of rows
1435            n = len(mz)
1436
1437            # No measurements
1438            if n == 0:
1439                continue
1440
1441            # Dimension check
1442            if len(mz) != len(abun):
1443                warnings.warn("m/z and intensity array dimension mismatch")
1444                continue
1445
1446            # Scan/frame info
1447            id_dict = i
1448
1449            # Columns
1450            cols[level] = ["scan", "mz", "intensity"]
1451            m = len(cols[level])
1452
1453            # Subarray init
1454            arr = np.empty((n, m), dtype=dtype)
1455            inx = 0
1456
1457            # Populate scan/frame info
1458            arr[:, inx] = i
1459            inx += 1
1460
1461            # Populate m/z
1462            arr[:, inx] = mz
1463            inx += 1
1464
1465            # Populate intensity
1466            arr[:, inx] = abun
1467            inx += 1
1468
1469            # Initialize output container
1470            if level not in res:
1471                res[level] = np.empty((N[level], m), dtype=dtype)
1472                counter[level] = 0
1473
1474            # Insert subarray
1475            res[level][counter[level] : counter[level] + n, :] = arr
1476            counter[level] += n
1477
1478        # Construct ms1 and ms2 mz dataframes
1479        for level in res.keys():
1480            res[level] = pd.DataFrame(res[level])
1481            res[level].columns = cols[level]
1482        # rename keys in res to add 'ms' prefix
1483        res = {f"ms{key}": value for key, value in res.items()}
1484
1485        return res
1486
1487    def run(self, spectra="all", scan_df=None):
1488        """
1489        Extracts mass spectra data from a raw file.
1490
1491        Parameters
1492        ----------
1493        spectra : str, optional
1494            Which mass spectra data to include in the output. Default is all.  Other options: none, ms1, ms2.
1495        scan_df : pandas.DataFrame, optional
1496            Scan dataframe.  If not provided, the scan dataframe is created from the mzML file.
1497
1498        Returns
1499        -------
1500        tuple
1501            A tuple containing two elements:
1502            - A dictionary containing mass spectra data, separated by MS level.
1503            - A pandas DataFrame containing scan information, including scan number, scan time, TIC, MS level,
1504                scan text, scan window lower and upper bounds, polarity, and precursor m/z (if applicable).
1505        """
1506        # Prepare scan_df
1507        if scan_df is None:
1508            scan_df = self.get_scan_df()
1509
1510        # Prepare mass spectra data
1511        if spectra != "none":
1512            res = self.get_ms_raw(spectra=spectra, scan_df=scan_df)
1513        else:
1514            res = None
1515
1516        return res, scan_df
1517
1518    def get_mass_spectrum_from_scan(
1519        self, scan_number, spectrum_mode, auto_process=True
1520    ):
1521        """Instatiate a MassSpecBase object from a single scan number from the binary file, currently only supports profile mode.
1522
1523        Parameters
1524        ----------
1525        scan_number : int
1526            The scan number to extract the mass spectrum from.
1527        polarity : int
1528            The polarity of the scan.  1 for positive mode, -1 for negative mode.
1529        spectrum_mode : str
1530            The type of mass spectrum to extract.  Must be 'profile' or 'centroid'.
1531        auto_process : bool, optional
1532            If True, perform peak picking and noise threshold calculation after creating the mass spectrum object. Default is True.
1533
1534        Returns
1535        -------
1536        MassSpecProfile | MassSpecCentroid
1537            The MassSpecProfile or MassSpecCentroid object containing the parsed mass spectrum.
1538        """
1539
1540        if spectrum_mode == "profile":
1541            scanStatistics = self.iRawDataPlus.GetScanStatsForScanNumber(scan_number)
1542            profileStream = self.iRawDataPlus.GetSegmentedScanFromScanNumber(
1543                scan_number, scanStatistics
1544            )
1545            abun = list(profileStream.Intensities)
1546            mz = list(profileStream.Positions)
1547            data_dict = {
1548                Labels.mz: mz,
1549                Labels.abundance: abun,
1550            }
1551            d_params = self.set_metadata(
1552                firstScanNumber=scan_number,
1553                lastScanNumber=scan_number,
1554                scans_list=False,
1555                label=Labels.thermo_profile,
1556            )
1557            mass_spectrum_obj = MassSpecProfile(
1558                data_dict, d_params, auto_process=auto_process
1559            )
1560
1561        elif spectrum_mode == "centroid":
1562            centroid_scan = self.iRawDataPlus.GetCentroidStream(scan_number, False)
1563            if centroid_scan.Masses is not None:
1564                mz = list(centroid_scan.Masses)
1565                abun = list(centroid_scan.Intensities)
1566                rp = list(centroid_scan.Resolutions)
1567                magnitude = list(centroid_scan.Intensities)
1568                noise = list(centroid_scan.Noises)
1569                baselines = list(centroid_scan.Baselines)
1570                array_noise_std = (np.array(noise) - np.array(baselines)) / 3
1571                l_signal_to_noise = np.array(magnitude) / array_noise_std
1572                data_dict = {
1573                    Labels.mz: mz,
1574                    Labels.abundance: abun,
1575                    Labels.rp: rp,
1576                    Labels.s2n: list(l_signal_to_noise),
1577                }
1578            else:  # For CID MS2, the centroid data are stored in the profile data location, they do not have any associated rp or baseline data, but they should be treated as centroid data
1579                scanStatistics = self.iRawDataPlus.GetScanStatsForScanNumber(
1580                    scan_number
1581                )
1582                profileStream = self.iRawDataPlus.GetSegmentedScanFromScanNumber(
1583                    scan_number, scanStatistics
1584                )
1585                abun = list(profileStream.Intensities)
1586                mz = list(profileStream.Positions)
1587                data_dict = {
1588                    Labels.mz: mz,
1589                    Labels.abundance: abun,
1590                    Labels.rp: [np.nan] * len(mz),
1591                    Labels.s2n: [np.nan] * len(mz),
1592                }
1593            d_params = self.set_metadata(
1594                firstScanNumber=scan_number,
1595                lastScanNumber=scan_number,
1596                scans_list=False,
1597                label=Labels.thermo_centroid,
1598            )
1599            mass_spectrum_obj = MassSpecCentroid(
1600                data_dict, d_params, auto_process=auto_process
1601            )
1602
1603        return mass_spectrum_obj
1604
1605    def get_mass_spectra_obj(self):
1606        """Instatiate a MassSpectraBase object from the binary data file file.
1607
1608        Returns
1609        -------
1610        MassSpectraBase
1611            The MassSpectra object containing the parsed mass spectra.  The object is instatiated with the mzML file, analyzer, instrument, sample name, and scan dataframe.
1612        """
1613        _, scan_df = self.run(spectra="none")
1614        mass_spectra_obj = MassSpectraBase(
1615            self.file_location,
1616            self.analyzer,
1617            self.instrument_label,
1618            self.sample_name,
1619            self,
1620        )
1621        scan_df = scan_df.set_index("scan", drop=False)
1622        mass_spectra_obj.scan_df = scan_df
1623
1624        return mass_spectra_obj
1625
1626    def get_lcms_obj(self, spectra="all"):
1627        """Instatiates a LCMSBase object from the mzML file.
1628
1629        Parameters
1630        ----------
1631        spectra : str, optional
1632            Which mass spectra data to include in the output. Default is "all".  Other options: "none", "ms1", "ms2".
1633
1634        Returns
1635        -------
1636        LCMSBase
1637            LCMS object containing mass spectra data. The object is instatiated with the file location, analyzer, instrument, sample name, scan info, mz dataframe (as specifified), polarity, as well as the attributes holding the scans, retention times, and tics.
1638        """
1639        _, scan_df = self.run(spectra="none")  # first run it to just get scan info
1640        res, scan_df = self.run(
1641            scan_df=scan_df, spectra=spectra
1642        )  # second run to parse data
1643        lcms_obj = LCMSBase(
1644            self.file_location,
1645            self.analyzer,
1646            self.instrument_label,
1647            self.sample_name,
1648            self,
1649        )
1650        if spectra != "none":
1651            for key in res:
1652                key_int = int(key.replace("ms", ""))
1653                res[key] = res[key][res[key].intensity > 0]
1654                res[key] = (
1655                    res[key].sort_values(by=["scan", "mz"]).reset_index(drop=True)
1656                )
1657                lcms_obj._ms_unprocessed[key_int] = res[key]
1658        lcms_obj.scan_df = scan_df.set_index("scan", drop=False)
1659        # Check if polarity is mixed
1660        if len(set(scan_df.polarity)) > 1:
1661            raise ValueError("Mixed polarities detected in scan data")
1662        lcms_obj.polarity = scan_df.polarity[0]
1663        lcms_obj._scans_number_list = list(scan_df.scan)
1664        lcms_obj._retention_time_list = list(scan_df.scan_time)
1665        lcms_obj._tic_list = list(scan_df.tic)
1666
1667        return lcms_obj
1668
1669    def get_icr_transient_times(self):
1670        """Return a list for transient time targets for all scans, or selected scans range
1671
1672        Notes
1673        --------
1674        Resolving Power and Transient time targets based on 7T FT-ICR MS system
1675        """
1676
1677        res_trans_time = {
1678            "50": 0.384,
1679            "100000": 0.768,
1680            "200000": 1.536,
1681            "400000": 3.072,
1682            "750000": 6.144,
1683            "1000000": 12.288,
1684        }
1685
1686        firstScanNumber = self.start_scan
1687
1688        lastScanNumber = self.end_scan
1689
1690        transient_time_list = []
1691
1692        for scan in range(firstScanNumber, lastScanNumber):
1693            scan_header = self.get_scan_header(scan)
1694
1695            rp_target = scan_header["FT Resolution:"]
1696
1697            transient_time = res_trans_time.get(rp_target)
1698
1699            transient_time_list.append(transient_time)
1700
1701            # print(transient_time, rp_target)
1702
1703        return transient_time_list
spec = ModuleSpec(name='corems', loader=<_frozen_importlib_external.SourceFileLoader object>, origin='/Users/heal742/LOCAL/corems_dev/corems/corems/__init__.py', submodule_search_locations=['/Users/heal742/LOCAL/corems_dev/corems/corems'])
class ThermoBaseClass:
  60class ThermoBaseClass:
  61    """Class for parsing Thermo Raw files and extracting information from them.
  62
  63    Parameters:
  64    -----------
  65    file_location : str or pathlib.Path or s3path.S3Path
  66        Thermo Raw file path or S3 path.
  67
  68    Attributes:
  69    -----------
  70    file_path : str or pathlib.Path or s3path.S3Path
  71        The file path of the Thermo Raw file.
  72    parameters : LCMSParameters
  73        The LCMS parameters for the Thermo Raw file.
  74    chromatogram_settings : LiquidChromatographSetting
  75        The chromatogram settings for the Thermo Raw file.
  76    scans : list or tuple
  77        The selected scans for the Thermo Raw file.
  78    start_scan : int
  79        The starting scan number for the Thermo Raw file.
  80    end_scan : int
  81        The ending scan number for the Thermo Raw file.
  82
  83    Methods:
  84    --------
  85    * set_msordertype(scanFilter, mstype: str = 'ms1') -> scanFilter
  86        Convert the user-passed MS Type string to a Thermo MSOrderType object.
  87    * get_instrument_info() -> dict
  88        Get the instrument information from the Thermo Raw file.
  89    * get_creation_time() -> datetime.datetime
  90        Extract the creation date stamp from the .RAW file and return it as a formatted datetime object.
  91    * remove_temp_file()
  92        Remove the temporary file if the path is from S3Path.
  93    * get_polarity_mode(scan_number: int) -> int
  94        Get the polarity mode for the given scan number.
  95    * get_filter_for_scan_num(scan_number: int) -> List[str]
  96        Get the filter for the given scan number.
  97    * check_full_scan(scan_number: int) -> bool
  98        Check if the given scan number is a full scan.
  99    * get_all_filters() -> Tuple[Dict[int, str], List[str]]
 100        Get all scan filters for the Thermo Raw file.
 101    * get_scan_header(scan: int) -> Dict[str, Any]
 102        Get the full dictionary of scan header metadata for the given scan number.
 103    * get_rt_time_from_trace(trace) -> Tuple[List[float], List[float], List[int]]
 104        Get the retention time, intensity, and scan number from the given trace.
 105    * get_eics(target_mzs: List[float], tic_data: Dict[str, Any], ms_type: str = 'MS !d',
 106             peak_detection: bool = True, smooth: bool = True, plot: bool = False,
 107             ax: Optional[matplotlib.axes.Axes] = None, legend: bool = False) -> Tuple[Dict[float, EIC_Data], matplotlib.axes.Axes]
 108        Get the extracted ion chromatograms (EICs) for the target m/z values.
 109
 110    """
 111
 112    def __init__(self, file_location):
 113        """file_location: srt pathlib.Path or s3path.S3Path
 114        Thermo Raw file path
 115        """
 116        # Thread.__init__(self)
 117        if isinstance(file_location, str):
 118            file_path = Path(file_location)
 119
 120        elif isinstance(file_location, S3Path):
 121            temp_dir = Path("tmp/")
 122            temp_dir.mkdir(exist_ok=True)
 123
 124            file_path = temp_dir / file_location.name
 125            with open(file_path, "wb") as fh:
 126                fh.write(file_location.read_bytes())
 127
 128        else:
 129            file_path = file_location
 130
 131        self.iRawDataPlus = RawFileReaderAdapter.FileFactory(str(file_path))
 132
 133        if not self.iRawDataPlus.IsOpen:
 134            raise FileNotFoundError(
 135                "Unable to access the RAW file using the RawFileReader class!"
 136            )
 137
 138        # Check for any errors in the RAW file
 139        if self.iRawDataPlus.IsError:
 140            raise IOError(
 141                "Error opening ({}) - {}".format(self.iRawDataPlus.FileError, file_path)
 142            )
 143
 144        self.res = self.iRawDataPlus.SelectInstrument(Device.MS, 1)
 145
 146        self.file_path = file_location
 147        self.iFileHeader = FileHeaderReaderFactory.ReadFile(str(file_path))
 148
 149        # removing tmp file
 150
 151        self._init_settings()
 152
 153    def _init_settings(self):
 154        """
 155        Initialize the LCMSParameters object.
 156        """
 157        self._parameters = LCMSParameters()
 158
 159    @property
 160    def parameters(self) -> LCMSParameters:
 161        """
 162        Get or set the LCMSParameters object.
 163        """
 164        return self._parameters
 165
 166    @parameters.setter
 167    def parameters(self, instance_LCMSParameters: LCMSParameters):
 168        self._parameters = instance_LCMSParameters
 169
 170    @property
 171    def chromatogram_settings(self) -> LiquidChromatographSetting:
 172        """
 173        Get or set the LiquidChromatographSetting object.
 174        """
 175        return self.parameters.lc_ms
 176
 177    @chromatogram_settings.setter
 178    def chromatogram_settings(
 179        self, instance_LiquidChromatographSetting: LiquidChromatographSetting
 180    ):
 181        self.parameters.lc_ms = instance_LiquidChromatographSetting
 182
 183    @property
 184    def scans(self) -> list | tuple:
 185        """scans : list or tuple
 186        If list uses Thermo AverageScansInScanRange for selected scans, ortherwise uses Thermo AverageScans for a scan range
 187        """
 188        return self.chromatogram_settings.scans
 189
 190    @property
 191    def start_scan(self) -> int:
 192        """
 193        Get the starting scan number for the Thermo Raw file.
 194        """
 195        if self.scans[0] == -1:
 196            return self.iRawDataPlus.RunHeaderEx.FirstSpectrum
 197        else:
 198            return self.scans[0]
 199
 200    @property
 201    def end_scan(self) -> int:
 202        """
 203        Get the ending scan number for the Thermo Raw file.
 204        """
 205        if self.scans[-1] == -1:
 206            return self.iRawDataPlus.RunHeaderEx.LastSpectrum
 207        else:
 208            return self.scans[-1]
 209
 210    def set_msordertype(self, scanFilter, mstype: str = "ms1"):
 211        """
 212        Function to convert user passed string MS Type to Thermo MSOrderType object
 213        Limited to MS1 through MS10.
 214
 215        Parameters:
 216        -----------
 217        scanFilter : Thermo.ScanFilter
 218            The scan filter object.
 219        mstype : str, optional
 220            The MS Type string, by default 'ms1'
 221
 222        """
 223        mstype = mstype.upper()
 224        # Check that a valid mstype is passed
 225        if (int(mstype.split("MS")[1]) > 10) or (int(mstype.split("MS")[1]) < 1):
 226            warn("MS Type not valid, must be between MS1 and MS10")
 227
 228        msordertypedict = {
 229            "MS1": MSOrderType.Ms,
 230            "MS2": MSOrderType.Ms2,
 231            "MS3": MSOrderType.Ms3,
 232            "MS4": MSOrderType.Ms4,
 233            "MS5": MSOrderType.Ms5,
 234            "MS6": MSOrderType.Ms6,
 235            "MS7": MSOrderType.Ms7,
 236            "MS8": MSOrderType.Ms8,
 237            "MS9": MSOrderType.Ms9,
 238            "MS10": MSOrderType.Ms10,
 239        }
 240        scanFilter.MSOrder = msordertypedict[mstype]
 241        return scanFilter
 242
 243    def get_instrument_info(self) -> dict:
 244        """
 245        Get the instrument information from the Thermo Raw file.
 246
 247        Returns:
 248        --------
 249        dict
 250            A dictionary with the keys 'model', and 'serial_number'.
 251        """
 252        instrumentData = self.iRawDataPlus.GetInstrumentData()
 253        return {
 254            "model": instrumentData.Model,
 255            "serial_number": instrumentData.SerialNumber
 256        }
 257    
 258    def get_creation_time(self) -> datetime.datetime:
 259        """
 260        Extract the creation date stamp from the .RAW file
 261        Return formatted creation date stamp.
 262
 263        """
 264        credate = self.iRawDataPlus.CreationDate.get_Ticks()
 265        credate = datetime.datetime(1, 1, 1) + datetime.timedelta(
 266            microseconds=credate / 10
 267        )
 268        return credate
 269
 270    def remove_temp_file(self) -> None:
 271        """if the path is from S3Path data cannot be serialized to io.ByteStream and
 272        a temporary copy is stored at the temp dir
 273        use this function only at the end of your execution scrip
 274        some LCMS class methods depend on this file
 275        """
 276
 277        self.file_path.unlink()
 278
 279    def close_file(self) -> None:
 280        """
 281        Close the Thermo Raw file.
 282        """
 283        self.iRawDataPlus.Dispose()
 284
 285    def get_polarity_mode(self, scan_number: int) -> int:
 286        """
 287        Get the polarity mode for the given scan number.
 288
 289        Parameters:
 290        -----------
 291        scan_number : int
 292            The scan number.
 293
 294        Raises:
 295        -------
 296        Exception
 297            If the polarity mode is unknown.
 298
 299        """
 300        polarity_symbol = self.get_filter_for_scan_num(scan_number)[1]
 301
 302        if polarity_symbol == "+":
 303            return 1
 304            # return 'POSITIVE_ION_MODE'
 305
 306        elif polarity_symbol == "-":
 307            return -1
 308
 309        else:
 310            raise Exception("Polarity Mode Unknown, please set it manually")
 311
 312    def get_filter_for_scan_num(self, scan_number: int) -> List[str]:
 313        """
 314        Returns the closest matching run time that corresponds to scan_number for the current
 315        controller. This function is only supported for MS device controllers.
 316        e.g.  ['FTMS', '-', 'p', 'NSI', 'Full', 'ms', '[200.00-1000.00]']
 317
 318        Parameters:
 319        -----------
 320        scan_number : int
 321            The scan number.
 322
 323        """
 324        scan_label = self.iRawDataPlus.GetScanEventStringForScanNumber(scan_number)
 325
 326        return str(scan_label).split()
 327
 328    def get_ms_level_for_scan_num(self, scan_number: int) -> str:
 329        """
 330        Get the MS order for the given scan number.
 331
 332        Parameters:
 333        -----------
 334        scan_number : int
 335            The scan number
 336
 337        Returns:
 338        --------
 339        int
 340            The MS order type (1 for MS, 2 for MS2, etc.)
 341        """
 342        scan_filter = self.iRawDataPlus.GetFilterForScanNumber(scan_number)
 343
 344        msordertype = {
 345            MSOrderType.Ms: 1,
 346            MSOrderType.Ms2: 2,
 347            MSOrderType.Ms3: 3,
 348            MSOrderType.Ms4: 4,
 349            MSOrderType.Ms5: 5,
 350            MSOrderType.Ms6: 6,
 351            MSOrderType.Ms7: 7,
 352            MSOrderType.Ms8: 8,
 353            MSOrderType.Ms9: 9,
 354            MSOrderType.Ms10: 10,
 355        }
 356
 357        if scan_filter.MSOrder in msordertype:
 358            return msordertype[scan_filter.MSOrder]
 359        else:
 360            raise Exception("MS Order Type not found")
 361    
 362    def check_full_scan(self, scan_number: int) -> bool:
 363        # scan_filter.ScanMode 0 = FULL
 364        scan_filter = self.iRawDataPlus.GetFilterForScanNumber(scan_number)
 365
 366        return scan_filter.ScanMode == MSOrderType.Ms
 367
 368    def get_all_filters(self) -> Tuple[Dict[int, str], List[str]]:
 369        """
 370        Get all scan filters.
 371        This function is only supported for MS device controllers.
 372        e.g.  ['FTMS', '-', 'p', 'NSI', 'Full', 'ms', '[200.00-1000.00]']
 373
 374        """
 375
 376        scanrange = range(self.start_scan, self.end_scan + 1)
 377        scanfiltersdic = {}
 378        scanfilterslist = []
 379        for scan_number in scanrange:
 380            scan_label = self.iRawDataPlus.GetScanEventStringForScanNumber(scan_number)
 381            scanfiltersdic[scan_number] = scan_label
 382            scanfilterslist.append(scan_label)
 383        scanfilterset = list(set(scanfilterslist))
 384        return scanfiltersdic, scanfilterset
 385
 386    def get_scan_header(self, scan: int) -> Dict[str, Any]:
 387        """
 388        Get full dictionary of scan header meta data, i.e. AGC status, ion injection time, etc.
 389
 390        Parameters:
 391        -----------
 392        scan : int
 393            The scan number.
 394
 395        """
 396        header = self.iRawDataPlus.GetTrailerExtraInformation(scan)
 397
 398        header_dic = {}
 399        for i in range(header.Length):
 400            header_dic.update({header.Labels[i]: header.Values[i]})
 401        return header_dic
 402
 403    @staticmethod
 404    def get_rt_time_from_trace(trace) -> Tuple[List[float], List[float], List[int]]:
 405        """trace: ThermoFisher.CommonCore.Data.Business.ChromatogramSignal"""
 406        return list(trace.Times), list(trace.Intensities), list(trace.Scans)
 407
 408    def get_eics(
 409        self,
 410        target_mzs: List[float],
 411        tic_data: Dict[str, Any],
 412        ms_type="MS !d",
 413        peak_detection=False,
 414        smooth=False,
 415        plot=False,
 416        ax: Optional[axes.Axes] = None,
 417        legend=False,
 418    ) -> Tuple[Dict[float, EIC_Data], axes.Axes]:
 419        """ms_type: str ('MS', MS2')
 420        start_scan: int default -1 will select the lowest available
 421        end_scan: int default -1 will select the highest available
 422
 423        returns:
 424
 425            chroma: dict{target_mz: EIC_Data(
 426                                        Scans: [int]
 427                                            original thermo scan numbers
 428                                        Time: [floats]
 429                                            list of retention times
 430                                        TIC: [floats]
 431                                            total ion chromatogram
 432                                        Apexes: [int]
 433                                            original thermo apex scan number after peak picking
 434                                        )
 435
 436        """
 437        # If peak_detection or smooth is True, raise exception
 438        if peak_detection or smooth:
 439            raise Exception("Peak detection and smoothing are no longer implemented in this function")
 440
 441        options = MassOptions()
 442        options.ToleranceUnits = ToleranceUnits.ppm
 443        options.Tolerance = self.chromatogram_settings.eic_tolerance_ppm
 444
 445        all_chroma_settings = []
 446
 447        for target_mz in target_mzs:
 448            settings = ChromatogramTraceSettings(TraceType.MassRange)
 449            settings.Filter = ms_type
 450            settings.MassRanges = [Range(target_mz, target_mz)]
 451
 452            chroma_settings = IChromatogramSettings(settings)
 453
 454            all_chroma_settings.append(chroma_settings)
 455
 456        # chroma_settings2 = IChromatogramSettings(settings)
 457        # print(chroma_settings.FragmentMass)
 458        # print(chroma_settings.FragmentMass)
 459        # print(chroma_settings)
 460        # print(chroma_settings)
 461
 462        data = self.iRawDataPlus.GetChromatogramData(
 463            all_chroma_settings, self.start_scan, self.end_scan, options
 464        )
 465
 466        traces = ChromatogramSignal.FromChromatogramData(data)
 467
 468        chroma = {}
 469
 470        if plot:
 471            from matplotlib.transforms import Bbox
 472            import matplotlib.pyplot as plt
 473
 474            if not ax:
 475                # ax = plt.gca()
 476                # ax.clear()
 477                fig, ax = plt.subplots()
 478
 479            else:
 480                fig = plt.gcf()
 481
 482            # plt.show()
 483
 484        for i, trace in enumerate(traces):
 485            if trace.Length > 0:
 486                rt, eic, scans = self.get_rt_time_from_trace(trace)
 487                if smooth:
 488                    eic = self.smooth_tic(eic)
 489
 490                chroma[target_mzs[i]] = EIC_Data(scans=scans, time=rt, eic=eic)
 491                if plot:
 492                    ax.plot(rt, eic, label="{:.5f}".format(target_mzs[i]))
 493
 494        if peak_detection:
 495            # max_eic = self.get_max_eic(chroma)
 496            max_signal = max(tic_data.tic)
 497
 498            for eic_data in chroma.values():
 499                eic = eic_data.eic
 500                time = eic_data.time
 501
 502                if len(eic) != len(tic_data.tic):
 503                    warn(
 504                        "The software assumes same lenth of TIC and EIC, this does not seems to be the case and the results mass spectrum selected by the scan number might not be correct"
 505                    )
 506
 507                if eic.max() > 0:
 508                    centroid_eics = self.eic_centroid_detector(time, eic, max_signal)
 509                    eic_data.apexes = [i for i in centroid_eics]
 510
 511                    if plot:
 512                        for peak_indexes in eic_data.apexes:
 513                            apex_index = peak_indexes[1]
 514                            ax.plot(
 515                                time[apex_index],
 516                                eic[apex_index],
 517                                marker="x",
 518                                linewidth=0,
 519                            )
 520
 521        if plot:
 522            ax.set_xlabel("Time (min)")
 523            ax.set_ylabel("a.u.")
 524            ax.set_title(ms_type + " EIC")
 525            ax.tick_params(axis="both", which="major", labelsize=12)
 526            ax.axes.spines["top"].set_visible(False)
 527            ax.axes.spines["right"].set_visible(False)
 528
 529            if legend:
 530                legend = ax.legend(loc="upper left", bbox_to_anchor=(1.02, 0, 0.07, 1))
 531                fig.subplots_adjust(right=0.76)
 532                # ax.set_prop_cycle(color=plt.cm.gist_rainbow(np.linspace(0, 1, len(traces))))
 533
 534                d = {"down": 30, "up": -30}
 535
 536                def func(evt):
 537                    if legend.contains(evt):
 538                        bbox = legend.get_bbox_to_anchor()
 539                        bbox = Bbox.from_bounds(
 540                            bbox.x0, bbox.y0 + d[evt.button], bbox.width, bbox.height
 541                        )
 542                        tr = legend.axes.transAxes.inverted()
 543                        legend.set_bbox_to_anchor(bbox.transformed(tr))
 544                        fig.canvas.draw_idle()
 545
 546                fig.canvas.mpl_connect("scroll_event", func)
 547            return chroma, ax
 548        else:
 549            return chroma, None
 550            rt = []
 551            tic = []
 552            scans = []
 553            for i in range(traces[0].Length):
 554                # print(trace[0].HasBasePeakData,trace[0].EndTime )
 555
 556                # print("  {} - {}, {}".format( i, trace[0].Times[i], trace[0].Intensities[i] ))
 557                rt.append(traces[0].Times[i])
 558                tic.append(traces[0].Intensities[i])
 559                scans.append(traces[0].Scans[i])
 560
 561            return traces
 562            # plot_chroma(rt, tic)
 563            # plt.show()
 564
 565    def get_tic(
 566        self,
 567        ms_type="MS !d",
 568        peak_detection=False,  # This wont work right now
 569        smooth=False,  # This wont work right now
 570        plot=False,
 571        ax=None,
 572        trace_type="TIC",
 573    ) -> Tuple[TIC_Data, axes.Axes]:
 574        """ms_type: str ('MS !d', 'MS2', None)
 575            if you use None you get all scans.
 576        peak_detection: bool
 577        smooth: bool
 578        plot: bool
 579        ax: matplotlib axis object
 580        trace_type: str ('TIC','BPC')
 581
 582        returns:
 583            chroma: dict
 584            {
 585            Scan: [int]
 586                original thermo scan numberMS
 587            Time: [floats]
 588                list of retention times
 589            TIC: [floats]
 590                total ion chromatogram
 591            Apexes: [int]
 592                original thermo apex scan number after peak picking
 593            }
 594        """
 595        if trace_type == "TIC":
 596            settings = ChromatogramTraceSettings(TraceType.TIC)
 597        elif trace_type == "BPC":
 598            settings = ChromatogramTraceSettings(TraceType.BasePeak)
 599        else:
 600            raise ValueError(f"{trace_type} undefined")
 601        if ms_type == "all":
 602            settings.Filter = None
 603        else:
 604            settings.Filter = ms_type
 605
 606        chroma_settings = IChromatogramSettings(settings)
 607
 608        data = self.iRawDataPlus.GetChromatogramData(
 609            [chroma_settings], self.start_scan, self.end_scan
 610        )
 611
 612        trace = ChromatogramSignal.FromChromatogramData(data)
 613
 614        data = TIC_Data(time=[], scans=[], tic=[], bpc=[], apexes=[])
 615
 616        if trace[0].Length > 0:
 617            for i in range(trace[0].Length):
 618                # print(trace[0].HasBasePeakData,trace[0].EndTime )
 619
 620                # print("  {} - {}, {}".format( i, trace[0].Times[i], trace[0].Intensities[i] ))
 621                data.time.append(trace[0].Times[i])
 622                data.tic.append(trace[0].Intensities[i])
 623                data.scans.append(trace[0].Scans[i])
 624
 625                # print(trace[0].Scans[i])
 626            if smooth:
 627                data.tic = self.smooth_tic(data.tic)
 628
 629            else:
 630                data.tic = np.array(data.tic)
 631
 632            if peak_detection:
 633                centroid_peak_indexes = [
 634                    i for i in self.centroid_detector(data.time, data.tic)
 635                ]
 636
 637                data.apexes = centroid_peak_indexes
 638
 639            if plot:
 640                if not ax:
 641                    import matplotlib.pyplot as plt
 642
 643                    ax = plt.gca()
 644                    # fig, ax = plt.subplots(figsize=(6, 3))
 645
 646                ax.plot(data.time, data.tic, label=trace_type)
 647                ax.set_xlabel("Time (min)")
 648                ax.set_ylabel("a.u.")
 649                if peak_detection:
 650                    for peak_indexes in data.apexes:
 651                        apex_index = peak_indexes[1]
 652                        ax.plot(
 653                            data.time[apex_index],
 654                            data.tic[apex_index],
 655                            marker="x",
 656                            linewidth=0,
 657                        )
 658
 659                # plt.show()
 660                if trace_type == "BPC":
 661                    data.bpc = data.tic
 662                    data.tic = []
 663                return data, ax
 664            if trace_type == "BPC":
 665                data.bpc = data.tic
 666                data.tic = []
 667            return data, None
 668
 669        else:
 670            return None, None
 671
 672    def get_average_mass_spectrum(
 673        self,
 674        spectrum_mode: str = "profile",
 675        auto_process: bool = True,
 676        ppm_tolerance: float = 5.0,
 677        ms_type: str = "MS1",
 678    ) -> MassSpecProfile | MassSpecCentroid:
 679        """
 680        Averages mass spectra over a scan range using Thermo's AverageScansInScanRange method
 681        or a scan list using Thermo's AverageScans method
 682        spectrum_mode: str
 683            centroid or profile mass spectrum
 684        auto_process: bool
 685            If true performs peak picking, and noise threshold calculation after creation of mass spectrum object
 686        ms_type: str
 687            String of form 'ms1' or 'ms2' or 'MS3' etc. Valid up to MS10.
 688            Internal function converts to Thermo MSOrderType class.
 689
 690        """
 691
 692        def get_profile_mass_spec(averageScan, d_params: dict, auto_process: bool):
 693            mz_list = list(averageScan.SegmentedScan.Positions)
 694            abund_list = list(averageScan.SegmentedScan.Intensities)
 695
 696            data_dict = {
 697                Labels.mz: mz_list,
 698                Labels.abundance: abund_list,
 699            }
 700
 701            return MassSpecProfile(data_dict, d_params, auto_process=auto_process)
 702
 703        def get_centroid_mass_spec(averageScan, d_params: dict):
 704            noise = list(averageScan.centroidScan.Noises)
 705
 706            baselines = list(averageScan.centroidScan.Baselines)
 707
 708            rp = list(averageScan.centroidScan.Resolutions)
 709
 710            magnitude = list(averageScan.centroidScan.Intensities)
 711
 712            mz = list(averageScan.centroidScan.Masses)
 713
 714            array_noise_std = (np.array(noise) - np.array(baselines)) / 3
 715            l_signal_to_noise = np.array(magnitude) / array_noise_std
 716
 717            d_params["baseline_noise"] = np.average(array_noise_std)
 718
 719            d_params["baseline_noise_std"] = np.std(array_noise_std)
 720
 721            data_dict = {
 722                Labels.mz: mz,
 723                Labels.abundance: magnitude,
 724                Labels.rp: rp,
 725                Labels.s2n: list(l_signal_to_noise),
 726            }
 727
 728            mass_spec = MassSpecCentroid(data_dict, d_params, auto_process=False)
 729
 730            return mass_spec
 731
 732        d_params = self.set_metadata(
 733            firstScanNumber=self.start_scan, lastScanNumber=self.end_scan
 734        )
 735
 736        # Create the mass options object that will be used when averaging the scans
 737        options = MassOptions()
 738        options.ToleranceUnits = ToleranceUnits.ppm
 739        options.Tolerance = ppm_tolerance
 740
 741        # Get the scan filter for the first scan.  This scan filter will be used to located
 742        # scans within the given scan range of the same type
 743        scanFilter = self.iRawDataPlus.GetFilterForScanNumber(self.start_scan)
 744
 745        # force it to only look for the MSType
 746        scanFilter = self.set_msordertype(scanFilter, ms_type)
 747
 748        if isinstance(self.scans, tuple):
 749            averageScan = Extensions.AverageScansInScanRange(
 750                self.iRawDataPlus, self.start_scan, self.end_scan, scanFilter, options
 751            )
 752
 753            if averageScan:
 754                if spectrum_mode == "profile":
 755                    mass_spec = get_profile_mass_spec(
 756                        averageScan, d_params, auto_process
 757                    )
 758
 759                    return mass_spec
 760
 761                elif spectrum_mode == "centroid":
 762                    if averageScan.HasCentroidStream:
 763                        mass_spec = get_centroid_mass_spec(averageScan, d_params)
 764
 765                        return mass_spec
 766
 767                    else:
 768                        raise ValueError(
 769                            "No Centroind data available for the selected scans"
 770                        )
 771                else:
 772                    raise ValueError("spectrum_mode must be 'profile' or centroid")
 773            else:
 774                raise ValueError("No data found for the selected scans")
 775
 776        elif isinstance(self.scans, list):
 777            d_params = self.set_metadata(scans_list=self.scans)
 778
 779            scans = List[int]()
 780            for scan in self.scans:
 781                scans.Add(scan)
 782
 783            averageScan = Extensions.AverageScans(self.iRawDataPlus, scans, options)
 784
 785            if averageScan:
 786                if spectrum_mode == "profile":
 787                    mass_spec = get_profile_mass_spec(
 788                        averageScan, d_params, auto_process
 789                    )
 790
 791                    return mass_spec
 792
 793                elif spectrum_mode == "centroid":
 794                    if averageScan.HasCentroidStream:
 795                        mass_spec = get_centroid_mass_spec(averageScan, d_params)
 796
 797                        return mass_spec
 798
 799                    else:
 800                        raise ValueError(
 801                            "No Centroind data available for the selected scans"
 802                        )
 803
 804                else:
 805                    raise ValueError("spectrum_mode must be 'profile' or centroid")
 806
 807            else:
 808                raise ValueError("No data found for the selected scans")
 809
 810        else:
 811            raise ValueError("scans must be a list intergers or a tuple if integers")
 812
 813    def set_metadata(
 814        self,
 815        firstScanNumber=0,
 816        lastScanNumber=0,
 817        scans_list=False,
 818        label=Labels.thermo_profile,
 819    ):
 820        """
 821        Collect metadata to be ingested in the mass spectrum object
 822
 823        scans_list: list[int] or false
 824        lastScanNumber: int
 825        firstScanNumber: int
 826        """
 827
 828        d_params = default_parameters(self.file_path)
 829
 830        # assumes scans is full scan or reduced profile scan
 831
 832        d_params["label"] = label
 833
 834        if scans_list:
 835            d_params["scan_number"] = scans_list
 836
 837            d_params["polarity"] = self.get_polarity_mode(scans_list[0])
 838
 839        else:
 840            d_params["scan_number"] = "{}-{}".format(firstScanNumber, lastScanNumber)
 841
 842            d_params["polarity"] = self.get_polarity_mode(firstScanNumber)
 843
 844        d_params["analyzer"] = self.iRawDataPlus.GetInstrumentData().Model
 845
 846        d_params["acquisition_time"] = self.get_creation_time()
 847
 848        d_params["instrument_label"] = self.iRawDataPlus.GetInstrumentData().Name
 849
 850        return d_params
 851
 852    def get_instrument_methods(self, parse_strings: bool = True):
 853        """
 854        This function will extract the instrument methods embedded in the raw file
 855
 856        First it will check if there are any instrument methods, if not returning None
 857        Then it will get the total number of instrument methods.
 858        For each method, it will extract the plaintext string of the method and attempt to parse it into a dictionary
 859        If this fails, it will return just the string object.
 860
 861        This has been tested on data from an Orbitrap ID-X with embedded MS and LC methods, but other instrument types may fail.
 862
 863        Parameters:
 864        -----------
 865        parse_strings: bool
 866            If True, will attempt to parse the instrument methods into a dictionary. If False, will return the raw string.
 867
 868        Returns:
 869        --------
 870        List[Dict[str, Any]] or List
 871            A list of dictionaries containing the instrument methods, or a list of strings if parsing fails.
 872        """
 873
 874        if not self.iRawDataPlus.HasInstrumentMethod:
 875            raise ValueError(
 876                "Raw Data file does not have any instrument methods attached"
 877            )
 878            return None
 879        else:
 880
 881            def parse_instrument_method(data):
 882                lines = data.split("\r\n")
 883                method = {}
 884                current_section = None
 885                sub_section = None
 886
 887                for line in lines:
 888                    if not line.strip():  # Skip empty lines
 889                        continue
 890                    if (
 891                        line.startswith("----")
 892                        or line.endswith("Settings")
 893                        or line.endswith("Summary")
 894                        or line.startswith("Experiment")
 895                        or line.startswith("Scan Event")
 896                    ):
 897                        current_section = line.replace("-", "").strip()
 898                        method[current_section] = {}
 899                        sub_section = None
 900                    elif line.startswith("\t"):
 901                        if "\t\t" in line:
 902                            indent_level = line.count("\t")
 903                            key_value = line.strip()
 904
 905                            if indent_level == 2:
 906                                if sub_section:
 907                                    key, value = (
 908                                        key_value.split("=", 1)
 909                                        if "=" in key_value
 910                                        else (key_value, None)
 911                                    )
 912                                    method[current_section][sub_section][
 913                                        key.strip()
 914                                    ] = value.strip() if value else None
 915                            elif indent_level == 3:
 916                                scan_type, key_value = (
 917                                    key_value.split(" ", 1)
 918                                    if " " in key_value
 919                                    else (key_value, None)
 920                                )
 921                                method.setdefault(current_section, {}).setdefault(
 922                                    sub_section, {}
 923                                ).setdefault(scan_type, {})
 924
 925                                if key_value:
 926                                    key, value = (
 927                                        key_value.split("=", 1)
 928                                        if "=" in key_value
 929                                        else (key_value, None)
 930                                    )
 931                                    method[current_section][sub_section][scan_type][
 932                                        key.strip()
 933                                    ] = value.strip() if value else None
 934                        else:
 935                            key_value = line.strip()
 936                            if "=" in key_value:
 937                                key, value = key_value.split("=", 1)
 938                                method.setdefault(current_section, {})[key.strip()] = (
 939                                    value.strip()
 940                                )
 941                            else:
 942                                sub_section = key_value
 943                    else:
 944                        if ":" in line:
 945                            key, value = line.split(":", 1)
 946                            method[current_section][key.strip()] = value.strip()
 947                        else:
 948                            method[current_section][line] = {}
 949
 950                return method
 951
 952            count_instrument_methods = self.iRawDataPlus.InstrumentMethodsCount
 953            # TODO make this code better...
 954            instrument_methods = []
 955            for i in range(count_instrument_methods):
 956                instrument_method_string = self.iRawDataPlus.GetInstrumentMethod(i)
 957                if parse_strings:
 958                    try:
 959                        instrument_method_dict = parse_instrument_method(
 960                            instrument_method_string
 961                        )
 962                    except:  # if it fails for any reason
 963                        instrument_method_dict = instrument_method_string
 964                else:
 965                    instrument_method_dict = instrument_method_string
 966                instrument_methods.append(instrument_method_dict)
 967            return instrument_methods
 968
 969    def get_tune_method(self):
 970        """
 971        This code will extract the tune method from the raw file
 972        It has been tested on data from a Thermo Orbitrap ID-X, Astral and Q-Exactive, but may fail on other instrument types.
 973        It attempts to parse out section headers and sub-sections, but may not work for all instrument types.
 974        It will also not return Labels (keys) where the value is blank
 975
 976        Returns:
 977        --------
 978        Dict[str, Any]
 979            A dictionary containing the tune method information
 980
 981        Raises:
 982        -------
 983        ValueError
 984            If no tune methods are found in the raw file
 985
 986        """
 987        tunemethodcount = self.iRawDataPlus.GetTuneDataCount()
 988        if tunemethodcount == 0:
 989            raise ValueError("No tune methods found in the raw data file")
 990            return None
 991        elif tunemethodcount > 1:
 992            warnings.warn(
 993                "Multiple tune methods found in the raw data file, returning the 1st"
 994            )
 995
 996        header = self.iRawDataPlus.GetTuneData(0)
 997
 998        header_dic = {}
 999        current_section = None
1000
1001        for i in range(header.Length):
1002            label = header.Labels[i]
1003            value = header.Values[i]
1004
1005            # Check for section headers
1006            if "===" in label or (
1007                (value == "" or value is None) and not label.endswith(":")
1008            ):
1009                # This is a section header
1010                section_name = (
1011                    label.replace("=", "").replace(":", "").strip()
1012                )  # Clean the label if it contains '='
1013                header_dic[section_name] = {}
1014                current_section = section_name
1015            else:
1016                if current_section:
1017                    header_dic[current_section][label] = value
1018                else:
1019                    header_dic[label] = value
1020        return header_dic
1021
1022    def get_status_log(self, retention_time: float = 0):
1023        """
1024        This code will extract the status logs from the raw file
1025        It has been tested on data from a Thermo Orbitrap ID-X, Astral and Q-Exactive, but may fail on other instrument types.
1026        It attempts to parse out section headers and sub-sections, but may not work for all instrument types.
1027        It will also not return Labels (keys) where the value is blank
1028
1029        Parameters:
1030        -----------
1031        retention_time: float
1032            The retention time in minutes to extract the status log data from.
1033            Will use the closest retention time found. Default 0.
1034
1035        Returns:
1036        --------
1037        Dict[str, Any]
1038            A dictionary containing the status log information
1039
1040        Raises:
1041        -------
1042        ValueError
1043            If no status logs are found in the raw file
1044
1045        """
1046        tunemethodcount = self.iRawDataPlus.GetStatusLogEntriesCount()
1047        if tunemethodcount == 0:
1048            raise ValueError("No status logs found in the raw data file")
1049            return None
1050
1051        header = self.iRawDataPlus.GetStatusLogForRetentionTime(retention_time)
1052
1053        header_dic = {}
1054        current_section = None
1055
1056        for i in range(header.Length):
1057            label = header.Labels[i]
1058            value = header.Values[i]
1059
1060            # Check for section headers
1061            if "===" in label or (
1062                (value == "" or value is None) and not label.endswith(":")
1063            ):
1064                # This is a section header
1065                section_name = (
1066                    label.replace("=", "").replace(":", "").strip()
1067                )  # Clean the label if it contains '='
1068                header_dic[section_name] = {}
1069                current_section = section_name
1070            else:
1071                if current_section:
1072                    header_dic[current_section][label] = value
1073                else:
1074                    header_dic[label] = value
1075        return header_dic
1076
1077    def get_error_logs(self):
1078        """
1079        This code will extract the error logs from the raw file
1080
1081        Returns:
1082        --------
1083        Dict[float, str]
1084            A dictionary containing the error log information with the retention time as the key
1085
1086        Raises:
1087        -------
1088        ValueError
1089            If no error logs are found in the raw file
1090        """
1091
1092        error_log_count = self.iRawDataPlus.RunHeaderEx.ErrorLogCount
1093        if error_log_count == 0:
1094            raise ValueError("No error logs found in the raw data file")
1095            return None
1096
1097        error_logs = {}
1098
1099        for i in range(error_log_count):
1100            error_log_item = self.iRawDataPlus.GetErrorLogItem(i)
1101            rt = error_log_item.RetentionTime
1102            message = error_log_item.Message
1103            # Use the index `i` as the unique ID key
1104            error_logs[i] = {"rt": rt, "message": message}
1105        return error_logs
1106
1107    def get_sample_information(self):
1108        """
1109        This code will extract the sample information from the raw file
1110
1111        Returns:
1112        --------
1113        Dict[str, Any]
1114            A dictionary containing the sample information
1115            Note that UserText field may not be handled properly and may need further processing
1116        """
1117        sminfo = self.iRawDataPlus.SampleInformation
1118        smdict = {}
1119        smdict["Comment"] = sminfo.Comment
1120        smdict["SampleId"] = sminfo.SampleId
1121        smdict["SampleName"] = sminfo.SampleName
1122        smdict["Vial"] = sminfo.Vial
1123        smdict["InjectionVolume"] = sminfo.InjectionVolume
1124        smdict["Barcode"] = sminfo.Barcode
1125        smdict["BarcodeStatus"] = str(sminfo.BarcodeStatus)
1126        smdict["CalibrationLevel"] = sminfo.CalibrationLevel
1127        smdict["DilutionFactor"] = sminfo.DilutionFactor
1128        smdict["InstrumentMethodFile"] = sminfo.InstrumentMethodFile
1129        smdict["RawFileName"] = sminfo.RawFileName
1130        smdict["CalibrationFile"] = sminfo.CalibrationFile
1131        smdict["IstdAmount"] = sminfo.IstdAmount
1132        smdict["RowNumber"] = sminfo.RowNumber
1133        smdict["Path"] = sminfo.Path
1134        smdict["ProcessingMethodFile"] = sminfo.ProcessingMethodFile
1135        smdict["SampleType"] = str(sminfo.SampleType)
1136        smdict["SampleWeight"] = sminfo.SampleWeight
1137        smdict["UserText"] = {
1138            "UserText": [x for x in sminfo.UserText]
1139        }  # [0] #This may not work - needs debugging with
1140        return smdict
1141
1142    def get_instrument_data(self):
1143        """
1144        This code will extract the instrument data from the raw file
1145
1146        Returns:
1147        --------
1148        Dict[str, Any]
1149            A dictionary containing the instrument data
1150        """
1151        instrument_data = self.iRawDataPlus.GetInstrumentData()
1152        id_dict = {}
1153        id_dict["Name"] = instrument_data.Name
1154        id_dict["Model"] = instrument_data.Model
1155        id_dict["SerialNumber"] = instrument_data.SerialNumber
1156        id_dict["SoftwareVersion"] = instrument_data.SoftwareVersion
1157        id_dict["HardwareVersion"] = instrument_data.HardwareVersion
1158        id_dict["ChannelLabels"] = {
1159            "ChannelLabels": [x for x in instrument_data.ChannelLabels]
1160        }
1161        id_dict["Flags"] = instrument_data.Flags
1162        id_dict["AxisLabelY"] = instrument_data.AxisLabelY
1163        id_dict["AxisLabelX"] = instrument_data.AxisLabelX
1164        return id_dict
1165
1166    def get_centroid_msms_data(self, scan):
1167        """
1168        .. deprecated:: 2.0
1169            This function will be removed in CoreMS 2.0. Please use `get_average_mass_spectrum()` instead for similar functionality.
1170        """
1171
1172        warnings.warn(
1173            "The `get_centroid_msms_data()` is deprecated as of CoreMS 2.0 and will be removed in a future version. "
1174            "Please use `get_average_mass_spectrum()` instead.",
1175            DeprecationWarning,
1176        )
1177
1178        d_params = self.set_metadata(scans_list=[scan], label=Labels.thermo_centroid)
1179
1180        centroidStream = self.iRawDataPlus.GetCentroidStream(scan, False)
1181
1182        noise = list(centroidStream.Noises)
1183
1184        baselines = list(centroidStream.Baselines)
1185
1186        rp = list(centroidStream.Resolutions)
1187
1188        magnitude = list(centroidStream.Intensities)
1189
1190        mz = list(centroidStream.Masses)
1191
1192        # charge = scans_labels[5]
1193        array_noise_std = (np.array(noise) - np.array(baselines)) / 3
1194        l_signal_to_noise = np.array(magnitude) / array_noise_std
1195
1196        d_params["baseline_noise"] = np.average(array_noise_std)
1197
1198        d_params["baseline_noise_std"] = np.std(array_noise_std)
1199
1200        data_dict = {
1201            Labels.mz: mz,
1202            Labels.abundance: magnitude,
1203            Labels.rp: rp,
1204            Labels.s2n: list(l_signal_to_noise),
1205        }
1206
1207        mass_spec = MassSpecCentroid(data_dict, d_params, auto_process=False)
1208        mass_spec.settings.noise_threshold_method = "relative_abundance"
1209        mass_spec.settings.noise_threshold_min_relative_abundance = 1
1210        mass_spec.process_mass_spec()
1211        return mass_spec
1212
1213    def get_average_mass_spectrum_by_scanlist(
1214        self,
1215        scans_list: List[int],
1216        auto_process: bool = True,
1217        ppm_tolerance: float = 5.0,
1218    ) -> MassSpecProfile:
1219        """
1220        Averages selected scans mass spectra using Thermo's AverageScans method
1221        scans_list: list[int]
1222        auto_process: bool
1223            If true performs peak picking, and noise threshold calculation after creation of mass spectrum object
1224        Returns:
1225            MassSpecProfile
1226
1227         .. deprecated:: 2.0
1228        This function will be removed in CoreMS 2.0. Please use `get_average_mass_spectrum()` instead for similar functionality.
1229        """
1230
1231        warnings.warn(
1232            "The `get_average_mass_spectrum_by_scanlist()` is deprecated as of CoreMS 2.0 and will be removed in a future version. "
1233            "Please use `get_average_mass_spectrum()` instead.",
1234            DeprecationWarning,
1235        )
1236
1237        d_params = self.set_metadata(scans_list=scans_list)
1238
1239        # assumes scans is full scan or reduced profile scan
1240
1241        scans = List[int]()
1242        for scan in scans_list:
1243            scans.Add(scan)
1244
1245        # Create the mass options object that will be used when averaging the scans
1246        options = MassOptions()
1247        options.ToleranceUnits = ToleranceUnits.ppm
1248        options.Tolerance = ppm_tolerance
1249
1250        # Get the scan filter for the first scan.  This scan filter will be used to located
1251        # scans within the given scan range of the same type
1252
1253        averageScan = Extensions.AverageScans(self.iRawDataPlus, scans, options)
1254
1255        len_data = averageScan.SegmentedScan.Positions.Length
1256
1257        mz_list = list(averageScan.SegmentedScan.Positions)
1258        abund_list = list(averageScan.SegmentedScan.Intensities)
1259
1260        data_dict = {
1261            Labels.mz: mz_list,
1262            Labels.abundance: abund_list,
1263        }
1264
1265        mass_spec = MassSpecProfile(data_dict, d_params, auto_process=auto_process)
1266
1267        return mass_spec

Class for parsing Thermo Raw files and extracting information from them.

Parameters:

file_location : str or pathlib.Path or s3path.S3Path Thermo Raw file path or S3 path.

Attributes:

file_path : str or pathlib.Path or s3path.S3Path The file path of the Thermo Raw file. parameters : LCMSParameters The LCMS parameters for the Thermo Raw file. chromatogram_settings : LiquidChromatographSetting The chromatogram settings for the Thermo Raw file. scans : list or tuple The selected scans for the Thermo Raw file. start_scan : int The starting scan number for the Thermo Raw file. end_scan : int The ending scan number for the Thermo Raw file.

Methods:

  • set_msordertype(scanFilter, mstype: str = 'ms1') -> scanFilter Convert the user-passed MS Type string to a Thermo MSOrderType object.
  • get_instrument_info() -> dict Get the instrument information from the Thermo Raw file.
  • get_creation_time() -> datetime.datetime Extract the creation date stamp from the .RAW file and return it as a formatted datetime object.
  • remove_temp_file() Remove the temporary file if the path is from S3Path.
  • get_polarity_mode(scan_number: int) -> int Get the polarity mode for the given scan number.
  • get_filter_for_scan_num(scan_number: int) -> List[str] Get the filter for the given scan number.
  • check_full_scan(scan_number: int) -> bool Check if the given scan number is a full scan.
  • get_all_filters() -> Tuple[Dict[int, str], List[str]] Get all scan filters for the Thermo Raw file.
  • get_scan_header(scan: int) -> Dict[str, Any] Get the full dictionary of scan header metadata for the given scan number.
  • get_rt_time_from_trace(trace) -> Tuple[List[float], List[float], List[int]] Get the retention time, intensity, and scan number from the given trace.
  • get_eics(target_mzs: List[float], tic_data: Dict[str, Any], ms_type: str = 'MS !d', peak_detection: bool = True, smooth: bool = True, plot: bool = False, ax: Optional[matplotlib.axes.Axes] = None, legend: bool = False) -> Tuple[Dict[float, EIC_Data], matplotlib.axes.Axes] Get the extracted ion chromatograms (EICs) for the target m/z values.
ThermoBaseClass(file_location)
112    def __init__(self, file_location):
113        """file_location: srt pathlib.Path or s3path.S3Path
114        Thermo Raw file path
115        """
116        # Thread.__init__(self)
117        if isinstance(file_location, str):
118            file_path = Path(file_location)
119
120        elif isinstance(file_location, S3Path):
121            temp_dir = Path("tmp/")
122            temp_dir.mkdir(exist_ok=True)
123
124            file_path = temp_dir / file_location.name
125            with open(file_path, "wb") as fh:
126                fh.write(file_location.read_bytes())
127
128        else:
129            file_path = file_location
130
131        self.iRawDataPlus = RawFileReaderAdapter.FileFactory(str(file_path))
132
133        if not self.iRawDataPlus.IsOpen:
134            raise FileNotFoundError(
135                "Unable to access the RAW file using the RawFileReader class!"
136            )
137
138        # Check for any errors in the RAW file
139        if self.iRawDataPlus.IsError:
140            raise IOError(
141                "Error opening ({}) - {}".format(self.iRawDataPlus.FileError, file_path)
142            )
143
144        self.res = self.iRawDataPlus.SelectInstrument(Device.MS, 1)
145
146        self.file_path = file_location
147        self.iFileHeader = FileHeaderReaderFactory.ReadFile(str(file_path))
148
149        # removing tmp file
150
151        self._init_settings()

file_location: srt pathlib.Path or s3path.S3Path Thermo Raw file path

iRawDataPlus
res
file_path
iFileHeader

Get or set the LCMSParameters object.

Get or set the LiquidChromatographSetting object.

scans: list | tuple

scans : list or tuple If list uses Thermo AverageScansInScanRange for selected scans, ortherwise uses Thermo AverageScans for a scan range

start_scan: int

Get the starting scan number for the Thermo Raw file.

end_scan: int

Get the ending scan number for the Thermo Raw file.

def set_msordertype(self, scanFilter, mstype: str = 'ms1'):
210    def set_msordertype(self, scanFilter, mstype: str = "ms1"):
211        """
212        Function to convert user passed string MS Type to Thermo MSOrderType object
213        Limited to MS1 through MS10.
214
215        Parameters:
216        -----------
217        scanFilter : Thermo.ScanFilter
218            The scan filter object.
219        mstype : str, optional
220            The MS Type string, by default 'ms1'
221
222        """
223        mstype = mstype.upper()
224        # Check that a valid mstype is passed
225        if (int(mstype.split("MS")[1]) > 10) or (int(mstype.split("MS")[1]) < 1):
226            warn("MS Type not valid, must be between MS1 and MS10")
227
228        msordertypedict = {
229            "MS1": MSOrderType.Ms,
230            "MS2": MSOrderType.Ms2,
231            "MS3": MSOrderType.Ms3,
232            "MS4": MSOrderType.Ms4,
233            "MS5": MSOrderType.Ms5,
234            "MS6": MSOrderType.Ms6,
235            "MS7": MSOrderType.Ms7,
236            "MS8": MSOrderType.Ms8,
237            "MS9": MSOrderType.Ms9,
238            "MS10": MSOrderType.Ms10,
239        }
240        scanFilter.MSOrder = msordertypedict[mstype]
241        return scanFilter

Function to convert user passed string MS Type to Thermo MSOrderType object Limited to MS1 through MS10.

Parameters:

scanFilter : Thermo.ScanFilter The scan filter object. mstype : str, optional The MS Type string, by default 'ms1'

def get_instrument_info(self) -> dict:
243    def get_instrument_info(self) -> dict:
244        """
245        Get the instrument information from the Thermo Raw file.
246
247        Returns:
248        --------
249        dict
250            A dictionary with the keys 'model', and 'serial_number'.
251        """
252        instrumentData = self.iRawDataPlus.GetInstrumentData()
253        return {
254            "model": instrumentData.Model,
255            "serial_number": instrumentData.SerialNumber
256        }

Get the instrument information from the Thermo Raw file.

Returns:

dict A dictionary with the keys 'model', and 'serial_number'.

def get_creation_time(self) -> datetime.datetime:
258    def get_creation_time(self) -> datetime.datetime:
259        """
260        Extract the creation date stamp from the .RAW file
261        Return formatted creation date stamp.
262
263        """
264        credate = self.iRawDataPlus.CreationDate.get_Ticks()
265        credate = datetime.datetime(1, 1, 1) + datetime.timedelta(
266            microseconds=credate / 10
267        )
268        return credate

Extract the creation date stamp from the .RAW file Return formatted creation date stamp.

def remove_temp_file(self) -> None:
270    def remove_temp_file(self) -> None:
271        """if the path is from S3Path data cannot be serialized to io.ByteStream and
272        a temporary copy is stored at the temp dir
273        use this function only at the end of your execution scrip
274        some LCMS class methods depend on this file
275        """
276
277        self.file_path.unlink()

if the path is from S3Path data cannot be serialized to io.ByteStream and a temporary copy is stored at the temp dir use this function only at the end of your execution scrip some LCMS class methods depend on this file

def close_file(self) -> None:
279    def close_file(self) -> None:
280        """
281        Close the Thermo Raw file.
282        """
283        self.iRawDataPlus.Dispose()

Close the Thermo Raw file.

def get_polarity_mode(self, scan_number: int) -> int:
285    def get_polarity_mode(self, scan_number: int) -> int:
286        """
287        Get the polarity mode for the given scan number.
288
289        Parameters:
290        -----------
291        scan_number : int
292            The scan number.
293
294        Raises:
295        -------
296        Exception
297            If the polarity mode is unknown.
298
299        """
300        polarity_symbol = self.get_filter_for_scan_num(scan_number)[1]
301
302        if polarity_symbol == "+":
303            return 1
304            # return 'POSITIVE_ION_MODE'
305
306        elif polarity_symbol == "-":
307            return -1
308
309        else:
310            raise Exception("Polarity Mode Unknown, please set it manually")

Get the polarity mode for the given scan number.

Parameters:

scan_number : int The scan number.

Raises:

Exception If the polarity mode is unknown.

def get_filter_for_scan_num(self, scan_number: int) -> System.Collections.Generic.List[String]:
312    def get_filter_for_scan_num(self, scan_number: int) -> List[str]:
313        """
314        Returns the closest matching run time that corresponds to scan_number for the current
315        controller. This function is only supported for MS device controllers.
316        e.g.  ['FTMS', '-', 'p', 'NSI', 'Full', 'ms', '[200.00-1000.00]']
317
318        Parameters:
319        -----------
320        scan_number : int
321            The scan number.
322
323        """
324        scan_label = self.iRawDataPlus.GetScanEventStringForScanNumber(scan_number)
325
326        return str(scan_label).split()

Returns the closest matching run time that corresponds to scan_number for the current controller. This function is only supported for MS device controllers. e.g. ['FTMS', '-', 'p', 'NSI', 'Full', 'ms', '[200.00-1000.00]']

Parameters:

scan_number : int The scan number.

def get_ms_level_for_scan_num(self, scan_number: int) -> str:
328    def get_ms_level_for_scan_num(self, scan_number: int) -> str:
329        """
330        Get the MS order for the given scan number.
331
332        Parameters:
333        -----------
334        scan_number : int
335            The scan number
336
337        Returns:
338        --------
339        int
340            The MS order type (1 for MS, 2 for MS2, etc.)
341        """
342        scan_filter = self.iRawDataPlus.GetFilterForScanNumber(scan_number)
343
344        msordertype = {
345            MSOrderType.Ms: 1,
346            MSOrderType.Ms2: 2,
347            MSOrderType.Ms3: 3,
348            MSOrderType.Ms4: 4,
349            MSOrderType.Ms5: 5,
350            MSOrderType.Ms6: 6,
351            MSOrderType.Ms7: 7,
352            MSOrderType.Ms8: 8,
353            MSOrderType.Ms9: 9,
354            MSOrderType.Ms10: 10,
355        }
356
357        if scan_filter.MSOrder in msordertype:
358            return msordertype[scan_filter.MSOrder]
359        else:
360            raise Exception("MS Order Type not found")

Get the MS order for the given scan number.

Parameters:

scan_number : int The scan number

Returns:

int The MS order type (1 for MS, 2 for MS2, etc.)

def check_full_scan(self, scan_number: int) -> bool:
362    def check_full_scan(self, scan_number: int) -> bool:
363        # scan_filter.ScanMode 0 = FULL
364        scan_filter = self.iRawDataPlus.GetFilterForScanNumber(scan_number)
365
366        return scan_filter.ScanMode == MSOrderType.Ms
def get_all_filters(self) -> Tuple[Dict[int, str], System.Collections.Generic.List[String]]:
368    def get_all_filters(self) -> Tuple[Dict[int, str], List[str]]:
369        """
370        Get all scan filters.
371        This function is only supported for MS device controllers.
372        e.g.  ['FTMS', '-', 'p', 'NSI', 'Full', 'ms', '[200.00-1000.00]']
373
374        """
375
376        scanrange = range(self.start_scan, self.end_scan + 1)
377        scanfiltersdic = {}
378        scanfilterslist = []
379        for scan_number in scanrange:
380            scan_label = self.iRawDataPlus.GetScanEventStringForScanNumber(scan_number)
381            scanfiltersdic[scan_number] = scan_label
382            scanfilterslist.append(scan_label)
383        scanfilterset = list(set(scanfilterslist))
384        return scanfiltersdic, scanfilterset

Get all scan filters. This function is only supported for MS device controllers. e.g. ['FTMS', '-', 'p', 'NSI', 'Full', 'ms', '[200.00-1000.00]']

def get_scan_header(self, scan: int) -> Dict[str, Any]:
386    def get_scan_header(self, scan: int) -> Dict[str, Any]:
387        """
388        Get full dictionary of scan header meta data, i.e. AGC status, ion injection time, etc.
389
390        Parameters:
391        -----------
392        scan : int
393            The scan number.
394
395        """
396        header = self.iRawDataPlus.GetTrailerExtraInformation(scan)
397
398        header_dic = {}
399        for i in range(header.Length):
400            header_dic.update({header.Labels[i]: header.Values[i]})
401        return header_dic

Get full dictionary of scan header meta data, i.e. AGC status, ion injection time, etc.

Parameters:

scan : int The scan number.

@staticmethod
def get_rt_time_from_trace( trace) -> Tuple[System.Collections.Generic.List[Double], System.Collections.Generic.List[Double], System.Collections.Generic.List[Int32]]:
403    @staticmethod
404    def get_rt_time_from_trace(trace) -> Tuple[List[float], List[float], List[int]]:
405        """trace: ThermoFisher.CommonCore.Data.Business.ChromatogramSignal"""
406        return list(trace.Times), list(trace.Intensities), list(trace.Scans)

trace: ThermoFisher.CommonCore.Data.Business.ChromatogramSignal

def get_eics( self, target_mzs: System.Collections.Generic.List[Double], tic_data: Dict[str, Any], ms_type='MS !d', peak_detection=False, smooth=False, plot=False, ax: Optional[matplotlib.axes._axes.Axes] = None, legend=False) -> Tuple[Dict[float, corems.mass_spectra.factory.chromat_data.EIC_Data], matplotlib.axes._axes.Axes]:
408    def get_eics(
409        self,
410        target_mzs: List[float],
411        tic_data: Dict[str, Any],
412        ms_type="MS !d",
413        peak_detection=False,
414        smooth=False,
415        plot=False,
416        ax: Optional[axes.Axes] = None,
417        legend=False,
418    ) -> Tuple[Dict[float, EIC_Data], axes.Axes]:
419        """ms_type: str ('MS', MS2')
420        start_scan: int default -1 will select the lowest available
421        end_scan: int default -1 will select the highest available
422
423        returns:
424
425            chroma: dict{target_mz: EIC_Data(
426                                        Scans: [int]
427                                            original thermo scan numbers
428                                        Time: [floats]
429                                            list of retention times
430                                        TIC: [floats]
431                                            total ion chromatogram
432                                        Apexes: [int]
433                                            original thermo apex scan number after peak picking
434                                        )
435
436        """
437        # If peak_detection or smooth is True, raise exception
438        if peak_detection or smooth:
439            raise Exception("Peak detection and smoothing are no longer implemented in this function")
440
441        options = MassOptions()
442        options.ToleranceUnits = ToleranceUnits.ppm
443        options.Tolerance = self.chromatogram_settings.eic_tolerance_ppm
444
445        all_chroma_settings = []
446
447        for target_mz in target_mzs:
448            settings = ChromatogramTraceSettings(TraceType.MassRange)
449            settings.Filter = ms_type
450            settings.MassRanges = [Range(target_mz, target_mz)]
451
452            chroma_settings = IChromatogramSettings(settings)
453
454            all_chroma_settings.append(chroma_settings)
455
456        # chroma_settings2 = IChromatogramSettings(settings)
457        # print(chroma_settings.FragmentMass)
458        # print(chroma_settings.FragmentMass)
459        # print(chroma_settings)
460        # print(chroma_settings)
461
462        data = self.iRawDataPlus.GetChromatogramData(
463            all_chroma_settings, self.start_scan, self.end_scan, options
464        )
465
466        traces = ChromatogramSignal.FromChromatogramData(data)
467
468        chroma = {}
469
470        if plot:
471            from matplotlib.transforms import Bbox
472            import matplotlib.pyplot as plt
473
474            if not ax:
475                # ax = plt.gca()
476                # ax.clear()
477                fig, ax = plt.subplots()
478
479            else:
480                fig = plt.gcf()
481
482            # plt.show()
483
484        for i, trace in enumerate(traces):
485            if trace.Length > 0:
486                rt, eic, scans = self.get_rt_time_from_trace(trace)
487                if smooth:
488                    eic = self.smooth_tic(eic)
489
490                chroma[target_mzs[i]] = EIC_Data(scans=scans, time=rt, eic=eic)
491                if plot:
492                    ax.plot(rt, eic, label="{:.5f}".format(target_mzs[i]))
493
494        if peak_detection:
495            # max_eic = self.get_max_eic(chroma)
496            max_signal = max(tic_data.tic)
497
498            for eic_data in chroma.values():
499                eic = eic_data.eic
500                time = eic_data.time
501
502                if len(eic) != len(tic_data.tic):
503                    warn(
504                        "The software assumes same lenth of TIC and EIC, this does not seems to be the case and the results mass spectrum selected by the scan number might not be correct"
505                    )
506
507                if eic.max() > 0:
508                    centroid_eics = self.eic_centroid_detector(time, eic, max_signal)
509                    eic_data.apexes = [i for i in centroid_eics]
510
511                    if plot:
512                        for peak_indexes in eic_data.apexes:
513                            apex_index = peak_indexes[1]
514                            ax.plot(
515                                time[apex_index],
516                                eic[apex_index],
517                                marker="x",
518                                linewidth=0,
519                            )
520
521        if plot:
522            ax.set_xlabel("Time (min)")
523            ax.set_ylabel("a.u.")
524            ax.set_title(ms_type + " EIC")
525            ax.tick_params(axis="both", which="major", labelsize=12)
526            ax.axes.spines["top"].set_visible(False)
527            ax.axes.spines["right"].set_visible(False)
528
529            if legend:
530                legend = ax.legend(loc="upper left", bbox_to_anchor=(1.02, 0, 0.07, 1))
531                fig.subplots_adjust(right=0.76)
532                # ax.set_prop_cycle(color=plt.cm.gist_rainbow(np.linspace(0, 1, len(traces))))
533
534                d = {"down": 30, "up": -30}
535
536                def func(evt):
537                    if legend.contains(evt):
538                        bbox = legend.get_bbox_to_anchor()
539                        bbox = Bbox.from_bounds(
540                            bbox.x0, bbox.y0 + d[evt.button], bbox.width, bbox.height
541                        )
542                        tr = legend.axes.transAxes.inverted()
543                        legend.set_bbox_to_anchor(bbox.transformed(tr))
544                        fig.canvas.draw_idle()
545
546                fig.canvas.mpl_connect("scroll_event", func)
547            return chroma, ax
548        else:
549            return chroma, None
550            rt = []
551            tic = []
552            scans = []
553            for i in range(traces[0].Length):
554                # print(trace[0].HasBasePeakData,trace[0].EndTime )
555
556                # print("  {} - {}, {}".format( i, trace[0].Times[i], trace[0].Intensities[i] ))
557                rt.append(traces[0].Times[i])
558                tic.append(traces[0].Intensities[i])
559                scans.append(traces[0].Scans[i])
560
561            return traces
562            # plot_chroma(rt, tic)
563            # plt.show()

ms_type: str ('MS', MS2') start_scan: int default -1 will select the lowest available end_scan: int default -1 will select the highest available

returns:

chroma: dict{target_mz: EIC_Data(
                            Scans: [int]
                                original thermo scan numbers
                            Time: [floats]
                                list of retention times
                            TIC: [floats]
                                total ion chromatogram
                            Apexes: [int]
                                original thermo apex scan number after peak picking
                            )
def get_tic( self, ms_type='MS !d', peak_detection=False, smooth=False, plot=False, ax=None, trace_type='TIC') -> Tuple[corems.mass_spectra.factory.chromat_data.TIC_Data, matplotlib.axes._axes.Axes]:
565    def get_tic(
566        self,
567        ms_type="MS !d",
568        peak_detection=False,  # This wont work right now
569        smooth=False,  # This wont work right now
570        plot=False,
571        ax=None,
572        trace_type="TIC",
573    ) -> Tuple[TIC_Data, axes.Axes]:
574        """ms_type: str ('MS !d', 'MS2', None)
575            if you use None you get all scans.
576        peak_detection: bool
577        smooth: bool
578        plot: bool
579        ax: matplotlib axis object
580        trace_type: str ('TIC','BPC')
581
582        returns:
583            chroma: dict
584            {
585            Scan: [int]
586                original thermo scan numberMS
587            Time: [floats]
588                list of retention times
589            TIC: [floats]
590                total ion chromatogram
591            Apexes: [int]
592                original thermo apex scan number after peak picking
593            }
594        """
595        if trace_type == "TIC":
596            settings = ChromatogramTraceSettings(TraceType.TIC)
597        elif trace_type == "BPC":
598            settings = ChromatogramTraceSettings(TraceType.BasePeak)
599        else:
600            raise ValueError(f"{trace_type} undefined")
601        if ms_type == "all":
602            settings.Filter = None
603        else:
604            settings.Filter = ms_type
605
606        chroma_settings = IChromatogramSettings(settings)
607
608        data = self.iRawDataPlus.GetChromatogramData(
609            [chroma_settings], self.start_scan, self.end_scan
610        )
611
612        trace = ChromatogramSignal.FromChromatogramData(data)
613
614        data = TIC_Data(time=[], scans=[], tic=[], bpc=[], apexes=[])
615
616        if trace[0].Length > 0:
617            for i in range(trace[0].Length):
618                # print(trace[0].HasBasePeakData,trace[0].EndTime )
619
620                # print("  {} - {}, {}".format( i, trace[0].Times[i], trace[0].Intensities[i] ))
621                data.time.append(trace[0].Times[i])
622                data.tic.append(trace[0].Intensities[i])
623                data.scans.append(trace[0].Scans[i])
624
625                # print(trace[0].Scans[i])
626            if smooth:
627                data.tic = self.smooth_tic(data.tic)
628
629            else:
630                data.tic = np.array(data.tic)
631
632            if peak_detection:
633                centroid_peak_indexes = [
634                    i for i in self.centroid_detector(data.time, data.tic)
635                ]
636
637                data.apexes = centroid_peak_indexes
638
639            if plot:
640                if not ax:
641                    import matplotlib.pyplot as plt
642
643                    ax = plt.gca()
644                    # fig, ax = plt.subplots(figsize=(6, 3))
645
646                ax.plot(data.time, data.tic, label=trace_type)
647                ax.set_xlabel("Time (min)")
648                ax.set_ylabel("a.u.")
649                if peak_detection:
650                    for peak_indexes in data.apexes:
651                        apex_index = peak_indexes[1]
652                        ax.plot(
653                            data.time[apex_index],
654                            data.tic[apex_index],
655                            marker="x",
656                            linewidth=0,
657                        )
658
659                # plt.show()
660                if trace_type == "BPC":
661                    data.bpc = data.tic
662                    data.tic = []
663                return data, ax
664            if trace_type == "BPC":
665                data.bpc = data.tic
666                data.tic = []
667            return data, None
668
669        else:
670            return None, None

ms_type: str ('MS !d', 'MS2', None) if you use None you get all scans. peak_detection: bool smooth: bool plot: bool ax: matplotlib axis object trace_type: str ('TIC','BPC')

returns: chroma: dict { Scan: [int] original thermo scan numberMS Time: [floats] list of retention times TIC: [floats] total ion chromatogram Apexes: [int] original thermo apex scan number after peak picking }

def get_average_mass_spectrum( self, spectrum_mode: str = 'profile', auto_process: bool = True, ppm_tolerance: float = 5.0, ms_type: str = 'MS1') -> corems.mass_spectrum.factory.MassSpectrumClasses.MassSpecProfile | corems.mass_spectrum.factory.MassSpectrumClasses.MassSpecCentroid:
672    def get_average_mass_spectrum(
673        self,
674        spectrum_mode: str = "profile",
675        auto_process: bool = True,
676        ppm_tolerance: float = 5.0,
677        ms_type: str = "MS1",
678    ) -> MassSpecProfile | MassSpecCentroid:
679        """
680        Averages mass spectra over a scan range using Thermo's AverageScansInScanRange method
681        or a scan list using Thermo's AverageScans method
682        spectrum_mode: str
683            centroid or profile mass spectrum
684        auto_process: bool
685            If true performs peak picking, and noise threshold calculation after creation of mass spectrum object
686        ms_type: str
687            String of form 'ms1' or 'ms2' or 'MS3' etc. Valid up to MS10.
688            Internal function converts to Thermo MSOrderType class.
689
690        """
691
692        def get_profile_mass_spec(averageScan, d_params: dict, auto_process: bool):
693            mz_list = list(averageScan.SegmentedScan.Positions)
694            abund_list = list(averageScan.SegmentedScan.Intensities)
695
696            data_dict = {
697                Labels.mz: mz_list,
698                Labels.abundance: abund_list,
699            }
700
701            return MassSpecProfile(data_dict, d_params, auto_process=auto_process)
702
703        def get_centroid_mass_spec(averageScan, d_params: dict):
704            noise = list(averageScan.centroidScan.Noises)
705
706            baselines = list(averageScan.centroidScan.Baselines)
707
708            rp = list(averageScan.centroidScan.Resolutions)
709
710            magnitude = list(averageScan.centroidScan.Intensities)
711
712            mz = list(averageScan.centroidScan.Masses)
713
714            array_noise_std = (np.array(noise) - np.array(baselines)) / 3
715            l_signal_to_noise = np.array(magnitude) / array_noise_std
716
717            d_params["baseline_noise"] = np.average(array_noise_std)
718
719            d_params["baseline_noise_std"] = np.std(array_noise_std)
720
721            data_dict = {
722                Labels.mz: mz,
723                Labels.abundance: magnitude,
724                Labels.rp: rp,
725                Labels.s2n: list(l_signal_to_noise),
726            }
727
728            mass_spec = MassSpecCentroid(data_dict, d_params, auto_process=False)
729
730            return mass_spec
731
732        d_params = self.set_metadata(
733            firstScanNumber=self.start_scan, lastScanNumber=self.end_scan
734        )
735
736        # Create the mass options object that will be used when averaging the scans
737        options = MassOptions()
738        options.ToleranceUnits = ToleranceUnits.ppm
739        options.Tolerance = ppm_tolerance
740
741        # Get the scan filter for the first scan.  This scan filter will be used to located
742        # scans within the given scan range of the same type
743        scanFilter = self.iRawDataPlus.GetFilterForScanNumber(self.start_scan)
744
745        # force it to only look for the MSType
746        scanFilter = self.set_msordertype(scanFilter, ms_type)
747
748        if isinstance(self.scans, tuple):
749            averageScan = Extensions.AverageScansInScanRange(
750                self.iRawDataPlus, self.start_scan, self.end_scan, scanFilter, options
751            )
752
753            if averageScan:
754                if spectrum_mode == "profile":
755                    mass_spec = get_profile_mass_spec(
756                        averageScan, d_params, auto_process
757                    )
758
759                    return mass_spec
760
761                elif spectrum_mode == "centroid":
762                    if averageScan.HasCentroidStream:
763                        mass_spec = get_centroid_mass_spec(averageScan, d_params)
764
765                        return mass_spec
766
767                    else:
768                        raise ValueError(
769                            "No Centroind data available for the selected scans"
770                        )
771                else:
772                    raise ValueError("spectrum_mode must be 'profile' or centroid")
773            else:
774                raise ValueError("No data found for the selected scans")
775
776        elif isinstance(self.scans, list):
777            d_params = self.set_metadata(scans_list=self.scans)
778
779            scans = List[int]()
780            for scan in self.scans:
781                scans.Add(scan)
782
783            averageScan = Extensions.AverageScans(self.iRawDataPlus, scans, options)
784
785            if averageScan:
786                if spectrum_mode == "profile":
787                    mass_spec = get_profile_mass_spec(
788                        averageScan, d_params, auto_process
789                    )
790
791                    return mass_spec
792
793                elif spectrum_mode == "centroid":
794                    if averageScan.HasCentroidStream:
795                        mass_spec = get_centroid_mass_spec(averageScan, d_params)
796
797                        return mass_spec
798
799                    else:
800                        raise ValueError(
801                            "No Centroind data available for the selected scans"
802                        )
803
804                else:
805                    raise ValueError("spectrum_mode must be 'profile' or centroid")
806
807            else:
808                raise ValueError("No data found for the selected scans")
809
810        else:
811            raise ValueError("scans must be a list intergers or a tuple if integers")

Averages mass spectra over a scan range using Thermo's AverageScansInScanRange method or a scan list using Thermo's AverageScans method spectrum_mode: str centroid or profile mass spectrum auto_process: bool If true performs peak picking, and noise threshold calculation after creation of mass spectrum object ms_type: str String of form 'ms1' or 'ms2' or 'MS3' etc. Valid up to MS10. Internal function converts to Thermo MSOrderType class.

def set_metadata( self, firstScanNumber=0, lastScanNumber=0, scans_list=False, label='Thermo_Profile'):
813    def set_metadata(
814        self,
815        firstScanNumber=0,
816        lastScanNumber=0,
817        scans_list=False,
818        label=Labels.thermo_profile,
819    ):
820        """
821        Collect metadata to be ingested in the mass spectrum object
822
823        scans_list: list[int] or false
824        lastScanNumber: int
825        firstScanNumber: int
826        """
827
828        d_params = default_parameters(self.file_path)
829
830        # assumes scans is full scan or reduced profile scan
831
832        d_params["label"] = label
833
834        if scans_list:
835            d_params["scan_number"] = scans_list
836
837            d_params["polarity"] = self.get_polarity_mode(scans_list[0])
838
839        else:
840            d_params["scan_number"] = "{}-{}".format(firstScanNumber, lastScanNumber)
841
842            d_params["polarity"] = self.get_polarity_mode(firstScanNumber)
843
844        d_params["analyzer"] = self.iRawDataPlus.GetInstrumentData().Model
845
846        d_params["acquisition_time"] = self.get_creation_time()
847
848        d_params["instrument_label"] = self.iRawDataPlus.GetInstrumentData().Name
849
850        return d_params

Collect metadata to be ingested in the mass spectrum object

scans_list: list[int] or false lastScanNumber: int firstScanNumber: int

def get_instrument_methods(self, parse_strings: bool = True):
852    def get_instrument_methods(self, parse_strings: bool = True):
853        """
854        This function will extract the instrument methods embedded in the raw file
855
856        First it will check if there are any instrument methods, if not returning None
857        Then it will get the total number of instrument methods.
858        For each method, it will extract the plaintext string of the method and attempt to parse it into a dictionary
859        If this fails, it will return just the string object.
860
861        This has been tested on data from an Orbitrap ID-X with embedded MS and LC methods, but other instrument types may fail.
862
863        Parameters:
864        -----------
865        parse_strings: bool
866            If True, will attempt to parse the instrument methods into a dictionary. If False, will return the raw string.
867
868        Returns:
869        --------
870        List[Dict[str, Any]] or List
871            A list of dictionaries containing the instrument methods, or a list of strings if parsing fails.
872        """
873
874        if not self.iRawDataPlus.HasInstrumentMethod:
875            raise ValueError(
876                "Raw Data file does not have any instrument methods attached"
877            )
878            return None
879        else:
880
881            def parse_instrument_method(data):
882                lines = data.split("\r\n")
883                method = {}
884                current_section = None
885                sub_section = None
886
887                for line in lines:
888                    if not line.strip():  # Skip empty lines
889                        continue
890                    if (
891                        line.startswith("----")
892                        or line.endswith("Settings")
893                        or line.endswith("Summary")
894                        or line.startswith("Experiment")
895                        or line.startswith("Scan Event")
896                    ):
897                        current_section = line.replace("-", "").strip()
898                        method[current_section] = {}
899                        sub_section = None
900                    elif line.startswith("\t"):
901                        if "\t\t" in line:
902                            indent_level = line.count("\t")
903                            key_value = line.strip()
904
905                            if indent_level == 2:
906                                if sub_section:
907                                    key, value = (
908                                        key_value.split("=", 1)
909                                        if "=" in key_value
910                                        else (key_value, None)
911                                    )
912                                    method[current_section][sub_section][
913                                        key.strip()
914                                    ] = value.strip() if value else None
915                            elif indent_level == 3:
916                                scan_type, key_value = (
917                                    key_value.split(" ", 1)
918                                    if " " in key_value
919                                    else (key_value, None)
920                                )
921                                method.setdefault(current_section, {}).setdefault(
922                                    sub_section, {}
923                                ).setdefault(scan_type, {})
924
925                                if key_value:
926                                    key, value = (
927                                        key_value.split("=", 1)
928                                        if "=" in key_value
929                                        else (key_value, None)
930                                    )
931                                    method[current_section][sub_section][scan_type][
932                                        key.strip()
933                                    ] = value.strip() if value else None
934                        else:
935                            key_value = line.strip()
936                            if "=" in key_value:
937                                key, value = key_value.split("=", 1)
938                                method.setdefault(current_section, {})[key.strip()] = (
939                                    value.strip()
940                                )
941                            else:
942                                sub_section = key_value
943                    else:
944                        if ":" in line:
945                            key, value = line.split(":", 1)
946                            method[current_section][key.strip()] = value.strip()
947                        else:
948                            method[current_section][line] = {}
949
950                return method
951
952            count_instrument_methods = self.iRawDataPlus.InstrumentMethodsCount
953            # TODO make this code better...
954            instrument_methods = []
955            for i in range(count_instrument_methods):
956                instrument_method_string = self.iRawDataPlus.GetInstrumentMethod(i)
957                if parse_strings:
958                    try:
959                        instrument_method_dict = parse_instrument_method(
960                            instrument_method_string
961                        )
962                    except:  # if it fails for any reason
963                        instrument_method_dict = instrument_method_string
964                else:
965                    instrument_method_dict = instrument_method_string
966                instrument_methods.append(instrument_method_dict)
967            return instrument_methods

This function will extract the instrument methods embedded in the raw file

First it will check if there are any instrument methods, if not returning None Then it will get the total number of instrument methods. For each method, it will extract the plaintext string of the method and attempt to parse it into a dictionary If this fails, it will return just the string object.

This has been tested on data from an Orbitrap ID-X with embedded MS and LC methods, but other instrument types may fail.

Parameters:

parse_strings: bool If True, will attempt to parse the instrument methods into a dictionary. If False, will return the raw string.

Returns:

List[Dict[str, Any]] or List A list of dictionaries containing the instrument methods, or a list of strings if parsing fails.

def get_tune_method(self):
 969    def get_tune_method(self):
 970        """
 971        This code will extract the tune method from the raw file
 972        It has been tested on data from a Thermo Orbitrap ID-X, Astral and Q-Exactive, but may fail on other instrument types.
 973        It attempts to parse out section headers and sub-sections, but may not work for all instrument types.
 974        It will also not return Labels (keys) where the value is blank
 975
 976        Returns:
 977        --------
 978        Dict[str, Any]
 979            A dictionary containing the tune method information
 980
 981        Raises:
 982        -------
 983        ValueError
 984            If no tune methods are found in the raw file
 985
 986        """
 987        tunemethodcount = self.iRawDataPlus.GetTuneDataCount()
 988        if tunemethodcount == 0:
 989            raise ValueError("No tune methods found in the raw data file")
 990            return None
 991        elif tunemethodcount > 1:
 992            warnings.warn(
 993                "Multiple tune methods found in the raw data file, returning the 1st"
 994            )
 995
 996        header = self.iRawDataPlus.GetTuneData(0)
 997
 998        header_dic = {}
 999        current_section = None
1000
1001        for i in range(header.Length):
1002            label = header.Labels[i]
1003            value = header.Values[i]
1004
1005            # Check for section headers
1006            if "===" in label or (
1007                (value == "" or value is None) and not label.endswith(":")
1008            ):
1009                # This is a section header
1010                section_name = (
1011                    label.replace("=", "").replace(":", "").strip()
1012                )  # Clean the label if it contains '='
1013                header_dic[section_name] = {}
1014                current_section = section_name
1015            else:
1016                if current_section:
1017                    header_dic[current_section][label] = value
1018                else:
1019                    header_dic[label] = value
1020        return header_dic

This code will extract the tune method from the raw file It has been tested on data from a Thermo Orbitrap ID-X, Astral and Q-Exactive, but may fail on other instrument types. It attempts to parse out section headers and sub-sections, but may not work for all instrument types. It will also not return Labels (keys) where the value is blank

Returns:

Dict[str, Any] A dictionary containing the tune method information

Raises:

ValueError If no tune methods are found in the raw file

def get_status_log(self, retention_time: float = 0):
1022    def get_status_log(self, retention_time: float = 0):
1023        """
1024        This code will extract the status logs from the raw file
1025        It has been tested on data from a Thermo Orbitrap ID-X, Astral and Q-Exactive, but may fail on other instrument types.
1026        It attempts to parse out section headers and sub-sections, but may not work for all instrument types.
1027        It will also not return Labels (keys) where the value is blank
1028
1029        Parameters:
1030        -----------
1031        retention_time: float
1032            The retention time in minutes to extract the status log data from.
1033            Will use the closest retention time found. Default 0.
1034
1035        Returns:
1036        --------
1037        Dict[str, Any]
1038            A dictionary containing the status log information
1039
1040        Raises:
1041        -------
1042        ValueError
1043            If no status logs are found in the raw file
1044
1045        """
1046        tunemethodcount = self.iRawDataPlus.GetStatusLogEntriesCount()
1047        if tunemethodcount == 0:
1048            raise ValueError("No status logs found in the raw data file")
1049            return None
1050
1051        header = self.iRawDataPlus.GetStatusLogForRetentionTime(retention_time)
1052
1053        header_dic = {}
1054        current_section = None
1055
1056        for i in range(header.Length):
1057            label = header.Labels[i]
1058            value = header.Values[i]
1059
1060            # Check for section headers
1061            if "===" in label or (
1062                (value == "" or value is None) and not label.endswith(":")
1063            ):
1064                # This is a section header
1065                section_name = (
1066                    label.replace("=", "").replace(":", "").strip()
1067                )  # Clean the label if it contains '='
1068                header_dic[section_name] = {}
1069                current_section = section_name
1070            else:
1071                if current_section:
1072                    header_dic[current_section][label] = value
1073                else:
1074                    header_dic[label] = value
1075        return header_dic

This code will extract the status logs from the raw file It has been tested on data from a Thermo Orbitrap ID-X, Astral and Q-Exactive, but may fail on other instrument types. It attempts to parse out section headers and sub-sections, but may not work for all instrument types. It will also not return Labels (keys) where the value is blank

Parameters:

retention_time: float The retention time in minutes to extract the status log data from. Will use the closest retention time found. Default 0.

Returns:

Dict[str, Any] A dictionary containing the status log information

Raises:

ValueError If no status logs are found in the raw file

def get_error_logs(self):
1077    def get_error_logs(self):
1078        """
1079        This code will extract the error logs from the raw file
1080
1081        Returns:
1082        --------
1083        Dict[float, str]
1084            A dictionary containing the error log information with the retention time as the key
1085
1086        Raises:
1087        -------
1088        ValueError
1089            If no error logs are found in the raw file
1090        """
1091
1092        error_log_count = self.iRawDataPlus.RunHeaderEx.ErrorLogCount
1093        if error_log_count == 0:
1094            raise ValueError("No error logs found in the raw data file")
1095            return None
1096
1097        error_logs = {}
1098
1099        for i in range(error_log_count):
1100            error_log_item = self.iRawDataPlus.GetErrorLogItem(i)
1101            rt = error_log_item.RetentionTime
1102            message = error_log_item.Message
1103            # Use the index `i` as the unique ID key
1104            error_logs[i] = {"rt": rt, "message": message}
1105        return error_logs

This code will extract the error logs from the raw file

Returns:

Dict[float, str] A dictionary containing the error log information with the retention time as the key

Raises:

ValueError If no error logs are found in the raw file

def get_sample_information(self):
1107    def get_sample_information(self):
1108        """
1109        This code will extract the sample information from the raw file
1110
1111        Returns:
1112        --------
1113        Dict[str, Any]
1114            A dictionary containing the sample information
1115            Note that UserText field may not be handled properly and may need further processing
1116        """
1117        sminfo = self.iRawDataPlus.SampleInformation
1118        smdict = {}
1119        smdict["Comment"] = sminfo.Comment
1120        smdict["SampleId"] = sminfo.SampleId
1121        smdict["SampleName"] = sminfo.SampleName
1122        smdict["Vial"] = sminfo.Vial
1123        smdict["InjectionVolume"] = sminfo.InjectionVolume
1124        smdict["Barcode"] = sminfo.Barcode
1125        smdict["BarcodeStatus"] = str(sminfo.BarcodeStatus)
1126        smdict["CalibrationLevel"] = sminfo.CalibrationLevel
1127        smdict["DilutionFactor"] = sminfo.DilutionFactor
1128        smdict["InstrumentMethodFile"] = sminfo.InstrumentMethodFile
1129        smdict["RawFileName"] = sminfo.RawFileName
1130        smdict["CalibrationFile"] = sminfo.CalibrationFile
1131        smdict["IstdAmount"] = sminfo.IstdAmount
1132        smdict["RowNumber"] = sminfo.RowNumber
1133        smdict["Path"] = sminfo.Path
1134        smdict["ProcessingMethodFile"] = sminfo.ProcessingMethodFile
1135        smdict["SampleType"] = str(sminfo.SampleType)
1136        smdict["SampleWeight"] = sminfo.SampleWeight
1137        smdict["UserText"] = {
1138            "UserText": [x for x in sminfo.UserText]
1139        }  # [0] #This may not work - needs debugging with
1140        return smdict

This code will extract the sample information from the raw file

Returns:

Dict[str, Any] A dictionary containing the sample information Note that UserText field may not be handled properly and may need further processing

def get_instrument_data(self):
1142    def get_instrument_data(self):
1143        """
1144        This code will extract the instrument data from the raw file
1145
1146        Returns:
1147        --------
1148        Dict[str, Any]
1149            A dictionary containing the instrument data
1150        """
1151        instrument_data = self.iRawDataPlus.GetInstrumentData()
1152        id_dict = {}
1153        id_dict["Name"] = instrument_data.Name
1154        id_dict["Model"] = instrument_data.Model
1155        id_dict["SerialNumber"] = instrument_data.SerialNumber
1156        id_dict["SoftwareVersion"] = instrument_data.SoftwareVersion
1157        id_dict["HardwareVersion"] = instrument_data.HardwareVersion
1158        id_dict["ChannelLabels"] = {
1159            "ChannelLabels": [x for x in instrument_data.ChannelLabels]
1160        }
1161        id_dict["Flags"] = instrument_data.Flags
1162        id_dict["AxisLabelY"] = instrument_data.AxisLabelY
1163        id_dict["AxisLabelX"] = instrument_data.AxisLabelX
1164        return id_dict

This code will extract the instrument data from the raw file

Returns:

Dict[str, Any] A dictionary containing the instrument data

def get_centroid_msms_data(self, scan):
1166    def get_centroid_msms_data(self, scan):
1167        """
1168        .. deprecated:: 2.0
1169            This function will be removed in CoreMS 2.0. Please use `get_average_mass_spectrum()` instead for similar functionality.
1170        """
1171
1172        warnings.warn(
1173            "The `get_centroid_msms_data()` is deprecated as of CoreMS 2.0 and will be removed in a future version. "
1174            "Please use `get_average_mass_spectrum()` instead.",
1175            DeprecationWarning,
1176        )
1177
1178        d_params = self.set_metadata(scans_list=[scan], label=Labels.thermo_centroid)
1179
1180        centroidStream = self.iRawDataPlus.GetCentroidStream(scan, False)
1181
1182        noise = list(centroidStream.Noises)
1183
1184        baselines = list(centroidStream.Baselines)
1185
1186        rp = list(centroidStream.Resolutions)
1187
1188        magnitude = list(centroidStream.Intensities)
1189
1190        mz = list(centroidStream.Masses)
1191
1192        # charge = scans_labels[5]
1193        array_noise_std = (np.array(noise) - np.array(baselines)) / 3
1194        l_signal_to_noise = np.array(magnitude) / array_noise_std
1195
1196        d_params["baseline_noise"] = np.average(array_noise_std)
1197
1198        d_params["baseline_noise_std"] = np.std(array_noise_std)
1199
1200        data_dict = {
1201            Labels.mz: mz,
1202            Labels.abundance: magnitude,
1203            Labels.rp: rp,
1204            Labels.s2n: list(l_signal_to_noise),
1205        }
1206
1207        mass_spec = MassSpecCentroid(data_dict, d_params, auto_process=False)
1208        mass_spec.settings.noise_threshold_method = "relative_abundance"
1209        mass_spec.settings.noise_threshold_min_relative_abundance = 1
1210        mass_spec.process_mass_spec()
1211        return mass_spec

Deprecated since version 2.0: This function will be removed in CoreMS 2.0. Please use get_average_mass_spectrum() instead for similar functionality.

def get_average_mass_spectrum_by_scanlist( self, scans_list: System.Collections.Generic.List[Int32], auto_process: bool = True, ppm_tolerance: float = 5.0) -> corems.mass_spectrum.factory.MassSpectrumClasses.MassSpecProfile:
1213    def get_average_mass_spectrum_by_scanlist(
1214        self,
1215        scans_list: List[int],
1216        auto_process: bool = True,
1217        ppm_tolerance: float = 5.0,
1218    ) -> MassSpecProfile:
1219        """
1220        Averages selected scans mass spectra using Thermo's AverageScans method
1221        scans_list: list[int]
1222        auto_process: bool
1223            If true performs peak picking, and noise threshold calculation after creation of mass spectrum object
1224        Returns:
1225            MassSpecProfile
1226
1227         .. deprecated:: 2.0
1228        This function will be removed in CoreMS 2.0. Please use `get_average_mass_spectrum()` instead for similar functionality.
1229        """
1230
1231        warnings.warn(
1232            "The `get_average_mass_spectrum_by_scanlist()` is deprecated as of CoreMS 2.0 and will be removed in a future version. "
1233            "Please use `get_average_mass_spectrum()` instead.",
1234            DeprecationWarning,
1235        )
1236
1237        d_params = self.set_metadata(scans_list=scans_list)
1238
1239        # assumes scans is full scan or reduced profile scan
1240
1241        scans = List[int]()
1242        for scan in scans_list:
1243            scans.Add(scan)
1244
1245        # Create the mass options object that will be used when averaging the scans
1246        options = MassOptions()
1247        options.ToleranceUnits = ToleranceUnits.ppm
1248        options.Tolerance = ppm_tolerance
1249
1250        # Get the scan filter for the first scan.  This scan filter will be used to located
1251        # scans within the given scan range of the same type
1252
1253        averageScan = Extensions.AverageScans(self.iRawDataPlus, scans, options)
1254
1255        len_data = averageScan.SegmentedScan.Positions.Length
1256
1257        mz_list = list(averageScan.SegmentedScan.Positions)
1258        abund_list = list(averageScan.SegmentedScan.Intensities)
1259
1260        data_dict = {
1261            Labels.mz: mz_list,
1262            Labels.abundance: abund_list,
1263        }
1264
1265        mass_spec = MassSpecProfile(data_dict, d_params, auto_process=auto_process)
1266
1267        return mass_spec

Averages selected scans mass spectra using Thermo's AverageScans method scans_list: list[int] auto_process: bool If true performs peak picking, and noise threshold calculation after creation of mass spectrum object Returns: MassSpecProfile

Deprecated since version 2.0.

This function will be removed in CoreMS 2.0. Please use get_average_mass_spectrum() instead for similar functionality.

class ImportMassSpectraThermoMSFileReader(ThermoBaseClass, corems.mass_spectra.input.parserbase.SpectraParserInterface):
1270class ImportMassSpectraThermoMSFileReader(ThermoBaseClass, SpectraParserInterface):
1271    """A class for parsing Thermo RAW mass spectrometry data files and instatiating MassSpectraBase or LCMSBase objects
1272
1273    Parameters
1274    ----------
1275    file_location : str or Path
1276        The path to the RAW file to be parsed.
1277    analyzer : str, optional
1278        The type of mass analyzer used in the instrument. Default is "Unknown".
1279    instrument_label : str, optional
1280        The name of the instrument used to acquire the data. Default is "Unknown".
1281    sample_name : str, optional
1282        The name of the sample being analyzed. If not provided, the stem of the file_location path will be used.
1283
1284    Attributes
1285    ----------
1286    file_location : Path
1287        The path to the RAW file being parsed.
1288    analyzer : str
1289        The type of mass analyzer used in the instrument.
1290    instrument_label : str
1291        The name of the instrument used to acquire the data.
1292    sample_name : str
1293        The name of the sample being analyzed.
1294
1295    Methods
1296    -------
1297    * run(spectra=True).
1298        Parses the RAW file and returns a dictionary of mass spectra dataframes and a scan metadata dataframe.
1299    * get_mass_spectrum_from_scan(scan_number, polarity, auto_process=True)
1300        Parses the RAW file and returns a MassSpecBase object from a single scan.
1301    * get_mass_spectra_obj().
1302        Parses the RAW file and instantiates a MassSpectraBase object.
1303    * get_lcms_obj().
1304        Parses the RAW file and instantiates an LCMSBase object.
1305    * get_icr_transient_times().
1306        Return a list for transient time targets for all scans, or selected scans range
1307
1308    Inherits from ThermoBaseClass and SpectraParserInterface
1309    """
1310
1311    def __init__(
1312        self,
1313        file_location,
1314        analyzer="Unknown",
1315        instrument_label="Unknown",
1316        sample_name=None,
1317    ):
1318        super().__init__(file_location)
1319        if isinstance(file_location, str):
1320            # if obj is a string it defaults to create a Path obj, pass the S3Path if needed
1321            file_location = Path(file_location)
1322        if not file_location.exists():
1323            raise FileExistsError("File does not exist: " + str(file_location))
1324
1325        self.file_location = file_location
1326        self.analyzer = analyzer
1327        self.instrument_label = instrument_label
1328
1329        if sample_name:
1330            self.sample_name = sample_name
1331        else:
1332            self.sample_name = file_location.stem
1333
1334    def load(self):
1335        pass
1336
1337    def get_scan_df(self):
1338        # This automatically brings in all the data
1339        self.chromatogram_settings.scans = (-1, -1)
1340
1341        # Get scan df info; starting with TIC data
1342        tic_data, _ = self.get_tic(ms_type="all", peak_detection=False, smooth=False)
1343        tic_data = {
1344            "scan": tic_data.scans,
1345            "scan_time": tic_data.time,
1346            "tic": tic_data.tic,
1347        }
1348        scan_df = pd.DataFrame.from_dict(tic_data)
1349        scan_df["ms_level"] = None
1350        
1351        # get scan text
1352        scan_filter_df = pd.DataFrame.from_dict(
1353            self.get_all_filters()[0], orient="index"
1354        )
1355        scan_filter_df.reset_index(inplace=True)
1356        scan_filter_df.rename(columns={"index": "scan", 0: "scan_text"}, inplace=True)
1357
1358        scan_df = scan_df.merge(scan_filter_df, on="scan", how="left")
1359        scan_df["scan_window_lower"] = scan_df.scan_text.str.extract(
1360            r"\[(\d+\.\d+)-\d+\.\d+\]"
1361        )
1362        scan_df["scan_window_upper"] = scan_df.scan_text.str.extract(
1363            r"\[\d+\.\d+-(\d+\.\d+)\]"
1364        )
1365        scan_df["polarity"] = np.where(
1366            scan_df.scan_text.str.contains(" - "), "negative", "positive"
1367        )
1368        scan_df["precursor_mz"] = scan_df.scan_text.str.extract(r"(\d+\.\d+)@")
1369        scan_df["precursor_mz"] = scan_df["precursor_mz"].astype(float)
1370
1371        # Assign each scan as centroid or profile and add ms_level
1372        scan_df["ms_format"] = None
1373        for i in scan_df.scan.to_list():
1374            scan_df.loc[scan_df.scan == i, "ms_level"] = self.get_ms_level_for_scan_num(i)
1375            if self.iRawDataPlus.IsCentroidScanFromScanNumber(i):
1376                scan_df.loc[scan_df.scan == i, "ms_format"] = "centroid"
1377            else:
1378                scan_df.loc[scan_df.scan == i, "ms_format"] = "profile"
1379
1380        return scan_df
1381
1382    def get_ms_raw(self, spectra, scan_df):
1383        if spectra == "all":
1384            scan_df_forspec = scan_df
1385        elif spectra == "ms1":
1386            scan_df_forspec = scan_df[scan_df.ms_level == 1]
1387        elif spectra == "ms2":
1388            scan_df_forspec = scan_df[scan_df.ms_level == 2]
1389        else:
1390            raise ValueError("spectra must be 'none', 'all', 'ms1', or 'ms2'")
1391
1392        # Result container
1393        res = {}
1394
1395        # Row count container
1396        counter = {}
1397
1398        # Column name container
1399        cols = {}
1400
1401        # set at float32
1402        dtype = np.float32
1403
1404        # First pass: get nrows
1405        N = defaultdict(lambda: 0)
1406        for i in scan_df_forspec.scan.to_list():
1407            level = scan_df_forspec.loc[scan_df_forspec.scan == i, "ms_level"].values[0]
1408            scanStatistics = self.iRawDataPlus.GetScanStatsForScanNumber(i)
1409            profileStream = self.iRawDataPlus.GetSegmentedScanFromScanNumber(
1410                i, scanStatistics
1411            )
1412            abun = list(profileStream.Intensities)
1413            abun = np.array(abun)[np.where(np.array(abun) > 0)[0]]
1414
1415            N[level] += len(abun)
1416
1417        # Second pass: parse
1418        for i in scan_df_forspec.scan.to_list():
1419            scanStatistics = self.iRawDataPlus.GetScanStatsForScanNumber(i)
1420            profileStream = self.iRawDataPlus.GetSegmentedScanFromScanNumber(
1421                i, scanStatistics
1422            )
1423            abun = list(profileStream.Intensities)
1424            mz = list(profileStream.Positions)
1425
1426            # Get index of abun that are > 0
1427            inx = np.where(np.array(abun) > 0)[0]
1428            mz = np.array(mz)[inx]
1429            mz = np.float32(mz)
1430            abun = np.array(abun)[inx]
1431            abun = np.float32(abun)
1432
1433            level = scan_df_forspec.loc[scan_df_forspec.scan == i, "ms_level"].values[0]
1434
1435            # Number of rows
1436            n = len(mz)
1437
1438            # No measurements
1439            if n == 0:
1440                continue
1441
1442            # Dimension check
1443            if len(mz) != len(abun):
1444                warnings.warn("m/z and intensity array dimension mismatch")
1445                continue
1446
1447            # Scan/frame info
1448            id_dict = i
1449
1450            # Columns
1451            cols[level] = ["scan", "mz", "intensity"]
1452            m = len(cols[level])
1453
1454            # Subarray init
1455            arr = np.empty((n, m), dtype=dtype)
1456            inx = 0
1457
1458            # Populate scan/frame info
1459            arr[:, inx] = i
1460            inx += 1
1461
1462            # Populate m/z
1463            arr[:, inx] = mz
1464            inx += 1
1465
1466            # Populate intensity
1467            arr[:, inx] = abun
1468            inx += 1
1469
1470            # Initialize output container
1471            if level not in res:
1472                res[level] = np.empty((N[level], m), dtype=dtype)
1473                counter[level] = 0
1474
1475            # Insert subarray
1476            res[level][counter[level] : counter[level] + n, :] = arr
1477            counter[level] += n
1478
1479        # Construct ms1 and ms2 mz dataframes
1480        for level in res.keys():
1481            res[level] = pd.DataFrame(res[level])
1482            res[level].columns = cols[level]
1483        # rename keys in res to add 'ms' prefix
1484        res = {f"ms{key}": value for key, value in res.items()}
1485
1486        return res
1487
1488    def run(self, spectra="all", scan_df=None):
1489        """
1490        Extracts mass spectra data from a raw file.
1491
1492        Parameters
1493        ----------
1494        spectra : str, optional
1495            Which mass spectra data to include in the output. Default is all.  Other options: none, ms1, ms2.
1496        scan_df : pandas.DataFrame, optional
1497            Scan dataframe.  If not provided, the scan dataframe is created from the mzML file.
1498
1499        Returns
1500        -------
1501        tuple
1502            A tuple containing two elements:
1503            - A dictionary containing mass spectra data, separated by MS level.
1504            - A pandas DataFrame containing scan information, including scan number, scan time, TIC, MS level,
1505                scan text, scan window lower and upper bounds, polarity, and precursor m/z (if applicable).
1506        """
1507        # Prepare scan_df
1508        if scan_df is None:
1509            scan_df = self.get_scan_df()
1510
1511        # Prepare mass spectra data
1512        if spectra != "none":
1513            res = self.get_ms_raw(spectra=spectra, scan_df=scan_df)
1514        else:
1515            res = None
1516
1517        return res, scan_df
1518
1519    def get_mass_spectrum_from_scan(
1520        self, scan_number, spectrum_mode, auto_process=True
1521    ):
1522        """Instatiate a MassSpecBase object from a single scan number from the binary file, currently only supports profile mode.
1523
1524        Parameters
1525        ----------
1526        scan_number : int
1527            The scan number to extract the mass spectrum from.
1528        polarity : int
1529            The polarity of the scan.  1 for positive mode, -1 for negative mode.
1530        spectrum_mode : str
1531            The type of mass spectrum to extract.  Must be 'profile' or 'centroid'.
1532        auto_process : bool, optional
1533            If True, perform peak picking and noise threshold calculation after creating the mass spectrum object. Default is True.
1534
1535        Returns
1536        -------
1537        MassSpecProfile | MassSpecCentroid
1538            The MassSpecProfile or MassSpecCentroid object containing the parsed mass spectrum.
1539        """
1540
1541        if spectrum_mode == "profile":
1542            scanStatistics = self.iRawDataPlus.GetScanStatsForScanNumber(scan_number)
1543            profileStream = self.iRawDataPlus.GetSegmentedScanFromScanNumber(
1544                scan_number, scanStatistics
1545            )
1546            abun = list(profileStream.Intensities)
1547            mz = list(profileStream.Positions)
1548            data_dict = {
1549                Labels.mz: mz,
1550                Labels.abundance: abun,
1551            }
1552            d_params = self.set_metadata(
1553                firstScanNumber=scan_number,
1554                lastScanNumber=scan_number,
1555                scans_list=False,
1556                label=Labels.thermo_profile,
1557            )
1558            mass_spectrum_obj = MassSpecProfile(
1559                data_dict, d_params, auto_process=auto_process
1560            )
1561
1562        elif spectrum_mode == "centroid":
1563            centroid_scan = self.iRawDataPlus.GetCentroidStream(scan_number, False)
1564            if centroid_scan.Masses is not None:
1565                mz = list(centroid_scan.Masses)
1566                abun = list(centroid_scan.Intensities)
1567                rp = list(centroid_scan.Resolutions)
1568                magnitude = list(centroid_scan.Intensities)
1569                noise = list(centroid_scan.Noises)
1570                baselines = list(centroid_scan.Baselines)
1571                array_noise_std = (np.array(noise) - np.array(baselines)) / 3
1572                l_signal_to_noise = np.array(magnitude) / array_noise_std
1573                data_dict = {
1574                    Labels.mz: mz,
1575                    Labels.abundance: abun,
1576                    Labels.rp: rp,
1577                    Labels.s2n: list(l_signal_to_noise),
1578                }
1579            else:  # For CID MS2, the centroid data are stored in the profile data location, they do not have any associated rp or baseline data, but they should be treated as centroid data
1580                scanStatistics = self.iRawDataPlus.GetScanStatsForScanNumber(
1581                    scan_number
1582                )
1583                profileStream = self.iRawDataPlus.GetSegmentedScanFromScanNumber(
1584                    scan_number, scanStatistics
1585                )
1586                abun = list(profileStream.Intensities)
1587                mz = list(profileStream.Positions)
1588                data_dict = {
1589                    Labels.mz: mz,
1590                    Labels.abundance: abun,
1591                    Labels.rp: [np.nan] * len(mz),
1592                    Labels.s2n: [np.nan] * len(mz),
1593                }
1594            d_params = self.set_metadata(
1595                firstScanNumber=scan_number,
1596                lastScanNumber=scan_number,
1597                scans_list=False,
1598                label=Labels.thermo_centroid,
1599            )
1600            mass_spectrum_obj = MassSpecCentroid(
1601                data_dict, d_params, auto_process=auto_process
1602            )
1603
1604        return mass_spectrum_obj
1605
1606    def get_mass_spectra_obj(self):
1607        """Instatiate a MassSpectraBase object from the binary data file file.
1608
1609        Returns
1610        -------
1611        MassSpectraBase
1612            The MassSpectra object containing the parsed mass spectra.  The object is instatiated with the mzML file, analyzer, instrument, sample name, and scan dataframe.
1613        """
1614        _, scan_df = self.run(spectra="none")
1615        mass_spectra_obj = MassSpectraBase(
1616            self.file_location,
1617            self.analyzer,
1618            self.instrument_label,
1619            self.sample_name,
1620            self,
1621        )
1622        scan_df = scan_df.set_index("scan", drop=False)
1623        mass_spectra_obj.scan_df = scan_df
1624
1625        return mass_spectra_obj
1626
1627    def get_lcms_obj(self, spectra="all"):
1628        """Instatiates a LCMSBase object from the mzML file.
1629
1630        Parameters
1631        ----------
1632        spectra : str, optional
1633            Which mass spectra data to include in the output. Default is "all".  Other options: "none", "ms1", "ms2".
1634
1635        Returns
1636        -------
1637        LCMSBase
1638            LCMS object containing mass spectra data. The object is instatiated with the file location, analyzer, instrument, sample name, scan info, mz dataframe (as specifified), polarity, as well as the attributes holding the scans, retention times, and tics.
1639        """
1640        _, scan_df = self.run(spectra="none")  # first run it to just get scan info
1641        res, scan_df = self.run(
1642            scan_df=scan_df, spectra=spectra
1643        )  # second run to parse data
1644        lcms_obj = LCMSBase(
1645            self.file_location,
1646            self.analyzer,
1647            self.instrument_label,
1648            self.sample_name,
1649            self,
1650        )
1651        if spectra != "none":
1652            for key in res:
1653                key_int = int(key.replace("ms", ""))
1654                res[key] = res[key][res[key].intensity > 0]
1655                res[key] = (
1656                    res[key].sort_values(by=["scan", "mz"]).reset_index(drop=True)
1657                )
1658                lcms_obj._ms_unprocessed[key_int] = res[key]
1659        lcms_obj.scan_df = scan_df.set_index("scan", drop=False)
1660        # Check if polarity is mixed
1661        if len(set(scan_df.polarity)) > 1:
1662            raise ValueError("Mixed polarities detected in scan data")
1663        lcms_obj.polarity = scan_df.polarity[0]
1664        lcms_obj._scans_number_list = list(scan_df.scan)
1665        lcms_obj._retention_time_list = list(scan_df.scan_time)
1666        lcms_obj._tic_list = list(scan_df.tic)
1667
1668        return lcms_obj
1669
1670    def get_icr_transient_times(self):
1671        """Return a list for transient time targets for all scans, or selected scans range
1672
1673        Notes
1674        --------
1675        Resolving Power and Transient time targets based on 7T FT-ICR MS system
1676        """
1677
1678        res_trans_time = {
1679            "50": 0.384,
1680            "100000": 0.768,
1681            "200000": 1.536,
1682            "400000": 3.072,
1683            "750000": 6.144,
1684            "1000000": 12.288,
1685        }
1686
1687        firstScanNumber = self.start_scan
1688
1689        lastScanNumber = self.end_scan
1690
1691        transient_time_list = []
1692
1693        for scan in range(firstScanNumber, lastScanNumber):
1694            scan_header = self.get_scan_header(scan)
1695
1696            rp_target = scan_header["FT Resolution:"]
1697
1698            transient_time = res_trans_time.get(rp_target)
1699
1700            transient_time_list.append(transient_time)
1701
1702            # print(transient_time, rp_target)
1703
1704        return transient_time_list

A class for parsing Thermo RAW mass spectrometry data files and instatiating MassSpectraBase or LCMSBase objects

Parameters
  • file_location (str or Path): The path to the RAW file to be parsed.
  • analyzer (str, optional): The type of mass analyzer used in the instrument. Default is "Unknown".
  • instrument_label (str, optional): The name of the instrument used to acquire the data. Default is "Unknown".
  • sample_name (str, optional): The name of the sample being analyzed. If not provided, the stem of the file_location path will be used.
Attributes
  • file_location (Path): The path to the RAW file being parsed.
  • analyzer (str): The type of mass analyzer used in the instrument.
  • instrument_label (str): The name of the instrument used to acquire the data.
  • sample_name (str): The name of the sample being analyzed.
Methods
  • run(spectra=True). Parses the RAW file and returns a dictionary of mass spectra dataframes and a scan metadata dataframe.
  • get_mass_spectrum_from_scan(scan_number, polarity, auto_process=True) Parses the RAW file and returns a MassSpecBase object from a single scan.
  • get_mass_spectra_obj(). Parses the RAW file and instantiates a MassSpectraBase object.
  • get_lcms_obj(). Parses the RAW file and instantiates an LCMSBase object.
  • get_icr_transient_times(). Return a list for transient time targets for all scans, or selected scans range

Inherits from ThermoBaseClass and SpectraParserInterface

ImportMassSpectraThermoMSFileReader( file_location, analyzer='Unknown', instrument_label='Unknown', sample_name=None)
1311    def __init__(
1312        self,
1313        file_location,
1314        analyzer="Unknown",
1315        instrument_label="Unknown",
1316        sample_name=None,
1317    ):
1318        super().__init__(file_location)
1319        if isinstance(file_location, str):
1320            # if obj is a string it defaults to create a Path obj, pass the S3Path if needed
1321            file_location = Path(file_location)
1322        if not file_location.exists():
1323            raise FileExistsError("File does not exist: " + str(file_location))
1324
1325        self.file_location = file_location
1326        self.analyzer = analyzer
1327        self.instrument_label = instrument_label
1328
1329        if sample_name:
1330            self.sample_name = sample_name
1331        else:
1332            self.sample_name = file_location.stem

file_location: srt pathlib.Path or s3path.S3Path Thermo Raw file path

file_location
analyzer
instrument_label
def load(self):
1334    def load(self):
1335        pass

Load mass spectra data.

def get_scan_df(self):
1337    def get_scan_df(self):
1338        # This automatically brings in all the data
1339        self.chromatogram_settings.scans = (-1, -1)
1340
1341        # Get scan df info; starting with TIC data
1342        tic_data, _ = self.get_tic(ms_type="all", peak_detection=False, smooth=False)
1343        tic_data = {
1344            "scan": tic_data.scans,
1345            "scan_time": tic_data.time,
1346            "tic": tic_data.tic,
1347        }
1348        scan_df = pd.DataFrame.from_dict(tic_data)
1349        scan_df["ms_level"] = None
1350        
1351        # get scan text
1352        scan_filter_df = pd.DataFrame.from_dict(
1353            self.get_all_filters()[0], orient="index"
1354        )
1355        scan_filter_df.reset_index(inplace=True)
1356        scan_filter_df.rename(columns={"index": "scan", 0: "scan_text"}, inplace=True)
1357
1358        scan_df = scan_df.merge(scan_filter_df, on="scan", how="left")
1359        scan_df["scan_window_lower"] = scan_df.scan_text.str.extract(
1360            r"\[(\d+\.\d+)-\d+\.\d+\]"
1361        )
1362        scan_df["scan_window_upper"] = scan_df.scan_text.str.extract(
1363            r"\[\d+\.\d+-(\d+\.\d+)\]"
1364        )
1365        scan_df["polarity"] = np.where(
1366            scan_df.scan_text.str.contains(" - "), "negative", "positive"
1367        )
1368        scan_df["precursor_mz"] = scan_df.scan_text.str.extract(r"(\d+\.\d+)@")
1369        scan_df["precursor_mz"] = scan_df["precursor_mz"].astype(float)
1370
1371        # Assign each scan as centroid or profile and add ms_level
1372        scan_df["ms_format"] = None
1373        for i in scan_df.scan.to_list():
1374            scan_df.loc[scan_df.scan == i, "ms_level"] = self.get_ms_level_for_scan_num(i)
1375            if self.iRawDataPlus.IsCentroidScanFromScanNumber(i):
1376                scan_df.loc[scan_df.scan == i, "ms_format"] = "centroid"
1377            else:
1378                scan_df.loc[scan_df.scan == i, "ms_format"] = "profile"
1379
1380        return scan_df

Return scan data as a pandas DataFrame.

def get_ms_raw(self, spectra, scan_df):
1382    def get_ms_raw(self, spectra, scan_df):
1383        if spectra == "all":
1384            scan_df_forspec = scan_df
1385        elif spectra == "ms1":
1386            scan_df_forspec = scan_df[scan_df.ms_level == 1]
1387        elif spectra == "ms2":
1388            scan_df_forspec = scan_df[scan_df.ms_level == 2]
1389        else:
1390            raise ValueError("spectra must be 'none', 'all', 'ms1', or 'ms2'")
1391
1392        # Result container
1393        res = {}
1394
1395        # Row count container
1396        counter = {}
1397
1398        # Column name container
1399        cols = {}
1400
1401        # set at float32
1402        dtype = np.float32
1403
1404        # First pass: get nrows
1405        N = defaultdict(lambda: 0)
1406        for i in scan_df_forspec.scan.to_list():
1407            level = scan_df_forspec.loc[scan_df_forspec.scan == i, "ms_level"].values[0]
1408            scanStatistics = self.iRawDataPlus.GetScanStatsForScanNumber(i)
1409            profileStream = self.iRawDataPlus.GetSegmentedScanFromScanNumber(
1410                i, scanStatistics
1411            )
1412            abun = list(profileStream.Intensities)
1413            abun = np.array(abun)[np.where(np.array(abun) > 0)[0]]
1414
1415            N[level] += len(abun)
1416
1417        # Second pass: parse
1418        for i in scan_df_forspec.scan.to_list():
1419            scanStatistics = self.iRawDataPlus.GetScanStatsForScanNumber(i)
1420            profileStream = self.iRawDataPlus.GetSegmentedScanFromScanNumber(
1421                i, scanStatistics
1422            )
1423            abun = list(profileStream.Intensities)
1424            mz = list(profileStream.Positions)
1425
1426            # Get index of abun that are > 0
1427            inx = np.where(np.array(abun) > 0)[0]
1428            mz = np.array(mz)[inx]
1429            mz = np.float32(mz)
1430            abun = np.array(abun)[inx]
1431            abun = np.float32(abun)
1432
1433            level = scan_df_forspec.loc[scan_df_forspec.scan == i, "ms_level"].values[0]
1434
1435            # Number of rows
1436            n = len(mz)
1437
1438            # No measurements
1439            if n == 0:
1440                continue
1441
1442            # Dimension check
1443            if len(mz) != len(abun):
1444                warnings.warn("m/z and intensity array dimension mismatch")
1445                continue
1446
1447            # Scan/frame info
1448            id_dict = i
1449
1450            # Columns
1451            cols[level] = ["scan", "mz", "intensity"]
1452            m = len(cols[level])
1453
1454            # Subarray init
1455            arr = np.empty((n, m), dtype=dtype)
1456            inx = 0
1457
1458            # Populate scan/frame info
1459            arr[:, inx] = i
1460            inx += 1
1461
1462            # Populate m/z
1463            arr[:, inx] = mz
1464            inx += 1
1465
1466            # Populate intensity
1467            arr[:, inx] = abun
1468            inx += 1
1469
1470            # Initialize output container
1471            if level not in res:
1472                res[level] = np.empty((N[level], m), dtype=dtype)
1473                counter[level] = 0
1474
1475            # Insert subarray
1476            res[level][counter[level] : counter[level] + n, :] = arr
1477            counter[level] += n
1478
1479        # Construct ms1 and ms2 mz dataframes
1480        for level in res.keys():
1481            res[level] = pd.DataFrame(res[level])
1482            res[level].columns = cols[level]
1483        # rename keys in res to add 'ms' prefix
1484        res = {f"ms{key}": value for key, value in res.items()}
1485
1486        return res

Return a dictionary of mass spectra data as a pandas DataFrame.

def run(self, spectra='all', scan_df=None):
1488    def run(self, spectra="all", scan_df=None):
1489        """
1490        Extracts mass spectra data from a raw file.
1491
1492        Parameters
1493        ----------
1494        spectra : str, optional
1495            Which mass spectra data to include in the output. Default is all.  Other options: none, ms1, ms2.
1496        scan_df : pandas.DataFrame, optional
1497            Scan dataframe.  If not provided, the scan dataframe is created from the mzML file.
1498
1499        Returns
1500        -------
1501        tuple
1502            A tuple containing two elements:
1503            - A dictionary containing mass spectra data, separated by MS level.
1504            - A pandas DataFrame containing scan information, including scan number, scan time, TIC, MS level,
1505                scan text, scan window lower and upper bounds, polarity, and precursor m/z (if applicable).
1506        """
1507        # Prepare scan_df
1508        if scan_df is None:
1509            scan_df = self.get_scan_df()
1510
1511        # Prepare mass spectra data
1512        if spectra != "none":
1513            res = self.get_ms_raw(spectra=spectra, scan_df=scan_df)
1514        else:
1515            res = None
1516
1517        return res, scan_df

Extracts mass spectra data from a raw file.

Parameters
  • spectra (str, optional): Which mass spectra data to include in the output. Default is all. Other options: none, ms1, ms2.
  • scan_df (pandas.DataFrame, optional): Scan dataframe. If not provided, the scan dataframe is created from the mzML file.
Returns
  • tuple: A tuple containing two elements:
    • A dictionary containing mass spectra data, separated by MS level.
    • A pandas DataFrame containing scan information, including scan number, scan time, TIC, MS level, scan text, scan window lower and upper bounds, polarity, and precursor m/z (if applicable).
def get_mass_spectrum_from_scan(self, scan_number, spectrum_mode, auto_process=True):
1519    def get_mass_spectrum_from_scan(
1520        self, scan_number, spectrum_mode, auto_process=True
1521    ):
1522        """Instatiate a MassSpecBase object from a single scan number from the binary file, currently only supports profile mode.
1523
1524        Parameters
1525        ----------
1526        scan_number : int
1527            The scan number to extract the mass spectrum from.
1528        polarity : int
1529            The polarity of the scan.  1 for positive mode, -1 for negative mode.
1530        spectrum_mode : str
1531            The type of mass spectrum to extract.  Must be 'profile' or 'centroid'.
1532        auto_process : bool, optional
1533            If True, perform peak picking and noise threshold calculation after creating the mass spectrum object. Default is True.
1534
1535        Returns
1536        -------
1537        MassSpecProfile | MassSpecCentroid
1538            The MassSpecProfile or MassSpecCentroid object containing the parsed mass spectrum.
1539        """
1540
1541        if spectrum_mode == "profile":
1542            scanStatistics = self.iRawDataPlus.GetScanStatsForScanNumber(scan_number)
1543            profileStream = self.iRawDataPlus.GetSegmentedScanFromScanNumber(
1544                scan_number, scanStatistics
1545            )
1546            abun = list(profileStream.Intensities)
1547            mz = list(profileStream.Positions)
1548            data_dict = {
1549                Labels.mz: mz,
1550                Labels.abundance: abun,
1551            }
1552            d_params = self.set_metadata(
1553                firstScanNumber=scan_number,
1554                lastScanNumber=scan_number,
1555                scans_list=False,
1556                label=Labels.thermo_profile,
1557            )
1558            mass_spectrum_obj = MassSpecProfile(
1559                data_dict, d_params, auto_process=auto_process
1560            )
1561
1562        elif spectrum_mode == "centroid":
1563            centroid_scan = self.iRawDataPlus.GetCentroidStream(scan_number, False)
1564            if centroid_scan.Masses is not None:
1565                mz = list(centroid_scan.Masses)
1566                abun = list(centroid_scan.Intensities)
1567                rp = list(centroid_scan.Resolutions)
1568                magnitude = list(centroid_scan.Intensities)
1569                noise = list(centroid_scan.Noises)
1570                baselines = list(centroid_scan.Baselines)
1571                array_noise_std = (np.array(noise) - np.array(baselines)) / 3
1572                l_signal_to_noise = np.array(magnitude) / array_noise_std
1573                data_dict = {
1574                    Labels.mz: mz,
1575                    Labels.abundance: abun,
1576                    Labels.rp: rp,
1577                    Labels.s2n: list(l_signal_to_noise),
1578                }
1579            else:  # For CID MS2, the centroid data are stored in the profile data location, they do not have any associated rp or baseline data, but they should be treated as centroid data
1580                scanStatistics = self.iRawDataPlus.GetScanStatsForScanNumber(
1581                    scan_number
1582                )
1583                profileStream = self.iRawDataPlus.GetSegmentedScanFromScanNumber(
1584                    scan_number, scanStatistics
1585                )
1586                abun = list(profileStream.Intensities)
1587                mz = list(profileStream.Positions)
1588                data_dict = {
1589                    Labels.mz: mz,
1590                    Labels.abundance: abun,
1591                    Labels.rp: [np.nan] * len(mz),
1592                    Labels.s2n: [np.nan] * len(mz),
1593                }
1594            d_params = self.set_metadata(
1595                firstScanNumber=scan_number,
1596                lastScanNumber=scan_number,
1597                scans_list=False,
1598                label=Labels.thermo_centroid,
1599            )
1600            mass_spectrum_obj = MassSpecCentroid(
1601                data_dict, d_params, auto_process=auto_process
1602            )
1603
1604        return mass_spectrum_obj

Instatiate a MassSpecBase object from a single scan number from the binary file, currently only supports profile mode.

Parameters
  • scan_number (int): The scan number to extract the mass spectrum from.
  • polarity (int): The polarity of the scan. 1 for positive mode, -1 for negative mode.
  • spectrum_mode (str): The type of mass spectrum to extract. Must be 'profile' or 'centroid'.
  • auto_process (bool, optional): If True, perform peak picking and noise threshold calculation after creating the mass spectrum object. Default is True.
Returns
  • MassSpecProfile | MassSpecCentroid: The MassSpecProfile or MassSpecCentroid object containing the parsed mass spectrum.
def get_mass_spectra_obj(self):
1606    def get_mass_spectra_obj(self):
1607        """Instatiate a MassSpectraBase object from the binary data file file.
1608
1609        Returns
1610        -------
1611        MassSpectraBase
1612            The MassSpectra object containing the parsed mass spectra.  The object is instatiated with the mzML file, analyzer, instrument, sample name, and scan dataframe.
1613        """
1614        _, scan_df = self.run(spectra="none")
1615        mass_spectra_obj = MassSpectraBase(
1616            self.file_location,
1617            self.analyzer,
1618            self.instrument_label,
1619            self.sample_name,
1620            self,
1621        )
1622        scan_df = scan_df.set_index("scan", drop=False)
1623        mass_spectra_obj.scan_df = scan_df
1624
1625        return mass_spectra_obj

Instatiate a MassSpectraBase object from the binary data file file.

Returns
  • MassSpectraBase: The MassSpectra object containing the parsed mass spectra. The object is instatiated with the mzML file, analyzer, instrument, sample name, and scan dataframe.
def get_lcms_obj(self, spectra='all'):
1627    def get_lcms_obj(self, spectra="all"):
1628        """Instatiates a LCMSBase object from the mzML file.
1629
1630        Parameters
1631        ----------
1632        spectra : str, optional
1633            Which mass spectra data to include in the output. Default is "all".  Other options: "none", "ms1", "ms2".
1634
1635        Returns
1636        -------
1637        LCMSBase
1638            LCMS object containing mass spectra data. The object is instatiated with the file location, analyzer, instrument, sample name, scan info, mz dataframe (as specifified), polarity, as well as the attributes holding the scans, retention times, and tics.
1639        """
1640        _, scan_df = self.run(spectra="none")  # first run it to just get scan info
1641        res, scan_df = self.run(
1642            scan_df=scan_df, spectra=spectra
1643        )  # second run to parse data
1644        lcms_obj = LCMSBase(
1645            self.file_location,
1646            self.analyzer,
1647            self.instrument_label,
1648            self.sample_name,
1649            self,
1650        )
1651        if spectra != "none":
1652            for key in res:
1653                key_int = int(key.replace("ms", ""))
1654                res[key] = res[key][res[key].intensity > 0]
1655                res[key] = (
1656                    res[key].sort_values(by=["scan", "mz"]).reset_index(drop=True)
1657                )
1658                lcms_obj._ms_unprocessed[key_int] = res[key]
1659        lcms_obj.scan_df = scan_df.set_index("scan", drop=False)
1660        # Check if polarity is mixed
1661        if len(set(scan_df.polarity)) > 1:
1662            raise ValueError("Mixed polarities detected in scan data")
1663        lcms_obj.polarity = scan_df.polarity[0]
1664        lcms_obj._scans_number_list = list(scan_df.scan)
1665        lcms_obj._retention_time_list = list(scan_df.scan_time)
1666        lcms_obj._tic_list = list(scan_df.tic)
1667
1668        return lcms_obj

Instatiates a LCMSBase object from the mzML file.

Parameters
  • spectra (str, optional): Which mass spectra data to include in the output. Default is "all". Other options: "none", "ms1", "ms2".
Returns
  • LCMSBase: LCMS object containing mass spectra data. The object is instatiated with the file location, analyzer, instrument, sample name, scan info, mz dataframe (as specifified), polarity, as well as the attributes holding the scans, retention times, and tics.
def get_icr_transient_times(self):
1670    def get_icr_transient_times(self):
1671        """Return a list for transient time targets for all scans, or selected scans range
1672
1673        Notes
1674        --------
1675        Resolving Power and Transient time targets based on 7T FT-ICR MS system
1676        """
1677
1678        res_trans_time = {
1679            "50": 0.384,
1680            "100000": 0.768,
1681            "200000": 1.536,
1682            "400000": 3.072,
1683            "750000": 6.144,
1684            "1000000": 12.288,
1685        }
1686
1687        firstScanNumber = self.start_scan
1688
1689        lastScanNumber = self.end_scan
1690
1691        transient_time_list = []
1692
1693        for scan in range(firstScanNumber, lastScanNumber):
1694            scan_header = self.get_scan_header(scan)
1695
1696            rp_target = scan_header["FT Resolution:"]
1697
1698            transient_time = res_trans_time.get(rp_target)
1699
1700            transient_time_list.append(transient_time)
1701
1702            # print(transient_time, rp_target)
1703
1704        return transient_time_list

Return a list for transient time targets for all scans, or selected scans range

Notes

Resolving Power and Transient time targets based on 7T FT-ICR MS system