corems.mass_spectrum.calc.Calibration

Created on Wed May 13 02:16:09 2020

@author: Will Kew

  1# -*- coding: utf-8 -*-
  2"""
  3Created on Wed May 13 02:16:09 2020
  4
  5@author: Will Kew
  6"""
  7
  8# import modules
  9import csv
 10import warnings
 11from io import BytesIO, StringIO
 12from pathlib import Path
 13
 14import numpy as np
 15import pandas as pd
 16from s3path import S3Path
 17
 18# import scipy modules for calibration
 19from scipy.optimize import minimize
 20
 21
 22class MzDomainCalibration:
 23    """MzDomainCalibration class for recalibrating mass spectra
 24
 25    Parameters
 26    ----------
 27    mass_spectrum : CoreMS MassSpectrum Object
 28        The mass spectrum to be calibrated.
 29    ref_masslist : str
 30        The path to a reference mass list.
 31    mzsegment : tuple of floats, optional
 32        The mz range to recalibrate, or None. Used for calibration of specific parts of the mz domain at a time.
 33        Future work - allow multiple mzsegments to be passed.
 34
 35    Attributes
 36    ----------
 37    mass_spectrum : CoreMS MassSpectrum Object
 38        The mass spectrum to be calibrated.
 39    mzsegment : tuple of floats or None
 40        The mz range to recalibrate, or None.
 41    ref_mass_list_path : str or Path
 42        The path to the reference mass list.
 43
 44    Methods
 45    -------
 46    * run().
 47        Main function to run this class.
 48    * load_ref_mass_list().
 49        Load reference mass list (Bruker format).
 50    * gen_ref_mass_list_from_assigned(min_conf=0.7).
 51        Generate reference mass list from assigned masses.
 52    * find_calibration_points(df_ref, calib_ppm_error_threshold=(-1, 1), calib_snr_threshold=5).
 53        Find calibration points in the mass spectrum based on the reference mass list.
 54    * robust_calib(param, cal_peaks_mz, cal_refs_mz, order=1).
 55        Recalibration function.
 56    * recalibrate_mass_spectrum(cal_peaks_mz, cal_refs_mz, order=1, diagnostic=False).
 57        Main recalibration function which uses a robust linear regression.
 58
 59
 60    """
 61
 62    def __init__(self, mass_spectrum, ref_masslist, mzsegment=None):
 63        self.mass_spectrum = mass_spectrum
 64        self.mzsegment = mzsegment
 65
 66        # define reference mass list - bruker .ref format
 67        self.ref_mass_list_path = ref_masslist
 68        if self.mass_spectrum.percentile_assigned(mute_output=True)[0] != 0:
 69            warnings.warn(
 70                "Warning: calibrating spectra which have already been assigned may yield erroneous results"
 71            )
 72        self.mass_spectrum.mz_cal = self.mass_spectrum.mz_exp
 73        self.mass_spectrum.mz_cal_profile = self.mass_spectrum._mz_exp
 74
 75        if self.mass_spectrum.parameters.mass_spectrum.verbose_processing:
 76            print(
 77                "MS Obj loaded - " + str(len(mass_spectrum.mspeaks)) + " peaks found."
 78            )
 79     
 80    def load_ref_mass_list(self):
 81        """
 82        Load reference mass list (Bruker format).
 83
 84        Loads in a reference mass list from a .ref file. Some versions of
 85        Bruker's software produce .ref files with a different format where
 86        the header lines (starting with '#' or '##') and delimiters may vary.
 87        The file may be located locally or on S3 and will be handled accordingly.
 88        
 89        Returns
 90        -------
 91        df_ref : Pandas DataFrame
 92            Reference mass list object.
 93        """
 94        # Get a Path-like object from the input path string or S3Path
 95        refmasslist = (Path(self.ref_mass_list_path)
 96                    if isinstance(self.ref_mass_list_path, str)
 97                    else self.ref_mass_list_path)
 98        
 99        # Make sure the file exists
