corems.mass_spectra.calc.SignalProcessing

  1import numpy as np
  2
  3from scipy.signal import savgol_filter
  4from scipy.signal.windows import boxcar
  5from scipy import interpolate
  6from matplotlib import pyplot as plt
  7from numpy import abs
  8from numpy import array, polyfit, asarray
  9
 10
 11def peak_detector(tic, max_tic):  # TODO remove max_tic argument?
 12    """
 13    Find peaks by detecting minima in the first derivative of the data
 14    Used in LC/GC data processing
 15
 16    Parameters
 17    ----------
 18    tic : array
 19        array of data points to find the peaks
 20    max_tic : float
 21        maximum value of the data points
 22
 23    Returns
 24    -------
 25    tuple
 26        tuple of indexes of the start, apex and final points of the peak
 27
 28    """
 29    dy = derivate(tic)
 30
 31    indexes = np.where((np.hstack((dy, 0)) < 0) & (np.hstack((0, dy)) > 0))[0]
 32
 33    for index in indexes:
 34        start_index = find_minima(index, tic, right=False)
 35        final_index = find_minima(index, tic)
 36
 37        yield (start_index, index, final_index)
 38
 39
 40def find_nearest_scan(data, nodes):
 41    """
 42    Find nearest data point in a list of nodes (derivated data)
 43    in LC/GC this is 'scan', in MS this is 'm/z' data point
 44
 45    Parameters
 46    ----------
 47    data : float
 48        data point to find the nearest node
 49    nodes : array
 50        array of nodes to search for the nearest node
 51
 52    Returns
 53    -------
 54    float
 55        nearest node to the data point
 56    """
 57
 58    array_data = asarray(nodes)
 59
 60    scan_index = (abs(array_data - data)).argmin()
 61
 62    return nodes[scan_index]
 63
 64
 65def check_corrected_abundance(
 66    closest_left,
 67    closest_right,
 68    apex_index,
 69    signal,
 70    max_signal,
 71    signal_threshold,
 72    abun_norm,
 73):
 74    """
 75    Check the corrected abundance of the peak
 76
 77    Parameters
 78    ----------
 79    closest_left : int
 80        index of the closest left node
 81    closest_right : int
 82        index of the closest right node
 83    apex_index : int
 84        index of the apex node
 85    signal : array
 86        array of data points to find the peaks
 87    max_signal : float
 88        maximum value of the data points
 89    signal_threshold : float
 90        threshold for the signal
 91    abun_norm : float
 92        abundance normalization factor
 93
 94    Returns
 95    -------
 96    float
 97        corrected abundance of the peak
 98
 99
100    """
101    x = [closest_left, closest_right]
102    y = [signal[closest_left], signal[closest_right]]
103
104    pol = polyfit(x, y, 1)  # TODO replace with faster method in this file
105
106    corrected_peak_height = signal[apex_index] - pol(apex_index)
107
108    if (corrected_peak_height / max_signal) * abun_norm > signal_threshold:
109        return corrected_peak_height
110    else:
111        return False
112
113
114def peak_picking_first_derivative(
115    domain,
116    signal,
117    max_height,
118    max_prominence,
119    max_signal,
120    min_peak_datapoints,
121    peak_derivative_threshold,
122    signal_threshold=0.1,
123    correct_baseline=True,
124    plot_res=False,
125    abun_norm=100,
126    check_abundance=False,
127    apex_indexes=[],
128):
129    """
130    Find peaks by detecting minima in the first derivative of the data
131    Used in LC/GC and MS data processing
132    Optional baseline correction, then peak apex detection via 1st derivative.
133    For each apex the peak datapoints surrounding the apex are determined.
134    Some basic thresholding is applied (signal, number of datapoints, etc).
135
136    Parameters
137    ----------
138    domain : array
139        array of data points to find the peaks
140    signal : array
141        array of data points to find the peaks
142    max_height : float
143        maximum height of the peak
144    max_prominence : float
145        maximum prominence of the peak
146    max_signal : float
147        maximum signal of the peak
148    min_peak_datapoints : int
149        minimum number of data points in the peak
150    peak_derivative_threshold : float
151        threshold for the peak derivative
152    signal_threshold : float
153        threshold for the signal
154    correct_baseline : bool
155        flag to correct the baseline
156    plot_res : bool
157        flag to plot the results
158    abun_norm : float
159        abundance normalization factor
160    check_abundance : bool
161        flag to check the abundance
162
163
164    Returns
165    -------
166    tuple
167        tuple of indexes of the start, apex and final points of the peak
168
169
170    """
171    if correct_baseline:
172        signal = signal - baseline_detector(signal, domain, max_height, max_prominence)
173
174    domain = np.array(domain)
175    signal = np.array(signal)
176
177    dy = derivate(signal)
178    if len(apex_indexes) == 0:
179        # Find apexes
180        apex_indexes = np.where((np.hstack((dy, 0)) < 0) & (np.hstack((0, dy)) > 0))[0]
181    else:
182        apex_indexes = np.array(apex_indexes)
183
184    if apex_indexes.size and apex_indexes is not None:
185        apex_indexes = apex_indexes[
186            signal[apex_indexes] / max_signal >= signal_threshold
187        ]
188
189    signal = signal / max(signal)
190    start_peak = []
191    end_peak = []
192
193    pos_dy_threshold = peak_derivative_threshold  # max(dy) * peak_derivative_threshold
194    neg_dy_threshold = -peak_derivative_threshold  # min(dy) * peak_derivative_threshold
195    len_dy = len(dy)
196    # take apex_index and move left to find start
197    for index in apex_indexes:
198        # catch for starting position
199
200        if index == 0:
201            index_start = index
202        else:
203            index_start = index - 1
204
205        # catch for ending position
206        if (index + 1) >= dy.shape[0]:
207            index_end = index - 1
208        else:
209            index_end = index + 1
210
211        # while dy[index_start-1] > 0 and index_start != 0:
212        while dy[index_start - 1] > pos_dy_threshold and index_start > 0:
213            index_start = index_start - 1
214        start_peak.append(index_start)
215
216        # while dy[index_end] < 0 and index_end != (len(dy) - 1):
217        while dy[index_end] < neg_dy_threshold and index_end != (len_dy - 1):
218            index_end = index_end + 1
219        end_peak.append(index_end)
220
221    start_peak = array(start_peak)
222    end_peak = array(end_peak)
223
224    for apex_index in apex_indexes:
225        # index_gt_apex = np.where(end_peak >= apex_index)[0]
226        # index_lt_apex = np.where(start_peak <= apex_index)[0]
227        index_gt_apex = np.arange(np.searchsorted(end_peak, apex_index), len(end_peak))
228        index_lt_apex = np.arange(
229            0, np.searchsorted(start_peak, apex_index, side="right")
230        )
231
232        if not index_gt_apex.size == 0 and not index_lt_apex.size == 0:
233            closest_right = find_nearest_scan(apex_index, end_peak[index_gt_apex])
234            closest_left = find_nearest_scan(apex_index, start_peak[index_lt_apex])
235            if check_abundance:
236                corrected_peak_height = check_corrected_abundance(
237                    closest_left,
238                    closest_right,
239                    apex_index,
240                    signal,
241                    max_signal,
242                    signal_threshold,
243                    abun_norm,
244                )
245            else:
246                corrected_peak_height = signal[apex_index]
247
248            if (closest_right - closest_left) >= min_peak_datapoints:
249                if plot_res:
250                    plt.plot(
251                        domain[closest_left : closest_right + 1],
252                        dy[closest_left : closest_right + 1],
253                        c="red",
254                    )
255                    plt.plot(
256                        domain[closest_left : closest_right + 1],
257                        signal[closest_left : closest_right + 1],
258                        c="black",
259                    )
260                    plt.title(str((corrected_peak_height / max_signal) * 100))
261                    plt.show()
262
263                yield (closest_left, apex_index, closest_right)
264
265
266def find_minima(index, tic, right=True):
267    """
268    Find the index of the local minima in the given time-of-flight (TOF) intensity array.
269
270    Parameters:
271    -----------
272    index: int
273        The starting index to search for the minima.
274    tic: list
275        TIC data points
276    right : bool, optional
277        Determines the direction of the search. If True, search to the right of the index. If False, search to the left of the index. Default is True.
278
279    Returns:
280    --------
281    int
282        The index of the local minima in the TIC  array.
283    """
284
285    j = index
286    # apex_abundance = tic[index]
287    tic_len = len(tic)
288
289    if right:
290        minima = tic[j] >= tic[j + 1]
291    else:
292        minima = tic[j] >= tic[j - 1]
293
294    while minima:
295        if j == 1 or j == tic_len - 2:
296            break
297
298        if right:
299            j += 1
300
301            minima = tic[j] >= tic[j + 1]
302
303        else:
304            j -= 1
305            minima = tic[j] >= tic[j - 1]
306
307    if right:
308        return j
309    else:
310        return j
311
312
313def derivate(data_array):
314    """
315    Calculate derivative of the data points.
316    Replaces nan with infinity
317
318    Parameters
319    ----------
320    data_array : array
321        array of data points
322
323    Returns
324    -------
325    array
326        array of the derivative of the data points
327    """
328    data_array = np.array(data_array)
329
330    dy = data_array[1:] - data_array[:-1]
331
332    # replaces nan for infinity
333    indices_nan = np.where(np.isnan(data_array))[0]
334
335    if indices_nan.size:
336        data_array[indices_nan] = np.inf
337        dy[np.where(np.isnan(dy))[0]] = np.inf
338
339    return dy
340
341
342def minima_detector(tic, max_tic, peak_height_max_percent, peak_max_prominence_percent):
343    """
344    Minima detector for the TIC data points.
345
346    Parameters
347    ----------
348    tic : array
349        array of data points to find the peaks
350    max_tic : float
351        maximum value of the data points
352    peak_height_max_percent : float
353        maximum height of the peak
354    peak_max_prominence_percent : float
355        maximum prominence of the peak
356
357    Returns
358    -------
359    generator
360        generator of the indexes of the minima in the TIC array
361
362    """
363    peak_height_diff = lambda hi, li: ((tic[hi] - tic[li]) / max_tic) * 100
364
365    for start_index, index, final_index in peak_detector(tic, max_tic):
366        # abundance max threshold
367        if (tic[index] / max_tic) * 100 < peak_height_max_percent:
368            # calculates prominence and filter
369            if (
370                peak_height_diff(index, start_index)
371                and peak_height_diff(index, final_index) < peak_max_prominence_percent
372            ):
373                yield from (start_index, final_index)
374
375
376def baseline_detector(
377    tic, rt, peak_height_max_percent, peak_max_prominence_percent, do_interpolation=True
378):
379    """
380    Baseline detector for the TIC data points.
381    For LC/GC data processing
382
383    Parameters
384    ----------
385    tic : array
386        array of data points to find the peaks
387    rt : array
388        array of retention time data points
389    peak_height_max_percent : float
390        maximum height of the peak
391    peak_max_prominence_percent : float
392        maximum prominence of the peak
393    do_interpolation : bool, optional
394        flag to interpolate the data points. Default is True
395
396    Returns
397    -------
398    array
399        array of the baseline corrected data points
400
401    """
402    rt = np.array(rt)
403
404    max_tic = max(tic)
405
406    indexes = sorted(
407        list(
408            set(
409                i
410                for i in minima_detector(
411                    tic, max_tic, peak_height_max_percent, peak_max_prominence_percent
412                )
413            )
414        )
415    )
416
417    y = -tic
418
419    x1 = rt[indexes]
420
421    y1 = y[indexes]
422
423    if len(x1) <= 5:
424        return tic
425
426    if not do_interpolation:
427        y0 = np.zeros(tic.shape)
428        y0[indexes] = y[indexes]
429
430        return y0
431
432    else:
433        f1 = interpolate.interp1d(x1, y1, kind="quadratic", fill_value="extrapolate")
434
435        ynew1 = f1(list(rt))
436
437        # from matplotlib import pyplot as plt
438        # if self.deconv_rt_list and  self.deconv_mz == 51:
439
440        #   plt.plot(rt, tic-(-1* ynew1), color='green')
441
442        # plt.plot(rt, -1* ynew1, c='black')
443
444        # s = self.smooth(s, 10, 'blackman')
445
446        # plt.plot(self.retention_time, -s)
447
448        # plt.show()
449
450        return -1 * ynew1
451
452
453def peak_detector_generator(
454    tic, stds, method, rt, max_height, min_height, max_prominence, min_datapoints
455):
456    """
457    Peak detector generator for the TIC data points.
458
459    Parameters
460    ----------
461    tic : array
462        array of data points to find the peaks
463    stds : float
464        standard deviation
465    method : str
466        method to detect the peaks
467        Available methods: 'manual_relative_abundance', 'auto_relative_abundance', 'second_derivative'
468    rt : array
469        array of retention time data points
470    max_height : float
471        maximum height of the peak
472    min_height : float
473        minimum height of the peak
474    max_prominence : float
475        maximum prominence of the peak
476    min_datapoints : int
477        minimum number of data points in the peak
478
479    Returns
480    -------
481    generator
482        generator of the indexes of the peaks in the TIC array
483
484    """
485    max_tic = max(tic)
486
487    if method == "manual_relative_abundance":
488        tic = tic - baseline_detector(tic, rt, max_height, max_prominence)
489
490        norm_tic = (tic / max_tic) * 100
491
492        remove_indexes = np.where(norm_tic < min_height)[0]
493
494        # if self.deconv_rt_list and  self.deconv_mz == 51:
495        #    plt.plot(self.deconv_rt_list, tic, label=self.deconv_mz)
496
497    elif method == "auto_relative_abundance":
498        tic = tic - baseline_detector(tic, rt, max_height, max_prominence)
499
500        baseline = baseline_detector(tic, rt, max_height, max_prominence)
501
502        peak_detect_threshold = np.nanmean(baseline) + (stds * np.std(baseline))
503
504        remove_indexes = np.where(tic < peak_detect_threshold)[0]
505
506    elif method == "second_derivative":
507        remove_indexes = second_derivative_threshold(
508            tic, stds, rt, max_height, max_prominence
509        )
510
511    else:
512        NotImplemented(method)
513
514    peak_height_diff = lambda hi, li: ((tic[hi] - tic[li]) / max_tic) * 100
515
516    dy = derivate(tic)
517
518    include_indexes = np.where((np.hstack((dy, 0)) < 0) & (np.hstack((0, dy)) > 0))[0]
519
520    final_indexes = sorted(set(include_indexes) - set(remove_indexes))
521
522    # from matplotlib import pyplot as plt
523
524    # plt.plot(self.retention_time, tic, color='black')
525    # plt.scatter(self.retention_time[remove_indexes], tic[remove_indexes], color='red')
526    # plt.scatter(self.retention_time[include_indexes], tic[include_indexes], color='blue')
527    # plt.scatter(self.retention_time[final_indexes], tic[final_indexes], color='blue')
528
529    # plt.show()
530
531    for index in final_indexes:
532        start_index = find_minima(index, tic, right=False)
533        final_index = find_minima(index, tic)
534
535        if final_index - start_index > min_datapoints:
536            # if min( peak_height_diff(index,start_index), peak_height_diff(index,final_index) )> self.chromatogram_settings.peak_min_prominence_percent :
537
538            yield (start_index, index, final_index)
539
540
541def smooth_signal(x, window_len, window, pol_order, implemented_smooth_method):
542    """
543    Smooth the data using a window with requested size.
544
545    This method is based on the convolution of a scaled window with the signal.
546    The signal is prepared by introducing reflected copies of the signal
547    (with the window size) in both ends so that transient parts are minimized
548    in the begining and end part of the output signal.
549
550    Parameters
551    ----------
552    x: array
553        the input signal
554    window_len: int
555        the dimension of the smoothing window; should be an odd integer
556    window: str
557        the type of window from 'flat', 'hanning', 'hamming', 'bartlett', 'blackman'
558    pol_order: int
559        the order of the polynomial to fit the data
560    implemented_smooth_method: list
561        list of implemented smoothing methods
562
563    Returns
564    -------
565    y: array
566        the smoothed signal
567
568    Notes:
569    -----
570    See also: numpy.hanning, numpy.hamming, numpy.bartlett, numpy.blackman, numpy.convolve
571    scipy.signal.savgol_filter
572
573    """
574    x = np.array(x)
575
576    if x.ndim != 1:
577        raise ValueError("smooth only accepts 1 dimension arrays.")
578
579    if x.size < window_len:
580        raise ValueError("Input array needs to be bigger than window size")
581
582    # if window_len < 3:
583    #    return x
584
585    if not window in implemented_smooth_method:
586        raise ValueError(
587            "Window method should be 'flat', 'hanning', 'hamming', 'bartlett', 'blackman'"
588        )
589
590    s = np.r_[x[window_len - 1 : 0 : -1], x, x[-1:-window_len:-1]]
591
592    if window == "savgol":
593        return savgol_filter(x, window_len, pol_order)
594
595    elif window == "boxcar":  # moving average
596        w = boxcar(window_len)
597
598        y = np.convolve(w, s, mode="valid")
599
600    elif window == "flat":  # moving average
601        w = np.ones(window_len, "d")
602
603        y = np.convolve(w / w.sum(), s, mode="valid")
604
605    else:
606        w = eval(window + "(window_len)")
607
608        y = np.convolve(w / w.sum(), s, mode="valid")
609
610    return y[int(window_len / 2 - 1) : int(-window_len / 2)]
611
612
613def second_derivative_threshold(
614    tic, stds, rt, peak_height_max_percent, peak_max_prominence_percent
615):
616    """
617    Second derivative threshold for the TIC data points.
618    For LC/GC data processing
619
620    Parameters
621    ----------
622    tic : array
623        array of data points to find the peaks
624    stds : float
625        standard deviation
626    rt : array
627        array of retention time data points
628    peak_height_max_percent : float
629        maximum height of the peak
630
631    Returns
632    -------
633    array
634        array of the indexes of the data points to remove
635
636    """
637
638    dy = derivate(tic)
639
640    dydy = derivate(dy)
641    dydy = np.hstack((dydy, 0))
642    dydy = np.hstack((0, dydy))
643
644    baseline = baseline_detector(
645        dydy,
646        rt,
647        peak_height_max_percent,
648        peak_max_prominence_percent,
649        do_interpolation=False,
650    )
651
652    threshold_median = np.median(baseline) - (stds * np.std(baseline))
653
654    remove_indexes = np.where(dydy > threshold_median)[0]
655
656    return remove_indexes
def peak_detector(tic, max_tic):
12def peak_detector(tic, max_tic):  # TODO remove max_tic argument?
13    """
14    Find peaks by detecting minima in the first derivative of the data
15    Used in LC/GC data processing
16
17    Parameters
18    ----------
19    tic : array
20        array of data points to find the peaks
21    max_tic : float
22        maximum value of the data points
23
24    Returns
25    -------
26    tuple
27        tuple of indexes of the start, apex and final points of the peak
28
29    """
30    dy = derivate(tic)
31
32    indexes = np.where((np.hstack((dy, 0)) < 0) & (np.hstack((0, dy)) > 0))[0]
33
34    for index in indexes:
35        start_index = find_minima(index, tic, right=False)
36        final_index = find_minima(index, tic)
37
38        yield (start_index, index, final_index)

