corems.mass_spectrum.calc.MassErrorPrediction

  1__author__ = "Yuri E. Corilo"
  2__date__ = "03/31/2020"
  3
  4from threading import Thread
  5from pandas import DataFrame
  6from numpy import hstack, inf, isnan, where, array
  7from tqdm import tqdm
  8
  9
 10class MassErrorPrediction(Thread):
 11    """Class for mass error prediction.
 12
 13    Parameters
 14    ----------
 15    mass_spectrum : list
 16        List of mass spectrum objects.
 17    mz_overlay : int, optional
 18        The mz overlay value for peak simulation. Default is 10.
 19    rp_increments : int, optional
 20        The resolving power increments for peak simulation. Default is 10000.
 21    base_line_target : float, optional
 22        The target value for the baseline resolution. Default is 0.01.
 23    max_interation : int, optional
 24        The maximum number of iterations for peak simulation. Default is 1000.
 25    interpolation : str, optional
 26        The interpolation method for missing data. Default is 'linear'.
 27
 28    Attributes
 29    ----------
 30    mass_spectrum_obj : list
 31        List of mass spectrum objects.
 32    mz_overlay : int
 33        The mz overlay value for peak simulation.
 34    rp_increments : int
 35        The resolving power increments for peak simulation.
 36    base_line_target : float
 37        The target value for the baseline resolution.
 38    max_interation : int
 39        The maximum number of iterations for peak simulation.
 40    df : DataFrame or None
 41        The calculated error distribution dataframe.
 42    interpolation : str
 43        The interpolation method for missing data.
 44
 45    Methods
 46    -------
 47    * run().
 48        Runs the mass error prediction calculation.
 49    * get_results().
 50        Returns the calculated error distribution dataframe.
 51
 52    """
 53
 54    def __init__(
 55        self,
 56        mass_spectrum,
 57        mz_overlay=10,
 58        rp_increments=10000,
 59        base_line_target: float = 0.01,
 60        max_interation=1000,
 61        interpolation="linear",
 62    ):
 63        Thread.__init__(self)
 64
 65        self.mass_spectrum_obj = mass_spectrum
 66
 67        self.mz_overlay = mz_overlay
 68
 69        self.rp_increments = rp_increments
 70
 71        self.base_line_target = base_line_target
 72
 73        self.max_interation = max_interation
 74
 75        self.df = None
 76
 77        self.interpolation = interpolation
 78
 79    def run(self):
 80        """Runs the mass error prediction calculation."""
 81        self.df = self.calc_error_dist()
 82
 83    def get_results(self):
 84        """Returns the calculated error distribution dataframe."""
 85
 86        if not self.df:
 87            self.run()
 88
 89        return self.df
 90
 91    def calc_error_dist(self):
 92        """Calculate the error distribution."""
 93        verbose = self.mass_spectrum_obj.parameters.mass_spectrum.verbose_processing
 94        results_list = []
 95
 96        indexes_without_results = list(range(len(self.mass_spectrum_obj)))
 97        # loop trough mass spectrum
 98        for peak_obj_idx, peak_obj in enumerate(tqdm(self.mass_spectrum_obj), disable=not verbose):
 99            # access ms peaks triplets ( peak_obj_idx -1, peak_obj_idx, and peak_obj_idx + 1)