100        if not refmasslist.exists():
101            raise FileExistsError("File does not exist: %s" % refmasslist)
102        
103        # Read all lines from the file (handling S3 vs local differently)
104        if isinstance(refmasslist, S3Path):
105            # For S3, read the file in binary, then decode to string and split into lines.
106            content = refmasslist.open("rb").read()
107            all_lines = content.decode("utf-8").splitlines(keepends=True)
108        else:
109            # For a local file, open in text mode and read lines.
110            with refmasslist.open("r") as f:
111                all_lines = f.readlines()
112        
113        # Identify the index of the first line of the actual data.
114        # We assume header lines start with '#' (or '##') and ignore blank lines.
115        data_start_idx = 0
116        for idx, line in enumerate(all_lines):
117            if line.strip() and not line.lstrip().startswith("#"):
118                data_start_idx = idx
119                break
120        
121        # If there are not enough lines to guess the dialect, throw an error
122        if data_start_idx >= len(all_lines):
123            raise ValueError("The file does not appear to contain any data lines.")
124        
125        # Use a couple of the data lines to let csv.Sniffer detect the delimiter
126        sample_lines = "".join(all_lines[data_start_idx:data_start_idx+2])
127        try:
128            dialect = csv.Sniffer().sniff(sample_lines)
129            delimiter = dialect.delimiter
130        except csv.Error:
131            # If csv.Sniffer fails, default to a common delimiter (e.g., comma)
132            delimiter = ","
133        
134        # Join the lines from the beginning of data (this might include a blank line) 
135        joined_data = "".join(all_lines[data_start_idx:])
136        
137        # Depending on whether the file is S3 or local, wrap the data as needed for pandas
138        if isinstance(refmasslist, S3Path):
139            data_buffer = BytesIO(joined_data.encode("utf-8"))
140        else:
141            data_buffer = StringIO(joined_data)
142        
143        # Read data into a DataFrame.
144        # Adjust columns and names as needed – here we assume at least 2 columns:
145        df_ref = pd.read_csv(data_buffer,
146                            sep=delimiter,
147                            header=None,
148                            usecols=[0, 1],   # Modify if more columns are required.
149                            names=["Formula", "m/z"])
150        
151        df_ref.sort_values(by="m/z", ascending=True, inplace=True)
152        
153        if self.mass_spectrum.parameters.mass_spectrum.verbose_processing:
154            print("Reference mass list loaded - {} calibration masses loaded.".format(len(df_ref)))
155        
156        return df_ref
157
158    def gen_ref_mass_list_from_assigned(self, min_conf: float = 0.7):
159        """Generate reference mass list from assigned masses
160
161        This function will generate a ref mass dataframe object from an assigned corems mass spec obj
162        using assigned masses above a certain minimum confidence threshold.
163
164        This function needs to be retested and check it is covered in the unit tests.
165
166        Parameters
167        ----------
168        min_conf : float, optional
169            minimum confidence score. The default is 0.7.
170
171        Returns
172        -------
173        df_ref : Pandas DataFrame
174            reference mass list - based on calculated masses.
175
176        """
177        # TODO this function needs to be retested and check it is covered in the unit tests
178        df = self.mass_spectrum.to_dataframe()
179        df = df[df["Confidence Score"] > min_conf]
180        df_ref = pd.DataFrame(columns=["m/z"])
181        df_ref["m/z"] = df["Calculated m/z"]
182        if self.mass_spectrum.parameters.mass_spectrum.verbose_processing:
183            print(
184                "Reference mass list generated - "
185                + str(len(df_ref))
186                + " calibration masses."
187            )
188        return df_ref
189
190    def find_calibration_points(
191        self,
192        df_ref,
193        calib_ppm_error_threshold: tuple[float, float] = (-1, 1),
194        calib_snr_threshold: float = 5,
195        calibration_ref_match_method: str = "legacy",
196        calibration_ref_match_tolerance: float = 0.003,
197        calibration_ref_match_std_raw_error_limit: float = 1.5,
198    ):
199        """Function to find calibration points in the mass spectrum
200
201        Based on the reference mass list.
202
203        Parameters
204        ----------
205        df_ref : Pandas DataFrame
206            reference mass list for recalibration.
207        calib_ppm_error_threshold : tuple of floats, optional
208            ppm error for finding calibration masses in the spectrum. The default is -1,1.
209            Note: This is based on the calculation of ppm = ((mz_measure - mz_theoretical)/mz_theoretical)*1e6.
210                Some software does this the other way around and value signs must be inverted for that to work.
211        calib_snr_threshold : float, optional
212            snr threshold for finding calibration masses in the spectrum. The default is 5.
213            If SNR data is unavailable, peaks are filtered by intensity percentile using the formula:
214            percentile = max(5, 100 - calib_snr_threshold)
215
216        Returns
217        -------
218        cal_peaks_mz : list of floats
219            masses of measured ions to use in calibration routine
220        cal_refs_mz : list of floats
221            reference mz values of found calibration points.
222
223        """
224
225        # Check if SNR data is available by testing the first peak
226        use_snr = False
227        if len(self.mass_spectrum.mspeaks) > 0:
228            first_peak = self.mass_spectrum.mspeaks[0]
229            if (hasattr(first_peak, 'signal_to_noise') and 
230                first_peak.signal_to_noise is not None and 
231                not np.isnan(first_peak.signal_to_noise) and
232                first_peak.signal_to_noise > 0):
233                use_snr = True
234
235        # This approach is much more efficient and expedient than the original implementation.
236        peaks_mz = []
237        peaks_intensity = []
238        
239        if use_snr:
240            # Use SNR filtering
241            for x in self.mass_spectrum.mspeaks:
242                if x.signal_to_noise > calib_snr_threshold:
243                    if self.mzsegment:
244                        if min(self.mzsegment) <= x.mz_exp <= max(self.mzsegment):
245                            peaks_mz.append(x.mz_exp)
246                    else:
247                        peaks_mz.append(x.mz_exp)
248        else:
249            # Fallback to intensity percentile filtering
250            intensity_percentile = max(5, 100 - calib_snr_threshold)
251            warnings.warn(
252                f"SNR data unavailable for calibration. Using intensity-based filtering instead. "
253                f"SNR threshold of {calib_snr_threshold} corresponds to intensity percentile >= {intensity_percentile}%."
254            )
255            
256            # Collect all peaks and their intensities
257            all_peaks_data = []
258            for x in self.mass_spectrum.mspeaks:
259                if self.mzsegment:
260                    if min(self.mzsegment) <= x.mz_exp <= max(self.mzsegment):
261                        all_peaks_data.append((x.mz_exp, x.abundance))
262                else:
263                    all_peaks_data.append((x.mz_exp, x.abundance))
264            
265            if all_peaks_data:
266                peaks_mz_list, intensities = zip(*all_peaks_data)
267                intensity_threshold = np.percentile(intensities, intensity_percentile)
268                
269                for mz, intensity in all_peaks_data:
270                    if intensity >= intensity_threshold:
271                        peaks_mz.append(mz)
272        
273        peaks_mz = np.asarray(peaks_mz)
274
275        if calibration_ref_match_method == "legacy":
276            # This legacy approach iterates through each reference match and finds the entries within 1 mz and within the user defined PPM error threshold
277            # Then it removes ambiguities - which means the calibration threshold hasto be very tight.
278            cal_peaks_mz = []
279            cal_refs_mz = []
280            for mzref in df_ref["m/z"]:
281                tmp_peaks_mz = peaks_mz[abs(peaks_mz - mzref) < 1]
282                for mzmeas in tmp_peaks_mz:
283                    delta_mass = ((mzmeas - mzref) / mzref) * 1e6
284                    if delta_mass < max(calib_ppm_error_threshold):
285                        if delta_mass > min(calib_ppm_error_threshold):
286                            cal_peaks_mz.append(mzmeas)
287                            cal_refs_mz.append(mzref)
288
289            # To remove entries with duplicated indices (reference masses matching multiple peaks)
290            tmpdf = pd.Series(index=cal_refs_mz, data=cal_peaks_mz, dtype=float)
291            tmpdf = tmpdf[~tmpdf.index.duplicated(keep=False)]
292
293            cal_peaks_mz = list(tmpdf.values)
294            cal_refs_mz = list(tmpdf.index)
295        elif calibration_ref_match_method == "merged":
296            #warnings.warn("Using experimental new reference mass list merging")
297            # This is a new approach (August 2024) which uses Pandas 'merged_asof' to find the peaks closest in m/z between
298            # reference and measured masses. This is a quicker way to match, and seems to get more matches.
299            # It may not work as well when the data are far from correc initial mass
300            # e.g. if the correct peak is further from the reference than an incorrect peak.
301            meas_df = pd.DataFrame(columns=["meas_m/z"], data=peaks_mz)
302            tolerance = calibration_ref_match_tolerance
303            merged_df = pd.merge_asof(
304                df_ref,
305                meas_df,
306                left_on="m/z",
307                right_on="meas_m/z",
308                tolerance=tolerance,
309                direction="nearest",
310            )
311            merged_df.dropna(how="any", inplace=True)
312            merged_df["Error_ppm"] = (
313                (merged_df["meas_m/z"] - merged_df["m/z"]) / merged_df["m/z"]
314            ) * 1e6
315            median_raw_error = merged_df["Error_ppm"].median()
316            std_raw_error = merged_df["Error_ppm"].std()
317            if std_raw_error > calibration_ref_match_std_raw_error_limit:
318                std_raw_error = calibration_ref_match_std_raw_error_limit
319            self.mass_spectrum.calibration_raw_error_median = median_raw_error
320            self.mass_spectrum.calibration_raw_error_stdev = std_raw_error
321            merged_df = merged_df[
322                (merged_df["Error_ppm"] > (median_raw_error - 1.5 * std_raw_error))
323                & (merged_df["Error_ppm"] < (median_raw_error + 1.5 * std_raw_error))
324            ]
325            # merged_df= merged_df[(merged_df['Error_ppm']>min(calib_ppm_error_threshold))&(merged_df['Error_ppm']<max(calib_ppm_error_threshold))]
326            cal_peaks_mz = list(merged_df["meas_m/z"])
327            cal_refs_mz = list(merged_df["m/z"])
328        else:
329            raise ValueError(f"{calibration_ref_match_method} not allowed.")
330
331        if False:
332            min_calib_ppm_error = calib_ppm_error_threshold[0]
333            max_calib_ppm_error = calib_ppm_error_threshold[1]
334            df_raw = self.mass_spectrum.to_dataframe()
335
336            df_raw = df_raw[df_raw["S/N"] > calib_snr_threshold]
337            # optionally further subset that based on minimum S/N, RP, Peak Height
338            # to ensure only valid points are utilized
339            # in this example, only a S/N threshold is implemented.
340            imzmeas = []
341            mzrefs = []
342
343            for mzref in df_ref["m/z"]:
344                # find all peaks within a defined ppm error threshold
345                tmpdf = df_raw[
346                    ((df_raw["m/z"] - mzref) / mzref) * 1e6 < max_calib_ppm_error
347                ]
348                # Error is relative to the theoretical, so the divisor should be divisor
349
350                tmpdf = tmpdf[
351                    ((tmpdf["m/z"] - mzref) / mzref) * 1e6 > min_calib_ppm_error
352                ]
353
354                # only use the calibration point if only one peak is within the thresholds
355                # This may require some optimization of the threshold tolerances
356                if len(tmpdf) == 1:
357                    imzmeas.append(int(tmpdf.index.values))
358                    mzrefs.append(mzref)
359
360        # it is crucial the mass lists are in same order
361        # corems likes to do masses from high to low.
362        cal_refs_mz.sort(reverse=False)
363        cal_peaks_mz.sort(reverse=False)
364        if self.mass_spectrum.parameters.mass_spectrum.verbose_processing:
365            print(
366                str(len(cal_peaks_mz))
367                + " calibration points matched within thresholds."
368            )
369        return cal_peaks_mz, cal_refs_mz
370
371    def robust_calib(
372        self,
373        param: list[float],
374        cal_peaks_mz: list[float],
375        cal_refs_mz: list[float],
376        order: int = 1,
377    ):
378        """Recalibration function
379
380        Computes the rms of m/z errors to minimize when calibrating.
381        This is adapted from from spike.
382
383        Parameters
384        ----------
385        param : list of floats
386            generated by minimize function from scipy optimize.
387        cal_peaks_mz : list of floats
388            masses of measured peaks to use in mass calibration.
389        cal_peaks_mz : list of floats
390            reference mz values of found calibration points.
391        order : int, optional
392            order of the recalibration function. 1 = linear, 2 = quadratic. The default is 1.
393
394        Returns
395        -------
396        rmserror : float
397            root mean square mass error for calibration points.
398
399        """
400        Aterm = param[0]
401        Bterm = param[1]
402        try:
403            Cterm = param[2]
404        except IndexError:
405            pass
406
407        # get the mspeaks from the mass spectrum object which were calibration points
408        # mspeaks = [self.mass_spectrum.mspeaks[x] for x in imzmeas]
409        # get their calibrated mass values
410        # mspeakmzs = [x.mz_cal for x in mspeaks]
411        cal_peaks_mz = np.asarray(cal_peaks_mz)
412
413        # linearz
414        if order == 1:
415            ref_recal_points = (Aterm * cal_peaks_mz) + Bterm
416        # quadratic
417        elif order == 2:
418            ref_recal_points = (Aterm * (cal_peaks_mz)) + (
419                Bterm * np.power((cal_peaks_mz), 2) + Cterm
420            )
421
422        # sort both the calibration points (measured, recalibrated)
423        ref_recal_points.sort()
424        # and sort the calibration points (theoretical, predefined)
425        cal_refs_mz.sort()
426
427        # calculate the ppm error for each calibration point
428        error = ((ref_recal_points - cal_refs_mz) / cal_refs_mz) * 1e6
429        # calculate the root mean square error - this is our target to minimize
430        rmserror = np.sqrt(np.mean(error**2))
431        return rmserror
432
433    def recalibrate_mass_spectrum(
434        self,
435        cal_peaks_mz: list[float],
436        cal_refs_mz: list[float],
437        order: int = 1,
438        diagnostic: bool = False,
439    ):
440        """Main recalibration function which uses a robust linear regression
441
442        This function performs the recalibration of the mass spectrum object.
443        It iteratively applies
444
445        Parameters
446        ----------
447        cal_peaks_mz : list of float
448            masses of measured peaks to use in mass calibration.
449        cal_refs_mz : list of float
450            reference mz values of found calibration points.
451        order : int, optional
452            order of the recalibration function. 1 = linear, 2 = quadratic. The default is 1.
453
454        Returns
455        -------
456        mass_spectrum : CoreMS mass spectrum object
457            Calibrated mass spectrum object
458
459
460        Notes
461        -----
462        This function is adapted, in part, from the SPIKE project [1,2] and is based on the robust linear regression method.
463
464        References
465        ----------
466        1.  Chiron L., Coutouly M-A., Starck J-P., Rolando C., Delsuc M-A.
467            SPIKE a Processing Software dedicated to Fourier Spectroscopies
468            https://arxiv.org/abs/1608.06777 (2016)
469        2.  SPIKE - https://github.com/spike-project/spike
470
471        """
472        # initialise parameters for recalibration
473        # these are the 'Aterm, Bterm, Cterm'
474        # as spectra are already freq->mz calibrated, these terms are very small
475        # may be beneficial to formally separate them from the freq->mz terms
476        if order == 1:
477            Po = [1, 0]
478        elif order == 2:
479            Po = [1, 0, 0]
480
481        if len(cal_peaks_mz) >= 2:
482            if self.mzsegment:  # If only part of the spectrum is to be recalibrated
483                mz_exp_peaks = np.array(
484                    [mspeak.mz_exp for mspeak in self.mass_spectrum]
485                )
486                # Split the array into two parts - one to recailbrate, one to keep unchanged.
487                mz_exp_peaks_tocal = mz_exp_peaks[
488                    (mz_exp_peaks >= min(self.mzsegment))
489                    & (mz_exp_peaks <= max(self.mzsegment))
490                ]
491                mz_exp_peaks_unchanged = mz_exp_peaks[
492                    ~(mz_exp_peaks >= min(self.mzsegment))
493                    | ~(mz_exp_peaks <= max(self.mzsegment))
494                ]
495                # TODO: - segmented calibration needs a way to better track the calibration args/values...
496                if not self.mass_spectrum.is_centroid:
497                    mz_exp_profile = np.array(self.mass_spectrum.mz_exp_profile)
498                    # Split the array into two parts - one to recailbrate, one to keep unchanged.
499                    mz_exp_profile_tocal = mz_exp_profile[
500                        (mz_exp_profile >= min(self.mzsegment))
501                        & (mz_exp_profile <= max(self.mzsegment))
502                    ]
503                    mz_exp_profile_unchanged = mz_exp_profile[
504                        ~(mz_exp_profile >= min(self.mzsegment))
505                        | ~(mz_exp_profile <= max(self.mzsegment))
506                    ]
507            else:  # if just recalibrating the whole spectrum
508                mz_exp_peaks_tocal = np.array(
509                    [mspeak.mz_exp for mspeak in self.mass_spectrum]
510                )
511                if not self.mass_spectrum.is_centroid:
512                    mz_exp_profile_tocal = np.array(self.mass_spectrum.mz_exp_profile)
513
514            minimize_method = self.mass_spectrum.settings.calib_minimize_method
515            res = minimize(
516                self.robust_calib,
517                Po,
518                args=(cal_peaks_mz, cal_refs_mz, order),
519                method=minimize_method,
520            )
521            if self.mass_spectrum.parameters.mass_spectrum.verbose_processing:
522                print(
523                    "minimize function completed with RMS error of: {:0.3f} ppm".format(
524                        res["fun"]
525                    )
526                )
527                print(
528                    "minimize function performed {:1d} fn evals and {:1d} iterations".format(
529                        res["nfev"], res["nit"]
530                    )
531                )
532            Pn = res.x
533
534            # mz_exp_ms = np.array([mspeak.mz_exp for mspeak in self.mass_spectrum])
535
536            if order == 1:
537                mz_domain = (Pn[0] * mz_exp_peaks_tocal) + Pn[1]
538                if not self.mass_spectrum.is_centroid:
539                    mz_profile_calc = (Pn[0] * mz_exp_profile_tocal) + Pn[1]
540
541            elif order == 2:
542                mz_domain = (Pn[0] * (mz_exp_peaks_tocal)) + (
543                    Pn[1] * np.power((mz_exp_peaks_tocal), 2) + Pn[2]
544                )
545
546                if not self.mass_spectrum.is_centroid:
547                    mz_profile_calc = (Pn[0] * (mz_exp_profile_tocal)) + (
548                        Pn[1] * np.power((mz_exp_profile_tocal), 2) + Pn[2]
549                    )
550
551            if self.mzsegment:
552                # Recombine the mass domains
553                mz_domain = np.concatenate([mz_domain, mz_exp_peaks_unchanged])
554                mz_domain.sort()
555                if not self.mass_spectrum.is_centroid:
556                    mz_profile_calc = np.concatenate(
557                        [mz_profile_calc, mz_exp_profile_unchanged]
558                    )
559                    mz_profile_calc.sort()
560                # Sort them
561                if (
562                    mz_exp_peaks[0] > mz_exp_peaks[1]
563                ):  # If originally descending mass order
564                    mz_domain = mz_domain[::-1]
565                    if not self.mass_spectrum.is_centroid:
566                        mz_profile_calc = mz_profile_calc[::-1]
567
568            self.mass_spectrum.mz_cal = mz_domain
569            if not self.mass_spectrum.is_centroid:
570                self.mass_spectrum.mz_cal_profile = mz_profile_calc
571
572            self.mass_spectrum.calibration_order = order
573            self.mass_spectrum.calibration_RMS = float(res["fun"])
574            self.mass_spectrum.calibration_points = int(len(cal_refs_mz))
575            self.mass_spectrum.calibration_ref_mzs = cal_refs_mz
576            self.mass_spectrum.calibration_meas_mzs = cal_peaks_mz
577
578            self.mass_spectrum.calibration_segment = self.mzsegment
579
580            if diagnostic:
581                return self.mass_spectrum, res
582            return self.mass_spectrum
583        else:
584            warnings.warn("Too few calibration points - aborting.")
585            return self.mass_spectrum
586
587    def run(self):
588        """Run the calibration routine
589
590        This function runs the calibration routine.
591
592        """
593        calib_snr_threshold = self.mass_spectrum.settings.calib_sn_threshold
594        max_calib_ppm_error = self.mass_spectrum.settings.max_calib_ppm_error
595        min_calib_ppm_error = self.mass_spectrum.settings.min_calib_ppm_error
596        calib_pol_order = self.mass_spectrum.settings.calib_pol_order
597        calibration_ref_match_method = (
598            self.mass_spectrum.settings.calibration_ref_match_method
599        )
600        calibration_ref_match_tolerance = (
601            self.mass_spectrum.settings.calibration_ref_match_tolerance
602        )
603        calibration_ref_match_std_raw_error_limit = (
604            self.mass_spectrum.settings.calibration_ref_match_std_raw_error_limit
605        )
606
607        # load reference mass list
608        df_ref = self.load_ref_mass_list()
609
610        # find calibration points
611        cal_peaks_mz, cal_refs_mz = self.find_calibration_points(
612            df_ref,
613            calib_ppm_error_threshold=(min_calib_ppm_error, max_calib_ppm_error),
614            calib_snr_threshold=calib_snr_threshold,
615            calibration_ref_match_method=calibration_ref_match_method,
616            calibration_ref_match_tolerance=calibration_ref_match_tolerance,
617            calibration_ref_match_std_raw_error_limit=calibration_ref_match_std_raw_error_limit,
618        )
619        if len(cal_peaks_mz) == 2:
620            self.mass_spectrum.settings.calib_pol_order = 1
621            calib_pol_order = 1
622            if self.mass_spectrum.parameters.mass_spectrum.verbose_processing:
623                print("Only 2 calibration points found, forcing a linear recalibration")
624        elif len(cal_peaks_mz) < 2:
625            warnings.warn("Too few calibration points found, function will fail")
626        self.recalibrate_mass_spectrum(cal_peaks_mz, cal_refs_mz, order=calib_pol_order)
class MzDomainCalibration:
 23class MzDomainCalibration:
 24    """MzDomainCalibration class for recalibrating mass spectra
 25
 26    Parameters
 27    ----------
 28    mass_spectrum : CoreMS MassSpectrum Object
 29        The mass spectrum to be calibrated.
 30    ref_masslist : str
 31        The path to a reference mass list.
 32    mzsegment : tuple of floats, optional
 33        The mz range to recalibrate, or None. Used for calibration of specific parts of the mz domain at a time.
 34        Future work - allow multiple mzsegments to be passed.
 35
 36    Attributes
 37    ----------
 38    mass_spectrum : CoreMS MassSpectrum Object
 39        The mass spectrum to be calibrated.
 40    mzsegment : tuple of floats or None
 41        The mz range to recalibrate, or None.
 42    ref_mass_list_path : str or Path
 43        The path to the reference mass list.
 44
 45    Methods
 46    -------
 47    * run().
 48        Main function to run this class.
 49    * load_ref_mass_list().
 50        Load reference mass list (Bruker format).
 51    * gen_ref_mass_list_from_assigned(min_conf=0.7).
 52        Generate reference mass list from assigned masses.
 53    * find_calibration_points(df_ref, calib_ppm_error_threshold=(-1, 1), calib_snr_threshold=5).
 54        Find calibration points in the mass spectrum based on the reference mass list.
 55    * robust_calib(param, cal_peaks_mz, cal_refs_mz, order=1).
 56        Recalibration function.
 57    * recalibrate_mass_spectrum(cal_peaks_mz, cal_refs_mz, order=1, diagnostic=False).
 58        Main recalibration function which uses a robust linear regression.
 59
 60
 61    """
 62
 63    def __init__(self, mass_spectrum, ref_masslist, mzsegment=None):
 64        self.mass_spectrum = mass_spectrum
 65        self.mzsegment = mzsegment
 66
 67        # define reference mass list - bruker .ref format
 68        self.ref_mass_list_path = ref_masslist
 69        if self.mass_spectrum.percentile_assigned(mute_output=True)[0] != 0:
 70            warnings.warn(
 71                "Warning: calibrating spectra which have already been assigned may yield erroneous results"
 72            )
 73        self.mass_spectrum.mz_cal = self.mass_spectrum.mz_exp
 74        self.mass_spectrum.mz_cal_profile = self.mass_spectrum._mz_exp
 75
 76        if self.mass_spectrum.parameters.mass_spectrum.verbose_processing:
 77            print(
 78                "MS Obj loaded - " + str(len(mass_spectrum.mspeaks)) + " peaks found."
 79            )
 80     
 81    def load_ref_mass_list(self):
 82        """
 83        Load reference mass list (Bruker format).
 84
 85        Loads in a reference mass list from a .ref file. Some versions of
 86        Bruker's software produce .ref files with a different format where
 87        the header lines (starting with '#' or '##') and delimiters may vary.
 88        The file may be located locally or on S3 and will be handled accordingly.
 89        
 90        Returns
 91        -------
 92        df_ref : Pandas DataFrame
 93            Reference mass list object.
 94        """
 95        # Get a Path-like object from the input path string or S3Path
 96        refmasslist = (Path(self.ref_mass_list_path)
 97                    if isinstance(self.ref_mass_list_path, str)
 98                    else self.ref_mass_list_path)
 99        