Find peaks by detecting minima in the first derivative of the data Used in LC/GC data processing

Parameters
  • tic (array): array of data points to find the peaks
  • max_tic (float): maximum value of the data points
Returns
  • tuple: tuple of indexes of the start, apex and final points of the peak
def find_nearest_scan(data, nodes):
41def find_nearest_scan(data, nodes):
42    """
43    Find nearest data point in a list of nodes (derivated data)
44    in LC/GC this is 'scan', in MS this is 'm/z' data point
45
46    Parameters
47    ----------
48    data : float
49        data point to find the nearest node
50    nodes : array
51        array of nodes to search for the nearest node
52
53    Returns
54    -------
55    float
56        nearest node to the data point
57    """
58
59    array_data = asarray(nodes)
60
61    scan_index = (abs(array_data - data)).argmin()
62
63    return nodes[scan_index]

Find nearest data point in a list of nodes (derivated data) in LC/GC this is 'scan', in MS this is 'm/z' data point

Parameters
  • data (float): data point to find the nearest node
  • nodes (array): array of nodes to search for the nearest node
Returns
  • float: nearest node to the data point
def check_corrected_abundance( closest_left, closest_right, apex_index, signal, max_signal, signal_threshold, abun_norm):
 66def check_corrected_abundance(
 67    closest_left,
 68    closest_right,
 69    apex_index,
 70    signal,
 71    max_signal,
 72    signal_threshold,
 73    abun_norm,
 74):
 75    """
 76    Check the corrected abundance of the peak
 77
 78    Parameters
 79    ----------
 80    closest_left : int
 81        index of the closest left node
 82    closest_right : int
 83        index of the closest right node
 84    apex_index : int
 85        index of the apex node
 86    signal : array
 87        array of data points to find the peaks
 88    max_signal : float
 89        maximum value of the data points
 90    signal_threshold : float
 91        threshold for the signal
 92    abun_norm : float
 93        abundance normalization factor
 94
 95    Returns
 96    -------
 97    float
 98        corrected abundance of the peak
 99
100
101    """
102    x = [closest_left, closest_right]
103    y = [signal[closest_left], signal[closest_right]]
104
105    pol = polyfit(x, y, 1)  # TODO replace with faster method in this file
106
107    corrected_peak_height = signal[apex_index] - pol(apex_index)
108
109    if (corrected_peak_height / max_signal) * abun_norm > signal_threshold:
110        return corrected_peak_height
111    else:
112        return False