100            # check lower and upper boundaries to not excesses mass spectrum range
101
102            if peak_obj_idx != 0 and peak_obj_idx != len(self.mass_spectrum_obj) - 1:
103                # current peak_obj initialted in the loop expression
104                # geting the peak on the left (previous_peak_obj) and the one in the right position (next_peak_obj)
105                next_peak_obj = self.mass_spectrum_obj[peak_obj_idx + 1]
106                previous_peak_obj = self.mass_spectrum_obj[peak_obj_idx - 1]
107
108                # check mz range defined in max_mz variable and check if peaks have same nominal mz
109                # keeping same mz for better plotting representation only, remove it for production
110                if (
111                    peak_obj.nominal_mz_exp == next_peak_obj.nominal_mz_exp
112                    and peak_obj.nominal_mz_exp == previous_peak_obj.nominal_mz_exp
113                ):
114                    # simulate peak shape
115                    sim_mz, sim_abun = peak_obj.gaussian(mz_overlay=self.mz_overlay)
116                    # update_plot(sim_mz,sim_abun, 0.5)
117
118                    # simulate peak shape
119                    next_sim_mz, next_sim_abun = next_peak_obj.gaussian(
120                        mz_overlay=self.mz_overlay
121                    )
122                    # update_plot(next_sim_mz, next_sim_abun, 0.5)
123
124                    # simulate peak shape
125                    previous_sim_mz, previous_sim_abun = previous_peak_obj.gaussian(
126                        mz_overlay=self.mz_overlay
127                    )
128                    # update_plot(previous_sim_mz,  previous_sim_abun, 0.5)
129
130                    sim_mz_domain, summed_peaks_abun = self.sum_data(
131                        (
132                            (previous_sim_mz, previous_sim_abun),
133                            (sim_mz, sim_abun),
134                            (next_sim_mz, next_sim_abun),
135                        )
136                    )
137                    # update_plot(sim_mz_domain,summed_peaks_abun, 0.5)
138
139                    # sum simulated abundances
140                    # summed_peaks_abun = (sim_abun + next_sim_abun + previous_sim_abun)
141
142                    # normalize abundances to 0-1
143                    # summed_peaks_abun = summed_peaks_abun/(max(summed_peaks_abun))
144
145                    # find appexes location (mz) and magnitude
146                    mz_centroid, abund_centroid = self.find_peak_apex(
147                        sim_mz_domain, summed_peaks_abun
148                    )
149
150                    # find valley location (mz_min_valley) and magnitude (abund_min_valley)
151                    mz_min_valley, abund_min_valley = self.find_peak_valley(
152                        sim_mz_domain, summed_peaks_abun
153                    )
154
155                    # clear delta_rp (global implementation) and store choose resolving power increments
156                    delta_rp = self.rp_increments
157
158                    # used to limited number of iterations
159                    i = 0
160                    j = 0
161
162                    # TODO: fit peak shape and decide best fit #gaussian, lorentz and voigt
163                    # plot_triplets(mz_centroid,abund_centroid, mz_min_valley, abund_min_valley, sim_mz_domain, summed_peaks_abun )
164                    if len(mz_centroid) == 2:
165                        while len(mz_centroid) < 3 and i <= self.max_interation:
166                            previous_sim_mz, previous_sim_abun = (
167                                previous_peak_obj.gaussian(
168                                    delta_rp=delta_rp, mz_overlay=self.mz_overlay
169                                )
170                            )
171
172                            sim_mz, sim_abun = peak_obj.gaussian(
173                                delta_rp=delta_rp, mz_overlay=self.mz_overlay
174                            )
175
176                            next_sim_mz, next_sim_abun = next_peak_obj.gaussian(
177                                delta_rp=delta_rp, mz_overlay=self.mz_overlay
178                            )
179
180                            sim_mz_domain, summed_peaks_abun = self.sum_data(
181                                (
182                                    (previous_sim_mz, previous_sim_abun),
183                                    (sim_mz, sim_abun),
184                                    (next_sim_mz, next_sim_abun),
185                                )
186                            )
187
188                            # update_plot(sim_mz_domain,  summed_peaks_abun, 0.01)
189
190                            mz_centroid, abund_centroid = self.find_peak_apex(
191                                sim_mz_domain, summed_peaks_abun
192                            )
193
194                            delta_rp += self.rp_increments
195
196                            i += 1
197
198                        mz_min_valley, abund_min_valley = self.find_peak_valley(
199                            sim_mz_domain, summed_peaks_abun
200                        )
201
202                    if len(mz_centroid) == 3 and len(abund_min_valley) == 2:
203                        # increase all three peak resolving power until both valley magnitude is bellow the defined target
204                        # calculate peak shapes with the needed resolving power to have a baseline resolution for all peaks
205                        # calculate mass difference (ppm) between original centroid and the new simulated peak.
206
207                        while (
208                            abund_min_valley[0] > self.base_line_target
209                            or abund_min_valley[1] > self.base_line_target
210                            and j <= self.max_interation
211                        ):
212                            previous_sim_mz, previous_sim_abun = (
213                                previous_peak_obj.gaussian(
214                                    delta_rp=delta_rp, mz_overlay=self.mz_overlay
215                                )
216                            )
217
218                            sim_mz, sim_abun = peak_obj.gaussian(
219                                delta_rp=delta_rp, mz_overlay=self.mz_overlay
220                            )
221
222                            next_sim_mz, next_sim_abun = next_peak_obj.gaussian(
223                                delta_rp=delta_rp, mz_overlay=self.mz_overlay
224                            )
225
226                            sim_mz_domain, summed_peaks_abun = self.sum_data(
227                                (
228                                    (previous_sim_mz, previous_sim_abun),
229                                    (sim_mz, sim_abun),
230                                    (next_sim_mz, next_sim_abun),
231                                )
232                            )
233
234                            # update_plot(sim_mz_domain,  summed_peaks_abun, 0.001)
235
236                            # summed_peaks_abun = (sim_abun + next_sim_abun + previous_sim_abun)
237
238                            # find appexes location (mz) and magnitude
239                            mz_centroid, abund_centroid = self.find_peak_apex(
240                                sim_mz_domain, summed_peaks_abun
241                            )
242
243                            # find valley location (mz_min_valley) and magnitude (abund_min_valley)
244                            summed_peaks_abun = summed_peaks_abun / (
245                                summed_peaks_abun.max()
246                            )
247                            mz_min_valley, abund_min_valley = self.find_peak_valley(
248                                sim_mz_domain, summed_peaks_abun
249                            )
250
251                            if len(abund_min_valley) != 2:
252                                break
253
254                            delta_rp += self.rp_increments
255                            j += 1
256
257                            # plot_triplets(mz_centroid,abund_centroid, mz_min_valley, abund_min_valley, sim_mz_domain, summed_peaks_abun )
258
259                        # plot_triplets(mz_centroid,abund_centroid, mz_min_valley, abund_min_valley, sim_mz_domain, summed_peaks_abun )
260
261                        mass_shift_ppp = self.calc_error(
262                            mz_centroid[1], peak_obj.mz_exp, 1000000
263                        )
264                        # delta_mz = mz_centroid[1] - peak_obj.mz_exp
265                        height_shift_per = self.calc_error(
266                            abund_centroid[1], peak_obj.abundance, 100
267                        )
268                        # excitation_amplitude = str(mass_spectrum_obj.filename.stem).split("ex")[1].split("pc")[0]
269                        # ion_time = str(mass_spectrum_obj.filename.stem).split("0pt")[1].split("s")[0]
270                        peak_obj.predicted_std = mass_shift_ppp
271
272                        results_list.append(
273                            {
274                                "ms_index_position": peak_obj_idx,
275                                "predicted_std": mass_shift_ppp,
276                                "mz_exp": peak_obj.mz_exp,
277                                "nominal_mz_exp": peak_obj.nominal_mz_exp,
278                                "predicted_mz": mz_centroid[1],
279                                "s2n": peak_obj.signal_to_noise,
280                                "peak_height": peak_obj.abundance,
281                                "predicted_peak_height": abund_centroid[1],
282                                "peak_height_error": height_shift_per,
283                                "resolving_power": peak_obj.resolving_power,
284                                # "excitation_amplitude" : excitation_amplitude,
285                                # "ion_time" : ion_time
286                            }
287                        )
288
289                        indexes_without_results.remove(peak_obj_idx)
290                    # elif len(mz_centroid) == 3 and len(abund_min_valley) != 2:
291
292        for peak_obj_idx in indexes_without_results:
293            results_list.append(
294                {
295                    "ms_index_position": peak_obj_idx,
296                    "mz_exp": self.mass_spectrum_obj[peak_obj_idx].mz_exp,
297                    "nominal_mz_exp": self.mass_spectrum_obj[
298                        peak_obj_idx
299                    ].nominal_mz_exp,
300                    "s2n": self.mass_spectrum_obj[peak_obj_idx].signal_to_noise,
301                    "peak_height": self.mass_spectrum_obj[peak_obj_idx].abundance,
302                    "resolving_power": self.mass_spectrum_obj[
303                        peak_obj_idx
304                    ].resolving_power,
305                    # "excitation_amplitude" : excitation_amplitude,
306                    # "ion_time" : ion_time
307                }
308            )
309
310        df = DataFrame(results_list).sort_values("mz_exp")
311
312        df.interpolate(method="linear", limit_direction="backward", inplace=True)
313        df.interpolate(method="linear", limit_direction="forward", inplace=True)
314
315        # TODO improve interpolation for missing data
316        # f1 = interpolate.interp1d(x1, y1, kind='quadratic',fill_value="extrapolate")
317
318        for peak_obj_idx in indexes_without_results:
319            predicted_std = df.loc[peak_obj_idx].predicted_std
320
321            self.mass_spectrum_obj[peak_obj_idx].predicted_std = predicted_std
322
323        return df
324
325    def sum_data(self, tuple_mz_abun_list: tuple):
326        """Sum the abundances of the simulated peaks.
327
328        Parameters
329        ------
330        tuple_mz_abun_list : tuple
331            A tuple containing the mz and abundance lists.
332
333        Returns
334        -------
335        tuple
336            A tuple containing the summed mz and abundance lists.
337
338        """
339        all_mz = {}
340
341        for mz_list, abun_list in tuple_mz_abun_list:
342            for index, mz in enumerate(mz_list):
343                abundance = abun_list[index]
344
345                if mz in all_mz:
346                    all_mz[mz] = all_mz[mz] + abundance
347                else:
348                    all_mz[mz] = abundance
349
350        mz_all = []
351        abun_all = []
352
353        for mz in sorted(all_mz):
354            mz_all.append(mz)
355            abun_all.append(all_mz[mz])
356
357        return array(mz_all), array(abun_all)
358
359    def calc_error(self, mass_ref, mass_sim, factor):
360        """Calculate the error between two values.
361
362        Parameters
363        ----------
364        mass_ref : float
365            The reference value.
366        mass_sim : float
367            The simulated value.
368        factor : float
369            The factor to multiply the error by.
370
371        Returns
372        -------
373        float
374            The calculated error.
375
376
377        """
378        return (mass_sim - mass_ref / mass_ref) * factor
379
380    def find_peak_apex(self, mz, abund):
381        """Find the peak apex.
382
383        Parameters
384        ------
385        mz : array
386            The mz array.
387        abund : array
388            The abundance array.
389
390        Returns
391        -------
392        tuple
393            A tuple containing the peak apex mass and abundance.
394
395        """
396        dy = abund[1:] - abund[:-1]
397
398        # replaces nan for infinity'''
399        indices_nan = where(isnan(abund))[0]
400
401        if indices_nan.size:
402            abund[indices_nan] = inf
403            dy[where(isnan(dy))[0]] = inf
404
405        indexes = where((hstack((dy, 0)) < 0) & (hstack((0, dy)) > 0))[0]
406
407        if indexes.size:
408            return mz[indexes], abund[indexes]
409
410    def find_peak_valley(self, mz, abund):
411        """Find the peak valley.
412
413        Parameters
414        ------
415        mz : array
416            The mz array.
417        abund : array
418            The abundance array.
419
420        Returns
421        -------
422        tuple
423            A tuple containing the peak valley mz and abundance.
424        """
425        dy = abund[1:] - abund[:-1]
426
427        # replaces nan for infinity
428        indices_nan = where(isnan(abund))[0]
429
430        if indices_nan.size:
431            abund[indices_nan] = inf
432            dy[where(isnan(dy))[0]] = inf
433
434        indexes = where((hstack((dy, 0)) > 0) & (hstack((0, dy)) < 0))[0]
435
436        return mz[indexes], abund[indexes]
class MassErrorPrediction(threading.Thread):
 11class MassErrorPrediction(Thread):
 12    """Class for mass error prediction.
 13
 14    Parameters
 15    ----------
 16    mass_spectrum : list
 17        List of mass spectrum objects.
 18    mz_overlay : int, optional
 19        The mz overlay value for peak simulation. Default is 10.
 20    rp_increments : int, optional
 21        The resolving power increments for peak simulation. Default is 10000.
 22    base_line_target : float, optional
 23        The target value for the baseline resolution. Default is 0.01.
 24    max_interation : int, optional
 25        The maximum number of iterations for peak simulation. Default is 1000.
 26    interpolation : str, optional
 27        The interpolation method for missing data. Default is 'linear'.
 28
 29    Attributes
 30    ----------
 31    mass_spectrum_obj : list
 32        List of mass spectrum objects.
 33    mz_overlay : int
 34        The mz overlay value for peak simulation.
 35    rp_increments : int
 36        The resolving power increments for peak simulation.
 37    base_line_target : float
 38        The target value for the baseline resolution.
 39    max_interation : int
 40        The maximum number of iterations for peak simulation.
 41    df : DataFrame or None
 42        The calculated error distribution dataframe.
 43    interpolation : str
 44        The interpolation method for missing data.
 45
 46    Methods
 47    -------
 48    * run().
 49        Runs the mass error prediction calculation.
 50    * get_results().
 51        Returns the calculated error distribution dataframe.
 52
 53    """
 54
 55    def __init__(
 56        self,
 57        mass_spectrum,
 58        mz_overlay=10,
 59        rp_increments=10000,
 60        base_line_target: float = 0.01,
 61        max_interation=1000,
 62        interpolation="linear",
 63    ):
 64        Thread.__init__(self)
 65
 66        self.mass_spectrum_obj = mass_spectrum
 67
 68        self.mz_overlay = mz_overlay
 69
 70        self.rp_increments = rp_increments
 71
 72        self.base_line_target = base_line_target
 73
 74        self.max_interation = max_interation
 75
 76        self.df = None
 77
 78        self.interpolation = interpolation
 79
 80    def run(self):
 81        """Runs the mass error prediction calculation."""
 82        self.df = self.calc_error_dist()
 83
 84    def get_results(self):
 85        """Returns the calculated error distribution dataframe."""
 86
 87        if not self.df:
 88            self.run()
 89
 90        return self.df
 91
 92    def calc_error_dist(self):
 93        """Calculate the error distribution."""
 94        verbose = self.mass_spectrum_obj.parameters.mass_spectrum.verbose_processing
 95        results_list = []
 96
 97        indexes_without_results = list(range(len(self.mass_spectrum_obj)))
 98        # loop trough mass spectrum
 99        for peak_obj_idx, peak_obj in enumerate(tqdm(self.mass_spectrum_obj), disable=not verbose):