100        # Make sure the file exists
101        if not refmasslist.exists():
102            raise FileExistsError("File does not exist: %s" % refmasslist)
103        
104        # Read all lines from the file (handling S3 vs local differently)
105        if isinstance(refmasslist, S3Path):
106            # For S3, read the file in binary, then decode to string and split into lines.
107            content = refmasslist.open("rb").read()
108            all_lines = content.decode("utf-8").splitlines(keepends=True)
109        else:
110            # For a local file, open in text mode and read lines.
111            with refmasslist.open("r") as f:
112                all_lines = f.readlines()
113        
114        # Identify the index of the first line of the actual data.
115        # We assume header lines start with '#' (or '##') and ignore blank lines.
116        data_start_idx = 0
117        for idx, line in enumerate(all_lines):
118            if line.strip() and not line.lstrip().startswith("#"):
119                data_start_idx = idx
120                break
121        
122        # If there are not enough lines to guess the dialect, throw an error
123        if data_start_idx >= len(all_lines):
124            raise ValueError("The file does not appear to contain any data lines.")
125        
126        # Use a couple of the data lines to let csv.Sniffer detect the delimiter
127        sample_lines = "".join(all_lines[data_start_idx:data_start_idx+2])
128        try:
129            dialect = csv.Sniffer().sniff(sample_lines)
130            delimiter = dialect.delimiter
131        except csv.Error:
132            # If csv.Sniffer fails, default to a common delimiter (e.g., comma)
133            delimiter = ","
134        
135        # Join the lines from the beginning of data (this might include a blank line) 
136        joined_data = "".join(all_lines[data_start_idx:])
137        
138        # Depending on whether the file is S3 or local, wrap the data as needed for pandas
139        if isinstance(refmasslist, S3Path):
140            data_buffer = BytesIO(joined_data.encode("utf-8"))
141        else:
142            data_buffer = StringIO(joined_data)
143        
144        # Read data into a DataFrame.
145        # Adjust columns and names as needed – here we assume at least 2 columns:
146        df_ref = pd.read_csv(data_buffer,
147                            sep=delimiter,
148                            header=None,
149                            usecols=[0, 1],   # Modify if more columns are required.
150                            names=["Formula", "m/z"])
151        
152        df_ref.sort_values(by="m/z", ascending=True, inplace=True)
153        
154        if self.mass_spectrum.parameters.mass_spectrum.verbose_processing:
155            print("Reference mass list loaded - {} calibration masses loaded.".format(len(df_ref)))
156        
157        return df_ref
158
159    def gen_ref_mass_list_from_assigned(self, min_conf: float = 0.7):
160        """Generate reference mass list from assigned masses
161
162        This function will generate a ref mass dataframe object from an assigned corems mass spec obj
163        using assigned masses above a certain minimum confidence threshold.
164
165        This function needs to be retested and check it is covered in the unit tests.
166
167        Parameters
168        ----------
169        min_conf : float, optional
170            minimum confidence score. The default is 0.7.
171
172        Returns
173        -------
174        df_ref : Pandas DataFrame
175            reference mass list - based on calculated masses.
176
177        """
178        # TODO this function needs to be retested and check it is covered in the unit tests
179        df = self.mass_spectrum.to_dataframe()
180        df = df[df["Confidence Score"] > min_conf]
181        df_ref = pd.DataFrame(columns=["m/z"])
182        df_ref["m/z"] = df["Calculated m/z"]
183        if self.mass_spectrum.parameters.mass_spectrum.verbose_processing:
184            print(
185                "Reference mass list generated - "
186                + str(len(df_ref))
187                + " calibration masses."
188            )
189        return df_ref
190
191    def find_calibration_points(
192        self,
193        df_ref,
194        calib_ppm_error_threshold: tuple[float, float] = (-1, 1),
195        calib_snr_threshold: float = 5,
196        calibration_ref_match_method: str = "legacy",
197        calibration_ref_match_tolerance: float = 0.003,
198        calibration_ref_match_std_raw_error_limit: float = 1.5,
199    ):
200        """Function to find calibration points in the mass spectrum
201
202        Based on the reference mass list.
203
204        Parameters
205        ----------
206        df_ref : Pandas DataFrame
207            reference mass list for recalibration.
208        calib_ppm_error_threshold : tuple of floats, optional
209            ppm error for finding calibration masses in the spectrum. The default is -1,1.
210            Note: This is based on the calculation of ppm = ((mz_measure - mz_theoretical)/mz_theoretical)*1e6.
211                Some software does this the other way around and value signs must be inverted for that to work.
212        calib_snr_threshold : float, optional
213            snr threshold for finding calibration masses in the spectrum. The default is 5.
214            If SNR data is unavailable, peaks are filtered by intensity percentile using the formula:
215            percentile = max(5, 100 - calib_snr_threshold)
216
217        Returns
218        -------
219        cal_peaks_mz : list of floats
220            masses of measured ions to use in calibration routine
221        cal_refs_mz : list of floats
222            reference mz values of found calibration points.
223
224        """
225
226        # Check if SNR data is available by testing the first peak
227        use_snr = False
228        if len(self.mass_spectrum.mspeaks) > 0:
229            first_peak = self.mass_spectrum.mspeaks[0]
230            if (hasattr(first_peak, 'signal_to_noise') and 
231                first_peak.signal_to_noise is not None and 
232                not np.isnan(first_peak.signal_to_noise) and
233                first_peak.signal_to_noise > 0):
234                use_snr = True
235
236        # This approach is much more efficient and expedient than the original implementation.
237        peaks_mz = []
238        peaks_intensity = []
239        
240        if use_snr:
241            # Use SNR filtering
242            for x in self.mass_spectrum.mspeaks:
243                if x.signal_to_noise > calib_snr_threshold:
244                    if self.mzsegment:
245                        if min(self.mzsegment) <= x.mz_exp <= max(self.mzsegment):
246                            peaks_mz.append(x.mz_exp)
247                    else:
248                        peaks_mz.append(x.mz_exp)
249        else:
250            # Fallback to intensity percentile filtering
251            intensity_percentile = max(5, 100 - calib_snr_threshold)
252            warnings.warn(
253                f"SNR data unavailable for calibration. Using intensity-based filtering instead. "
254                f"SNR threshold of {calib_snr_threshold} corresponds to intensity percentile >= {intensity_percentile}%."
255            )
256            
257            # Collect all peaks and their intensities
258            all_peaks_data = []
259            for x in self.mass_spectrum.mspeaks:
260                if self.mzsegment:
261                    if min(self.mzsegment) <= x.mz_exp <= max(self.mzsegment):
262                        all_peaks_data.append((x.mz_exp, x.abundance))
263                else:
264                    all_peaks_data.append((x.mz_exp, x.abundance))
265            
266            if all_peaks_data:
267                peaks_mz_list, intensities = zip(*all_peaks_data)
268                intensity_threshold = np.percentile(intensities, intensity_percentile)
269                
270                for mz, intensity in all_peaks_data:
271                    if intensity >= intensity_threshold:
272                        peaks_mz.append(mz)
273        
274        peaks_mz = np.asarray(peaks_mz)
275
276        if calibration_ref_match_method == "legacy":
277            # This legacy approach iterates through each reference match and finds the entries within 1 mz and within the user defined PPM error threshold
278            # Then it removes ambiguities - which means the calibration threshold hasto be very tight.
279            cal_peaks_mz = []
280            cal_refs_mz = []
281            for mzref in df_ref["m/z"]:
282                tmp_peaks_mz = peaks_mz[abs(peaks_mz - mzref) < 1]
283                for mzmeas in tmp_peaks_mz:
284                    delta_mass = ((mzmeas - mzref) / mzref) * 1e6
285                    if delta_mass < max(calib_ppm_error_threshold):
286                        if delta_mass > min(calib_ppm_error_threshold):
287                            cal_peaks_mz.append(mzmeas)
288                            cal_refs_mz.append(mzref)
289
290            # To remove entries with duplicated indices (reference masses matching multiple peaks)
291            tmpdf = pd.Series(index=cal_refs_mz, data=cal_peaks_mz, dtype=float)
292            tmpdf = tmpdf[~tmpdf.index.duplicated(keep=False)]
293
294            cal_peaks_mz = list(tmpdf.values)
295            cal_refs_mz = list(tmpdf.index)
296        elif calibration_ref_match_method == "merged":
297            #warnings.warn("Using experimental new reference mass list merging")
298            # This is a new approach (August 2024) which uses Pandas 'merged_asof' to find the peaks closest in m/z between
299            # reference and measured masses. This is a quicker way to match, and seems to get more matches.
300            # It may not work as well when the data are far from correc initial mass
301            # e.g. if the correct peak is further from the reference than an incorrect peak.
302            meas_df = pd.DataFrame(columns=["meas_m/z"], data=peaks_mz)
303            tolerance = calibration_ref_match_tolerance
304            merged_df = pd.merge_asof(
305                df_ref,
306                meas_df,
307                left_on="m/z",
308                right_on="meas_m/z",
309                tolerance=tolerance,
310                direction="nearest",
311            )
312            merged_df.dropna(how="any", inplace=True)
313            merged_df["Error_ppm"] = (
314                (merged_df["meas_m/z"] - merged_df["m/z"]) / merged_df["m/z"]
315            ) * 1e6
316            median_raw_error = merged_df["Error_ppm"].median()
317            std_raw_error = merged_df["Error_ppm"].std()
318            if std_raw_error > calibration_ref_match_std_raw_error_limit:
319                std_raw_error = calibration_ref_match_std_raw_error_limit
320            self.mass_spectrum.calibration_raw_error_median = median_raw_error
321            self.mass_spectrum.calibration_raw_error_stdev = std_raw_error
322            merged_df = merged_df[
323                (merged_df["Error_ppm"] > (median_raw_error - 1.5 * std_raw_error))
324                & (merged_df["Error_ppm"] < (median_raw_error + 1.5 * std_raw_error))
325            ]
326            # merged_df= merged_df[(merged_df['Error_ppm']>min(calib_ppm_error_threshold))&(merged_df['Error_ppm']<max(calib_ppm_error_threshold))]
327            cal_peaks_mz = list(merged_df["meas_m/z"])
328            cal_refs_mz = list(merged_df["m/z"])
329        else:
330            raise ValueError(f"{calibration_ref_match_method} not allowed.")
331
332        if False:
333            min_calib_ppm_error = calib_ppm_error_threshold[0]
334            max_calib_ppm_error = calib_ppm_error_threshold[1]
335            df_raw = self.mass_spectrum.to_dataframe()
336
337            df_raw = df_raw[df_raw["S/N"] > calib_snr_threshold]
338            # optionally further subset that based on minimum S/N, RP, Peak Height
339            # to ensure only valid points are utilized
340            # in this example, only a S/N threshold is implemented.
341            imzmeas = []
342            mzrefs = []
343
344            for mzref in df_ref["m/z"]:
345                # find all peaks within a defined ppm error threshold
346                tmpdf = df_raw[
347                    ((df_raw["m/z"] - mzref) / mzref) * 1e6 < max_calib_ppm_error
348                ]
349                # Error is relative to the theoretical, so the divisor should be divisor
350
351                tmpdf = tmpdf[
352                    ((tmpdf["m/z"] - mzref) / mzref) * 1e6 > min_calib_ppm_error
353                ]
354
355                # only use the calibration point if only one peak is within the thresholds
356                # This may require some optimization of the threshold tolerances
357                if len(tmpdf) == 1:
358                    imzmeas.append(int(tmpdf.index.values))
359                    mzrefs.append(mzref)
360
361        # it is crucial the mass lists are in same order
362        # corems likes to do masses from high to low.
363        cal_refs_mz.sort(reverse=False)
364        cal_peaks_mz.sort(reverse=False)
365        if self.mass_spectrum.parameters.mass_spectrum.verbose_processing:
366            print(
367                str(len(cal_peaks_mz))
368                + " calibration points matched within thresholds."
369            )
370        return cal_peaks_mz, cal_refs_mz
371
372    def robust_calib(
373        self,
374        param: list[float],
375        cal_peaks_mz: list[float],
376        cal_refs_mz: list[float],
377        order: int = 1,
378    ):
379        """Recalibration function
380
381        Computes the rms of m/z errors to minimize when calibrating.
382        This is adapted from from spike.
383
384        Parameters
385        ----------
386        param : list of floats
387            generated by minimize function from scipy optimize.
388        cal_peaks_mz : list of floats
389            masses of measured peaks to use in mass calibration.
390        cal_peaks_mz : list of floats
391            reference mz values of found calibration points.
392        order : int, optional
393            order of the recalibration function. 1 = linear, 2 = quadratic. The default is 1.
394
395        Returns
396        -------
397        rmserror : float
398            root mean square mass error for calibration points.
399
400        """
401        Aterm = param[0]
402        Bterm = param[1]
403        try:
404            Cterm = param[2]
405        except IndexError:
406            pass
407
408        # get the mspeaks from the mass spectrum object which were calibration points
409        # mspeaks = [self.mass_spectrum.mspeaks[x] for x in imzmeas]
410        # get their calibrated mass values
411        # mspeakmzs = [x.mz_cal for x in mspeaks]
412        cal_peaks_mz = np.asarray(cal_peaks_mz)
413
414        # linearz
415        if order == 1:
416            ref_recal_points = (Aterm * cal_peaks_mz) + Bterm
417        # quadratic
418        elif order == 2:
419            ref_recal_points = (Aterm * (cal_peaks_mz)) + (
420                Bterm * np.power((cal_peaks_mz), 2) + Cterm
421            )
422
423        # sort both the calibration points (measured, recalibrated)
424        ref_recal_points.sort()
425        # and sort the calibration points (theoretical, predefined)
426        cal_refs_mz.sort()
427
428        # calculate the ppm error for each calibration point
429        error = ((ref_recal_points - cal_refs_mz) / cal_refs_mz) * 1e6
430        # calculate the root mean square error - this is our target to minimize
431        rmserror = np.sqrt(np.mean(error**2))
432        return rmserror
433
434    def recalibrate_mass_spectrum(
435        self,
436        cal_peaks_mz: list[float],
437        cal_refs_mz: list[float],
438        order: int = 1,
439        diagnostic: bool = False,
440    ):
441        """Main recalibration function which uses a robust linear regression
442
443        This function performs the recalibration of the mass spectrum object.
444        It iteratively applies
445
446        Parameters
447        ----------
448        cal_peaks_mz : list of float
449            masses of measured peaks to use in mass calibration.
450        cal_refs_mz : list of float
451            reference mz values of found calibration points.
452        order : int, optional
453            order of the recalibration function. 1 = linear, 2 = quadratic. The default is 1.
454
455        Returns
456        -------
457        mass_spectrum : CoreMS mass spectrum object
458            Calibrated mass spectrum object
459
460
461        Notes
462        -----
463        This function is adapted, in part, from the SPIKE project [1,2] and is based on the robust linear regression method.
464
465        References
466        ----------
467        1.  Chiron L., Coutouly M-A., Starck J-P., Rolando C., Delsuc M-A.
468            SPIKE a Processing Software dedicated to Fourier Spectroscopies
469            https://arxiv.org/abs/1608.06777 (2016)
470        2.  SPIKE - https://github.com/spike-project/spike
471
472        """
473        # initialise parameters for recalibration
474        # these are the 'Aterm, Bterm, Cterm'
475        # as spectra are already freq->mz calibrated, these terms are very small
476        # may be beneficial to formally separate them from the freq->mz terms
477        if order == 1:
478            Po = [1, 0]
479        elif order == 2:
480            Po = [1, 0, 0]
481
482        if len(cal_peaks_mz) >= 2:
483            if self.mzsegment:  # If only part of the spectrum is to be recalibrated
484                mz_exp_peaks = np.array(
485                    [mspeak.mz_exp for mspeak in self.mass_spectrum]
486                )
487                # Split the array into two parts - one to recailbrate, one to keep unchanged.
488                mz_exp_peaks_tocal = mz_exp_peaks[
489                    (mz_exp_peaks >= min(self.mzsegment))
490                    & (mz_exp_peaks <= max(self.mzsegment))
491                ]
492                mz_exp_peaks_unchanged = mz_exp_peaks[
493                    ~(mz_exp_peaks >= min(self.mzsegment))
494                    | ~(mz_exp_peaks <= max(self.mzsegment))
495                ]
496                # TODO: - segmented calibration needs a way to better track the calibration args/values...
497                if not self.mass_spectrum.is_centroid:
498                    mz_exp_profile = np.array(self.mass_spectrum.mz_exp_profile)
499                    # Split the array into two parts - one to recailbrate, one to keep unchanged.
500                    mz_exp_profile_tocal = mz_exp_profile[
501                        (mz_exp_profile >= min(self.mzsegment))
502                        & (mz_exp_profile <= max(self.mzsegment))
503                    ]
504                    mz_exp_profile_unchanged = mz_exp_profile[
505                        ~(mz_exp_profile >= min(self.mzsegment))
506                        | ~(mz_exp_profile <= max(self.mzsegment))
507                    ]
508            else:  # if just recalibrating the whole spectrum
509                mz_exp_peaks_tocal = np.array(
510                    [mspeak.mz_exp for mspeak in self.mass_spectrum]
511                )
512                if not self.mass_spectrum.is_centroid:
513                    mz_exp_profile_tocal = np.array(self.mass_spectrum.mz_exp_profile)
514
515            minimize_method = self.mass_spectrum.settings.calib_minimize_method
516            res = minimize(
517                self.robust_calib,
518                Po,
519                args=(cal_peaks_mz, cal_refs_mz, order),
520                method=minimize_method,
521            )
522            if self.mass_spectrum.parameters.mass_spectrum.verbose_processing:
523                print(
524                    "minimize function completed with RMS error of: {:0.3f} ppm".format(
525                        res["fun"]
526                    )
527                )
528                print(
529                    "minimize function performed {:1d} fn evals and {:1d} iterations".format(
530                        res["nfev"], res["nit"]
531                    )
532                )
533            Pn = res.x
534
535            # mz_exp_ms = np.array([mspeak.mz_exp for mspeak in self.mass_spectrum])
536
537            if order == 1:
538                mz_domain = (Pn[0] * mz_exp_peaks_tocal) + Pn[1]
539                if not self.mass_spectrum.is_centroid:
540                    mz_profile_calc = (Pn[0] * mz_exp_profile_tocal) + Pn[1]
541
542            elif order == 2:
543                mz_domain = (Pn[0] * (mz_exp_peaks_tocal)) + (
544                    Pn[1] * np.power((mz_exp_peaks_tocal), 2) + Pn[2]
545                )
546
547                if not self.mass_spectrum.is_centroid:
548                    mz_profile_calc = (Pn[0] * (mz_exp_profile_tocal)) + (
549                        Pn[1] * np.power((mz_exp_profile_tocal), 2) + Pn[2]
550                    )
551
552            if self.mzsegment:
553                # Recombine the mass domains
554                mz_domain = np.concatenate([mz_domain, mz_exp_peaks_unchanged])
555                mz_domain.sort()
556                if not self.mass_spectrum.is_centroid:
557                    mz_profile_calc = np.concatenate(
558                        [mz_profile_calc, mz_exp_profile_unchanged]
559                    )
560                    mz_profile_calc.sort()
561                # Sort them
562                if (
563                    mz_exp_peaks[0] > mz_exp_peaks[1]
564                ):  # If originally descending mass order
565                    mz_domain = mz_domain[::-1]
566                    if not self.mass_spectrum.is_centroid:
567                        mz_profile_calc = mz_profile_calc[::-1]
568
569            self.mass_spectrum.mz_cal = mz_domain
570            if not self.mass_spectrum.is_centroid:
571                self.mass_spectrum.mz_cal_profile = mz_profile_calc
572
573            self.mass_spectrum.calibration_order = order
574            self.mass_spectrum.calibration_RMS = float(res["fun"])
575            self.mass_spectrum.calibration_points = int(len(cal_refs_mz))
576            self.mass_spectrum.calibration_ref_mzs = cal_refs_mz
577            self.mass_spectrum.calibration_meas_mzs = cal_peaks_mz
578
579            self.mass_spectrum.calibration_segment = self.mzsegment
580
581            if diagnostic:
582                return self.mass_spectrum, res
583            return self.mass_spectrum
584        else:
585            warnings.warn("Too few calibration points - aborting.")
586            return self.mass_spectrum
587
588    def run(self):
589        """Run the calibration routine
590
591        This function runs the calibration routine.
592
593        """
594        calib_snr_threshold = self.mass_spectrum.settings.calib_sn_threshold
595        max_calib_ppm_error = self.mass_spectrum.settings.max_calib_ppm_error
596        min_calib_ppm_error = self.mass_spectrum.settings.min_calib_ppm_error
597        calib_pol_order = self.mass_spectrum.settings.calib_pol_order
598        calibration_ref_match_method = (
599            self.mass_spectrum.settings.calibration_ref_match_method
600        )
601        calibration_ref_match_tolerance = (
602            self.mass_spectrum.settings.calibration_ref_match_tolerance
603        )
604        calibration_ref_match_std_raw_error_limit = (
605            self.mass_spectrum.settings.calibration_ref_match_std_raw_error_limit
606        )
607
608        # load reference mass list
609        df_ref = self.load_ref_mass_list()
610
611        # find calibration points
612        cal_peaks_mz, cal_refs_mz = self.find_calibration_points(
613            df_ref,
614            calib_ppm_error_threshold=(min_calib_ppm_error, max_calib_ppm_error),
615            calib_snr_threshold=calib_snr_threshold,
616            calibration_ref_match_method=calibration_ref_match_method,
617            calibration_ref_match_tolerance=calibration_ref_match_tolerance,
618            calibration_ref_match_std_raw_error_limit=calibration_ref_match_std_raw_error_limit,
619        )
620        if len(cal_peaks_mz) == 2:
621            self.mass_spectrum.settings.calib_pol_order = 1
622            calib_pol_order = 1
623            if self.mass_spectrum.parameters.mass_spectrum.verbose_processing:
624                print("Only 2 calibration points found, forcing a linear recalibration")
625        elif len(cal_peaks_mz) < 2:
626            warnings.warn("Too few calibration points found, function will fail")
627        self.recalibrate_mass_spectrum(cal_peaks_mz, cal_refs_mz, order=calib_pol_order)