Check the corrected abundance of the peak

Parameters
  • closest_left (int): index of the closest left node
  • closest_right (int): index of the closest right node
  • apex_index (int): index of the apex node
  • signal (array): array of data points to find the peaks
  • max_signal (float): maximum value of the data points
  • signal_threshold (float): threshold for the signal
  • abun_norm (float): abundance normalization factor
Returns
  • float: corrected abundance of the peak
def peak_picking_first_derivative( domain, signal, max_height, max_prominence, max_signal, min_peak_datapoints, peak_derivative_threshold, signal_threshold=0.1, correct_baseline=True, plot_res=False, abun_norm=100, check_abundance=False, apex_indexes=[]):
115def peak_picking_first_derivative(
116    domain,
117    signal,
118    max_height,
119    max_prominence,
120    max_signal,
121    min_peak_datapoints,
122    peak_derivative_threshold,
123    signal_threshold=0.1,
124    correct_baseline=True,
125    plot_res=False,
126    abun_norm=100,
127    check_abundance=False,
128    apex_indexes=[],
129):
130    """
131    Find peaks by detecting minima in the first derivative of the data
132    Used in LC/GC and MS data processing
133    Optional baseline correction, then peak apex detection via 1st derivative.
134    For each apex the peak datapoints surrounding the apex are determined.
135    Some basic thresholding is applied (signal, number of datapoints, etc).
136
137    Parameters
138    ----------
139    domain : array
140        array of data points to find the peaks
141    signal : array
142        array of data points to find the peaks
143    max_height : float
144        maximum height of the peak
145    max_prominence : float
146        maximum prominence of the peak
147    max_signal : float
148        maximum signal of the peak
149    min_peak_datapoints : int
150        minimum number of data points in the peak
151    peak_derivative_threshold : float
152        threshold for the peak derivative
153    signal_threshold : float
154        threshold for the signal
155    correct_baseline : bool
156        flag to correct the baseline
157    plot_res : bool
158        flag to plot the results
159    abun_norm : float
160        abundance normalization factor
161    check_abundance : bool
162        flag to check the abundance
163
164
165    Returns
166    -------
167    tuple
168        tuple of indexes of the start, apex and final points of the peak
169
170
171    """
172    if correct_baseline:
173        signal = signal - baseline_detector(signal, domain, max_height, max_prominence)
174
175    domain = np.array(domain)
176    signal = np.array(signal)
177
178    dy = derivate(signal)
179    if len(apex_indexes) == 0:
180        # Find apexes
181        apex_indexes = np.where((np.hstack((dy, 0)) < 0) & (np.hstack((0, dy)) > 0))[0]
182    else:
183        apex_indexes = np.array(apex_indexes)
184
185    if apex_indexes.size and apex_indexes is not None:
186        apex_indexes = apex_indexes[
187            signal[apex_indexes] / max_signal >= signal_threshold
188        ]
189
190    signal = signal / max(signal)
191    start_peak = []
192    end_peak = []
193
194    pos_dy_threshold = peak_derivative_threshold  # max(dy) * peak_derivative_threshold
195    neg_dy_threshold = -peak_derivative_threshold  # min(dy) * peak_derivative_threshold
196    len_dy = len(dy)
197    # take apex_index and move left to find start
198    for index in apex_indexes:
199        # catch for starting position
200
201        if index == 0:
202            index_start = index
203        else:
204            index_start = index - 1
205
206        # catch for ending position
207        if (index + 1) >= dy.shape[0]:
208            index_end = index - 1
209        else:
210            index_end = index + 1
211
212        # while dy[index_start-1] > 0 and index_start != 0:
213        while dy[index_start - 1] > pos_dy_threshold and index_start > 0:
214            index_start = index_start - 1
215        start_peak.append(index_start)
216
217        # while dy[index_end] < 0 and index_end != (len(dy) - 1):
218        while dy[index_end] < neg_dy_threshold and index_end != (len_dy - 1):
219            index_end = index_end + 1
220        end_peak.append(index_end)
221
222    start_peak = array(start_peak)
223    end_peak = array(end_peak)
224
225    for apex_index in apex_indexes:
226        # index_gt_apex = np.where(end_peak >= apex_index)[0]
227        # index_lt_apex = np.where(start_peak <= apex_index)[0]
228        index_gt_apex = np.arange(np.searchsorted(end_peak, apex_index), len(end_peak))
229        index_lt_apex = np.arange(
230            0, np.searchsorted(start_peak, apex_index, side="right")
231        )
232
233        if not index_gt_apex.size == 0 and not index_lt_apex.size == 0:
234            closest_right = find_nearest_scan(apex_index, end_peak[index_gt_apex])
235            closest_left = find_nearest_scan(apex_index, start_peak[index_lt_apex])
236            if check_abundance:
237                corrected_peak_height = check_corrected_abundance(
238                    closest_left,
239                    closest_right,
240                    apex_index,
241                    signal,
242                    max_signal,
243                    signal_threshold,
244                    abun_norm,
245                )
246            else:
247                corrected_peak_height = signal[apex_index]
248
249            if (closest_right - closest_left) >= min_peak_datapoints:
250                if plot_res:
251                    plt.plot(
252                        domain[closest_left : closest_right + 1],
253                        dy[closest_left : closest_right + 1],
254                        c="red",
255                    )
256                    plt.plot(
257                        domain[closest_left : closest_right + 1],
258                        signal[closest_left : closest_right + 1],
259                        c="black",
260                    )
261                    plt.title(str((corrected_peak_height / max_signal) * 100))
262                    plt.show()
263
264                yield (closest_left, apex_index, closest_right)