100            # access ms peaks triplets ( peak_obj_idx -1, peak_obj_idx, and peak_obj_idx + 1)
101            # check lower and upper boundaries to not excesses mass spectrum range
102
103            if peak_obj_idx != 0 and peak_obj_idx != len(self.mass_spectrum_obj) - 1:
104                # current peak_obj initialted in the loop expression
105                # geting the peak on the left (previous_peak_obj) and the one in the right position (next_peak_obj)
106                next_peak_obj = self.mass_spectrum_obj[peak_obj_idx + 1]
107                previous_peak_obj = self.mass_spectrum_obj[peak_obj_idx - 1]
108
109                # check mz range defined in max_mz variable and check if peaks have same nominal mz
110                # keeping same mz for better plotting representation only, remove it for production
111                if (
112                    peak_obj.nominal_mz_exp == next_peak_obj.nominal_mz_exp
113                    and peak_obj.nominal_mz_exp == previous_peak_obj.nominal_mz_exp
114                ):
115                    # simulate peak shape
116                    sim_mz, sim_abun = peak_obj.gaussian(mz_overlay=self.mz_overlay)
117                    # update_plot(sim_mz,sim_abun, 0.5)
118
119                    # simulate peak shape
120                    next_sim_mz, next_sim_abun = next_peak_obj.gaussian(
121                        mz_overlay=self.mz_overlay
122                    )
123                    # update_plot(next_sim_mz, next_sim_abun, 0.5)
124
125                    # simulate peak shape
126                    previous_sim_mz, previous_sim_abun = previous_peak_obj.gaussian(
127                        mz_overlay=self.mz_overlay
128                    )
129                    # update_plot(previous_sim_mz,  previous_sim_abun, 0.5)
130
131                    sim_mz_domain, summed_peaks_abun = self.sum_data(
132                        (
133                            (previous_sim_mz, previous_sim_abun),
134                            (sim_mz, sim_abun),
135                            (next_sim_mz, next_sim_abun),
136                        )
137                    )
138                    # update_plot(sim_mz_domain,summed_peaks_abun, 0.5)
139
140                    # sum simulated abundances
141                    # summed_peaks_abun = (sim_abun + next_sim_abun + previous_sim_abun)
142
143                    # normalize abundances to 0-1
144                    # summed_peaks_abun = summed_peaks_abun/(max(summed_peaks_abun))
145
146                    # find appexes location (mz) and magnitude
147                    mz_centroid, abund_centroid = self.find_peak_apex(
148                        sim_mz_domain, summed_peaks_abun
149                    )
150
151                    # find valley location (mz_min_valley) and magnitude (abund_min_valley)
152                    mz_min_valley, abund_min_valley = self.find_peak_valley(
153                        sim_mz_domain, summed_peaks_abun
154                    )
155
156                    # clear delta_rp (global implementation) and store choose resolving power increments
157                    delta_rp = self.rp_increments
158
159                    # used to limited number of iterations
160                    i = 0
161                    j = 0
162
163                    # TODO: fit peak shape and decide best fit #gaussian, lorentz and voigt
164                    # plot_triplets(mz_centroid,abund_centroid, mz_min_valley, abund_min_valley, sim_mz_domain, summed_peaks_abun )
165                    if len(mz_centroid) == 2:
166                        while len(mz_centroid) < 3 and i <= self.max_interation:
167                            previous_sim_mz, previous_sim_abun = (
168                                previous_peak_obj.gaussian(
169                                    delta_rp=delta_rp, mz_overlay=self.mz_overlay
170                                )
171                            )
172
173                            sim_mz, sim_abun = peak_obj.gaussian(
174                                delta_rp=delta_rp, mz_overlay=self.mz_overlay
175                            )
176
177                            next_sim_mz, next_sim_abun = next_peak_obj.gaussian(
178                                delta_rp=delta_rp, mz_overlay=self.mz_overlay
179                            )
180
181                            sim_mz_domain, summed_peaks_abun = self.sum_data(
182                                (
183                                    (previous_sim_mz, previous_sim_abun),
184                                    (sim_mz, sim_abun),
185                                    (next_sim_mz, next_sim_abun),
186                                )
187                            )
188
189                            # update_plot(sim_mz_domain,  summed_peaks_abun, 0.01)
190
191                            mz_centroid, abund_centroid = self.find_peak_apex(
192                                sim_mz_domain, summed_peaks_abun
193                            )
194
195                            delta_rp += self.rp_increments
196
197                            i += 1
198
199                        mz_min_valley, abund_min_valley = self.find_peak_valley(
200                            sim_mz_domain, summed_peaks_abun
201                        )
202
203                    if len(mz_centroid) == 3 and len(abund_min_valley) == 2:
204                        # increase all three peak resolving power until both valley magnitude is bellow the defined target
205                        # calculate peak shapes with the needed resolving power to have a baseline resolution for all peaks
206                        # calculate mass difference (ppm) between original centroid and the new simulated peak.
207
208                        while (
209                            abund_min_valley[0] > self.base_line_target
210                            or abund_min_valley[1] > self.base_line_target
211                            and j <= self.max_interation
212                        ):
213                            previous_sim_mz, previous_sim_abun = (
214                                previous_peak_obj.gaussian(
215                                    delta_rp=delta_rp, mz_overlay=self.mz_overlay
216                                )
217                            )
218
219                            sim_mz, sim_abun = peak_obj.gaussian(
220                                delta_rp=delta_rp, mz_overlay=self.mz_overlay
221                            )
222
223                            next_sim_mz, next_sim_abun = next_peak_obj.gaussian(
224                                delta_rp=delta_rp, mz_overlay=self.mz_overlay
225                            )
226
227                            sim_mz_domain, summed_peaks_abun = self.sum_data(
228                                (
229                                    (previous_sim_mz, previous_sim_abun),
230                                    (sim_mz, sim_abun),
231                                    (next_sim_mz, next_sim_abun),
232                                )
233                            )
234
235                            # update_plot(sim_mz_domain,  summed_peaks_abun, 0.001)
236
237                            # summed_peaks_abun = (sim_abun + next_sim_abun + previous_sim_abun)
238
239                            # find appexes location (mz) and magnitude
240                            mz_centroid, abund_centroid = self.find_peak_apex(
241                                sim_mz_domain, summed_peaks_abun
242                            )
243
244                            # find valley location (mz_min_valley) and magnitude (abund_min_valley)
245                            summed_peaks_abun = summed_peaks_abun / (
246                                summed_peaks_abun.max()
247                            )
248                            mz_min_valley, abund_min_valley = self.find_peak_valley(
249                                sim_mz_domain, summed_peaks_abun
250                            )
251
252                            if len(abund_min_valley) != 2:
253                                break
254
255                            delta_rp += self.rp_increments
256                            j += 1
257
258                            # plot_triplets(mz_centroid,abund_centroid, mz_min_valley, abund_min_valley, sim_mz_domain, summed_peaks_abun )
259
260                        # plot_triplets(mz_centroid,abund_centroid, mz_min_valley, abund_min_valley, sim_mz_domain, summed_peaks_abun )
261
262                        mass_shift_ppp = self.calc_error(
263                            mz_centroid[1], peak_obj.mz_exp, 1000000
264                        )
265                        # delta_mz = mz_centroid[1] - peak_obj.mz_exp
266                        height_shift_per = self.calc_error(
267                            abund_centroid[1], peak_obj.abundance, 100
268                        )
269                        # excitation_amplitude = str(mass_spectrum_obj.filename.stem).split("ex")[1].split("pc")[0]
270                        # ion_time = str(mass_spectrum_obj.filename.stem).split("0pt")[1].split("s")[0]
271                        peak_obj.predicted_std = mass_shift_ppp
272
273                        results_list.append(
274                            {
275                                "ms_index_position": peak_obj_idx,
276                                "predicted_std": mass_shift_ppp,
277                                "mz_exp": peak_obj.mz_exp,
278                                "nominal_mz_exp": peak_obj.nominal_mz_exp,
279                                "predicted_mz": mz_centroid[1],
280                                "s2n": peak_obj.signal_to_noise,
281                                "peak_height": peak_obj.abundance,
282                                "predicted_peak_height": abund_centroid[1],
283                                "peak_height_error": height_shift_per,
284                                "resolving_power": peak_obj.resolving_power,
285                                # "excitation_amplitude" : excitation_amplitude,
286                                # "ion_time" : ion_time
287                            }
288                        )
289
290                        indexes_without_results.remove(peak_obj_idx)
291                    # elif len(mz_centroid) == 3 and len(abund_min_valley) != 2:
292
293        for peak_obj_idx in indexes_without_results:
294            results_list.append(
295                {
296                    "ms_index_position": peak_obj_idx,
297                    "mz_exp": self.mass_spectrum_obj[peak_obj_idx].mz_exp,
298                    "nominal_mz_exp": self.mass_spectrum_obj[
299                        peak_obj_idx
300                    ].nominal_mz_exp,
301                    "s2n": self.mass_spectrum_obj[peak_obj_idx].signal_to_noise,
302                    "peak_height": self.mass_spectrum_obj[peak_obj_idx].abundance,
303                    "resolving_power": self.mass_spectrum_obj[
304                        peak_obj_idx
305                    ].resolving_power,
306                    # "excitation_amplitude" : excitation_amplitude,
307                    # "ion_time" : ion_time
308                }
309            )
310
311        df = DataFrame(results_list).sort_values("mz_exp")
312
313        df.interpolate(method="linear", limit_direction="backward", inplace=True)
314        df.interpolate(method="linear", limit_direction="forward", inplace=True)
315
316        # TODO improve interpolation for missing data
317        # f1 = interpolate.interp1d(x1, y1, kind='quadratic',fill_value="extrapolate")
318
319        for peak_obj_idx in indexes_without_results:
320            predicted_std = df.loc[peak_obj_idx].predicted_std
321
322            self.mass_spectrum_obj[peak_obj_idx].predicted_std = predicted_std
323
324        return df
325
326    def sum_data(self, tuple_mz_abun_list: tuple):
327        """Sum the abundances of the simulated peaks.
328
329        Parameters
330        ------
331        tuple_mz_abun_list : tuple
332            A tuple containing the mz and abundance lists.
333
334        Returns
335        -------
336        tuple
337            A tuple containing the summed mz and abundance lists.
338
339        """
340        all_mz = {}
341
342        for mz_list, abun_list in tuple_mz_abun_list:
343            for index, mz in enumerate(mz_list):
344                abundance = abun_list[index]
345
346                if mz in all_mz:
347                    all_mz[mz] = all_mz[mz] + abundance
348                else:
349                    all_mz[mz] = abundance
350
351        mz_all = []
352        abun_all = []
353
354        for mz in sorted(all_mz):
355            mz_all.append(mz)
356            abun_all.append(all_mz[mz])
357
358        return array(mz_all), array(abun_all)
359
360    def calc_error(self, mass_ref, mass_sim, factor):
361        """Calculate the error between two values.
362
363        Parameters
364        ----------
365        mass_ref : float
366            The reference value.
367        mass_sim : float
368            The simulated value.
369        factor : float
370            The factor to multiply the error by.
371
372        Returns
373        -------
374        float
375            The calculated error.
376
377
378        """
379        return (mass_sim - mass_ref / mass_ref) * factor
380
381    def find_peak_apex(self, mz, abund):
382        """Find the peak apex.
383
384        Parameters
385        ------
386        mz : array
387            The mz array.
388        abund : array
389            The abundance array.
390
391        Returns
392        -------
393        tuple
394            A tuple containing the peak apex mass and abundance.
395
396        """
397        dy = abund[1:] - abund[:-1]
398
399        # replaces nan for infinity'''
400        indices_nan = where(isnan(abund))[0]
401
402        if indices_nan.size:
403            abund[indices_nan] = inf
404            dy[where(isnan(dy))[0]] = inf
405
406        indexes = where((hstack((dy, 0)) < 0) & (hstack((0, dy)) > 0))[0]
407
408        if indexes.size:
409            return mz[indexes], abund[indexes]
410
411    def find_peak_valley(self, mz, abund):
412        """Find the peak valley.
413
414        Parameters
415        ------
416        mz : array
417            The mz array.
418        abund : array
419            The abundance array.
420
421        Returns
422        -------
423        tuple
424            A tuple containing the peak valley mz and abundance.
425        """
426        dy = abund[1:] - abund[:-1]
427
428        # replaces nan for infinity
429        indices_nan = where(isnan(abund))[0]
430
431        if indices_nan.size:
432            abund[indices_nan] = inf
433            dy[where(isnan(dy))[0]] = inf
434
435        indexes = where((hstack((dy, 0)) > 0) & (hstack((0, dy)) < 0))[0]
436
437        return mz[indexes], abund[indexes]

