corems.mass_spectrum.calc.PeakPicking

@author: Yuri E. Corilo @date: Jun 27, 2019

  1"""
  2@author: Yuri E. Corilo
  3@date: Jun 27, 2019
  4"""
  5
  6import warnings
  7from numpy import (
  8    hstack,
  9    inf,
 10    isnan,
 11    where,
 12    array,
 13    polyfit,
 14    nan,
 15    pad,
 16    zeros,
 17    searchsorted,
 18)
 19from corems.encapsulation.constant import Labels
 20from corems.mass_spectra.calc import SignalProcessing as sp
 21
 22
 23class PeakPicking:
 24    """Class for peak picking.
 25
 26    Parameters
 27    ----------
 28    None
 29
 30    Attributes
 31    ----------
 32    None
 33
 34    Methods
 35    -------
 36    * prepare_peak_picking_data().
 37        Prepare the mz, abundance, and frequence data for peak picking.
 38    * cut_mz_domain_peak_picking().
 39        Cut the m/z domain for peak picking.
 40    * extrapolate_axes_for_pp(mz=None, abund=None, freq=None).
 41        Extrapolate the m/z axis and fill the abundance axis with 0s.
 42    * do_peak_picking().
 43        Perform peak picking.
 44    * find_minima(apex_index, abundance, len_abundance, right=True).
 45        Find the minima of a peak.
 46    * linear_fit_calc(intes, massa, index_term, index_sign).
 47        Algebraic solution to a linear fit.
 48    * calculate_resolving_power(intes, massa, current_index).
 49        Calculate the resolving power of a peak.
 50    * cal_minima(mass, abun).
 51        Calculate the minima of a peak.
 52    * calc_centroid(mass, abund, freq).
 53        Calculate the centroid of a peak.
 54    * get_threshold(intes).
 55        Get the intensity threshold for peak picking.
 56    * algebraic_quadratic(list_mass, list_y).
 57        Find the apex of a peak - algebraically.
 58    * find_apex_fit_quadratic(mass, abund, freq, current_index).
 59        Find the apex of a peak.
 60    * check_prominence(abun, current_index, len_abundance, peak_height_diff).
 61        Check the prominence of a peak.
 62    * use_the_max(mass, abund, current_index, len_abundance, peak_height_diff).
 63        Use the max peak height as the centroid.
 64    * calc_centroid_legacy(mass, abund, freq).
 65        Legacy centroid calculation. Deprecated - for deletion.
 66
 67    """
 68
 69    def prepare_peak_picking_data(self):
 70        """Prepare the data for peak picking.
 71
 72        This function will prepare the m/z, abundance, and frequency data for peak picking according to the settings.
 73
 74        Returns
 75        -------
 76        mz : ndarray
 77            The m/z axis.
 78        abundance : ndarray
 79            The abundance axis.
 80        freq : ndarray or None
 81            The frequency axis, if available.
 82        """
 83        # First apply cut_mz_domain_peak_picking
 84        mz, abundance, freq = self.cut_mz_domain_peak_picking()
 85
 86        # Then extrapolate the axes for peak picking
 87        if self.settings.picking_point_extrapolate > 0:
 88            mz, abundance, freq = self.extrapolate_axes_for_pp(mz, abundance, freq)
 89        return mz, abundance, freq
 90
 91    def cut_mz_domain_peak_picking(self):
 92        """
 93        Cut the m/z domain for peak picking.
 94
 95        Simplified function
 96
 97        Returns
 98        -------
 99        mz_domain_X_low_cutoff : ndarray
100            The m/z values within the specified range.
101        mz_domain_low_Y_cutoff : ndarray
102            The abundance values within the specified range.
103        freq_domain_low_Y_cutoff : ndarray or None
104            The frequency values within the specified range, if available.
105
106        """
107        max_picking_mz = self.settings.max_picking_mz
108        min_picking_mz = self.settings.min_picking_mz
109
110        # min_start =  where(self.mz_exp_profile  > min_picking_mz)[0][0]
111        # max_final =  where(self.mz_exp_profile < max_picking_mz)[-1][-1]
112        min_start = searchsorted(a=self.mz_exp_profile, v=min_picking_mz)
113        max_final = searchsorted(a=self.mz_exp_profile, v=max_picking_mz)
114
115        if self.has_frequency:
116            if self.freq_exp_profile.any():
117                return (
118                    self.mz_exp_profile[min_start:max_final],
119                    self.abundance_profile[min_start:max_final],
120                    self.freq_exp_profile[min_start:max_final],
121                )
122
123        else:
124            return (
125                self.mz_exp_profile[min_start:max_final],
126                self.abundance_profile[min_start:max_final],
127                None,
128            )
129
130    def legacy_cut_mz_domain_peak_picking(self):
131        """
132        Cut the m/z domain for peak picking.
133        DEPRECATED
134        Returns
135        -------
136        mz_domain_X_low_cutoff : ndarray
137            The m/z values within the specified range.
138        mz_domain_low_Y_cutoff : ndarray
139            The abundance values within the specified range.
140        freq_domain_low_Y_cutoff : ndarray or None
141            The frequency values within the specified range, if available.
142
143        """
144        max_picking_mz = self.settings.max_picking_mz
145        min_picking_mz = self.settings.min_picking_mz
146
147        min_final = where(self.mz_exp_profile > min_picking_mz)[-1][-1]
148        min_start = where(self.mz_exp_profile > min_picking_mz)[0][0]
149
150        (
151            mz_domain_X_low_cutoff,
152            mz_domain_low_Y_cutoff,
153        ) = (
154            self.mz_exp_profile[min_start:min_final],
155            self.abundance_profile[min_start:min_final],
156        )
157
158        max_final = where(self.mz_exp_profile < max_picking_mz)[-1][-1]
159        max_start = where(self.mz_exp_profile < max_picking_mz)[0][0]
160
161        if self.has_frequency:
162            if self.freq_exp_profile.any():
163                freq_domain_low_Y_cutoff = self.freq_exp_profile[min_start:min_final]
164
165                return (
166                    mz_domain_X_low_cutoff[max_start:max_final],
167                    mz_domain_low_Y_cutoff[max_start:max_final],
168                    freq_domain_low_Y_cutoff[max_start:max_final],
169                )
170
171        else:
172            return (
173                mz_domain_X_low_cutoff[max_start:max_final],
174                mz_domain_low_Y_cutoff[max_start:max_final],
175                None,
176            )
177
178    @staticmethod
179    def extrapolate_axis(initial_array, pts):
180        """
181        This function will extrapolate an input array in both directions by N pts.
182
183        Parameters
184        ----------
185        initial_array : ndarray
186            The input array.
187        pts : int
188            The number of points to extrapolate.
189
190        Returns
191        -------
192        ndarray
193            The extrapolated array.
194
195        Notes
196        --------
197        This is a static method.
198        """
199        initial_array_len = len(initial_array)
200        right_delta = initial_array[-1] - initial_array[-2]
201        left_delta = initial_array[1] - initial_array[0]
202
203        # Create an array with extra space for extrapolation
204        pad_array = zeros(initial_array_len + 2 * pts)
205
206        # Copy original array into the middle of the padded array
207        pad_array[pts : pts + initial_array_len] = initial_array
208
209        # Extrapolate the right side
210        for pt in range(pts):
211            final_value = initial_array[-1]
212            value_to_add = right_delta * (pt + 1)
213            new_value = final_value + value_to_add
214            pad_array[initial_array_len + pts + pt] = new_value
215
216        # Extrapolate the left side
217        for pt in range(pts):
218            first_value = initial_array[0]
219            value_to_subtract = left_delta * (pt + 1)
220            new_value = first_value - value_to_subtract
221            pad_array[pts - pt - 1] = new_value
222
223        return pad_array
224
225    def extrapolate_axes_for_pp(self, mz=None, abund=None, freq=None):
226        """Extrapolate the m/z axis and fill the abundance axis with 0s.
227
228        Parameters
229        ----------
230        mz : ndarray or None
231            The m/z axis, if available. If None, the experimental m/z axis is used.
232        abund : ndarray or None
233            The abundance axis, if available. If None, the experimental abundance axis is used.
234        freq : ndarray or None
235            The frequency axis, if available. If None, the experimental frequency axis is used.
236
237        Returns
238        -------
239        mz : ndarray
240            The extrapolated m/z axis.
241        abund : ndarray
242            The abundance axis with 0s filled.
243        freq : ndarray or None
244            The extrapolated frequency axis, if available.
245
246        Notes
247        --------
248        This function will extrapolate the mz axis by the number of datapoints specified in the settings,
249        and fill the abundance axis with 0s.
250        This should prevent peak picking issues at the spectrum edge.
251
252        """
253        # Check if the input arrays are provided
254        if mz is None or abund is None:
255            mz, abund = self.mz_exp_profile, self.abundance_profile
256            if self.has_frequency:
257                freq = self.freq_exp_profile
258            else:
259                freq = None
260        pts = self.settings.picking_point_extrapolate
261        if pts == 0:
262            return mz, abund, freq
263
264        mz = self.extrapolate_axis(mz, pts)
265        abund = pad(abund, (pts, pts), mode="constant", constant_values=(0, 0))
266        if freq is not None:
267            freq = self.extrapolate_axis(freq, pts)
268        return mz, abund, freq
269
270    def do_peak_picking(self):
271        """Perform peak picking."""
272        mz, abundance, freq = self.prepare_peak_picking_data()
273
274        if (
275            self.label == Labels.bruker_frequency
276            or self.label == Labels.midas_frequency
277        ):
278            self.calc_centroid(mz, abundance, freq)
279
280        elif self.label == Labels.thermo_profile:
281            self.calc_centroid(mz, abundance, freq)
282
283        elif self.label == Labels.bruker_profile:
284            self.calc_centroid(mz, abundance, freq)
285
286        elif self.label == Labels.booster_profile:
287            self.calc_centroid(mz, abundance, freq)
288
289        elif self.label == Labels.simulated_profile:
290            self.calc_centroid(mz, abundance, freq)
291
292        else:
293            raise Exception("Unknow mass spectrum type", self.label)
294
295    def find_minima(self, apex_index, abundance, len_abundance, right=True):
296        """Find the minima of a peak.
297
298        Parameters
299        ----------
300        apex_index : int
301            The index of the peak apex.
302        abundance : ndarray
303            The abundance values.
304        len_abundance : int
305            The length of the abundance array.
306        right : bool, optional
307            Flag indicating whether to search for minima to the right of the apex (default is True).
308
309        Returns
310        -------
311        int
312            The index of the minima.
313
314        """
315        j = apex_index
316
317        if right:
318            minima = abundance[j] > abundance[j + 1]
319        else:
320            minima = abundance[j] > abundance[j - 1]
321
322        while minima:
323            if j == 1 or j == len_abundance - 2:
324                break
325
326            if right:
327                j += 1
328
329                minima = abundance[j] >= abundance[j + 1]
330
331            else:
332                j -= 1
333                minima = abundance[j] >= abundance[j - 1]
334
335        if right:
336            return j
337        else:
338            return j
339
340    @staticmethod
341    def linear_fit_calc(intes, massa, index_term, index_sign):
342        """
343        Algebraic solution to a linear fit - roughly 25-50x faster than numpy polyfit when passing only two vals and doing a 1st order fit
344
345        Parameters
346        ----------
347        intes : ndarray
348            The intensity values.
349        massa : ndarray
350            The mass values.
351        index_term : int
352            The index of the current term.
353        index_sign : str
354            The index sign
355
356        Returns
357        -------
358        ndarray
359            The coefficients of the linear fit.
360
361        Notes
362        --------
363        This is a static method.
364        """
365        if index_sign == "+":
366            x1, x2 = massa[index_term], massa[index_term + 1]
367            y1, y2 = intes[index_term], intes[index_term + 1]
368        elif index_sign == "-":
369            x1, x2 = massa[index_term], massa[index_term - 1]
370            y1, y2 = intes[index_term], intes[index_term - 1]
371        else:
372            warnings.warn("error in linear fit calc, unknown index sign")
373
374        # Calculate the slope (m)
375        slope = (y2 - y1) / (x2 - x1)
376
377        # Calculate the intercept (b)
378        intercept = y1 - slope * x1
379
380        # The coefficients array would be [slope, intercept]
381        coefficients = array([slope, intercept])
382        return coefficients
383
384    def calculate_resolving_power(self, intes, massa, current_index):
385        """Calculate the resolving power of a peak.
386
387        Parameters
388        ----------
389        intes : ndarray
390            The intensity values.
391        massa : ndarray
392            The mass values.
393        current_index : int
394            The index of the current peak.
395
396        Returns
397        -------
398        float
399            The resolving power of the peak.
400
401        Notes
402        --------
403        This is a conservative calculation of resolving power,
404        the peak need to be resolved at least at the half-maximum magnitude,
405        otherwise, the combined full width at half maximum is used to calculate resolving power.
406
407        """
408
409        peak_height = intes[current_index]
410        target_peak_height = peak_height / 2
411
412        peak_height_minus = peak_height
413        peak_height_plus = peak_height
414
415        # There are issues when a peak is at the high or low limit of a spectrum in finding its local minima and maxima
416        # This solution will return nan for resolving power when a peak is possibly too close to an edge to avoid the issue
417
418        if current_index < 5:
419            warnings.warn("peak at low spectrum edge, returning no resolving power")
420            return nan
421        elif abs(current_index - len(intes)) < 5:
422            warnings.warn("peak at high spectrum edge, returning no resolving power")
423            return nan
424        else:
425            pass
426
427        index_minus = current_index
428        while peak_height_minus >= target_peak_height:
429            index_minus = index_minus - 1
430            if index_minus < 0:
431                warnings.warn(
432                    "Res. calc. warning - peak index minus adjacent to spectrum edge \n \
433                        Zeroing the first 5 data points of abundance. Peaks at spectrum edge may be incorrectly reported \n \
434                        Perhaps try to increase picking_point_extrapolate (e.g. to 3)"
435                )
436                # Pad the first 5 data points with zeros and restart the loop
437                intes[:5] = 0
438                peak_height_minus = target_peak_height
439                index_minus = current_index
440            else:
441                peak_height_minus = intes[index_minus]
442
443        if self.mspeaks_settings.legacy_centroid_polyfit:
444            x = [massa[index_minus], massa[index_minus + 1]]
445            y = [intes[index_minus], intes[index_minus + 1]]
446            coefficients = polyfit(x, y, 1)
447        else:
448            coefficients = self.linear_fit_calc(
449                intes, massa, index_minus, index_sign="+"
450            )
451
452        a = coefficients[0]
453        b = coefficients[1]
454        if self.mspeaks_settings.legacy_resolving_power:
455            y_intercept = intes[index_minus] + (
456                (intes[index_minus + 1] - intes[index_minus]) / 2
457            )
458        else:
459            y_intercept = target_peak_height
460        massa1 = (y_intercept - b) / a
461
462        index_plus = current_index
463        while peak_height_plus >= target_peak_height:
464            index_plus = index_plus + 1
465
466            try:
467                peak_height_plus = intes[index_plus]
468            except IndexError:
469                warnings.warn(
470                    "Res. calc. warning - peak index plus adjacent to spectrum edge \n \
471                        Zeroing the last 5 data points of abundance. Peaks at spectrum edge may be incorrectly reported\
472                        Perhaps try to increase picking_point_extrapolate (e.g. to 3)"
473                )
474                # Pad the first 5 data points with zeros and restart the loop
475                intes[-5:] = 0
476                peak_height_plus = target_peak_height
477                index_plus = current_index
478
479        if self.mspeaks_settings.legacy_centroid_polyfit:
480            x = [massa[index_plus], massa[index_plus - 1]]
481            y = [intes[index_plus], intes[index_plus - 1]]
482            coefficients = polyfit(x, y, 1)
483        else:
484            coefficients = self.linear_fit_calc(
485                intes, massa, index_plus, index_sign="-"
486            )
487
488        a = coefficients[0]
489        b = coefficients[1]
490
491        if self.mspeaks_settings.legacy_resolving_power:
492            y_intercept = intes[index_plus - 1] + (
493                (intes[index_plus] - intes[index_plus - 1]) / 2
494            )
495        else:
496            y_intercept = target_peak_height
497
498        massa2 = (y_intercept - b) / a
499
500        if massa1 > massa2:
501            resolvingpower = massa[current_index] / (massa1 - massa2)
502
503        else:
504            resolvingpower = massa[current_index] / (massa2 - massa1)
505
506        return resolvingpower
507
508    def cal_minima(self, mass, abun):
509        """Calculate the minima of a peak.
510
511        Parameters
512        ----------
513        mass : ndarray
514            The mass values.
515        abun : ndarray
516            The abundance values.
517
518        Returns
519        -------
520        ndarray or None
521            The mass values at the minima, if found.
522
523        """
524        abun = -abun
525
526        dy = abun[1:] - abun[:-1]
527
528        # replaces nan for infinity
529        indices_nan = where(isnan(abun))[0]
530
531        if indices_nan.size:
532            abun[indices_nan] = inf
533            dy[where(isnan(dy))[0]] = inf
534
535        indexes = where((hstack((dy, 0)) < 0) & (hstack((0, dy)) > 0))[0]
536
537        if indexes.size:
538            return mass[indexes], abun[indexes]
539
540    def calc_centroid(self, mass, abund, freq):
541        """Calculate the centroid of a peak.
542
543        Parameters
544        ----------
545        mass : ndarray
546            The mass values.
547        abund : ndarray
548            The abundance values.
549        freq : ndarray or None
550            The frequency values, if available.
551
552        Returns
553        -------
554        None
555
556        """
557
558        max_height = self.mspeaks_settings.peak_height_max_percent
559        max_prominence = self.mspeaks_settings.peak_max_prominence_percent
560        min_peak_datapoints = self.mspeaks_settings.min_peak_datapoints
561        peak_derivative_threshold = self.mspeaks_settings.peak_derivative_threshold
562        max_abun = max(abund)
563        peak_height_diff = lambda hi, li: ((abund[hi] - abund[li]) / max_abun) * 100
564
565        domain = mass
566        signal = abund
567        len_signal = len(signal)
568
569        signal_threshold, factor = self.get_threshold(abund)
570        max_signal = factor
571
572        correct_baseline = False
573
574        include_indexes = sp.peak_picking_first_derivative(
575            domain,
576            signal,
577            max_height,
578            max_prominence,
579            max_signal,
580            min_peak_datapoints,
581            peak_derivative_threshold,
582            signal_threshold=signal_threshold,
583            correct_baseline=correct_baseline,
584            abun_norm=1,
585            plot_res=False,
586        )
587
588        for indexes_tuple in include_indexes:
589            apex_index = indexes_tuple[1]
590
591            peak_indexes = self.check_prominence(
592                abund, apex_index, len_signal, peak_height_diff
593            )
594
595            if peak_indexes:
596                mz_exp_centroid, freq_centr, intes_centr = self.find_apex_fit_quadratic(
597                    mass, abund, freq, apex_index
598                )
599
600                if mz_exp_centroid:
601                    peak_resolving_power = self.calculate_resolving_power(
602                        abund, mass, apex_index
603                    )
604                    s2n = intes_centr / self.baseline_noise_std
605                    self.add_mspeak(
606                        self.polarity,
607                        mz_exp_centroid,
608                        abund[apex_index],
609                        peak_resolving_power,
610                        s2n,
611                        indexes_tuple,
612                        exp_freq=freq_centr,
613                        ms_parent=self,
614                    )
615                # pyplot.plot(domain[start_index: final_index + 1], signal[start_index:final_index + 1], c='black')
616                # pyplot.show()
617
618    def get_threshold(self, intes):
619        """Get the intensity threshold for peak picking.
620
621        Parameters
622        ----------
623        intes : ndarray
624            The intensity values.
625
626        Returns
627        -------
628        float
629            The intensity threshold.
630        float
631            The factor to multiply the intensity threshold by.
632        """
633
634        intes = array(intes).astype(float)
635
636        noise_threshold_method = self.settings.noise_threshold_method
637
638        if noise_threshold_method == "minima":
639            if self.is_centroid:
640                warnings.warn(
641                    "Auto threshould is disabled for centroid data, returning 0"
642                )
643                factor = 1
644                abundance_threshold = 1e-20
645            # print(self.settings.noise_threshold_min_std)
646            else:
647                abundance_threshold = self.baseline_noise + (
648                    self.settings.noise_threshold_min_std * self.baseline_noise_std
649                )
650                factor = 1
651
652        elif noise_threshold_method == "signal_noise":
653            abundance_threshold = self.settings.noise_threshold_min_s2n
654            if self.is_centroid:
655                factor = 1
656            else:
657                factor = self.baseline_noise_std
658
659        elif noise_threshold_method == "relative_abundance":
660            abundance_threshold = self.settings.noise_threshold_min_relative_abundance
661            factor = intes.max() / 100
662
663        elif noise_threshold_method == "absolute_abundance":
664            abundance_threshold = self.settings.noise_threshold_absolute_abundance
665            factor = 1
666
667        elif noise_threshold_method == "log":
668            if self.is_centroid:
669                raise Exception("log noise Not tested for centroid data")
670            abundance_threshold = self.settings.noise_threshold_log_nsigma
671            factor = self.baseline_noise_std
672
673        else:
674            raise Exception(
675                "%s method was not implemented, please refer to corems.mass_spectrum.calc.NoiseCalc Class"
676                % noise_threshold_method
677            )
678
679        return abundance_threshold, factor
680
681    @staticmethod
682    def algebraic_quadratic(list_mass, list_y):
683        """
684        Find the apex of a peak - algebraically.
685        Faster than using numpy polyfit by ~28x per fit.
686
687        Parameters
688        ----------
689        list_mass : ndarray
690            list of m/z values (3 points)
691        list_y : ndarray
692            list of abundance values (3 points)
693
694        Returns
695        -------
696        a, b, c: float
697            coefficients of the quadratic equation.
698
699        Notes
700        --------
701        This is a static method.
702        """
703        x_1, x_2, x_3 = list_mass
704        y_1, y_2, y_3 = list_y
705
706        a = (
707            y_1 / ((x_1 - x_2) * (x_1 - x_3))
708            + y_2 / ((x_2 - x_1) * (x_2 - x_3))
709            + y_3 / ((x_3 - x_1) * (x_3 - x_2))
710        )
711
712        b = (
713            -y_1 * (x_2 + x_3) / ((x_1 - x_2) * (x_1 - x_3))
714            - y_2 * (x_1 + x_3) / ((x_2 - x_1) * (x_2 - x_3))
715            - y_3 * (x_1 + x_2) / ((x_3 - x_1) * (x_3 - x_2))
716        )
717
718        c = (
719            y_1 * x_2 * x_3 / ((x_1 - x_2) * (x_1 - x_3))
720            + y_2 * x_1 * x_3 / ((x_2 - x_1) * (x_2 - x_3))
721            + y_3 * x_1 * x_2 / ((x_3 - x_1) * (x_3 - x_2))
722        )
723        return a, b, c
724
725    def find_apex_fit_quadratic(self, mass, abund, freq, current_index):
726        """
727        Find the apex of a peak.
728
729        Parameters
730        ----------
731        mass : ndarray
732            The mass values.
733        abund : ndarray
734            The abundance values.
735        freq : ndarray or None
736            The frequency values, if available.
737        current_index : int
738            The index of the current peak.
739
740
741        Returns
742        -------
743        float
744            The m/z value of the peak apex.
745        float
746            The frequency value of the peak apex, if available.
747        float
748            The abundance value of the peak apex.
749
750        """
751        # calc prominence
752        # peak_indexes = self.check_prominence(abund, current_index, len_abundance, peak_height_diff )
753
754        # if not peak_indexes:
755
756        #    return None, None, None, None
757
758        # else:
759
760        # fit parabola to three most abundant datapoints
761        list_mass = [
762            mass[current_index - 1],
763            mass[current_index],
764            mass[current_index + 1],
765        ]
766        list_y = [
767            abund[current_index - 1],
768            abund[current_index],
769            abund[current_index + 1],
770        ]
771
772        if self.mspeaks_settings.legacy_centroid_polyfit:
773            z = polyfit(list_mass, list_y, 2)
774            a = z[0]
775            b = z[1]
776        else:
777            a, b, c = self.algebraic_quadratic(list_mass, list_y)
778
779        calculated = -b / (2 * a)
780
781        if calculated < 1 or int(calculated) != int(list_mass[1]):
782            mz_exp_centroid = list_mass[1]
783
784        else:
785            mz_exp_centroid = calculated
786
787        if (
788            self.label == Labels.bruker_frequency
789            or self.label == Labels.midas_frequency
790        ):
791            # fit parabola to three most abundant frequency datapoints
792            list_freq = [
793                freq[current_index - 1],
794                freq[current_index],
795                freq[current_index + 1],
796            ]
797            if self.mspeaks_settings.legacy_centroid_polyfit:
798                z = polyfit(list_mass, list_y, 2)
799                a = z[0]
800                b = z[1]
801            else:
802                a, b, c = self.algebraic_quadratic(list_mass, list_y)
803
804            calculated_freq = -b / (2 * a)
805
806            if calculated_freq < 1 or int(calculated_freq) != freq[current_index]:
807                freq_centr = list_freq[1]
808
809            else:
810                freq_centr = calculated_freq
811
812        else:
813            freq_centr = None
814
815        if self.mspeaks_settings.legacy_centroid_polyfit:
816            abundance_centroid = abund[current_index]
817        else:
818            abundance_centroid = a * mz_exp_centroid**2 + b * mz_exp_centroid + c
819
820        return mz_exp_centroid, freq_centr, abundance_centroid
821
822    def check_prominence(
823        self, abun, current_index, len_abundance, peak_height_diff
824    ) -> tuple or False:
825        """Check the prominence of a peak.
826
827        Parameters
828        ----------
829        abun : ndarray
830            The abundance values.
831        current_index : int
832            The index of the current peak.
833        len_abundance : int
834            The length of the abundance array.
835        peak_height_diff : function
836            The function to calculate the peak height difference.
837
838        Returns
839        -------
840        tuple or False
841            A tuple containing the indexes of the peak, if the prominence is above the threshold.
842            Otherwise, False.
843
844        """
845
846        final_index = self.find_minima(current_index, abun, len_abundance, right=True)
847
848        start_index = self.find_minima(current_index, abun, len_abundance, right=False)
849
850        peak_indexes = (current_index - 1, current_index, current_index + 1)
851
852        if (
853            min(
854                peak_height_diff(current_index, start_index),
855                peak_height_diff(current_index, final_index),
856            )
857            > self.mspeaks_settings.peak_min_prominence_percent
858        ):
859            return peak_indexes
860
861        else:
862            return False
863
864    def use_the_max(self, mass, abund, current_index, len_abundance, peak_height_diff):
865        """Use the max peak height as the centroid
866
867        Parameters
868        ----------
869        mass : ndarray
870            The mass values.
871        abund : ndarray
872            The abundance values.
873        current_index : int
874            The index of the current peak.
875        len_abundance : int
876            The length of the abundance array.
877        peak_height_diff : function
878            The function to calculate the peak height difference.
879
880        Returns
881        -------
882        float
883            The m/z value of the peak apex.
884        float
885            The abundance value of the peak apex.
886        tuple or None
887            A tuple containing the indexes of the peak, if the prominence is above the threshold.
888            Otherwise, None.
889        """
890
891        peak_indexes = self.check_prominence(
892            abund, current_index, len_abundance, peak_height_diff
893        )
894
895        if not peak_indexes:
896            return None, None, None
897
898        else:
899            return mass[current_index], abund[current_index], peak_indexes
900
901    def calc_centroid_legacy(self, mass, abund, freq):
902        """Legacy centroid calculation
903        Deprecated - for deletion.
904
905        """
906        warnings.warn(
907            "Legacy centroid calculation is deprecated. Please use the new centroid calculation method."
908        )
909        pass
910        if False:
911            len_abundance = len(abund)
912
913            max_abundance = max(abund)
914
915            peak_height_diff = (
916                lambda hi, li: ((abund[hi] - abund[li]) / max_abundance) * 100
917            )
918
919            abundance_threshold, factor = self.get_threshold(abund)
920            # print(abundance_threshold, factor)
921            # find indices of all peaks
922            dy = abund[1:] - abund[:-1]
923
924            # replaces nan for infi nity
925            indices_nan = where(isnan(abund))[0]
926
927            if indices_nan.size:
928                abund[indices_nan] = inf
929                dy[where(isnan(dy))[0]] = inf
930
931            indexes = where((hstack((dy, 0)) < 0) & (hstack((0, dy)) > 0))[0]
932
933            # noise threshold
934            if indexes.size and abundance_threshold is not None:
935                indexes = indexes[abund[indexes] / factor >= abundance_threshold]
936            # filter out 'peaks' within 3 points of the spectrum limits
937            # remove entries within 3 points of upper limit
938            indexes = [x for x in indexes if (len_abundance - x) > 3]
939            # remove entries within 3 points of zero
940            indexes = [x for x in indexes if x > 3]
941
942            for current_index in indexes:
943                if self.label == Labels.simulated_profile:
944                    mz_exp_centroid, intes_centr, peak_indexes = self.use_the_max(
945                        mass, abund, current_index, len_abundance, peak_height_diff
946                    )
947                    if mz_exp_centroid:
948                        peak_resolving_power = self.calculate_resolving_power(
949                            abund, mass, current_index
950                        )
951                        s2n = intes_centr / self.baseline_noise_std
952                        freq_centr = None
953                        self.add_mspeak(
954                            self.polarity,
955                            mz_exp_centroid,
956                            abund[current_index],
957                            peak_resolving_power,
958                            s2n,
959                            peak_indexes,
960                            exp_freq=freq_centr,
961                            ms_parent=self,
962                        )
963
964                else:
965                    mz_exp_centroid, freq_centr, intes_centr, peak_indexes = (
966                        self.find_apex_fit_quadratic(
967                            mass,
968                            abund,
969                            freq,
970                            current_index,
971                            len_abundance,
972                            peak_height_diff,
973                        )
974                    )
975                    if mz_exp_centroid:
976                        try:
977                            peak_resolving_power = self.calculate_resolving_power(
978                                abund, mass, current_index
979                            )
980                        except IndexError:
981                            print("index error, skipping peak")
982                            continue
983
984                        s2n = intes_centr / self.baseline_noise_std
985                        self.add_mspeak(
986                            self.polarity,
987                            mz_exp_centroid,
988                            abund[current_index],
989                            peak_resolving_power,
990                            s2n,
991                            peak_indexes,
992                            exp_freq=freq_centr,
993                            ms_parent=self,
994                        )
class PeakPicking:
 24class PeakPicking:
 25    """Class for peak picking.
 26
 27    Parameters
 28    ----------
 29    None
 30
 31    Attributes
 32    ----------
 33    None
 34
 35    Methods
 36    -------
 37    * prepare_peak_picking_data().
 38        Prepare the mz, abundance, and frequence data for peak picking.
 39    * cut_mz_domain_peak_picking().
 40        Cut the m/z domain for peak picking.
 41    * extrapolate_axes_for_pp(mz=None, abund=None, freq=None).
 42        Extrapolate the m/z axis and fill the abundance axis with 0s.
 43    * do_peak_picking().
 44        Perform peak picking.
 45    * find_minima(apex_index, abundance, len_abundance, right=True).
 46        Find the minima of a peak.
 47    * linear_fit_calc(intes, massa, index_term, index_sign).
 48        Algebraic solution to a linear fit.
 49    * calculate_resolving_power(intes, massa, current_index).
 50        Calculate the resolving power of a peak.
 51    * cal_minima(mass, abun).
 52        Calculate the minima of a peak.
 53    * calc_centroid(mass, abund, freq).
 54        Calculate the centroid of a peak.
 55    * get_threshold(intes).
 56        Get the intensity threshold for peak picking.
 57    * algebraic_quadratic(list_mass, list_y).
 58        Find the apex of a peak - algebraically.
 59    * find_apex_fit_quadratic(mass, abund, freq, current_index).
 60        Find the apex of a peak.
 61    * check_prominence(abun, current_index, len_abundance, peak_height_diff).
 62        Check the prominence of a peak.
 63    * use_the_max(mass, abund, current_index, len_abundance, peak_height_diff).
 64        Use the max peak height as the centroid.
 65    * calc_centroid_legacy(mass, abund, freq).
 66        Legacy centroid calculation. Deprecated - for deletion.
 67
 68    """
 69
 70    def prepare_peak_picking_data(self):
 71        """Prepare the data for peak picking.
 72
 73        This function will prepare the m/z, abundance, and frequency data for peak picking according to the settings.
 74
 75        Returns
 76        -------
 77        mz : ndarray
 78            The m/z axis.
 79        abundance : ndarray
 80            The abundance axis.
 81        freq : ndarray or None
 82            The frequency axis, if available.
 83        """
 84        # First apply cut_mz_domain_peak_picking
 85        mz, abundance, freq = self.cut_mz_domain_peak_picking()
 86
 87        # Then extrapolate the axes for peak picking
 88        if self.settings.picking_point_extrapolate > 0:
 89            mz, abundance, freq = self.extrapolate_axes_for_pp(mz, abundance, freq)
 90        return mz, abundance, freq
 91
 92    def cut_mz_domain_peak_picking(self):
 93        """
 94        Cut the m/z domain for peak picking.
 95
 96        Simplified function
 97
 98        Returns
 99        -------
100        mz_domain_X_low_cutoff : ndarray
101            The m/z values within the specified range.
102        mz_domain_low_Y_cutoff : ndarray
103            The abundance values within the specified range.
104        freq_domain_low_Y_cutoff : ndarray or None
105            The frequency values within the specified range, if available.
106
107        """
108        max_picking_mz = self.settings.max_picking_mz
109        min_picking_mz = self.settings.min_picking_mz
110
111        # min_start =  where(self.mz_exp_profile  > min_picking_mz)[0][0]
112        # max_final =  where(self.mz_exp_profile < max_picking_mz)[-1][-1]
113        min_start = searchsorted(a=self.mz_exp_profile, v=min_picking_mz)
114        max_final = searchsorted(a=self.mz_exp_profile, v=max_picking_mz)
115
116        if self.has_frequency:
117            if self.freq_exp_profile.any():
118                return (
119                    self.mz_exp_profile[min_start:max_final],
120                    self.abundance_profile[min_start:max_final],
121                    self.freq_exp_profile[min_start:max_final],
122                )
123
124        else:
125            return (
126                self.mz_exp_profile[min_start:max_final],
127                self.abundance_profile[min_start:max_final],
128                None,
129            )
130
131    def legacy_cut_mz_domain_peak_picking(self):
132        """
133        Cut the m/z domain for peak picking.
134        DEPRECATED
135        Returns
136        -------
137        mz_domain_X_low_cutoff : ndarray
138            The m/z values within the specified range.
139        mz_domain_low_Y_cutoff : ndarray
140            The abundance values within the specified range.
141        freq_domain_low_Y_cutoff : ndarray or None
142            The frequency values within the specified range, if available.
143
144        """
145        max_picking_mz = self.settings.max_picking_mz
146        min_picking_mz = self.settings.min_picking_mz
147
148        min_final = where(self.mz_exp_profile > min_picking_mz)[-1][-1]
149        min_start = where(self.mz_exp_profile > min_picking_mz)[0][0]
150
151        (
152            mz_domain_X_low_cutoff,
153            mz_domain_low_Y_cutoff,
154        ) = (
155            self.mz_exp_profile[min_start:min_final],
156            self.abundance_profile[min_start:min_final],
157        )
158
159        max_final = where(self.mz_exp_profile < max_picking_mz)[-1][-1]
160        max_start = where(self.mz_exp_profile < max_picking_mz)[0][0]
161
162        if self.has_frequency:
163            if self.freq_exp_profile.any():
164                freq_domain_low_Y_cutoff = self.freq_exp_profile[min_start:min_final]
165
166                return (
167                    mz_domain_X_low_cutoff[max_start:max_final],
168                    mz_domain_low_Y_cutoff[max_start:max_final],
169                    freq_domain_low_Y_cutoff[max_start:max_final],
170                )
171
172        else:
173            return (
174                mz_domain_X_low_cutoff[max_start:max_final],
175                mz_domain_low_Y_cutoff[max_start:max_final],
176                None,
177            )
178
179    @staticmethod
180    def extrapolate_axis(initial_array, pts):
181        """
182        This function will extrapolate an input array in both directions by N pts.
183
184        Parameters
185        ----------
186        initial_array : ndarray
187            The input array.
188        pts : int
189            The number of points to extrapolate.
190
191        Returns
192        -------
193        ndarray
194            The extrapolated array.
195
196        Notes
197        --------
198        This is a static method.
199        """
200        initial_array_len = len(initial_array)
201        right_delta = initial_array[-1] - initial_array[-2]
202        left_delta = initial_array[1] - initial_array[0]
203
204        # Create an array with extra space for extrapolation
205        pad_array = zeros(initial_array_len + 2 * pts)
206
207        # Copy original array into the middle of the padded array
208        pad_array[pts : pts + initial_array_len] = initial_array
209
210        # Extrapolate the right side
211        for pt in range(pts):
212            final_value = initial_array[-1]
213            value_to_add = right_delta * (pt + 1)
214            new_value = final_value + value_to_add
215            pad_array[initial_array_len + pts + pt] = new_value
216
217        # Extrapolate the left side
218        for pt in range(pts):
219            first_value = initial_array[0]
220            value_to_subtract = left_delta * (pt + 1)
221            new_value = first_value - value_to_subtract
222            pad_array[pts - pt - 1] = new_value
223
224        return pad_array
225
226    def extrapolate_axes_for_pp(self, mz=None, abund=None, freq=None):
227        """Extrapolate the m/z axis and fill the abundance axis with 0s.
228
229        Parameters
230        ----------
231        mz : ndarray or None
232            The m/z axis, if available. If None, the experimental m/z axis is used.
233        abund : ndarray or None
234            The abundance axis, if available. If None, the experimental abundance axis is used.
235        freq : ndarray or None
236            The frequency axis, if available. If None, the experimental frequency axis is used.
237
238        Returns
239        -------
240        mz : ndarray
241            The extrapolated m/z axis.
242        abund : ndarray
243            The abundance axis with 0s filled.
244        freq : ndarray or None
245            The extrapolated frequency axis, if available.
246
247        Notes
248        --------
249        This function will extrapolate the mz axis by the number of datapoints specified in the settings,
250        and fill the abundance axis with 0s.
251        This should prevent peak picking issues at the spectrum edge.
252
253        """
254        # Check if the input arrays are provided
255        if mz is None or abund is None:
256            mz, abund = self.mz_exp_profile, self.abundance_profile
257            if self.has_frequency:
258                freq = self.freq_exp_profile
259            else:
260                freq = None
261        pts = self.settings.picking_point_extrapolate
262        if pts == 0:
263            return mz, abund, freq
264
265        mz = self.extrapolate_axis(mz, pts)
266        abund = pad(abund, (pts, pts), mode="constant", constant_values=(0, 0))
267        if freq is not None:
268            freq = self.extrapolate_axis(freq, pts)
269        return mz, abund, freq
270
271    def do_peak_picking(self):
272        """Perform peak picking."""
273        mz, abundance, freq = self.prepare_peak_picking_data()
274
275        if (
276            self.label == Labels.bruker_frequency
277            or self.label == Labels.midas_frequency
278        ):
279            self.calc_centroid(mz, abundance, freq)
280
281        elif self.label == Labels.thermo_profile:
282            self.calc_centroid(mz, abundance, freq)
283
284        elif self.label == Labels.bruker_profile:
285            self.calc_centroid(mz, abundance, freq)
286
287        elif self.label == Labels.booster_profile:
288            self.calc_centroid(mz, abundance, freq)
289
290        elif self.label == Labels.simulated_profile:
291            self.calc_centroid(mz, abundance, freq)
292
293        else:
294            raise Exception("Unknow mass spectrum type", self.label)
295
296    def find_minima(self, apex_index, abundance, len_abundance, right=True):
297        """Find the minima of a peak.
298
299        Parameters
300        ----------
301        apex_index : int
302            The index of the peak apex.
303        abundance : ndarray
304            The abundance values.
305        len_abundance : int
306            The length of the abundance array.
307        right : bool, optional
308            Flag indicating whether to search for minima to the right of the apex (default is True).
309
310        Returns
311        -------
312        int
313            The index of the minima.
314
315        """
316        j = apex_index
317
318        if right:
319            minima = abundance[j] > abundance[j + 1]
320        else:
321            minima = abundance[j] > abundance[j - 1]
322
323        while minima:
324            if j == 1 or j == len_abundance - 2:
325                break
326
327            if right:
328                j += 1
329
330                minima = abundance[j] >= abundance[j + 1]
331
332            else:
333                j -= 1
334                minima = abundance[j] >= abundance[j - 1]
335
336        if right:
337            return j
338        else:
339            return j
340
341    @staticmethod
342    def linear_fit_calc(intes, massa, index_term, index_sign):
343        """
344        Algebraic solution to a linear fit - roughly 25-50x faster than numpy polyfit when passing only two vals and doing a 1st order fit
345
346        Parameters
347        ----------
348        intes : ndarray
349            The intensity values.
350        massa : ndarray
351            The mass values.
352        index_term : int
353            The index of the current term.
354        index_sign : str
355            The index sign
356
357        Returns
358        -------
359        ndarray
360            The coefficients of the linear fit.
361
362        Notes
363        --------
364        This is a static method.
365        """
366        if index_sign == "+":
367            x1, x2 = massa[index_term], massa[index_term + 1]
368            y1, y2 = intes[index_term], intes[index_term + 1]
369        elif index_sign == "-":
370            x1, x2 = massa[index_term], massa[index_term - 1]
371            y1, y2 = intes[index_term], intes[index_term - 1]
372        else:
373            warnings.warn("error in linear fit calc, unknown index sign")
374
375        # Calculate the slope (m)
376        slope = (y2 - y1) / (x2 - x1)
377
378        # Calculate the intercept (b)
379        intercept = y1 - slope * x1
380
381        # The coefficients array would be [slope, intercept]
382        coefficients = array([slope, intercept])
383        return coefficients
384
385    def calculate_resolving_power(self, intes, massa, current_index):
386        """Calculate the resolving power of a peak.
387
388        Parameters
389        ----------
390        intes : ndarray
391            The intensity values.
392        massa : ndarray
393            The mass values.
394        current_index : int
395            The index of the current peak.
396
397        Returns
398        -------
399        float
400            The resolving power of the peak.
401
402        Notes
403        --------
404        This is a conservative calculation of resolving power,
405        the peak need to be resolved at least at the half-maximum magnitude,
406        otherwise, the combined full width at half maximum is used to calculate resolving power.
407
408        """
409
410        peak_height = intes[current_index]
411        target_peak_height = peak_height / 2
412
413        peak_height_minus = peak_height
414        peak_height_plus = peak_height
415
416        # There are issues when a peak is at the high or low limit of a spectrum in finding its local minima and maxima
417        # This solution will return nan for resolving power when a peak is possibly too close to an edge to avoid the issue
418
419        if current_index < 5:
420            warnings.warn("peak at low spectrum edge, returning no resolving power")
421            return nan
422        elif abs(current_index - len(intes)) < 5:
423            warnings.warn("peak at high spectrum edge, returning no resolving power")
424            return nan
425        else:
426            pass
427
428        index_minus = current_index
429        while peak_height_minus >= target_peak_height:
430            index_minus = index_minus - 1
431            if index_minus < 0:
432                warnings.warn(
433                    "Res. calc. warning - peak index minus adjacent to spectrum edge \n \
434                        Zeroing the first 5 data points of abundance. Peaks at spectrum edge may be incorrectly reported \n \
435                        Perhaps try to increase picking_point_extrapolate (e.g. to 3)"
436                )
437                # Pad the first 5 data points with zeros and restart the loop
438                intes[:5] = 0
439                peak_height_minus = target_peak_height
440                index_minus = current_index
441            else:
442                peak_height_minus = intes[index_minus]
443
444        if self.mspeaks_settings.legacy_centroid_polyfit:
445            x = [massa[index_minus], massa[index_minus + 1]]
446            y = [intes[index_minus], intes[index_minus + 1]]
447            coefficients = polyfit(x, y, 1)
448        else:
449            coefficients = self.linear_fit_calc(
450                intes, massa, index_minus, index_sign="+"
451            )
452
453        a = coefficients[0]
454        b = coefficients[1]
455        if self.mspeaks_settings.legacy_resolving_power:
456            y_intercept = intes[index_minus] + (
457                (intes[index_minus + 1] - intes[index_minus]) / 2
458            )
459        else:
460            y_intercept = target_peak_height
461        massa1 = (y_intercept - b) / a
462
463        index_plus = current_index
464        while peak_height_plus >= target_peak_height:
465            index_plus = index_plus + 1
466
467            try:
468                peak_height_plus = intes[index_plus]
469            except IndexError:
470                warnings.warn(
471                    "Res. calc. warning - peak index plus adjacent to spectrum edge \n \
472                        Zeroing the last 5 data points of abundance. Peaks at spectrum edge may be incorrectly reported\
473                        Perhaps try to increase picking_point_extrapolate (e.g. to 3)"
474                )
475                # Pad the first 5 data points with zeros and restart the loop
476                intes[-5:] = 0
477                peak_height_plus = target_peak_height
478                index_plus = current_index
479
480        if self.mspeaks_settings.legacy_centroid_polyfit:
481            x = [massa[index_plus], massa[index_plus - 1]]
482            y = [intes[index_plus], intes[index_plus - 1]]
483            coefficients = polyfit(x, y, 1)
484        else:
485            coefficients = self.linear_fit_calc(
486                intes, massa, index_plus, index_sign="-"
487            )
488
489        a = coefficients[0]
490        b = coefficients[1]
491
492        if self.mspeaks_settings.legacy_resolving_power:
493            y_intercept = intes[index_plus - 1] + (
494                (intes[index_plus] - intes[index_plus - 1]) / 2
495            )
496        else:
497            y_intercept = target_peak_height
498
499        massa2 = (y_intercept - b) / a
500
501        if massa1 > massa2:
502            resolvingpower = massa[current_index] / (massa1 - massa2)
503
504        else:
505            resolvingpower = massa[current_index] / (massa2 - massa1)
506
507        return resolvingpower
508
509    def cal_minima(self, mass, abun):
510        """Calculate the minima of a peak.
511
512        Parameters
513        ----------
514        mass : ndarray
515            The mass values.
516        abun : ndarray
517            The abundance values.
518
519        Returns
520        -------
521        ndarray or None
522            The mass values at the minima, if found.
523
524        """
525        abun = -abun
526
527        dy = abun[1:] - abun[:-1]
528
529        # replaces nan for infinity
530        indices_nan = where(isnan(abun))[0]
531
532        if indices_nan.size:
533            abun[indices_nan] = inf
534            dy[where(isnan(dy))[0]] = inf
535
536        indexes = where((hstack((dy, 0)) < 0) & (hstack((0, dy)) > 0))[0]
537
538        if indexes.size:
539            return mass[indexes], abun[indexes]
540
541    def calc_centroid(self, mass, abund, freq):
542        """Calculate the centroid of a peak.
543
544        Parameters
545        ----------
546        mass : ndarray
547            The mass values.
548        abund : ndarray
549            The abundance values.
550        freq : ndarray or None
551            The frequency values, if available.
552
553        Returns
554        -------
555        None
556
557        """
558
559        max_height = self.mspeaks_settings.peak_height_max_percent
560        max_prominence = self.mspeaks_settings.peak_max_prominence_percent
561        min_peak_datapoints = self.mspeaks_settings.min_peak_datapoints
562        peak_derivative_threshold = self.mspeaks_settings.peak_derivative_threshold
563        max_abun = max(abund)
564        peak_height_diff = lambda hi, li: ((abund[hi] - abund[li]) / max_abun) * 100
565
566        domain = mass
567        signal = abund
568        len_signal = len(signal)
569
570        signal_threshold, factor = self.get_threshold(abund)
571        max_signal = factor
572
573        correct_baseline = False
574
575        include_indexes = sp.peak_picking_first_derivative(
576            domain,
577            signal,
578            max_height,
579            max_prominence,
580            max_signal,
581            min_peak_datapoints,
582            peak_derivative_threshold,
583            signal_threshold=signal_threshold,
584            correct_baseline=correct_baseline,
585            abun_norm=1,
586            plot_res=False,
587        )
588
589        for indexes_tuple in include_indexes:
590            apex_index = indexes_tuple[1]
591
592            peak_indexes = self.check_prominence(
593                abund, apex_index, len_signal, peak_height_diff
594            )
595
596            if peak_indexes:
597                mz_exp_centroid, freq_centr, intes_centr = self.find_apex_fit_quadratic(
598                    mass, abund, freq, apex_index
599                )
600
601                if mz_exp_centroid:
602                    peak_resolving_power = self.calculate_resolving_power(
603                        abund, mass, apex_index
604                    )
605                    s2n = intes_centr / self.baseline_noise_std
606                    self.add_mspeak(
607                        self.polarity,
608                        mz_exp_centroid,
609                        abund[apex_index],
610                        peak_resolving_power,
611                        s2n,
612                        indexes_tuple,
613                        exp_freq=freq_centr,
614                        ms_parent=self,
615                    )
616                # pyplot.plot(domain[start_index: final_index + 1], signal[start_index:final_index + 1], c='black')
617                # pyplot.show()
618
619    def get_threshold(self, intes):
620        """Get the intensity threshold for peak picking.
621
622        Parameters
623        ----------
624        intes : ndarray
625            The intensity values.
626
627        Returns
628        -------
629        float
630            The intensity threshold.
631        float
632            The factor to multiply the intensity threshold by.
633        """
634
635        intes = array(intes).astype(float)
636
637        noise_threshold_method = self.settings.noise_threshold_method
638
639        if noise_threshold_method == "minima":
640            if self.is_centroid:
641                warnings.warn(
642                    "Auto threshould is disabled for centroid data, returning 0"
643                )
644                factor = 1
645                abundance_threshold = 1e-20
646            # print(self.settings.noise_threshold_min_std)
647            else:
648                abundance_threshold = self.baseline_noise + (
649                    self.settings.noise_threshold_min_std * self.baseline_noise_std
650                )
651                factor = 1
652
653        elif noise_threshold_method == "signal_noise":
654            abundance_threshold = self.settings.noise_threshold_min_s2n
655            if self.is_centroid:
656                factor = 1
657            else:
658                factor = self.baseline_noise_std
659
660        elif noise_threshold_method == "relative_abundance":
661            abundance_threshold = self.settings.noise_threshold_min_relative_abundance
662            factor = intes.max() / 100
663
664        elif noise_threshold_method == "absolute_abundance":
665            abundance_threshold = self.settings.noise_threshold_absolute_abundance
666            factor = 1
667
668        elif noise_threshold_method == "log":
669            if self.is_centroid:
670                raise Exception("log noise Not tested for centroid data")
671            abundance_threshold = self.settings.noise_threshold_log_nsigma
672            factor = self.baseline_noise_std
673
674        else:
675            raise Exception(
676                "%s method was not implemented, please refer to corems.mass_spectrum.calc.NoiseCalc Class"
677                % noise_threshold_method
678            )
679
680        return abundance_threshold, factor
681
682    @staticmethod
683    def algebraic_quadratic(list_mass, list_y):
684        """
685        Find the apex of a peak - algebraically.
686        Faster than using numpy polyfit by ~28x per fit.
687
688        Parameters
689        ----------
690        list_mass : ndarray
691            list of m/z values (3 points)
692        list_y : ndarray
693            list of abundance values (3 points)
694
695        Returns
696        -------
697        a, b, c: float
698            coefficients of the quadratic equation.
699
700        Notes
701        --------
702        This is a static method.
703        """
704        x_1, x_2, x_3 = list_mass
705        y_1, y_2, y_3 = list_y
706
707        a = (
708            y_1 / ((x_1 - x_2) * (x_1 - x_3))
709            + y_2 / ((x_2 - x_1) * (x_2 - x_3))
710            + y_3 / ((x_3 - x_1) * (x_3 - x_2))
711        )
712
713        b = (
714            -y_1 * (x_2 + x_3) / ((x_1 - x_2) * (x_1 - x_3))
715            - y_2 * (x_1 + x_3) / ((x_2 - x_1) * (x_2 - x_3))
716            - y_3 * (x_1 + x_2) / ((x_3 - x_1) * (x_3 - x_2))
717        )
718
719        c = (
720            y_1 * x_2 * x_3 / ((x_1 - x_2) * (x_1 - x_3))
721            + y_2 * x_1 * x_3 / ((x_2 - x_1) * (x_2 - x_3))
722            + y_3 * x_1 * x_2 / ((x_3 - x_1) * (x_3 - x_2))
723        )
724        return a, b, c
725
726    def find_apex_fit_quadratic(self, mass, abund, freq, current_index):
727        """
728        Find the apex of a peak.
729
730        Parameters
731        ----------
732        mass : ndarray
733            The mass values.
734        abund : ndarray
735            The abundance values.
736        freq : ndarray or None
737            The frequency values, if available.
738        current_index : int
739            The index of the current peak.
740
741
742        Returns
743        -------
744        float
745            The m/z value of the peak apex.
746        float
747            The frequency value of the peak apex, if available.
748        float
749            The abundance value of the peak apex.
750
751        """
752        # calc prominence
753        # peak_indexes = self.check_prominence(abund, current_index, len_abundance, peak_height_diff )
754
755        # if not peak_indexes:
756
757        #    return None, None, None, None
758
759        # else:
760
761        # fit parabola to three most abundant datapoints
762        list_mass = [
763            mass[current_index - 1],
764            mass[current_index],
765            mass[current_index + 1],
766        ]
767        list_y = [
768            abund[current_index - 1],
769            abund[current_index],
770            abund[current_index + 1],
771        ]
772
773        if self.mspeaks_settings.legacy_centroid_polyfit:
774            z = polyfit(list_mass, list_y, 2)
775            a = z[0]
776            b = z[1]
777        else:
778            a, b, c = self.algebraic_quadratic(list_mass, list_y)
779
780        calculated = -b / (2 * a)
781
782        if calculated < 1 or int(calculated) != int(list_mass[1]):
783            mz_exp_centroid = list_mass[1]
784
785        else:
786            mz_exp_centroid = calculated
787
788        if (
789            self.label == Labels.bruker_frequency
790            or self.label == Labels.midas_frequency
791        ):
792            # fit parabola to three most abundant frequency datapoints
793            list_freq = [
794                freq[current_index - 1],
795                freq[current_index],
796                freq[current_index + 1],
797            ]
798            if self.mspeaks_settings.legacy_centroid_polyfit:
799                z = polyfit(list_mass, list_y, 2)
800                a = z[0]
801                b = z[1]
802            else:
803                a, b, c = self.algebraic_quadratic(list_mass, list_y)
804
805            calculated_freq = -b / (2 * a)
806
807            if calculated_freq < 1 or int(calculated_freq) != freq[current_index]:
808                freq_centr = list_freq[1]
809
810            else:
811                freq_centr = calculated_freq
812
813        else:
814            freq_centr = None
815
816        if self.mspeaks_settings.legacy_centroid_polyfit:
817            abundance_centroid = abund[current_index]
818        else:
819            abundance_centroid = a * mz_exp_centroid**2 + b * mz_exp_centroid + c
820
821        return mz_exp_centroid, freq_centr, abundance_centroid
822
823    def check_prominence(
824        self, abun, current_index, len_abundance, peak_height_diff
825    ) -> tuple or False:
826        """Check the prominence of a peak.
827
828        Parameters
829        ----------
830        abun : ndarray
831            The abundance values.
832        current_index : int
833            The index of the current peak.
834        len_abundance : int
835            The length of the abundance array.
836        peak_height_diff : function
837            The function to calculate the peak height difference.
838
839        Returns
840        -------
841        tuple or False
842            A tuple containing the indexes of the peak, if the prominence is above the threshold.
843            Otherwise, False.
844
845        """
846
847        final_index = self.find_minima(current_index, abun, len_abundance, right=True)
848
849        start_index = self.find_minima(current_index, abun, len_abundance, right=False)
850
851        peak_indexes = (current_index - 1, current_index, current_index + 1)
852
853        if (
854            min(
855                peak_height_diff(current_index, start_index),
856                peak_height_diff(current_index, final_index),
857            )
858            > self.mspeaks_settings.peak_min_prominence_percent
859        ):
860            return peak_indexes
861
862        else:
863            return False
864
865    def use_the_max(self, mass, abund, current_index, len_abundance, peak_height_diff):
866        """Use the max peak height as the centroid
867
868        Parameters
869        ----------
870        mass : ndarray
871            The mass values.
872        abund : ndarray
873            The abundance values.
874        current_index : int
875            The index of the current peak.
876        len_abundance : int
877            The length of the abundance array.
878        peak_height_diff : function
879            The function to calculate the peak height difference.
880
881        Returns
882        -------
883        float
884            The m/z value of the peak apex.
885        float
886            The abundance value of the peak apex.
887        tuple or None
888            A tuple containing the indexes of the peak, if the prominence is above the threshold.
889            Otherwise, None.
890        """
891
892        peak_indexes = self.check_prominence(
893            abund, current_index, len_abundance, peak_height_diff
894        )
895
896        if not peak_indexes:
897            return None, None, None
898
899        else:
900            return mass[current_index], abund[current_index], peak_indexes
901
902    def calc_centroid_legacy(self, mass, abund, freq):
903        """Legacy centroid calculation
904        Deprecated - for deletion.
905
906        """
907        warnings.warn(
908            "Legacy centroid calculation is deprecated. Please use the new centroid calculation method."
909        )
910        pass
911        if False:
912            len_abundance = len(abund)
913
914            max_abundance = max(abund)
915
916            peak_height_diff = (
917                lambda hi, li: ((abund[hi] - abund[li]) / max_abundance) * 100
918            )
919
920            abundance_threshold, factor = self.get_threshold(abund)
921            # print(abundance_threshold, factor)
922            # find indices of all peaks
923            dy = abund[1:] - abund[:-1]
924
925            # replaces nan for infi nity
926            indices_nan = where(isnan(abund))[0]
927
928            if indices_nan.size:
929                abund[indices_nan] = inf
930                dy[where(isnan(dy))[0]] = inf
931
932            indexes = where((hstack((dy, 0)) < 0) & (hstack((0, dy)) > 0))[0]
933
934            # noise threshold
935            if indexes.size and abundance_threshold is not None:
936                indexes = indexes[abund[indexes] / factor >= abundance_threshold]
937            # filter out 'peaks' within 3 points of the spectrum limits
938            # remove entries within 3 points of upper limit
939            indexes = [x for x in indexes if (len_abundance - x) > 3]
940            # remove entries within 3 points of zero
941            indexes = [x for x in indexes if x > 3]
942
943            for current_index in indexes:
944                if self.label == Labels.simulated_profile:
945                    mz_exp_centroid, intes_centr, peak_indexes = self.use_the_max(
946                        mass, abund, current_index, len_abundance, peak_height_diff
947                    )
948                    if mz_exp_centroid:
949                        peak_resolving_power = self.calculate_resolving_power(
950                            abund, mass, current_index
951                        )
952                        s2n = intes_centr / self.baseline_noise_std
953                        freq_centr = None
954                        self.add_mspeak(
955                            self.polarity,
956                            mz_exp_centroid,
957                            abund[current_index],
958                            peak_resolving_power,
959                            s2n,
960                            peak_indexes,
961                            exp_freq=freq_centr,
962                            ms_parent=self,
963                        )
964
965                else:
966                    mz_exp_centroid, freq_centr, intes_centr, peak_indexes = (
967                        self.find_apex_fit_quadratic(
968                            mass,
969                            abund,
970                            freq,
971                            current_index,
972                            len_abundance,
973                            peak_height_diff,
974                        )
975                    )
976                    if mz_exp_centroid:
977                        try:
978                            peak_resolving_power = self.calculate_resolving_power(
979                                abund, mass, current_index
980                            )
981                        except IndexError:
982                            print("index error, skipping peak")
983                            continue
984
985                        s2n = intes_centr / self.baseline_noise_std
986                        self.add_mspeak(
987                            self.polarity,
988                            mz_exp_centroid,
989                            abund[current_index],
990                            peak_resolving_power,
991                            s2n,
992                            peak_indexes,
993                            exp_freq=freq_centr,
994                            ms_parent=self,
995                        )