Find peaks by detecting minima in the first derivative of the data Used in LC/GC and MS data processing Optional baseline correction, then peak apex detection via 1st derivative. For each apex the peak datapoints surrounding the apex are determined. Some basic thresholding is applied (signal, number of datapoints, etc).

Parameters
  • domain (array): array of data points to find the peaks
  • signal (array): array of data points to find the peaks
  • max_height (float): maximum height of the peak
  • max_prominence (float): maximum prominence of the peak
  • max_signal (float): maximum signal of the peak
  • min_peak_datapoints (int): minimum number of data points in the peak
  • peak_derivative_threshold (float): threshold for the peak derivative
  • signal_threshold (float): threshold for the signal
  • correct_baseline (bool): flag to correct the baseline
  • plot_res (bool): flag to plot the results
  • abun_norm (float): abundance normalization factor
  • check_abundance (bool): flag to check the abundance
Returns
  • tuple: tuple of indexes of the start, apex and final points of the peak
def find_minima(index, tic, right=True):
267def find_minima(index, tic, right=True):
268    """
269    Find the index of the local minima in the given time-of-flight (TOF) intensity array.
270
271    Parameters:
272    -----------
273    index: int
274        The starting index to search for the minima.
275    tic: list
276        TIC data points
277    right : bool, optional
278        Determines the direction of the search. If True, search to the right of the index. If False, search to the left of the index. Default is True.
279
280    Returns:
281    --------
282    int
283        The index of the local minima in the TIC  array.
284    """
285
286    j = index
287    # apex_abundance = tic[index]
288    tic_len = len(tic)
289
290    if right:
291        minima = tic[j] >= tic[j + 1]
292    else:
293        minima = tic[j] >= tic[j - 1]
294
295    while minima:
296        if j == 1 or j == tic_len - 2:
297            break
298
299        if right:
300            j += 1
301
302            minima = tic[j] >= tic[j + 1]
303
304        else:
305            j -= 1
306            minima = tic[j] >= tic[j - 1]
307
308    if right:
309        return j
310    else:
311        return j