Class for mass error prediction.

Parameters
  • mass_spectrum (list): List of mass spectrum objects.
  • mz_overlay (int, optional): The mz overlay value for peak simulation. Default is 10.
  • rp_increments (int, optional): The resolving power increments for peak simulation. Default is 10000.
  • base_line_target (float, optional): The target value for the baseline resolution. Default is 0.01.
  • max_interation (int, optional): The maximum number of iterations for peak simulation. Default is 1000.
  • interpolation (str, optional): The interpolation method for missing data. Default is 'linear'.
Attributes
  • mass_spectrum_obj (list): List of mass spectrum objects.
  • mz_overlay (int): The mz overlay value for peak simulation.
  • rp_increments (int): The resolving power increments for peak simulation.
  • base_line_target (float): The target value for the baseline resolution.
  • max_interation (int): The maximum number of iterations for peak simulation.
  • df (DataFrame or None): The calculated error distribution dataframe.
  • interpolation (str): The interpolation method for missing data.
Methods
  • run(). Runs the mass error prediction calculation.
  • get_results(). Returns the calculated error distribution dataframe.
MassErrorPrediction( mass_spectrum, mz_overlay=10, rp_increments=10000, base_line_target: float = 0.01, max_interation=1000, interpolation='linear')
55    def __init__(
56        self,
57        mass_spectrum,
58        mz_overlay=10,
59        rp_increments=10000,
60        base_line_target: float = 0.01,
61        max_interation=1000,
62        interpolation="linear",
63    ):
64        Thread.__init__(self)
65
66        self.mass_spectrum_obj = mass_spectrum
67
68        self.mz_overlay = mz_overlay
69
70        self.rp_increments = rp_increments
71
72        self.base_line_target = base_line_target
73
74        self.max_interation = max_interation
75
76        self.df = None
77
78        self.interpolation = interpolation