Class for peak picking.

Parameters
  • None
Attributes
  • None
Methods
  • prepare_peak_picking_data(). Prepare the mz, abundance, and frequence data for peak picking.
  • cut_mz_domain_peak_picking(). Cut the m/z domain for peak picking.
  • extrapolate_axes_for_pp(mz=None, abund=None, freq=None). Extrapolate the m/z axis and fill the abundance axis with 0s.
  • do_peak_picking(). Perform peak picking.
  • find_minima(apex_index, abundance, len_abundance, right=True). Find the minima of a peak.
  • linear_fit_calc(intes, massa, index_term, index_sign). Algebraic solution to a linear fit.
  • calculate_resolving_power(intes, massa, current_index). Calculate the resolving power of a peak.
  • cal_minima(mass, abun). Calculate the minima of a peak.
  • calc_centroid(mass, abund, freq). Calculate the centroid of a peak.
  • get_threshold(intes). Get the intensity threshold for peak picking.
  • algebraic_quadratic(list_mass, list_y). Find the apex of a peak - algebraically.
  • find_apex_fit_quadratic(mass, abund, freq, current_index). Find the apex of a peak.
  • check_prominence(abun, current_index, len_abundance, peak_height_diff). Check the prominence of a peak.
  • use_the_max(mass, abund, current_index, len_abundance, peak_height_diff). Use the max peak height as the centroid.
  • calc_centroid_legacy(mass, abund, freq). Legacy centroid calculation. Deprecated - for deletion.