Find the index of the local minima in the given time-of-flight (TOF) intensity array.

Parameters:

index: int The starting index to search for the minima. tic: list TIC data points right : bool, optional Determines the direction of the search. If True, search to the right of the index. If False, search to the left of the index. Default is True.

Returns:

int The index of the local minima in the TIC array.

def derivate(data_array):
314def derivate(data_array):
315    """
316    Calculate derivative of the data points.
317    Replaces nan with infinity
318
319    Parameters
320    ----------
321    data_array : array
322        array of data points
323
324    Returns
325    -------
326    array
327        array of the derivative of the data points
328    """
329    data_array = np.array(data_array)
330
331    dy = data_array[1:] - data_array[:-1]
332
333    # replaces nan for infinity
334    indices_nan = np.where(np.isnan(data_array))[0]
335
336    if indices_nan.size:
337        data_array[indices_nan] = np.inf
338        dy[np.where(np.isnan(dy))[0]] = np.inf
339
340    return dy

Calculate derivative of the data points. Replaces nan with infinity

Parameters
  • data_array (array): array of data points
Returns
  • array: array of the derivative of the data points
def minima_detector(tic, max_tic, peak_height_max_percent, peak_max_prominence_percent):
343def minima_detector(tic, max_tic, peak_height_max_percent, peak_max_prominence_percent):
344    """
345    Minima detector for the TIC data points.
346
347    Parameters
348    ----------
349    tic : array
350        array of data points to find the peaks
351    max_tic : float
352        maximum value of the data points
353    peak_height_max_percent : float
354        maximum height of the peak
355    peak_max_prominence_percent : float
356        maximum prominence of the peak
357
358    Returns
359    -------
360    generator
361        generator of the indexes of the minima in the TIC array
362
363    """
364    peak_height_diff = lambda hi, li: ((tic[hi] - tic[li]) / max_tic) * 100
365
366    for start_index, index, final_index in peak_detector(tic, max_tic):
367        # abundance max threshold
368        if (tic[index] / max_tic) * 100 < peak_height_max_percent:
369            # calculates prominence and filter
370            if (
371                peak_height_diff(index, start_index)
372                and peak_height_diff(index, final_index) < peak_max_prominence_percent
373            ):
374                yield from (start_index, final_index)