This constructor should always be called with keyword arguments. Arguments are:

group should be None; reserved for future extension when a ThreadGroup class is implemented.

target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.

name is the thread name. By default, a unique name is constructed of the form "Thread-N" where N is a small decimal number.

args is the argument tuple for the target invocation. Defaults to ().

kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.

If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread.

mass_spectrum_obj
mz_overlay
rp_increments
base_line_target
max_interation
df
interpolation
def run(self):
80    def run(self):
81        """Runs the mass error prediction calculation."""
82        self.df = self.calc_error_dist()

Runs the mass error prediction calculation.

def get_results(self):
84    def get_results(self):
85        """Returns the calculated error distribution dataframe."""
86
87        if not self.df:
88            self.run()
89
90        return self.df

Returns the calculated error distribution dataframe.

def calc_error_dist(self):
 92    def calc_error_dist(self):
 93        """Calculate the error distribution."""
 94        verbose = self.mass_spectrum_obj.parameters.mass_spectrum.verbose_processing
 95        results_list = []
 96
 97        indexes_without_results = list(range(len(self.mass_spectrum_obj)))
 98        # loop trough mass spectrum
 99        for peak_obj_idx, peak_obj in enumerate(tqdm(self.mass_spectrum_obj), disable=not verbose):
100            # access ms peaks triplets ( peak_obj_idx -1, peak_obj_idx, and peak_obj_idx + 1)
101            # check lower and upper boundaries to not excesses mass spectrum range
102
103            if peak_obj_idx != 0 and peak_obj_idx != len(self.mass_spectrum_obj) - 1:
104                # current peak_obj initialted in the loop expression
105                # geting the peak on the left (previous_peak_obj) and the one in the right position (next_peak_obj)
106                next_peak_obj = self.mass_spectrum_obj[peak_obj_idx + 1]
107                previous_peak_obj = self.mass_spectrum_obj[peak_obj_idx - 1]
108
109                # check mz range defined in max_mz variable and check if peaks have same nominal mz
110                # keeping same mz for better plotting representation only, remove it for production
111                if (
112                    peak_obj.nominal_mz_exp == next_peak_obj.nominal_mz_exp
113                    and peak_obj.nominal_mz_exp == previous_peak_obj.nominal_mz_exp
114                ):
115                    # simulate peak shape
116                    sim_mz, sim_abun = peak_obj.gaussian(mz_overlay=self.mz_overlay)
117                    # update_plot(sim_mz,sim_abun, 0.5)
118
119                    # simulate peak shape
120                    next_sim_mz, next_sim_abun = next_peak_obj.gaussian(
121                        mz_overlay=self.mz_overlay
122                    )
123                    # update_plot(next_sim_mz, next_sim_abun, 0.5)
124
125                    # simulate peak shape
126                    previous_sim_mz, previous_sim_abun = previous_peak_obj.gaussian(
127                        mz_overlay=self.mz_overlay
128                    )
129                    # update_plot(previous_sim_mz,  previous_sim_abun, 0.5)
130
131                    sim_mz_domain, summed_peaks_abun = self.sum_data(
132                        (
133                            (previous_sim_mz, previous_sim_abun),
134                            (sim_mz, sim_abun),
135                            (next_sim_mz, next_sim_abun),
136                        )
137                    )
138                    # update_plot(sim_mz_domain,summed_peaks_abun, 0.5)
139
140                    # sum simulated abundances
141                    # summed_peaks_abun = (sim_abun + next_sim_abun + previous_sim_abun)
142
143                    # normalize abundances to 0-1
144                    # summed_peaks_abun = summed_peaks_abun/(max(summed_peaks_abun))
145
146                    # find appexes location (mz) and magnitude
147                    mz_centroid, abund_centroid = self.find_peak_apex(
148                        sim_mz_domain, summed_peaks_abun
149                    )
150
151                    # find valley location (mz_min_valley) and magnitude (abund_min_valley)
152                    mz_min_valley, abund_min_valley = self.find_peak_valley(
153                        sim_mz_domain, summed_peaks_abun
154                    )
155
156                    # clear delta_rp (global implementation) and store choose resolving power increments
157                    delta_rp = self.rp_increments
158
159                    # used to limited number of iterations
160                    i = 0
161                    j = 0
162
163                    # TODO: fit peak shape and decide best fit #gaussian, lorentz and voigt
164                    # plot_triplets(mz_centroid,abund_centroid, mz_min_valley, abund_min_valley, sim_mz_domain, summed_peaks_abun )
165                    if len(mz_centroid) == 2:
166                        while len(mz_centroid) < 3 and i <= self.max_interation:
167                            previous_sim_mz, previous_sim_abun = (
168                                previous_peak_obj.gaussian(
169                                    delta_rp=delta_rp, mz_overlay=self.mz_overlay
170                                )
171                            )
172
173                            sim_mz, sim_abun = peak_obj.gaussian(
174                                delta_rp=delta_rp, mz_overlay=self.mz_overlay
175                            )
176
177                            next_sim_mz, next_sim_abun = next_peak_obj.gaussian(
178                                delta_rp=delta_rp, mz_overlay=self.mz_overlay
179                            )
180
181                            sim_mz_domain, summed_peaks_abun = self.sum_data(
182                                (
183                                    (previous_sim_mz, previous_sim_abun),
184                                    (sim_mz, sim_abun),
185                                    (next_sim_mz, next_sim_abun),
186                                )
187                            )
188
189                            # update_plot(sim_mz_domain,  summed_peaks_abun, 0.01)
190
191                            mz_centroid, abund_centroid = self.find_peak_apex(
192                                sim_mz_domain, summed_peaks_abun
193                            )
194
195                            delta_rp += self.rp_increments
196
197                            i += 1
198
199                        mz_min_valley, abund_min_valley = self.find_peak_valley(
200                            sim_mz_domain, summed_peaks_abun
201                        )
202
203                    if len(mz_centroid) == 3 and len(abund_min_valley) == 2:
204                        # increase all three peak resolving power until both valley magnitude is bellow the defined target
205                        # calculate peak shapes with the needed resolving power to have a baseline resolution for all peaks
206                        # calculate mass difference (ppm) between original centroid and the new simulated peak.
207
208                        while (
209                            abund_min_valley[0] > self.base_line_target
210                            or abund_min_valley[1] > self.base_line_target
211                            and j <= self.max_interation
212                        ):
213                            previous_sim_mz, previous_sim_abun = (
214                                previous_peak_obj.gaussian(
215                                    delta_rp=delta_rp, mz_overlay=self.mz_overlay
216                                )
217                            )
218
219                            sim_mz, sim_abun = peak_obj.gaussian(
220                                delta_rp=delta_rp, mz_overlay=self.mz_overlay
221                            )
222
223                            next_sim_mz, next_sim_abun = next_peak_obj.gaussian(
224                                delta_rp=delta_rp, mz_overlay=self.mz_overlay
225                            )
226
227                            sim_mz_domain, summed_peaks_abun = self.sum_data(
228                                (
229                                    (previous_sim_mz, previous_sim_abun),
230                                    (sim_mz, sim_abun),
231                                    (next_sim_mz, next_sim_abun),
232                                )
233                            )
234
235                            # update_plot(sim_mz_domain,  summed_peaks_abun, 0.001)
236
237                            # summed_peaks_abun = (sim_abun + next_sim_abun + previous_sim_abun)
238
239                            # find appexes location (mz) and magnitude
240                            mz_centroid, abund_centroid = self.find_peak_apex(
241                                sim_mz_domain, summed_peaks_abun
242                            )
243
244                            # find valley location (mz_min_valley) and magnitude (abund_min_valley)
245                            summed_peaks_abun = summed_peaks_abun / (
246                                summed_peaks_abun.max()
247                            )
248                            mz_min_valley, abund_min_valley = self.find_peak_valley(
249                                sim_mz_domain, summed_peaks_abun
250                            )
251
252                            if len(abund_min_valley) != 2:
253                                break
254
255                            delta_rp += self.rp_increments
256                            j += 1
257
258                            # plot_triplets(mz_centroid,abund_centroid, mz_min_valley, abund_min_valley, sim_mz_domain, summed_peaks_abun )
259
260                        # plot_triplets(mz_centroid,abund_centroid, mz_min_valley, abund_min_valley, sim_mz_domain, summed_peaks_abun )
261
262                        mass_shift_ppp = self.calc_error(
263                            mz_centroid[1], peak_obj.mz_exp, 1000000
264                        )
265                        # delta_mz = mz_centroid[1] - peak_obj.mz_exp
266                        height_shift_per = self.calc_error(
267                            abund_centroid[1], peak_obj.abundance, 100
268                        )
269                        # excitation_amplitude = str(mass_spectrum_obj.filename.stem).split("ex")[1].split("pc")[0]
270                        # ion_time = str(mass_spectrum_obj.filename.stem).split("0pt")[1].split("s")[0]
271                        peak_obj.predicted_std = mass_shift_ppp
272
273                        results_list.append(
274                            {
275                                "ms_index_position": peak_obj_idx,
276                                "predicted_std": mass_shift_ppp,
277                                "mz_exp": peak_obj.mz_exp,
278                                "nominal_mz_exp": peak_obj.nominal_mz_exp,
279                                "predicted_mz": mz_centroid[1],
280                                "s2n": peak_obj.signal_to_noise,
281                                "peak_height": peak_obj.abundance,
282                                "predicted_peak_height": abund_centroid[1],
283                                "peak_height_error": height_shift_per,
284                                "resolving_power": peak_obj.resolving_power,
285                                # "excitation_amplitude" : excitation_amplitude,
286                                # "ion_time" : ion_time
287                            }
288                        )
289
290                        indexes_without_results.remove(peak_obj_idx)
291                    # elif len(mz_centroid) == 3 and len(abund_min_valley) != 2:
292
293        for peak_obj_idx in indexes_without_results:
294            results_list.append(
295                {
296                    "ms_index_position": peak_obj_idx,
297                    "mz_exp": self.mass_spectrum_obj[peak_obj_idx].mz_exp,
298                    "nominal_mz_exp": self.mass_spectrum_obj[
299                        peak_obj_idx
300                    ].nominal_mz_exp,
301                    "s2n": self.mass_spectrum_obj[peak_obj_idx].signal_to_noise,
302                    "peak_height": self.mass_spectrum_obj[peak_obj_idx].abundance,
303                    "resolving_power": self.mass_spectrum_obj[
304                        peak_obj_idx
305                    ].resolving_power,
306                    # "excitation_amplitude" : excitation_amplitude,
307                    # "ion_time" : ion_time
308                }
309            )
310
311        df = DataFrame(results_list).sort_values("mz_exp")
312
313        df.interpolate(method="linear", limit_direction="backward", inplace=True)
314        df.interpolate(method="linear", limit_direction="forward", inplace=True)
315
316        # TODO improve interpolation for missing data
317        # f1 = interpolate.interp1d(x1, y1, kind='quadratic',fill_value="extrapolate")
318
319        for peak_obj_idx in indexes_without_results:
320            predicted_std = df.loc[peak_obj_idx].predicted_std
321
322            self.mass_spectrum_obj[peak_obj_idx].predicted_std = predicted_std
323
324        return df