def prepare_peak_picking_data(self):
70    def prepare_peak_picking_data(self):
71        """Prepare the data for peak picking.
72
73        This function will prepare the m/z, abundance, and frequency data for peak picking according to the settings.
74
75        Returns
76        -------
77        mz : ndarray
78            The m/z axis.
79        abundance : ndarray
80            The abundance axis.
81        freq : ndarray or None
82            The frequency axis, if available.
83        """
84        # First apply cut_mz_domain_peak_picking
85        mz, abundance, freq = self.cut_mz_domain_peak_picking()
86
87        # Then extrapolate the axes for peak picking
88        if self.settings.picking_point_extrapolate > 0:
89            mz, abundance, freq = self.extrapolate_axes_for_pp(mz, abundance, freq)
90        return mz, abundance, freq

Prepare the data for peak picking.

This function will prepare the m/z, abundance, and frequency data for peak picking according to the settings.

Returns
  • mz (ndarray): The m/z axis.
  • abundance (ndarray): The abundance axis.
  • freq (ndarray or None): The frequency axis, if available.
def cut_mz_domain_peak_picking(self):
 92    def cut_mz_domain_peak_picking(self):
 93        """
 94        Cut the m/z domain for peak picking.
 95
 96        Simplified function
 97
 98        Returns
 99        -------
100        mz_domain_X_low_cutoff : ndarray
101            The m/z values within the specified range.
102        mz_domain_low_Y_cutoff : ndarray
103            The abundance values within the specified range.
104        freq_domain_low_Y_cutoff : ndarray or None
105            The frequency values within the specified range, if available.
106
107        """
108        max_picking_mz = self.settings.max_picking_mz
109        min_picking_mz = self.settings.min_picking_mz
110
111        # min_start =  where(self.mz_exp_profile  > min_picking_mz)[0][0]
112        # max_final =  where(self.mz_exp_profile < max_picking_mz)[-1][-1]
113        min_start = searchsorted(a=self.mz_exp_profile, v=min_picking_mz)
114        max_final = searchsorted(a=self.mz_exp_profile, v=max_picking_mz)
115
116        if self.has_frequency:
117            if self.freq_exp_profile.any():
118                return (
119                    self.mz_exp_profile[min_start:max_final],
120                    self.abundance_profile[min_start:max_final],
121                    self.freq_exp_profile[min_start:max_final],
122                )
123
124        else:
125            return (
126                self.mz_exp_profile[min_start:max_final],
127                self.abundance_profile[min_start:max_final],
128                None,
129            )