Minima detector for the TIC data points.

Parameters
  • tic (array): array of data points to find the peaks
  • max_tic (float): maximum value of the data points
  • peak_height_max_percent (float): maximum height of the peak
  • peak_max_prominence_percent (float): maximum prominence of the peak
Returns
  • generator: generator of the indexes of the minima in the TIC array
def baseline_detector( tic, rt, peak_height_max_percent, peak_max_prominence_percent, do_interpolation=True):
377def baseline_detector(
378    tic, rt, peak_height_max_percent, peak_max_prominence_percent, do_interpolation=True
379):
380    """
381    Baseline detector for the TIC data points.
382    For LC/GC data processing
383
384    Parameters
385    ----------
386    tic : array
387        array of data points to find the peaks
388    rt : array
389        array of retention time data points
390    peak_height_max_percent : float
391        maximum height of the peak
392    peak_max_prominence_percent : float
393        maximum prominence of the peak
394    do_interpolation : bool, optional
395        flag to interpolate the data points. Default is True
396
397    Returns
398    -------
399    array
400        array of the baseline corrected data points
401
402    """
403    rt = np.array(rt)
404
405    max_tic = max(tic)
406
407    indexes = sorted(
408        list(
409            set(
410                i
411                for i in minima_detector(
412                    tic, max_tic, peak_height_max_percent, peak_max_prominence_percent
413                )
414            )
415        )
416    )
417
418    y = -tic
419
420    x1 = rt[indexes]
421
422    y1 = y[indexes]
423
424    if len(x1) <= 5:
425        return tic
426
427    if not do_interpolation:
428        y0 = np.zeros(tic.shape)
429        y0[indexes] = y[indexes]
430
431        return y0
432
433    else:
434        f1 = interpolate.interp1d(x1, y1, kind="quadratic", fill_value="extrapolate")
435
436        ynew1 = f1(list(rt))
437
438        # from matplotlib import pyplot as plt
439        # if self.deconv_rt_list and  self.deconv_mz == 51:
440
441        #   plt.plot(rt, tic-(-1* ynew1), color='green')
442
443        # plt.plot(rt, -1* ynew1, c='black')
444
445        # s = self.smooth(s, 10, 'blackman')
446
447        # plt.plot(self.retention_time, -s)
448
449        # plt.show()
450
451        return -1 * ynew1

