corems.mass_spectra.calc.GC_Calc

 1__author__ = "Yuri E. Corilo"
 2__date__ = "Feb 13, 2020"
 3
 4from pathlib import Path
 5import numpy as np
 6from pandas import Series
 7
 8from corems.mass_spectra.calc import SignalProcessing as sp
 9
10
11class GC_Calculations:
12    def calibrate_ri(self, ref_dict, cal_file_path):
13        if not self:
14            self.process_chromatogram()
15
16        for gcms_peak in self:
17            gcms_peak.calc_ri(ref_dict)
18
19        self.ri_pairs_ref = ref_dict
20        if isinstance(cal_file_path, str):
21            # if obj is a string it defaults to create a Path obj, pass the S3Path if needed
22            self.cal_file_path = Path(cal_file_path)
23        else:
24            self.cal_file_path = cal_file_path
25
26    def smooth_tic(self, tic):
27        implemented_smooth_method = self.chromatogram_settings.implemented_smooth_method
28
29        pol_order = self.chromatogram_settings.savgol_pol_order
30
31        window_len = self.chromatogram_settings.smooth_window
32
33        window = self.chromatogram_settings.smooth_method
34
35        return sp.smooth_signal(
36            tic, window_len, window, pol_order, implemented_smooth_method
37        )
38
39    def centroid_detector(self, tic, rt):
40        noise_std = self.chromatogram_settings.std_noise_threshold
41
42        method = self.chromatogram_settings.noise_threshold_method
43
44        # peak picking
45        min_height = self.chromatogram_settings.peak_height_min_percent
46        min_datapoints = self.chromatogram_settings.min_peak_datapoints
47
48        # baseline detection
49        max_prominence = self.chromatogram_settings.peak_max_prominence_percent
50        max_height = self.chromatogram_settings.peak_height_max_percent
51
52        peak_indexes_generator = sp.peak_detector_generator(
53            tic,
54            noise_std,
55            method,
56            rt,
57            max_height,
58            min_height,
59            max_prominence,
60            min_datapoints,
61        )
62
63        return peak_indexes_generator
64
65    def remove_outliers(self, data):
66        from numpy import percentile
67
68        q25, q75 = percentile(data, 25), percentile(data, 75)
69        iqr = q75 - q25
70        if self.parameters.verbose_processing:
71            print("Percentiles: 25th=%.3f, 75th=%.3f, IQR=%.3f" % (q25, q75, iqr))
72        # calculate the outlier cutoff
73        cut_off = iqr * 1.5
74        lower, upper = q25 - cut_off, q75 + cut_off
75        # identify outliers
76        outliers = [x for x in data if x < lower or x > upper]
77        if self.parameters.verbose_processing:
78            print("Identified outliers: %d" % len(outliers))
79        # remove outliers
80        nanfilled_outliers = Series(
81            [x if lower <= x <= upper else np.nan for x in data]
82        )
83
84        return nanfilled_outliers
class GC_Calculations:
12class GC_Calculations:
13    def calibrate_ri(self, ref_dict, cal_file_path):
14        if not self:
15            self.process_chromatogram()
16
17        for gcms_peak in self:
18            gcms_peak.calc_ri(ref_dict)
19
20        self.ri_pairs_ref = ref_dict
21        if isinstance(cal_file_path, str):
22            # if obj is a string it defaults to create a Path obj, pass the S3Path if needed
23            self.cal_file_path = Path(cal_file_path)
24        else:
25            self.cal_file_path = cal_file_path
26
27    def smooth_tic(self, tic):
28        implemented_smooth_method = self.chromatogram_settings.implemented_smooth_method
29
30        pol_order = self.chromatogram_settings.savgol_pol_order
31
32        window_len = self.chromatogram_settings.smooth_window
33
34        window = self.chromatogram_settings.smooth_method
35
36        return sp.smooth_signal(
37            tic, window_len, window, pol_order, implemented_smooth_method
38        )
39
40    def centroid_detector(self, tic, rt):
41        noise_std = self.chromatogram_settings.std_noise_threshold
42
43        method = self.chromatogram_settings.noise_threshold_method
44
45        # peak picking
46        min_height = self.chromatogram_settings.peak_height_min_percent
47        min_datapoints = self.chromatogram_settings.min_peak_datapoints
48
49        # baseline detection
50        max_prominence = self.chromatogram_settings.peak_max_prominence_percent
51        max_height = self.chromatogram_settings.peak_height_max_percent
52
53        peak_indexes_generator = sp.peak_detector_generator(
54            tic,
55            noise_std,
56            method,
57            rt,
58            max_height,
59            min_height,
60            max_prominence,
61            min_datapoints,
62        )
63
64        return peak_indexes_generator
65
66    def remove_outliers(self, data):
67        from numpy import percentile
68
69        q25, q75 = percentile(data, 25), percentile(data, 75)
70        iqr = q75 - q25
71        if self.parameters.verbose_processing:
72            print("Percentiles: 25th=%.3f, 75th=%.3f, IQR=%.3f" % (q25, q75, iqr))
73        # calculate the outlier cutoff
74        cut_off = iqr * 1.5
75        lower, upper = q25 - cut_off, q75 + cut_off
76        # identify outliers
77        outliers = [x for x in data if x < lower or x > upper]
78        if self.parameters.verbose_processing:
79            print("Identified outliers: %d" % len(outliers))
80        # remove outliers
81        nanfilled_outliers = Series(
82            [x if lower <= x <= upper else np.nan for x in data]
83        )
84
85        return nanfilled_outliers
def calibrate_ri(self, ref_dict, cal_file_path):
13    def calibrate_ri(self, ref_dict, cal_file_path):
14        if not self:
15            self.process_chromatogram()
16
17        for gcms_peak in self:
18            gcms_peak.calc_ri(ref_dict)
19
20        self.ri_pairs_ref = ref_dict
21        if isinstance(cal_file_path, str):
22            # if obj is a string it defaults to create a Path obj, pass the S3Path if needed
23            self.cal_file_path = Path(cal_file_path)
24        else:
25            self.cal_file_path = cal_file_path
def smooth_tic(self, tic):
27    def smooth_tic(self, tic):
28        implemented_smooth_method = self.chromatogram_settings.implemented_smooth_method
29
30        pol_order = self.chromatogram_settings.savgol_pol_order
31
32        window_len = self.chromatogram_settings.smooth_window
33
34        window = self.chromatogram_settings.smooth_method
35
36        return sp.smooth_signal(
37            tic, window_len, window, pol_order, implemented_smooth_method
38        )
def centroid_detector(self, tic, rt):
40    def centroid_detector(self, tic, rt):
41        noise_std = self.chromatogram_settings.std_noise_threshold
42
43        method = self.chromatogram_settings.noise_threshold_method
44
45        # peak picking
46        min_height = self.chromatogram_settings.peak_height_min_percent
47        min_datapoints = self.chromatogram_settings.min_peak_datapoints
48
49        # baseline detection
50        max_prominence = self.chromatogram_settings.peak_max_prominence_percent
51        max_height = self.chromatogram_settings.peak_height_max_percent
52
53        peak_indexes_generator = sp.peak_detector_generator(
54            tic,
55            noise_std,
56            method,
57            rt,
58            max_height,
59            min_height,
60            max_prominence,
61            min_datapoints,
62        )
63
64        return peak_indexes_generator
def remove_outliers(self, data):
66    def remove_outliers(self, data):
67        from numpy import percentile
68
69        q25, q75 = percentile(data, 25), percentile(data, 75)
70        iqr = q75 - q25
71        if self.parameters.verbose_processing:
72            print("Percentiles: 25th=%.3f, 75th=%.3f, IQR=%.3f" % (q25, q75, iqr))
73        # calculate the outlier cutoff
74        cut_off = iqr * 1.5
75        lower, upper = q25 - cut_off, q75 + cut_off
76        # identify outliers
77        outliers = [x for x in data if x < lower or x > upper]
78        if self.parameters.verbose_processing:
79            print("Identified outliers: %d" % len(outliers))
80        # remove outliers
81        nanfilled_outliers = Series(
82            [x if lower <= x <= upper else np.nan for x in data]
83        )
84
85        return nanfilled_outliers