Cut the m/z domain for peak picking.

Simplified function

Returns
  • mz_domain_X_low_cutoff (ndarray): The m/z values within the specified range.
  • mz_domain_low_Y_cutoff (ndarray): The abundance values within the specified range.
  • freq_domain_low_Y_cutoff (ndarray or None): The frequency values within the specified range, if available.
def legacy_cut_mz_domain_peak_picking(self):
131    def legacy_cut_mz_domain_peak_picking(self):
132        """
133        Cut the m/z domain for peak picking.
134        DEPRECATED
135        Returns
136        -------
137        mz_domain_X_low_cutoff : ndarray
138            The m/z values within the specified range.
139        mz_domain_low_Y_cutoff : ndarray
140            The abundance values within the specified range.
141        freq_domain_low_Y_cutoff : ndarray or None
142            The frequency values within the specified range, if available.
143
144        """
145        max_picking_mz = self.settings.max_picking_mz
146        min_picking_mz = self.settings.min_picking_mz
147
148        min_final = where(self.mz_exp_profile > min_picking_mz)[-1][-1]
149        min_start = where(self.mz_exp_profile > min_picking_mz)[0][0]
150
151        (
152            mz_domain_X_low_cutoff,
153            mz_domain_low_Y_cutoff,
154        ) = (
155            self.mz_exp_profile[min_start:min_final],
156            self.abundance_profile[min_start:min_final],
157        )
158
159        max_final = where(self.mz_exp_profile < max_picking_mz)[-1][-1]
160        max_start = where(self.mz_exp_profile < max_picking_mz)[0][0]
161
162        if self.has_frequency:
163            if self.freq_exp_profile.any():
164                freq_domain_low_Y_cutoff = self.freq_exp_profile[min_start:min_final]
165
166                return (
167                    mz_domain_X_low_cutoff[max_start:max_final],
168                    mz_domain_low_Y_cutoff[max_start:max_final],
169                    freq_domain_low_Y_cutoff[max_start:max_final],
170                )
171
172        else:
173            return (
174                mz_domain_X_low_cutoff[max_start:max_final],
175                mz_domain_low_Y_cutoff[max_start:max_final],
176                None,
177            )