MzDomainCalibration class for recalibrating mass spectra

Parameters
  • mass_spectrum (CoreMS MassSpectrum Object): The mass spectrum to be calibrated.
  • ref_masslist (str): The path to a reference mass list.
  • mzsegment (tuple of floats, optional): The mz range to recalibrate, or None. Used for calibration of specific parts of the mz domain at a time. Future work - allow multiple mzsegments to be passed.
Attributes
  • mass_spectrum (CoreMS MassSpectrum Object): The mass spectrum to be calibrated.
  • mzsegment (tuple of floats or None): The mz range to recalibrate, or None.
  • ref_mass_list_path (str or Path): The path to the reference mass list.
Methods
  • run(). Main function to run this class.
  • load_ref_mass_list(). Load reference mass list (Bruker format).
  • gen_ref_mass_list_from_assigned(min_conf=0.7). Generate reference mass list from assigned masses.
  • find_calibration_points(df_ref, calib_ppm_error_threshold=(-1, 1), calib_snr_threshold=5). Find calibration points in the mass spectrum based on the reference mass list.
  • robust_calib(param, cal_peaks_mz, cal_refs_mz, order=1). Recalibration function.
  • recalibrate_mass_spectrum(cal_peaks_mz, cal_refs_mz, order=1, diagnostic=False). Main recalibration function which uses a robust linear regression.