Baseline detector for the TIC data points. For LC/GC data processing

Parameters
  • tic (array): array of data points to find the peaks
  • rt (array): array of retention time data points
  • peak_height_max_percent (float): maximum height of the peak
  • peak_max_prominence_percent (float): maximum prominence of the peak
  • do_interpolation (bool, optional): flag to interpolate the data points. Default is True
Returns
  • array: array of the baseline corrected data points
def peak_detector_generator( tic, stds, method, rt, max_height, min_height, max_prominence, min_datapoints):
454def peak_detector_generator(
455    tic, stds, method, rt, max_height, min_height, max_prominence, min_datapoints
456):
457    """
458    Peak detector generator for the TIC data points.
459
460    Parameters
461    ----------
462    tic : array
463        array of data points to find the peaks
464    stds : float
465        standard deviation
466    method : str
467        method to detect the peaks
468        Available methods: 'manual_relative_abundance', 'auto_relative_abundance', 'second_derivative'
469    rt : array
470        array of retention time data points
471    max_height : float
472        maximum height of the peak
473    min_height : float
474        minimum height of the peak
475    max_prominence : float
476        maximum prominence of the peak
477    min_datapoints : int
478        minimum number of data points in the peak
479
480    Returns
481    -------
482    generator
483        generator of the indexes of the peaks in the TIC array
484
485    """
486    max_tic = max(tic)
487
488    if method == "manual_relative_abundance":
489        tic = tic - baseline_detector(tic, rt, max_height, max_prominence)
490
491        norm_tic = (tic / max_tic) * 100
492
493        remove_indexes = np.where(norm_tic < min_height)[0]
494
495        # if self.deconv_rt_list and  self.deconv_mz == 51:
496        #    plt.plot(self.deconv_rt_list, tic, label=self.deconv_mz)
497
498    elif method == "auto_relative_abundance":
499        tic = tic - baseline_detector(tic, rt, max_height, max_prominence)
500
501        baseline = baseline_detector(tic, rt, max_height, max_prominence)
502
503        peak_detect_threshold = np.nanmean(baseline) + (stds * np.std(baseline))
504
505        remove_indexes = np.where(tic < peak_detect_threshold)[0]
506
507    elif method == "second_derivative":
508        remove_indexes = second_derivative_threshold(
509            tic, stds, rt, max_height, max_prominence
510        )
511
512    else:
513        NotImplemented(method)
514
515    peak_height_diff = lambda hi, li: ((tic[hi] - tic[li]) / max_tic) * 100
516
517    dy = derivate(tic)
518
519    include_indexes = np.where((np.hstack((dy, 0)) < 0) & (np.hstack((0, dy)) > 0))[0]
520
521    final_indexes = sorted(set(include_indexes) - set(remove_indexes))
522
523    # from matplotlib import pyplot as plt
524
525    # plt.plot(self.retention_time, tic, color='black')
526    # plt.scatter(self.retention_time[remove_indexes], tic[remove_indexes], color='red')
527    # plt.scatter(self.retention_time[include_indexes], tic[include_indexes], color='blue')
528    # plt.scatter(self.retention_time[final_indexes], tic[final_indexes], color='blue')
529
530    # plt.show()
531
532    for index in final_indexes:
533        start_index = find_minima(index, tic, right=False)
534        final_index = find_minima(index, tic)
535
536        if final_index - start_index > min_datapoints:
537            # if min( peak_height_diff(index,start_index), peak_height_diff(index,final_index) )> self.chromatogram_settings.peak_min_prominence_percent :
538
539            yield (start_index, index, final_index)