Cut the m/z domain for peak picking. DEPRECATED

Returns
  • mz_domain_X_low_cutoff (ndarray): The m/z values within the specified range.
  • mz_domain_low_Y_cutoff (ndarray): The abundance values within the specified range.
  • freq_domain_low_Y_cutoff (ndarray or None): The frequency values within the specified range, if available.
@staticmethod
def extrapolate_axis(initial_array, pts):
179    @staticmethod
180    def extrapolate_axis(initial_array, pts):
181        """
182        This function will extrapolate an input array in both directions by N pts.
183
184        Parameters
185        ----------
186        initial_array : ndarray
187            The input array.
188        pts : int
189            The number of points to extrapolate.
190
191        Returns
192        -------
193        ndarray
194            The extrapolated array.
195
196        Notes
197        --------
198        This is a static method.
199        """
200        initial_array_len = len(initial_array)
201        right_delta = initial_array[-1] - initial_array[-2]
202        left_delta = initial_array[1] - initial_array[0]
203
204        # Create an array with extra space for extrapolation
205        pad_array = zeros(initial_array_len + 2 * pts)
206
207        # Copy original array into the middle of the padded array
208        pad_array[pts : pts + initial_array_len] = initial_array
209
210        # Extrapolate the right side
211        for pt in range(pts):
212            final_value = initial_array[-1]
213            value_to_add = right_delta * (pt + 1)
214            new_value = final_value + value_to_add
215            pad_array[initial_array_len + pts + pt] = new_value
216
217        # Extrapolate the left side
218        for pt in range(pts):
219            first_value = initial_array[0]
220            value_to_subtract = left_delta * (pt + 1)
221            new_value = first_value - value_to_subtract
222            pad_array[pts - pt - 1] = new_value
223
224        return pad_array