MzDomainCalibration(mass_spectrum, ref_masslist, mzsegment=None)
63    def __init__(self, mass_spectrum, ref_masslist, mzsegment=None):
64        self.mass_spectrum = mass_spectrum
65        self.mzsegment = mzsegment
66
67        # define reference mass list - bruker .ref format
68        self.ref_mass_list_path = ref_masslist
69        if self.mass_spectrum.percentile_assigned(mute_output=True)[0] != 0:
70            warnings.warn(
71                "Warning: calibrating spectra which have already been assigned may yield erroneous results"
72            )
73        self.mass_spectrum.mz_cal = self.mass_spectrum.mz_exp
74        self.mass_spectrum.mz_cal_profile = self.mass_spectrum._mz_exp
75
76        if self.mass_spectrum.parameters.mass_spectrum.verbose_processing:
77            print(
78                "MS Obj loaded - " + str(len(mass_spectrum.mspeaks)) + " peaks found."
79            )
mass_spectrum
mzsegment
ref_mass_list_path
def load_ref_mass_list(self):
 81    def load_ref_mass_list(self):
 82        """
 83        Load reference mass list (Bruker format).
 84
 85        Loads in a reference mass list from a .ref file. Some versions of
 86        Bruker's software produce .ref files with a different format where
 87        the header lines (starting with '#' or '##') and delimiters may vary.
 88        The file may be located locally or on S3 and will be handled accordingly.
 89        
 90        Returns
 91        -------
 92        df_ref : Pandas DataFrame
 93            Reference mass list object.
 94        """
 95        # Get a Path-like object from the input path string or S3Path
 96        refmasslist = (Path(self.ref_mass_list_path)
 97                    if isinstance(self.ref_mass_list_path, str)
 98                    else self.ref_mass_list_path)
 99        