Peak detector generator for the TIC data points.

Parameters
  • tic (array): array of data points to find the peaks
  • stds (float): standard deviation
  • method (str): method to detect the peaks Available methods: 'manual_relative_abundance', 'auto_relative_abundance', 'second_derivative'
  • rt (array): array of retention time data points
  • max_height (float): maximum height of the peak
  • min_height (float): minimum height of the peak
  • max_prominence (float): maximum prominence of the peak
  • min_datapoints (int): minimum number of data points in the peak
Returns
  • generator: generator of the indexes of the peaks in the TIC array
def smooth_signal(x, window_len, window, pol_order, implemented_smooth_method):
542def smooth_signal(x, window_len, window, pol_order, implemented_smooth_method):
543    """
544    Smooth the data using a window with requested size.
545
546    This method is based on the convolution of a scaled window with the signal.
547    The signal is prepared by introducing reflected copies of the signal
548    (with the window size) in both ends so that transient parts are minimized
549    in the begining and end part of the output signal.
550
551    Parameters
552    ----------
553    x: array
554        the input signal
555    window_len: int
556        the dimension of the smoothing window; should be an odd integer
557    window: str
558        the type of window from 'flat', 'hanning', 'hamming', 'bartlett', 'blackman'
559    pol_order: int
560        the order of the polynomial to fit the data
561    implemented_smooth_method: list
562        list of implemented smoothing methods
563
564    Returns
565    -------
566    y: array
567        the smoothed signal
568
569    Notes:
570    -----
571    See also: numpy.hanning, numpy.hamming, numpy.bartlett, numpy.blackman, numpy.convolve
572    scipy.signal.savgol_filter
573
574    """
575    x = np.array(x)
576
577    if x.ndim != 1:
578        raise ValueError("smooth only accepts 1 dimension arrays.")
579
580    if x.size < window_len:
581        raise ValueError("Input array needs to be bigger than window size")
582
583    # if window_len < 3:
584    #    return x
585
586    if not window in implemented_smooth_method:
587        raise ValueError(
588            "Window method should be 'flat', 'hanning', 'hamming', 'bartlett', 'blackman'"
589        )
590
591    s = np.r_[x[window_len - 1 : 0 : -1], x, x[-1:-window_len:-1]]
592
593    if window == "savgol":
594        return savgol_filter(x, window_len, pol_order)
595
596    elif window == "boxcar":  # moving average
597        w = boxcar(window_len)
598
599        y = np.convolve(w, s, mode="valid")
600
601    elif window == "flat":  # moving average
602        w = np.ones(window_len, "d")
603
604        y = np.convolve(w / w.sum(), s, mode="valid")
605
606    else:
607        w = eval(window + "(window_len)")
608
609        y = np.convolve(w / w.sum(), s, mode="valid")
610
611    return y[int(window_len / 2 - 1) : int(-window_len / 2)]

Smooth the data using a window with requested size.

This method is based on the convolution of a scaled window with the signal. The signal is prepared by introducing reflected copies of the signal (with the window size) in both ends so that transient parts are minimized in the begining and end part of the output signal.

Parameters
  • x (array): the input signal
  • window_len (int): the dimension of the smoothing window; should be an odd integer
  • window (str): the type of window from 'flat', 'hanning', 'hamming', 'bartlett', 'blackman'
  • pol_order (int): the order of the polynomial to fit the data
  • implemented_smooth_method (list): list of implemented smoothing methods
Returns
  • y (array): the smoothed signal
  • Notes:
  • -----
  • See also (numpy.hanning, numpy.hamming, numpy.bartlett, numpy.blackman, numpy.convolve):

  • scipy.signal.savgol_filter

def second_derivative_threshold(tic, stds, rt, peak_height_max_percent, peak_max_prominence_percent):
614def second_derivative_threshold(
615    tic, stds, rt, peak_height_max_percent, peak_max_prominence_percent
616):
617    """
618    Second derivative threshold for the TIC data points.
619    For LC/GC data processing
620
621    Parameters
622    ----------
623    tic : array
624        array of data points to find the peaks
625    stds : float
626        standard deviation
627    rt : array
628        array of retention time data points
629    peak_height_max_percent : float
630        maximum height of the peak
631
632    Returns
633    -------
634    array
635        array of the indexes of the data points to remove
636
637    """
638
639    dy = derivate(tic)
640
641    dydy = derivate(dy)
642    dydy = np.hstack((dydy, 0))
643    dydy = np.hstack((0, dydy))
644
645    baseline = baseline_detector(
646        dydy,
647        rt,
648        peak_height_max_percent,
649        peak_max_prominence_percent,
650        do_interpolation=False,
651    )
652
653    threshold_median = np.median(baseline) - (stds * np.std(baseline))
654
655    remove_indexes = np.where(dydy > threshold_median)[0]
656
657    return remove_indexes

Second derivative threshold for the TIC data points. For LC/GC data processing

Parameters
  • tic (array): array of data points to find the peaks
  • stds (float): standard deviation
  • rt (array): array of retention time data points
  • peak_height_max_percent (float): maximum height of the peak
Returns
  • array: array of the indexes of the data points to remove