This function will extrapolate an input array in both directions by N pts.

Parameters
  • initial_array (ndarray): The input array.
  • pts (int): The number of points to extrapolate.
Returns
  • ndarray: The extrapolated array.
Notes

This is a static method.

def extrapolate_axes_for_pp(self, mz=None, abund=None, freq=None):
226    def extrapolate_axes_for_pp(self, mz=None, abund=None, freq=None):
227        """Extrapolate the m/z axis and fill the abundance axis with 0s.
228
229        Parameters
230        ----------
231        mz : ndarray or None
232            The m/z axis, if available. If None, the experimental m/z axis is used.
233        abund : ndarray or None
234            The abundance axis, if available. If None, the experimental abundance axis is used.
235        freq : ndarray or None
236            The frequency axis, if available. If None, the experimental frequency axis is used.
237
238        Returns
239        -------
240        mz : ndarray
241            The extrapolated m/z axis.
242        abund : ndarray
243            The abundance axis with 0s filled.
244        freq : ndarray or None
245            The extrapolated frequency axis, if available.
246
247        Notes
248        --------
249        This function will extrapolate the mz axis by the number of datapoints specified in the settings,
250        and fill the abundance axis with 0s.
251        This should prevent peak picking issues at the spectrum edge.
252
253        """
254        # Check if the input arrays are provided
255        if mz is None or abund is None:
256            mz, abund = self.mz_exp_profile, self.abundance_profile
257            if self.has_frequency:
258                freq = self.freq_exp_profile
259            else:
260                freq = None
261        pts = self.settings.picking_point_extrapolate
262        if pts == 0:
263            return mz, abund, freq
264
265        mz = self.extrapolate_axis(mz, pts)
266        abund = pad(abund, (pts, pts), mode="constant", constant_values=(0, 0))
267        if freq is not None:
268            freq = self.extrapolate_axis(freq, pts)
269        return mz, abund, freq

Extrapolate the m/z axis and fill the abundance axis with 0s.

Parameters
  • mz (ndarray or None): The m/z axis, if available. If None, the experimental m/z axis is used.
  • abund (ndarray or None): The abundance axis, if available. If None, the experimental abundance axis is used.
  • freq (ndarray or None): The frequency axis, if available. If None, the experimental frequency axis is used.
Returns
  • mz (ndarray): The extrapolated m/z axis.
  • abund (ndarray): The abundance axis with 0s filled.
  • freq (ndarray or None): The extrapolated frequency axis, if available.
Notes

This function will extrapolate the mz axis by the number of datapoints specified in the settings, and fill the abundance axis with 0s. This should prevent peak picking issues at the spectrum edge.

def do_peak_picking(self):
271    def do_peak_picking(self):
272        """Perform peak picking."""
273        mz, abundance, freq = self.prepare_peak_picking_data()
274
275        if (
276            self.label == Labels.bruker_frequency
277            or self.label == Labels.midas_frequency
278        ):
279            self.calc_centroid(mz, abundance, freq)
280
281        elif self.label == Labels.thermo_profile:
282            self.calc_centroid(mz, abundance, freq)
283
284        elif self.label == Labels.bruker_profile:
285            self.calc_centroid(mz, abundance, freq)
286
287        elif self.label == Labels.booster_profile:
288            self.calc_centroid(mz, abundance, freq)
289
290        elif self.label == Labels.simulated_profile:
291            self.calc_centroid(mz, abundance, freq)
292
293        else:
294            raise Exception("Unknow mass spectrum type", self.label)

Perform peak picking.