Calculate the error distribution.

def sum_data(self, tuple_mz_abun_list: tuple):
326    def sum_data(self, tuple_mz_abun_list: tuple):
327        """Sum the abundances of the simulated peaks.
328
329        Parameters
330        ------
331        tuple_mz_abun_list : tuple
332            A tuple containing the mz and abundance lists.
333
334        Returns
335        -------
336        tuple
337            A tuple containing the summed mz and abundance lists.
338
339        """
340        all_mz = {}
341
342        for mz_list, abun_list in tuple_mz_abun_list:
343            for index, mz in enumerate(mz_list):
344                abundance = abun_list[index]
345
346                if mz in all_mz:
347                    all_mz[mz] = all_mz[mz] + abundance
348                else:
349                    all_mz[mz] = abundance
350
351        mz_all = []
352        abun_all = []
353
354        for mz in sorted(all_mz):
355            mz_all.append(mz)
356            abun_all.append(all_mz[mz])
357
358        return array(mz_all), array(abun_all)

Sum the abundances of the simulated peaks.

Parameters
  • tuple_mz_abun_list (tuple): A tuple containing the mz and abundance lists.
Returns
  • tuple: A tuple containing the summed mz and abundance lists.
def calc_error(self, mass_ref, mass_sim, factor):
360    def calc_error(self, mass_ref, mass_sim, factor):
361        """Calculate the error between two values.
362
363        Parameters
364        ----------
365        mass_ref : float
366            The reference value.
367        mass_sim : float
368            The simulated value.
369        factor : float
370            The factor to multiply the error by.
371
372        Returns
373        -------
374        float
375            The calculated error.
376
377
378        """
379        return (mass_sim - mass_ref / mass_ref) * factor