100        # Make sure the file exists
101        if not refmasslist.exists():
102            raise FileExistsError("File does not exist: %s" % refmasslist)
103        
104        # Read all lines from the file (handling S3 vs local differently)
105        if isinstance(refmasslist, S3Path):
106            # For S3, read the file in binary, then decode to string and split into lines.
107            content = refmasslist.open("rb").read()
108            all_lines = content.decode("utf-8").splitlines(keepends=True)
109        else:
110            # For a local file, open in text mode and read lines.
111            with refmasslist.open("r") as f:
112                all_lines = f.readlines()
113        
114        # Identify the index of the first line of the actual data.
115        # We assume header lines start with '#' (or '##') and ignore blank lines.
116        data_start_idx = 0
117        for idx, line in enumerate(all_lines):
118            if line.strip() and not line.lstrip().startswith("#"):
119                data_start_idx = idx
120                break
121        
122        # If there are not enough lines to guess the dialect, throw an error
123        if data_start_idx >= len(all_lines):
124            raise ValueError("The file does not appear to contain any data lines.")
125        
126        # Use a couple of the data lines to let csv.Sniffer detect the delimiter
127        sample_lines = "".join(all_lines[data_start_idx:data_start_idx+2])
128        try:
129            dialect = csv.Sniffer().sniff(sample_lines)
130            delimiter = dialect.delimiter
131        except csv.Error:
132            # If csv.Sniffer fails, default to a common delimiter (e.g., comma)
133            delimiter = ","
134        
135        # Join the lines from the beginning of data (this might include a blank line) 
136        joined_data = "".join(all_lines[data_start_idx:])
137        
138        # Depending on whether the file is S3 or local, wrap the data as needed for pandas
139        if isinstance(refmasslist, S3Path):
140            data_buffer = BytesIO(joined_data.encode("utf-8"))
141        else:
142            data_buffer = StringIO(joined_data)
143        
144        # Read data into a DataFrame.
145        # Adjust columns and names as needed – here we assume at least 2 columns:
146        df_ref = pd.read_csv(data_buffer,
147                            sep=delimiter,
148                            header=None,
149                            usecols=[0, 1],   # Modify if more columns are required.
150                            names=["Formula", "m/z"])
151        
152        df_ref.sort_values(by="m/z", ascending=True, inplace=True)
153        
154        if self.mass_spectrum.parameters.mass_spectrum.verbose_processing:
155            print("Reference mass list loaded - {} calibration masses loaded.".format(len(df_ref)))
156        
157        return df_ref

Load reference mass list (Bruker format).

Loads in a reference mass list from a .ref file. Some versions of Bruker's software produce .ref files with a different format where the header lines (starting with '#' or '##') and delimiters may vary. The file may be located locally or on S3 and will be handled accordingly.

Returns
  • df_ref (Pandas DataFrame): Reference mass list object.
def gen_ref_mass_list_from_assigned(self, min_conf: float = 0.7):
159    def gen_ref_mass_list_from_assigned(self, min_conf: float = 0.7):
160        """Generate reference mass list from assigned masses
161
162        This function will generate a ref mass dataframe object from an assigned corems mass spec obj
163        using assigned masses above a certain minimum confidence threshold.
164
165        This function needs to be retested and check it is covered in the unit tests.
166
167        Parameters
168        ----------
169        min_conf : float, optional
170            minimum confidence score. The default is 0.7.
171
172        Returns
173        -------
174        df_ref : Pandas DataFrame
175            reference mass list - based on calculated masses.
176
177        """
178        # TODO this function needs to be retested and check it is covered in the unit tests
179        df = self.mass_spectrum.to_dataframe()
180        df = df[df["Confidence Score"] > min_conf]
181        df_ref = pd.DataFrame(columns=["m/z"])
182        df_ref["m/z"] = df["Calculated m/z"]
183        if self.mass_spectrum.parameters.mass_spectrum.verbose_processing:
184            print(
185                "Reference mass list generated - "
186                + str(len(df_ref))
187                + " calibration masses."
188            )
189        return df_ref

Generate reference mass list from assigned masses

This function will generate a ref mass dataframe object from an assigned corems mass spec obj using assigned masses above a certain minimum confidence threshold.

This function needs to be retested and check it is covered in the unit tests.

Parameters
  • min_conf (float, optional): minimum confidence score. The default is 0.7.
Returns
  • df_ref (Pandas DataFrame): reference mass list - based on calculated masses.