def find_minima(self, apex_index, abundance, len_abundance, right=True):
296    def find_minima(self, apex_index, abundance, len_abundance, right=True):
297        """Find the minima of a peak.
298
299        Parameters
300        ----------
301        apex_index : int
302            The index of the peak apex.
303        abundance : ndarray
304            The abundance values.
305        len_abundance : int
306            The length of the abundance array.
307        right : bool, optional
308            Flag indicating whether to search for minima to the right of the apex (default is True).
309
310        Returns
311        -------
312        int
313            The index of the minima.
314
315        """
316        j = apex_index
317
318        if right:
319            minima = abundance[j] > abundance[j + 1]
320        else:
321            minima = abundance[j] > abundance[j - 1]
322
323        while minima:
324            if j == 1 or j == len_abundance - 2:
325                break
326
327            if right:
328                j += 1
329
330                minima = abundance[j] >= abundance[j + 1]
331
332            else:
333                j -= 1
334                minima = abundance[j] >= abundance[j - 1]
335
336        if right:
337            return j
338        else:
339            return j

Find the minima of a peak.

Parameters
  • apex_index (int): The index of the peak apex.
  • abundance (ndarray): The abundance values.
  • len_abundance (int): The length of the abundance array.
  • right (bool, optional): Flag indicating whether to search for minima to the right of the apex (default is True).
Returns
  • int: The index of the minima.
@staticmethod
def linear_fit_calc(intes, massa, index_term, index_sign):
341    @staticmethod
342    def linear_fit_calc(intes, massa, index_term, index_sign):
343        """
344        Algebraic solution to a linear fit - roughly 25-50x faster than numpy polyfit when passing only two vals and doing a 1st order fit
345
346        Parameters
347        ----------
348        intes : ndarray
349            The intensity values.
350        massa : ndarray
351            The mass values.
352        index_term : int
353            The index of the current term.
354        index_sign : str
355            The index sign
356
357        Returns
358        -------
359        ndarray
360            The coefficients of the linear fit.
361
362        Notes
363        --------
364        This is a static method.
365        """
366        if index_sign == "+":
367            x1, x2 = massa[index_term], massa[index_term + 1]
368            y1, y2 = intes[index_term], intes[index_term + 1]
369        elif index_sign == "-":
370            x1, x2 = massa[index_term], massa[index_term - 1]
371            y1, y2 = intes[index_term], intes[index_term - 1]
372        else:
373            warnings.warn("error in linear fit calc, unknown index sign")
374
375        # Calculate the slope (m)
376        slope = (y2 - y1) / (x2 - x1)
377
378        # Calculate the intercept (b)
379        intercept = y1 - slope * x1
380
381        # The coefficients array would be [slope, intercept]
382        coefficients = array([slope, intercept])
383        return coefficients

Algebraic solution to a linear fit - roughly 25-50x faster than numpy polyfit when passing only two vals and doing a 1st order fit

Parameters
  • intes (ndarray): The intensity values.
  • massa (ndarray): The mass values.
  • index_term (int): The index of the current term.
  • index_sign (str): The index sign
Returns
  • ndarray: The coefficients of the linear fit.
Notes

This is a static method.

def calculate_resolving_power(self, intes, massa, current_index):
385    def calculate_resolving_power(self, intes, massa, current_index):
386        """Calculate the resolving power of a peak.
387
388        Parameters
389        ----------
390        intes : ndarray
391            The intensity values.
392        massa : ndarray
393            The mass values.
394        current_index : int
395            The index of the current peak.
396
397        Returns
398        -------
399        float
400            The resolving power of the peak.
401
402        Notes
403        --------
404        This is a conservative calculation of resolving power,
405        the peak need to be resolved at least at the half-maximum magnitude,
406        otherwise, the combined full width at half maximum is used to calculate resolving power.
407
408        """
409
410        peak_height = intes[current_index]
411        target_peak_height = peak_height / 2
412
413        peak_height_minus = peak_height
414        peak_height_plus = peak_height
415
416        # There are issues when a peak is at the high or low limit of a spectrum in finding its local minima and maxima
417        # This solution will return nan for resolving power when a peak is possibly too close to an edge to avoid the issue
418
419        if current_index < 5:
420            warnings.warn("peak at low spectrum edge, returning no resolving power")
421            return nan
422        elif abs(current_index - len(intes)) < 5:
423            warnings.warn("peak at high spectrum edge, returning no resolving power")
424            return nan
425        else:
426            pass
427
428        index_minus = current_index
429        while peak_height_minus >= target_peak_height:
430            index_minus = index_minus - 1
431            if index_minus < 0:
432                warnings.warn(
433                    "Res. calc. warning - peak index minus adjacent to spectrum edge \n \
434                        Zeroing the first 5 data points of abundance. Peaks at spectrum edge may be incorrectly reported \n \
435                        Perhaps try to increase picking_point_extrapolate (e.g. to 3)"
436                )
437                # Pad the first 5 data points with zeros and restart the loop
438                intes[:5] = 0
439                peak_height_minus = target_peak_height
440                index_minus = current_index
441            else:
442                peak_height_minus = intes[index_minus]
443
444        if self.mspeaks_settings.legacy_centroid_polyfit:
445            x = [massa[index_minus], massa[index_minus + 1]]
446            y = [intes[index_minus], intes[index_minus + 1]]
447            coefficients = polyfit(x, y, 1)
448        else:
449            coefficients = self.linear_fit_calc(
450                intes, massa, index_minus, index_sign="+"
451            )
452
453        a = coefficients[0]
454        b = coefficients[1]
455        if self.mspeaks_settings.legacy_resolving_power:
456            y_intercept = intes[index_minus] + (
457                (intes[index_minus + 1] - intes[index_minus]) / 2
458            )
459        else:
460            y_intercept = target_peak_height
461        massa1 = (y_intercept - b) / a
462
463        index_plus = current_index
464        while peak_height_plus >= target_peak_height:
465            index_plus = index_plus + 1
466
467            try:
468                peak_height_plus = intes[index_plus]
469            except IndexError:
470                warnings.warn(
471                    "Res. calc. warning - peak index plus adjacent to spectrum edge \n \
472                        Zeroing the last 5 data points of abundance. Peaks at spectrum edge may be incorrectly reported\
473                        Perhaps try to increase picking_point_extrapolate (e.g. to 3)"
474                )
475                # Pad the first 5 data points with zeros and restart the loop
476                intes[-5:] = 0
477                peak_height_plus = target_peak_height
478                index_plus = current_index
479
480        if self.mspeaks_settings.legacy_centroid_polyfit:
481            x = [massa[index_plus], massa[index_plus - 1]]
482            y = [intes[index_plus], intes[index_plus - 1]]
483            coefficients = polyfit(x, y, 1)
484        else:
485            coefficients = self.linear_fit_calc(
486                intes, massa, index_plus, index_sign="-"
487            )
488
489        a = coefficients[0]
490        b = coefficients[1]
491
492        if self.mspeaks_settings.legacy_resolving_power:
493            y_intercept = intes[index_plus - 1] + (
494                (intes[index_plus] - intes[index_plus - 1]) / 2
495            )
496        else:
497            y_intercept = target_peak_height
498
499        massa2 = (y_intercept - b) / a
500
501        if massa1 > massa2:
502            resolvingpower = massa[current_index] / (massa1 - massa2)
503
504        else:
505            resolvingpower = massa[current_index] / (massa2 - massa1)
506
507        return resolvingpower

Calculate the resolving power of a peak.

Parameters
  • intes (ndarray): The intensity values.
  • massa (ndarray): The mass values.
  • current_index (int): The index of the current peak.
Returns
  • float: The resolving power of the peak.
Notes

This is a conservative calculation of resolving power, the peak need to be resolved at least at the half-maximum magnitude, otherwise, the combined full width at half maximum is used to calculate resolving power.

def cal_minima(self, mass, abun):
509    def cal_minima(self, mass, abun):
510        """Calculate the minima of a peak.
511
512        Parameters
513        ----------
514        mass : ndarray
515            The mass values.
516        abun : ndarray
517            The abundance values.
518
519        Returns
520        -------
521        ndarray or None
522            The mass values at the minima, if found.
523
524        """
525        abun = -abun
526
527        dy = abun[1:] - abun[:-1]
528
529        # replaces nan for infinity
530        indices_nan = where(isnan(abun))[0]
531
532        if indices_nan.size:
533            abun[indices_nan] = inf
534            dy[where(isnan(dy))[0]] = inf
535
536        indexes = where((hstack((dy, 0)) < 0) & (hstack((0, dy)) > 0))[0]
537
538        if indexes.size:
539            return mass[indexes], abun[indexes]

Calculate the minima of a peak.

Parameters
  • mass (ndarray): The mass values.
  • abun (ndarray): The abundance values.
Returns
  • ndarray or None: The mass values at the minima, if found.
def calc_centroid(self, mass, abund, freq):
541    def calc_centroid(self, mass, abund, freq):
542        """Calculate the centroid of a peak.
543
544        Parameters
545        ----------
546        mass : ndarray
547            The mass values.
548        abund : ndarray
549            The abundance values.
550        freq : ndarray or None
551            The frequency values, if available.
552
553        Returns
554        -------
555        None
556
557        """
558
559        max_height = self.mspeaks_settings.peak_height_max_percent
560        max_prominence = self.mspeaks_settings.peak_max_prominence_percent
561        min_peak_datapoints = self.mspeaks_settings.min_peak_datapoints
562        peak_derivative_threshold = self.mspeaks_settings.peak_derivative_threshold
563        max_abun = max(abund)
564        peak_height_diff = lambda hi, li: ((abund[hi] - abund[li]) / max_abun) * 100
565
566        domain = mass
567        signal = abund
568        len_signal = len(signal)
569
570        signal_threshold, factor = self.get_threshold(abund)
571        max_signal = factor
572
573        correct_baseline = False
574
575        include_indexes = sp.peak_picking_first_derivative(
576            domain,
577            signal,
578            max_height,
579            max_prominence,
580            max_signal,
581            min_peak_datapoints,
582            peak_derivative_threshold,
583            signal_threshold=signal_threshold,
584            correct_baseline=correct_baseline,
585            abun_norm=1,
586            plot_res=False,
587        )
588
589        for indexes_tuple in include_indexes:
590            apex_index = indexes_tuple[1]
591
592            peak_indexes = self.check_prominence(
593                abund, apex_index, len_signal, peak_height_diff
594            )
595
596            if peak_indexes:
597                mz_exp_centroid, freq_centr, intes_centr = self.find_apex_fit_quadratic(
598                    mass, abund, freq, apex_index
599                )
600
601                if mz_exp_centroid:
602                    peak_resolving_power = self.calculate_resolving_power(
603                        abund, mass, apex_index
604                    )
605                    s2n = intes_centr / self.baseline_noise_std
606                    self.add_mspeak(
607                        self.polarity,
608                        mz_exp_centroid,
609                        abund[apex_index],
610                        peak_resolving_power,
611                        s2n,
612                        indexes_tuple,
613                        exp_freq=freq_centr,
614                        ms_parent=self,
615                    )
616                # pyplot.plot(domain[start_index: final_index + 1], signal[start_index:final_index + 1], c='black')
617                # pyplot.show()

Calculate the centroid of a peak.

Parameters
  • mass (ndarray): The mass values.
  • abund (ndarray): The abundance values.
  • freq (ndarray or None): The frequency values, if available.
Returns
  • None
def get_threshold(self, intes):
619    def get_threshold(self, intes):
620        """Get the intensity threshold for peak picking.
621
622        Parameters
623        ----------
624        intes : ndarray
625            The intensity values.
626
627        Returns
628        -------
629        float
630            The intensity threshold.
631        float
632            The factor to multiply the intensity threshold by.
633        """
634
635        intes = array(intes).astype(float)
636
637        noise_threshold_method = self.settings.noise_threshold_method
638
639        if noise_threshold_method == "minima":
640            if self.is_centroid:
641                warnings.warn(
642                    "Auto threshould is disabled for centroid data, returning 0"
643                )
644                factor = 1
645                abundance_threshold = 1e-20
646            # print(self.settings.noise_threshold_min_std)
647            else:
648                abundance_threshold = self.baseline_noise + (
649                    self.settings.noise_threshold_min_std * self.baseline_noise_std
650                )
651                factor = 1
652
653        elif noise_threshold_method == "signal_noise":
654            abundance_threshold = self.settings.noise_threshold_min_s2n
655            if self.is_centroid:
656                factor = 1
657            else:
658                factor = self.baseline_noise_std
659
660        elif noise_threshold_method == "relative_abundance":
661            abundance_threshold = self.settings.noise_threshold_min_relative_abundance
662            factor = intes.max() / 100
663
664        elif noise_threshold_method == "absolute_abundance":
665            abundance_threshold = self.settings.noise_threshold_absolute_abundance
666            factor = 1
667
668        elif noise_threshold_method == "log":
669            if self.is_centroid:
670                raise Exception("log noise Not tested for centroid data")
671            abundance_threshold = self.settings.noise_threshold_log_nsigma
672            factor = self.baseline_noise_std
673
674        else:
675            raise Exception(
676                "%s method was not implemented, please refer to corems.mass_spectrum.calc.NoiseCalc Class"
677                % noise_threshold_method
678            )
679
680        return abundance_threshold, factor