Calculate the error between two values.

Parameters
  • mass_ref (float): The reference value.
  • mass_sim (float): The simulated value.
  • factor (float): The factor to multiply the error by.
Returns
  • float: The calculated error.
def find_peak_apex(self, mz, abund):
381    def find_peak_apex(self, mz, abund):
382        """Find the peak apex.
383
384        Parameters
385        ------
386        mz : array
387            The mz array.
388        abund : array
389            The abundance array.
390
391        Returns
392        -------
393        tuple
394            A tuple containing the peak apex mass and abundance.
395
396        """
397        dy = abund[1:] - abund[:-1]
398
399        # replaces nan for infinity'''
400        indices_nan = where(isnan(abund))[0]
401
402        if indices_nan.size:
403            abund[indices_nan] = inf
404            dy[where(isnan(dy))[0]] = inf
405
406        indexes = where((hstack((dy, 0)) < 0) & (hstack((0, dy)) > 0))[0]
407
408        if indexes.size:
409            return mz[indexes], abund[indexes]

Find the peak apex.

Parameters
  • mz (array): The mz array.
  • abund (array): The abundance array.
Returns
  • tuple: A tuple containing the peak apex mass and abundance.
def find_peak_valley(self, mz, abund):
411    def find_peak_valley(self, mz, abund):
412        """Find the peak valley.
413
414        Parameters
415        ------
416        mz : array
417            The mz array.
418        abund : array
419            The abundance array.
420
421        Returns
422        -------
423        tuple
424            A tuple containing the peak valley mz and abundance.
425        """
426        dy = abund[1:] - abund[:-1]
427
428        # replaces nan for infinity
429        indices_nan = where(isnan(abund))[0]
430
431        if indices_nan.size:
432            abund[indices_nan] = inf
433            dy[where(isnan(dy))[0]] = inf
434
435        indexes = where((hstack((dy, 0)) > 0) & (hstack((0, dy)) < 0))[0]
436
437        return mz[indexes], abund[indexes]

Find the peak valley.

Parameters
  • mz (array): The mz array.
  • abund (array): The abundance array.
Returns
  • tuple: A tuple containing the peak valley mz and abundance.
Inherited Members
threading.Thread
start
join
name
ident
is_alive
daemon
isDaemon
setDaemon
getName
setName
native_id