def find_calibration_points( self, df_ref, calib_ppm_error_threshold: tuple[float, float] = (-1, 1), calib_snr_threshold: float = 5, calibration_ref_match_method: str = 'legacy', calibration_ref_match_tolerance: float = 0.003, calibration_ref_match_std_raw_error_limit: float = 1.5):
191    def find_calibration_points(
192        self,
193        df_ref,
194        calib_ppm_error_threshold: tuple[float, float] = (-1, 1),
195        calib_snr_threshold: float = 5,
196        calibration_ref_match_method: str = "legacy",
197        calibration_ref_match_tolerance: float = 0.003,
198        calibration_ref_match_std_raw_error_limit: float = 1.5,
199    ):
200        """Function to find calibration points in the mass spectrum
201
202        Based on the reference mass list.
203
204        Parameters
205        ----------
206        df_ref : Pandas DataFrame
207            reference mass list for recalibration.
208        calib_ppm_error_threshold : tuple of floats, optional
209            ppm error for finding calibration masses in the spectrum. The default is -1,1.
210            Note: This is based on the calculation of ppm = ((mz_measure - mz_theoretical)/mz_theoretical)*1e6.
211                Some software does this the other way around and value signs must be inverted for that to work.
212        calib_snr_threshold : float, optional
213            snr threshold for finding calibration masses in the spectrum. The default is 5.
214            If SNR data is unavailable, peaks are filtered by intensity percentile using the formula:
215            percentile = max(5, 100 - calib_snr_threshold)
216
217        Returns
218        -------
219        cal_peaks_mz : list of floats
220            masses of measured ions to use in calibration routine
221        cal_refs_mz : list of floats
222            reference mz values of found calibration points.
223
224        """
225
226        # Check if SNR data is available by testing the first peak
227        use_snr = False
228        if len(self.mass_spectrum.mspeaks) > 0:
229            first_peak = self.mass_spectrum.mspeaks[0]
230            if (hasattr(first_peak, 'signal_to_noise') and 
231                first_peak.signal_to_noise is not None and 
232                not np.isnan(first_peak.signal_to_noise) and
233                first_peak.signal_to_noise > 0):
234                use_snr = True
235
236        # This approach is much more efficient and expedient than the original implementation.
237        peaks_mz = []
238        peaks_intensity = []
239        
240        if use_snr:
241            # Use SNR filtering
242            for x in self.mass_spectrum.mspeaks:
243                if x.signal_to_noise > calib_snr_threshold:
244                    if self.mzsegment:
245                        if min(self.mzsegment) <= x.mz_exp <= max(self.mzsegment):
246                            peaks_mz.append(x.mz_exp)
247                    else:
248                        peaks_mz.append(x.mz_exp)
249        else:
250            # Fallback to intensity percentile filtering
251            intensity_percentile = max(5, 100 - calib_snr_threshold)
252            warnings.warn(
253                f"SNR data unavailable for calibration. Using intensity-based filtering instead. "
254                f"SNR threshold of {calib_snr_threshold} corresponds to intensity percentile >= {intensity_percentile}%."
255            )
256            
257            # Collect all peaks and their intensities
258            all_peaks_data = []
259            for x in self.mass_spectrum.mspeaks:
260                if self.mzsegment:
261                    if min(self.mzsegment) <= x.mz_exp <= max(self.mzsegment):
262                        all_peaks_data.append((x.mz_exp, x.abundance))
263                else:
264                    all_peaks_data.append((x.mz_exp, x.abundance))
265            
266            if all_peaks_data:
267                peaks_mz_list, intensities = zip(*all_peaks_data)
268                intensity_threshold = np.percentile(intensities, intensity_percentile)
269                
270                for mz, intensity in all_peaks_data:
271                    if intensity >= intensity_threshold:
272                        peaks_mz.append(mz)
273        
274        peaks_mz = np.asarray(peaks_mz)
275
276        if calibration_ref_match_method == "legacy":
277            # This legacy approach iterates through each reference match and finds the entries within 1 mz and within the user defined PPM error threshold
278            # Then it removes ambiguities - which means the calibration threshold hasto be very tight.
279            cal_peaks_mz = []
280            cal_refs_mz = []
281            for mzref in df_ref["m/z"]:
282                tmp_peaks_mz = peaks_mz[abs(peaks_mz - mzref) < 1]
283                for mzmeas in tmp_peaks_mz:
284                    delta_mass = ((mzmeas - mzref) / mzref) * 1e6
285                    if delta_mass < max(calib_ppm_error_threshold):
286                        if delta_mass > min(calib_ppm_error_threshold):
287                            cal_peaks_mz.append(mzmeas)
288                            cal_refs_mz.append(mzref)
289
290            # To remove entries with duplicated indices (reference masses matching multiple peaks)
291            tmpdf = pd.Series(index=cal_refs_mz, data=cal_peaks_mz, dtype=float)
292            tmpdf = tmpdf[~tmpdf.index.duplicated(keep=False)]
293
294            cal_peaks_mz = list(tmpdf.values)
295            cal_refs_mz = list(tmpdf.index)
296        elif calibration_ref_match_method == "merged":
297            #warnings.warn("Using experimental new reference mass list merging")
298            # This is a new approach (August 2024) which uses Pandas 'merged_asof' to find the peaks closest in m/z between
299            # reference and measured masses. This is a quicker way to match, and seems to get more matches.
300            # It may not work as well when the data are far from correc initial mass
301            # e.g. if the correct peak is further from the reference than an incorrect peak.
302            meas_df = pd.DataFrame(columns=["meas_m/z"], data=peaks_mz)
303            tolerance = calibration_ref_match_tolerance
304            merged_df = pd.merge_asof(
305                df_ref,
306                meas_df,
307                left_on="m/z",
308                right_on="meas_m/z",
309                tolerance=tolerance,
310                direction="nearest",
311            )
312            merged_df.dropna(how="any", inplace=True)
313            merged_df["Error_ppm"] = (
314                (merged_df["meas_m/z"] - merged_df["m/z"]) / merged_df["m/z"]
315            ) * 1e6
316            median_raw_error = merged_df["Error_ppm"].median()
317            std_raw_error = merged_df["Error_ppm"].std()
318            if std_raw_error > calibration_ref_match_std_raw_error_limit:
319                std_raw_error = calibration_ref_match_std_raw_error_limit
320            self.mass_spectrum.calibration_raw_error_median = median_raw_error
321            self.mass_spectrum.calibration_raw_error_stdev = std_raw_error
322            merged_df = merged_df[
323                (merged_df["Error_ppm"] > (median_raw_error - 1.5 * std_raw_error))
324                & (merged_df["Error_ppm"] < (median_raw_error + 1.5 * std_raw_error))
325            ]
326            # merged_df= merged_df[(merged_df['Error_ppm']>min(calib_ppm_error_threshold))&(merged_df['Error_ppm']<max(calib_ppm_error_threshold))]
327            cal_peaks_mz = list(merged_df["meas_m/z"])
328            cal_refs_mz = list(merged_df["m/z"])
329        else:
330            raise ValueError(f"{calibration_ref_match_method} not allowed.")
331
332        if False:
333            min_calib_ppm_error = calib_ppm_error_threshold[0]
334            max_calib_ppm_error = calib_ppm_error_threshold[1]
335            df_raw = self.mass_spectrum.to_dataframe()
336
337            df_raw = df_raw[df_raw["S/N"] > calib_snr_threshold]
338            # optionally further subset that based on minimum S/N, RP, Peak Height
339            # to ensure only valid points are utilized
340            # in this example, only a S/N threshold is implemented.
341            imzmeas = []
342            mzrefs = []
343
344            for mzref in df_ref["m/z"]:
345                # find all peaks within a defined ppm error threshold
346                tmpdf = df_raw[
347                    ((df_raw["m/z"] - mzref) / mzref) * 1e6 < max_calib_ppm_error
348                ]
349                # Error is relative to the theoretical, so the divisor should be divisor
350
351                tmpdf = tmpdf[
352                    ((tmpdf["m/z"] - mzref) / mzref) * 1e6 > min_calib_ppm_error
353                ]
354
355                # only use the calibration point if only one peak is within the thresholds
356                # This may require some optimization of the threshold tolerances
357                if len(tmpdf) == 1:
358                    imzmeas.append(int(tmpdf.index.values))
359                    mzrefs.append(mzref)
360
361        # it is crucial the mass lists are in same order
362        # corems likes to do masses from high to low.
363        cal_refs_mz.sort(reverse=False)
364        cal_peaks_mz.sort(reverse=False)
365        if self.mass_spectrum.parameters.mass_spectrum.verbose_processing:
366            print(
367                str(len(cal_peaks_mz))
368                + " calibration points matched within thresholds."
369            )
370        return cal_peaks_mz, cal_refs_mz

Function to find calibration points in the mass spectrum

Based on the reference mass list.

Parameters
  • df_ref (Pandas DataFrame): reference mass list for recalibration.
  • calib_ppm_error_threshold (tuple of floats, optional): ppm error for finding calibration masses in the spectrum. The default is -1,1. Note: This is based on the calculation of ppm = ((mz_measure - mz_theoretical)/mz_theoretical)*1e6. Some software does this the other way around and value signs must be inverted for that to work.
  • calib_snr_threshold (float, optional): snr threshold for finding calibration masses in the spectrum. The default is 5. If SNR data is unavailable, peaks are filtered by intensity percentile using the formula: percentile = max(5, 100 - calib_snr_threshold)
Returns
  • cal_peaks_mz (list of floats): masses of measured ions to use in calibration routine
  • cal_refs_mz (list of floats): reference mz values of found calibration points.
def robust_calib( self, param: list[float], cal_peaks_mz: list[float], cal_refs_mz: list[float], order: int = 1):
372    def robust_calib(
373        self,
374        param: list[float],
375        cal_peaks_mz: list[float],
376        cal_refs_mz: list[float],
377        order: int = 1,
378    ):
379        """Recalibration function
380
381        Computes the rms of m/z errors to minimize when calibrating.
382        This is adapted from from spike.
383
384        Parameters
385        ----------
386        param : list of floats
387            generated by minimize function from scipy optimize.
388        cal_peaks_mz : list of floats
389            masses of measured peaks to use in mass calibration.
390        cal_peaks_mz : list of floats
391            reference mz values of found calibration points.
392        order : int, optional
393            order of the recalibration function. 1 = linear, 2 = quadratic. The default is 1.
394
395        Returns
396        -------
397        rmserror : float
398            root mean square mass error for calibration points.
399
400        """
401        Aterm = param[0]
402        Bterm = param[1]
403        try:
404            Cterm = param[2]
405        except IndexError:
406            pass
407
408        # get the mspeaks from the mass spectrum object which were calibration points
409        # mspeaks = [self.mass_spectrum.mspeaks[x] for x in imzmeas]
410        # get their calibrated mass values
411        # mspeakmzs = [x.mz_cal for x in mspeaks]
412        cal_peaks_mz = np.asarray(cal_peaks_mz)
413
414        # linearz
415        if order == 1:
416            ref_recal_points = (Aterm * cal_peaks_mz) + Bterm
417        # quadratic
418        elif order == 2:
419            ref_recal_points = (Aterm * (cal_peaks_mz)) + (
420                Bterm * np.power((cal_peaks_mz), 2) + Cterm
421            )
422
423        # sort both the calibration points (measured, recalibrated)
424        ref_recal_points.sort()
425        # and sort the calibration points (theoretical, predefined)
426        cal_refs_mz.sort()
427
428        # calculate the ppm error for each calibration point
429        error = ((ref_recal_points - cal_refs_mz) / cal_refs_mz) * 1e6
430        # calculate the root mean square error - this is our target to minimize
431        rmserror = np.sqrt(np.mean(error**2))
432        return rmserror