Get the intensity threshold for peak picking.

Parameters
  • intes (ndarray): The intensity values.
Returns
  • float: The intensity threshold.
  • float: The factor to multiply the intensity threshold by.
@staticmethod
def algebraic_quadratic(list_mass, list_y):
682    @staticmethod
683    def algebraic_quadratic(list_mass, list_y):
684        """
685        Find the apex of a peak - algebraically.
686        Faster than using numpy polyfit by ~28x per fit.
687
688        Parameters
689        ----------
690        list_mass : ndarray
691            list of m/z values (3 points)
692        list_y : ndarray
693            list of abundance values (3 points)
694
695        Returns
696        -------
697        a, b, c: float
698            coefficients of the quadratic equation.
699
700        Notes
701        --------
702        This is a static method.
703        """
704        x_1, x_2, x_3 = list_mass
705        y_1, y_2, y_3 = list_y
706
707        a = (
708            y_1 / ((x_1 - x_2) * (x_1 - x_3))
709            + y_2 / ((x_2 - x_1) * (x_2 - x_3))
710            + y_3 / ((x_3 - x_1) * (x_3 - x_2))
711        )
712
713        b = (
714            -y_1 * (x_2 + x_3) / ((x_1 - x_2) * (x_1 - x_3))
715            - y_2 * (x_1 + x_3) / ((x_2 - x_1) * (x_2 - x_3))
716            - y_3 * (x_1 + x_2) / ((x_3 - x_1) * (x_3 - x_2))
717        )
718
719        c = (
720            y_1 * x_2 * x_3 / ((x_1 - x_2) * (x_1 - x_3))
721            + y_2 * x_1 * x_3 / ((x_2 - x_1) * (x_2 - x_3))
722            + y_3 * x_1 * x_2 / ((x_3 - x_1) * (x_3 - x_2))
723        )
724        return a, b, c

Find the apex of a peak - algebraically. Faster than using numpy polyfit by ~28x per fit.

Parameters
  • list_mass (ndarray): list of m/z values (3 points)
  • list_y (ndarray): list of abundance values (3 points)
Returns
  • a, b, c (float): coefficients of the quadratic equation.
Notes

This is a static method.

def find_apex_fit_quadratic(self, mass, abund, freq, current_index):
726    def find_apex_fit_quadratic(self, mass, abund, freq, current_index):
727        """
728        Find the apex of a peak.
729
730        Parameters
731        ----------
732        mass : ndarray
733            The mass values.
734        abund : ndarray
735            The abundance values.
736        freq : ndarray or None
737            The frequency values, if available.
738        current_index : int
739            The index of the current peak.
740
741
742        Returns
743        -------
744        float
745            The m/z value of the peak apex.
746        float
747            The frequency value of the peak apex, if available.
748        float
749            The abundance value of the peak apex.
750
751        """
752        # calc prominence
753        # peak_indexes = self.check_prominence(abund, current_index, len_abundance, peak_height_diff )
754
755        # if not peak_indexes:
756
757        #    return None, None, None, None
758
759        # else:
760
761        # fit parabola to three most abundant datapoints
762        list_mass = [
763            mass[current_index - 1],
764            mass[current_index],
765            mass[current_index + 1],
766        ]
767        list_y = [
768            abund[current_index - 1],
769            abund[current_index],
770            abund[current_index + 1],
771        ]
772
773        if self.mspeaks_settings.legacy_centroid_polyfit:
774            z = polyfit(list_mass, list_y, 2)
775            a = z[0]
776            b = z[1]
777        else:
778            a, b, c = self.algebraic_quadratic(list_mass, list_y)
779
780        calculated = -b / (2 * a)
781
782        if calculated < 1 or int(calculated) != int(list_mass[1]):
783            mz_exp_centroid = list_mass[1]
784
785        else:
786            mz_exp_centroid = calculated
787
788        if (
789            self.label == Labels.bruker_frequency
790            or self.label == Labels.midas_frequency
791        ):
792            # fit parabola to three most abundant frequency datapoints
793            list_freq = [
794                freq[current_index - 1],
795                freq[current_index],
796                freq[current_index + 1],
797            ]
798            if self.mspeaks_settings.legacy_centroid_polyfit:
799                z = polyfit(list_mass, list_y, 2)
800                a = z[0]
801                b = z[1]
802            else:
803                a, b, c = self.algebraic_quadratic(list_mass, list_y)
804
805            calculated_freq = -b / (2 * a)
806
807            if calculated_freq < 1 or int(calculated_freq) != freq[current_index]:
808                freq_centr = list_freq[1]
809
810            else:
811                freq_centr = calculated_freq
812
813        else:
814            freq_centr = None
815
816        if self.mspeaks_settings.legacy_centroid_polyfit:
817            abundance_centroid = abund[current_index]
818        else:
819            abundance_centroid = a * mz_exp_centroid**2 + b * mz_exp_centroid + c
820
821        return mz_exp_centroid, freq_centr, abundance_centroid

Find the apex of a peak.

Parameters
  • mass (ndarray): The mass values.
  • abund (ndarray): The abundance values.
  • freq (ndarray or None): The frequency values, if available.
  • current_index (int): The index of the current peak.
Returns
  • float: The m/z value of the peak apex.
  • float: The frequency value of the peak apex, if available.
  • float: The abundance value of the peak apex.
def check_prominence(self, abun, current_index, len_abundance, peak_height_diff) -> tuple:
823    def check_prominence(
824        self, abun, current_index, len_abundance, peak_height_diff
825    ) -> tuple or False:
826        """Check the prominence of a peak.
827
828        Parameters
829        ----------
830        abun : ndarray
831            The abundance values.
832        current_index : int
833            The index of the current peak.
834        len_abundance : int
835            The length of the abundance array.
836        peak_height_diff : function
837            The function to calculate the peak height difference.
838
839        Returns
840        -------
841        tuple or False
842            A tuple containing the indexes of the peak, if the prominence is above the threshold.
843            Otherwise, False.
844
845        """
846
847        final_index = self.find_minima(current_index, abun, len_abundance, right=True)
848
849        start_index = self.find_minima(current_index, abun, len_abundance, right=False)
850
851        peak_indexes = (current_index - 1, current_index, current_index + 1)
852
853        if (
854            min(
855                peak_height_diff(current_index, start_index),
856                peak_height_diff(current_index, final_index),
857            )
858            > self.mspeaks_settings.peak_min_prominence_percent
859        ):
860            return peak_indexes
861
862        else:
863            return False

Check the prominence of a peak.

Parameters
  • abun (ndarray): The abundance values.
  • current_index (int): The index of the current peak.
  • len_abundance (int): The length of the abundance array.
  • peak_height_diff (function): The function to calculate the peak height difference.
Returns
  • tuple or False: A tuple containing the indexes of the peak, if the prominence is above the threshold. Otherwise, False.
def use_the_max(self, mass, abund, current_index, len_abundance, peak_height_diff):
865    def use_the_max(self, mass, abund, current_index, len_abundance, peak_height_diff):
866        """Use the max peak height as the centroid
867
868        Parameters
869        ----------
870        mass : ndarray
871            The mass values.
872        abund : ndarray
873            The abundance values.
874        current_index : int
875            The index of the current peak.
876        len_abundance : int
877            The length of the abundance array.
878        peak_height_diff : function
879            The function to calculate the peak height difference.
880
881        Returns
882        -------
883        float
884            The m/z value of the peak apex.
885        float
886            The abundance value of the peak apex.
887        tuple or None
888            A tuple containing the indexes of the peak, if the prominence is above the threshold.
889            Otherwise, None.
890        """
891
892        peak_indexes = self.check_prominence(
893            abund, current_index, len_abundance, peak_height_diff
894        )
895
896        if not peak_indexes:
897            return None, None, None
898
899        else:
900            return mass[current_index], abund[current_index], peak_indexes

Use the max peak height as the centroid

Parameters
  • mass (ndarray): The mass values.
  • abund (ndarray): The abundance values.
  • current_index (int): The index of the current peak.
  • len_abundance (int): The length of the abundance array.
  • peak_height_diff (function): The function to calculate the peak height difference.
Returns
  • float: The m/z value of the peak apex.
  • float: The abundance value of the peak apex.
  • tuple or None: A tuple containing the indexes of the peak, if the prominence is above the threshold. Otherwise, None.
def calc_centroid_legacy(self, mass, abund, freq):
902    def calc_centroid_legacy(self, mass, abund, freq):
903        """Legacy centroid calculation
904        Deprecated - for deletion.
905
906        """
907        warnings.warn(
908            "Legacy centroid calculation is deprecated. Please use the new centroid calculation method."
909        )
910        pass
911        if False:
912            len_abundance = len(abund)
913
914            max_abundance = max(abund)
915
916            peak_height_diff = (
917                lambda hi, li: ((abund[hi] - abund[li]) / max_abundance) * 100
918            )
919
920            abundance_threshold, factor = self.get_threshold(abund)
921            # print(abundance_threshold, factor)
922            # find indices of all peaks
923            dy = abund[1:] - abund[:-1]
924
925            # replaces nan for infi nity
926            indices_nan = where(isnan(abund))[0]
927
928            if indices_nan.size:
929                abund[indices_nan] = inf
930                dy[where(isnan(dy))[0]] = inf
931
932            indexes = where((hstack((dy, 0)) < 0) & (hstack((0, dy)) > 0))[0]
933
934            # noise threshold
935            if indexes.size and abundance_threshold is not None:
936                indexes = indexes[abund[indexes] / factor >= abundance_threshold]
937            # filter out 'peaks' within 3 points of the spectrum limits
938            # remove entries within 3 points of upper limit
939            indexes = [x for x in indexes if (len_abundance - x) > 3]
940            # remove entries within 3 points of zero
941            indexes = [x for x in indexes if x > 3]
942
943            for current_index in indexes:
944                if self.label == Labels.simulated_profile:
945                    mz_exp_centroid, intes_centr, peak_indexes = self.use_the_max(
946                        mass, abund, current_index, len_abundance, peak_height_diff
947                    )
948                    if mz_exp_centroid:
949                        peak_resolving_power = self.calculate_resolving_power(
950                            abund, mass, current_index
951                        )
952                        s2n = intes_centr / self.baseline_noise_std
953                        freq_centr = None
954                        self.add_mspeak(
955                            self.polarity,
956                            mz_exp_centroid,
957                            abund[current_index],
958                            peak_resolving_power,
959                            s2n,
960                            peak_indexes,
961                            exp_freq=freq_centr,
962                            ms_parent=self,
963                        )
964
965                else:
966                    mz_exp_centroid, freq_centr, intes_centr, peak_indexes = (
967                        self.find_apex_fit_quadratic(
968                            mass,
969                            abund,
970                            freq,
971                            current_index,
972                            len_abundance,
973                            peak_height_diff,
974                        )
975                    )
976                    if mz_exp_centroid:
977                        try:
978                            peak_resolving_power = self.calculate_resolving_power(
979                                abund, mass, current_index
980                            )
981                        except IndexError:
982                            print("index error, skipping peak")
983                            continue
984
985                        s2n = intes_centr / self.baseline_noise_std
986                        self.add_mspeak(
987                            self.polarity,
988                            mz_exp_centroid,
989                            abund[current_index],
990                            peak_resolving_power,
991                            s2n,
992                            peak_indexes,
993                            exp_freq=freq_centr,
994                            ms_parent=self,
995                        )

Legacy centroid calculation Deprecated - for deletion.