Recalibration function

Computes the rms of m/z errors to minimize when calibrating. This is adapted from from spike.

Parameters
  • param (list of floats): generated by minimize function from scipy optimize.
  • cal_peaks_mz (list of floats): masses of measured peaks to use in mass calibration.
  • cal_peaks_mz (list of floats): reference mz values of found calibration points.
  • order (int, optional): order of the recalibration function. 1 = linear, 2 = quadratic. The default is 1.
Returns
  • rmserror (float): root mean square mass error for calibration points.
def recalibrate_mass_spectrum( self, cal_peaks_mz: list[float], cal_refs_mz: list[float], order: int = 1, diagnostic: bool = False):
434    def recalibrate_mass_spectrum(
435        self,
436        cal_peaks_mz: list[float],
437        cal_refs_mz: list[float],
438        order: int = 1,
439        diagnostic: bool = False,
440    ):
441        """Main recalibration function which uses a robust linear regression
442
443        This function performs the recalibration of the mass spectrum object.
444        It iteratively applies
445
446        Parameters
447        ----------
448        cal_peaks_mz : list of float
449            masses of measured peaks to use in mass calibration.
450        cal_refs_mz : list of float
451            reference mz values of found calibration points.
452        order : int, optional
453            order of the recalibration function. 1 = linear, 2 = quadratic. The default is 1.
454
455        Returns
456        -------
457        mass_spectrum : CoreMS mass spectrum object
458            Calibrated mass spectrum object
459
460
461        Notes
462        -----
463        This function is adapted, in part, from the SPIKE project [1,2] and is based on the robust linear regression method.
464
465        References
466        ----------
467        1.  Chiron L., Coutouly M-A., Starck J-P., Rolando C., Delsuc M-A.
468            SPIKE a Processing Software dedicated to Fourier Spectroscopies
469            https://arxiv.org/abs/1608.06777 (2016)
470        2.  SPIKE - https://github.com/spike-project/spike
471
472        """
473        # initialise parameters for recalibration
474        # these are the 'Aterm, Bterm, Cterm'
475        # as spectra are already freq->mz calibrated, these terms are very small
476        # may be beneficial to formally separate them from the freq->mz terms
477        if order == 1:
478            Po = [1, 0]
479        elif order == 2:
480            Po = [1, 0, 0]
481
482        if len(cal_peaks_mz) >= 2:
483            if self.mzsegment:  # If only part of the spectrum is to be recalibrated
484                mz_exp_peaks = np.array(
485                    [mspeak.mz_exp for mspeak in self.mass_spectrum]
486                )
487                # Split the array into two parts - one to recailbrate, one to keep unchanged.
488                mz_exp_peaks_tocal = mz_exp_peaks[
489                    (mz_exp_peaks >= min(self.mzsegment))
490                    & (mz_exp_peaks <= max(self.mzsegment))
491                ]
492                mz_exp_peaks_unchanged = mz_exp_peaks[
493                    ~(mz_exp_peaks >= min(self.mzsegment))
494                    | ~(mz_exp_peaks <= max(self.mzsegment))
495                ]
496                # TODO: - segmented calibration needs a way to better track the calibration args/values...
497                if not self.mass_spectrum.is_centroid:
498                    mz_exp_profile = np.array(self.mass_spectrum.mz_exp_profile)
499                    # Split the array into two parts - one to recailbrate, one to keep unchanged.
500                    mz_exp_profile_tocal = mz_exp_profile[
501                        (mz_exp_profile >= min(self.mzsegment))
502                        & (mz_exp_profile <= max(self.mzsegment))
503                    ]
504                    mz_exp_profile_unchanged = mz_exp_profile[
505                        ~(mz_exp_profile >= min(self.mzsegment))
506                        | ~(mz_exp_profile <= max(self.mzsegment))
507                    ]
508            else:  # if just recalibrating the whole spectrum
509                mz_exp_peaks_tocal = np.array(
510                    [mspeak.mz_exp for mspeak in self.mass_spectrum]
511                )
512                if not self.mass_spectrum.is_centroid:
513                    mz_exp_profile_tocal = np.array(self.mass_spectrum.mz_exp_profile)
514
515            minimize_method = self.mass_spectrum.settings.calib_minimize_method
516            res = minimize(
517                self.robust_calib,
518                Po,
519                args=(cal_peaks_mz, cal_refs_mz, order),
520                method=minimize_method,
521            )
522            if self.mass_spectrum.parameters.mass_spectrum.verbose_processing:
523                print(
524                    "minimize function completed with RMS error of: {:0.3f} ppm".format(
525                        res["fun"]
526                    )
527                )
528                print(
529                    "minimize function performed {:1d} fn evals and {:1d} iterations".format(
530                        res["nfev"], res["nit"]
531                    )
532                )
533            Pn = res.x
534
535            # mz_exp_ms = np.array([mspeak.mz_exp for mspeak in self.mass_spectrum])
536
537            if order == 1:
538                mz_domain = (Pn[0] * mz_exp_peaks_tocal) + Pn[1]
539                if not self.mass_spectrum.is_centroid:
540                    mz_profile_calc = (Pn[0] * mz_exp_profile_tocal) + Pn[1]
541
542            elif order == 2:
543                mz_domain = (Pn[0] * (mz_exp_peaks_tocal)) + (
544                    Pn[1] * np.power((mz_exp_peaks_tocal), 2) + Pn[2]
545                )
546
547                if not self.mass_spectrum.is_centroid:
548                    mz_profile_calc = (Pn[0] * (mz_exp_profile_tocal)) + (
549                        Pn[1] * np.power((mz_exp_profile_tocal), 2) + Pn[2]
550                    )
551
552            if self.mzsegment:
553                # Recombine the mass domains
554                mz_domain = np.concatenate([mz_domain, mz_exp_peaks_unchanged])
555                mz_domain.sort()
556                if not self.mass_spectrum.is_centroid:
557                    mz_profile_calc = np.concatenate(
558                        [mz_profile_calc, mz_exp_profile_unchanged]
559                    )
560                    mz_profile_calc.sort()
561                # Sort them
562                if (
563                    mz_exp_peaks[0] > mz_exp_peaks[1]
564                ):  # If originally descending mass order
565                    mz_domain = mz_domain[::-1]
566                    if not self.mass_spectrum.is_centroid:
567                        mz_profile_calc = mz_profile_calc[::-1]
568
569            self.mass_spectrum.mz_cal = mz_domain
570            if not self.mass_spectrum.is_centroid:
571                self.mass_spectrum.mz_cal_profile = mz_profile_calc
572
573            self.mass_spectrum.calibration_order = order
574            self.mass_spectrum.calibration_RMS = float(res["fun"])
575            self.mass_spectrum.calibration_points = int(len(cal_refs_mz))
576            self.mass_spectrum.calibration_ref_mzs = cal_refs_mz
577            self.mass_spectrum.calibration_meas_mzs = cal_peaks_mz
578
579            self.mass_spectrum.calibration_segment = self.mzsegment
580
581            if diagnostic:
582                return self.mass_spectrum, res
583            return self.mass_spectrum
584        else:
585            warnings.warn("Too few calibration points - aborting.")
586            return self.mass_spectrum

Main recalibration function which uses a robust linear regression

This function performs the recalibration of the mass spectrum object. It iteratively applies

Parameters
  • cal_peaks_mz (list of float): masses of measured peaks to use in mass calibration.
  • cal_refs_mz (list of float): reference mz values of found calibration points.
  • order (int, optional): order of the recalibration function. 1 = linear, 2 = quadratic. The default is 1.
Returns
  • mass_spectrum (CoreMS mass spectrum object): Calibrated mass spectrum object
Notes

This function is adapted, in part, from the SPIKE project [1,2] and is based on the robust linear regression method.

References
  1. Chiron L., Coutouly M-A., Starck J-P., Rolando C., Delsuc M-A. SPIKE a Processing Software dedicated to Fourier Spectroscopies https://arxiv.org/abs/1608.06777 (2016)
  2. SPIKE - https://github.com/spike-project/spike
def run(self):
588    def run(self):
589        """Run the calibration routine
590
591        This function runs the calibration routine.
592
593        """
594        calib_snr_threshold = self.mass_spectrum.settings.calib_sn_threshold
595        max_calib_ppm_error = self.mass_spectrum.settings.max_calib_ppm_error
596        min_calib_ppm_error = self.mass_spectrum.settings.min_calib_ppm_error
597        calib_pol_order = self.mass_spectrum.settings.calib_pol_order
598        calibration_ref_match_method = (
599            self.mass_spectrum.settings.calibration_ref_match_method
600        )
601        calibration_ref_match_tolerance = (
602            self.mass_spectrum.settings.calibration_ref_match_tolerance
603        )
604        calibration_ref_match_std_raw_error_limit = (
605            self.mass_spectrum.settings.calibration_ref_match_std_raw_error_limit
606        )
607
608        # load reference mass list
609        df_ref = self.load_ref_mass_list()
610
611        # find calibration points
612        cal_peaks_mz, cal_refs_mz = self.find_calibration_points(
613            df_ref,
614            calib_ppm_error_threshold=(min_calib_ppm_error, max_calib_ppm_error),
615            calib_snr_threshold=calib_snr_threshold,
616            calibration_ref_match_method=calibration_ref_match_method,
617            calibration_ref_match_tolerance=calibration_ref_match_tolerance,
618            calibration_ref_match_std_raw_error_limit=calibration_ref_match_std_raw_error_limit,
619        )
620        if len(cal_peaks_mz) == 2:
621            self.mass_spectrum.settings.calib_pol_order = 1
622            calib_pol_order = 1
623            if self.mass_spectrum.parameters.mass_spectrum.verbose_processing:
624                print("Only 2 calibration points found, forcing a linear recalibration")
625        elif len(cal_peaks_mz) < 2:
626            warnings.warn("Too few calibration points found, function will fail")
627        self.recalibrate_mass_spectrum(cal_peaks_mz, cal_refs_mz, order=calib_pol_order)

Run the calibration routine

This function runs the calibration routine.