corems.mass_spectrum.calc.MassErrorPrediction
1__author__ = "Yuri E. Corilo" 2__date__ = "03/31/2020" 3 4from threading import Thread 5from pandas import DataFrame 6from numpy import hstack, inf, isnan, where, array 7from tqdm import tqdm 8 9 10class MassErrorPrediction(Thread): 11 """Class for mass error prediction. 12 13 Parameters 14 ---------- 15 mass_spectrum : list 16 List of mass spectrum objects. 17 mz_overlay : int, optional 18 The mz overlay value for peak simulation. Default is 10. 19 rp_increments : int, optional 20 The resolving power increments for peak simulation. Default is 10000. 21 base_line_target : float, optional 22 The target value for the baseline resolution. Default is 0.01. 23 max_interation : int, optional 24 The maximum number of iterations for peak simulation. Default is 1000. 25 interpolation : str, optional 26 The interpolation method for missing data. Default is 'linear'. 27 28 Attributes 29 ---------- 30 mass_spectrum_obj : list 31 List of mass spectrum objects. 32 mz_overlay : int 33 The mz overlay value for peak simulation. 34 rp_increments : int 35 The resolving power increments for peak simulation. 36 base_line_target : float 37 The target value for the baseline resolution. 38 max_interation : int 39 The maximum number of iterations for peak simulation. 40 df : DataFrame or None 41 The calculated error distribution dataframe. 42 interpolation : str 43 The interpolation method for missing data. 44 45 Methods 46 ------- 47 * run(). 48 Runs the mass error prediction calculation. 49 * get_results(). 50 Returns the calculated error distribution dataframe. 51 52 """ 53 54 def __init__( 55 self, 56 mass_spectrum, 57 mz_overlay=10, 58 rp_increments=10000, 59 base_line_target: float = 0.01, 60 max_interation=1000, 61 interpolation="linear", 62 ): 63 Thread.__init__(self) 64 65 self.mass_spectrum_obj = mass_spectrum 66 67 self.mz_overlay = mz_overlay 68 69 self.rp_increments = rp_increments 70 71 self.base_line_target = base_line_target 72 73 self.max_interation = max_interation 74 75 self.df = None 76 77 self.interpolation = interpolation 78 79 def run(self): 80 """Runs the mass error prediction calculation.""" 81 self.df = self.calc_error_dist() 82 83 def get_results(self): 84 """Returns the calculated error distribution dataframe.""" 85 86 if not self.df: 87 self.run() 88 89 return self.df 90 91 def calc_error_dist(self): 92 """Calculate the error distribution.""" 93 verbose = self.mass_spectrum_obj.parameters.mass_spectrum.verbose_processing 94 results_list = [] 95 96 indexes_without_results = list(range(len(self.mass_spectrum_obj))) 97 # loop trough mass spectrum 98 for peak_obj_idx, peak_obj in enumerate(tqdm(self.mass_spectrum_obj), disable=not verbose): 99 # access ms peaks triplets ( peak_obj_idx -1, peak_obj_idx, and peak_obj_idx + 1) 100 # check lower and upper boundaries to not excesses mass spectrum range 101 102 if peak_obj_idx != 0 and peak_obj_idx != len(self.mass_spectrum_obj) - 1: 103 # current peak_obj initialted in the loop expression 104 # geting the peak on the left (previous_peak_obj) and the one in the right position (next_peak_obj) 105 next_peak_obj = self.mass_spectrum_obj[peak_obj_idx + 1] 106 previous_peak_obj = self.mass_spectrum_obj[peak_obj_idx - 1] 107 108 # check mz range defined in max_mz variable and check if peaks have same nominal mz 109 # keeping same mz for better plotting representation only, remove it for production 110 if ( 111 peak_obj.nominal_mz_exp == next_peak_obj.nominal_mz_exp 112 and peak_obj.nominal_mz_exp == previous_peak_obj.nominal_mz_exp 113 ): 114 # simulate peak shape 115 sim_mz, sim_abun = peak_obj.gaussian(mz_overlay=self.mz_overlay) 116 # update_plot(sim_mz,sim_abun, 0.5) 117 118 # simulate peak shape 119 next_sim_mz, next_sim_abun = next_peak_obj.gaussian( 120 mz_overlay=self.mz_overlay 121 ) 122 # update_plot(next_sim_mz, next_sim_abun, 0.5) 123 124 # simulate peak shape 125 previous_sim_mz, previous_sim_abun = previous_peak_obj.gaussian( 126 mz_overlay=self.mz_overlay 127 ) 128 # update_plot(previous_sim_mz, previous_sim_abun, 0.5) 129 130 sim_mz_domain, summed_peaks_abun = self.sum_data( 131 ( 132 (previous_sim_mz, previous_sim_abun), 133 (sim_mz, sim_abun), 134 (next_sim_mz, next_sim_abun), 135 ) 136 ) 137 # update_plot(sim_mz_domain,summed_peaks_abun, 0.5) 138 139 # sum simulated abundances 140 # summed_peaks_abun = (sim_abun + next_sim_abun + previous_sim_abun) 141 142 # normalize abundances to 0-1 143 # summed_peaks_abun = summed_peaks_abun/(max(summed_peaks_abun)) 144 145 # find appexes location (mz) and magnitude 146 mz_centroid, abund_centroid = self.find_peak_apex( 147 sim_mz_domain, summed_peaks_abun 148 ) 149 150 # find valley location (mz_min_valley) and magnitude (abund_min_valley) 151 mz_min_valley, abund_min_valley = self.find_peak_valley( 152 sim_mz_domain, summed_peaks_abun 153 ) 154 155 # clear delta_rp (global implementation) and store choose resolving power increments 156 delta_rp = self.rp_increments 157 158 # used to limited number of iterations 159 i = 0 160 j = 0 161 162 # TODO: fit peak shape and decide best fit #gaussian, lorentz and voigt 163 # plot_triplets(mz_centroid,abund_centroid, mz_min_valley, abund_min_valley, sim_mz_domain, summed_peaks_abun ) 164 if len(mz_centroid) == 2: 165 while len(mz_centroid) < 3 and i <= self.max_interation: 166 previous_sim_mz, previous_sim_abun = ( 167 previous_peak_obj.gaussian( 168 delta_rp=delta_rp, mz_overlay=self.mz_overlay 169 ) 170 ) 171 172 sim_mz, sim_abun = peak_obj.gaussian( 173 delta_rp=delta_rp, mz_overlay=self.mz_overlay 174 ) 175 176 next_sim_mz, next_sim_abun = next_peak_obj.gaussian( 177 delta_rp=delta_rp, mz_overlay=self.mz_overlay 178 ) 179 180 sim_mz_domain, summed_peaks_abun = self.sum_data( 181 ( 182 (previous_sim_mz, previous_sim_abun), 183 (sim_mz, sim_abun), 184 (next_sim_mz, next_sim_abun), 185 ) 186 ) 187 188 # update_plot(sim_mz_domain, summed_peaks_abun, 0.01) 189 190 mz_centroid, abund_centroid = self.find_peak_apex( 191 sim_mz_domain, summed_peaks_abun 192 ) 193 194 delta_rp += self.rp_increments 195 196 i += 1 197 198 mz_min_valley, abund_min_valley = self.find_peak_valley( 199 sim_mz_domain, summed_peaks_abun 200 ) 201 202 if len(mz_centroid) == 3 and len(abund_min_valley) == 2: 203 # increase all three peak resolving power until both valley magnitude is bellow the defined target 204 # calculate peak shapes with the needed resolving power to have a baseline resolution for all peaks 205 # calculate mass difference (ppm) between original centroid and the new simulated peak. 206 207 while ( 208 abund_min_valley[0] > self.base_line_target 209 or abund_min_valley[1] > self.base_line_target 210 and j <= self.max_interation 211 ): 212 previous_sim_mz, previous_sim_abun = ( 213 previous_peak_obj.gaussian( 214 delta_rp=delta_rp, mz_overlay=self.mz_overlay 215 ) 216 ) 217 218 sim_mz, sim_abun = peak_obj.gaussian( 219 delta_rp=delta_rp, mz_overlay=self.mz_overlay 220 ) 221 222 next_sim_mz, next_sim_abun = next_peak_obj.gaussian( 223 delta_rp=delta_rp, mz_overlay=self.mz_overlay 224 ) 225 226 sim_mz_domain, summed_peaks_abun = self.sum_data( 227 ( 228 (previous_sim_mz, previous_sim_abun), 229 (sim_mz, sim_abun), 230 (next_sim_mz, next_sim_abun), 231 ) 232 ) 233 234 # update_plot(sim_mz_domain, summed_peaks_abun, 0.001) 235 236 # summed_peaks_abun = (sim_abun + next_sim_abun + previous_sim_abun) 237 238 # find appexes location (mz) and magnitude 239 mz_centroid, abund_centroid = self.find_peak_apex( 240 sim_mz_domain, summed_peaks_abun 241 ) 242 243 # find valley location (mz_min_valley) and magnitude (abund_min_valley) 244 summed_peaks_abun = summed_peaks_abun / ( 245 summed_peaks_abun.max() 246 ) 247 mz_min_valley, abund_min_valley = self.find_peak_valley( 248 sim_mz_domain, summed_peaks_abun 249 ) 250 251 if len(abund_min_valley) != 2: 252 break 253 254 delta_rp += self.rp_increments 255 j += 1 256 257 # plot_triplets(mz_centroid,abund_centroid, mz_min_valley, abund_min_valley, sim_mz_domain, summed_peaks_abun ) 258 259 # plot_triplets(mz_centroid,abund_centroid, mz_min_valley, abund_min_valley, sim_mz_domain, summed_peaks_abun ) 260 261 mass_shift_ppp = self.calc_error( 262 mz_centroid[1], peak_obj.mz_exp, 1000000 263 ) 264 # delta_mz = mz_centroid[1] - peak_obj.mz_exp 265 height_shift_per = self.calc_error( 266 abund_centroid[1], peak_obj.abundance, 100 267 ) 268 # excitation_amplitude = str(mass_spectrum_obj.filename.stem).split("ex")[1].split("pc")[0] 269 # ion_time = str(mass_spectrum_obj.filename.stem).split("0pt")[1].split("s")[0] 270 peak_obj.predicted_std = mass_shift_ppp 271 272 results_list.append( 273 { 274 "ms_index_position": peak_obj_idx, 275 "predicted_std": mass_shift_ppp, 276 "mz_exp": peak_obj.mz_exp, 277 "nominal_mz_exp": peak_obj.nominal_mz_exp, 278 "predicted_mz": mz_centroid[1], 279 "s2n": peak_obj.signal_to_noise, 280 "peak_height": peak_obj.abundance, 281 "predicted_peak_height": abund_centroid[1], 282 "peak_height_error": height_shift_per, 283 "resolving_power": peak_obj.resolving_power, 284 # "excitation_amplitude" : excitation_amplitude, 285 # "ion_time" : ion_time 286 } 287 ) 288 289 indexes_without_results.remove(peak_obj_idx) 290 # elif len(mz_centroid) == 3 and len(abund_min_valley) != 2: 291 292 for peak_obj_idx in indexes_without_results: 293 results_list.append( 294 { 295 "ms_index_position": peak_obj_idx, 296 "mz_exp": self.mass_spectrum_obj[peak_obj_idx].mz_exp, 297 "nominal_mz_exp": self.mass_spectrum_obj[ 298 peak_obj_idx 299 ].nominal_mz_exp, 300 "s2n": self.mass_spectrum_obj[peak_obj_idx].signal_to_noise, 301 "peak_height": self.mass_spectrum_obj[peak_obj_idx].abundance, 302 "resolving_power": self.mass_spectrum_obj[ 303 peak_obj_idx 304 ].resolving_power, 305 # "excitation_amplitude" : excitation_amplitude, 306 # "ion_time" : ion_time 307 } 308 ) 309 310 df = DataFrame(results_list).sort_values("mz_exp") 311 312 df.interpolate(method="linear", limit_direction="backward", inplace=True) 313 df.interpolate(method="linear", limit_direction="forward", inplace=True) 314 315 # TODO improve interpolation for missing data 316 # f1 = interpolate.interp1d(x1, y1, kind='quadratic',fill_value="extrapolate") 317 318 for peak_obj_idx in indexes_without_results: 319 predicted_std = df.loc[peak_obj_idx].predicted_std 320 321 self.mass_spectrum_obj[peak_obj_idx].predicted_std = predicted_std 322 323 return df 324 325 def sum_data(self, tuple_mz_abun_list: tuple): 326 """Sum the abundances of the simulated peaks. 327 328 Parameters 329 ------ 330 tuple_mz_abun_list : tuple 331 A tuple containing the mz and abundance lists. 332 333 Returns 334 ------- 335 tuple 336 A tuple containing the summed mz and abundance lists. 337 338 """ 339 all_mz = {} 340 341 for mz_list, abun_list in tuple_mz_abun_list: 342 for index, mz in enumerate(mz_list): 343 abundance = abun_list[index] 344 345 if mz in all_mz: 346 all_mz[mz] = all_mz[mz] + abundance 347 else: 348 all_mz[mz] = abundance 349 350 mz_all = [] 351 abun_all = [] 352 353 for mz in sorted(all_mz): 354 mz_all.append(mz) 355 abun_all.append(all_mz[mz]) 356 357 return array(mz_all), array(abun_all) 358 359 def calc_error(self, mass_ref, mass_sim, factor): 360 """Calculate the error between two values. 361 362 Parameters 363 ---------- 364 mass_ref : float 365 The reference value. 366 mass_sim : float 367 The simulated value. 368 factor : float 369 The factor to multiply the error by. 370 371 Returns 372 ------- 373 float 374 The calculated error. 375 376 377 """ 378 return (mass_sim - mass_ref / mass_ref) * factor 379 380 def find_peak_apex(self, mz, abund): 381 """Find the peak apex. 382 383 Parameters 384 ------ 385 mz : array 386 The mz array. 387 abund : array 388 The abundance array. 389 390 Returns 391 ------- 392 tuple 393 A tuple containing the peak apex mass and abundance. 394 395 """ 396 dy = abund[1:] - abund[:-1] 397 398 # replaces nan for infinity''' 399 indices_nan = where(isnan(abund))[0] 400 401 if indices_nan.size: 402 abund[indices_nan] = inf 403 dy[where(isnan(dy))[0]] = inf 404 405 indexes = where((hstack((dy, 0)) < 0) & (hstack((0, dy)) > 0))[0] 406 407 if indexes.size: 408 return mz[indexes], abund[indexes] 409 410 def find_peak_valley(self, mz, abund): 411 """Find the peak valley. 412 413 Parameters 414 ------ 415 mz : array 416 The mz array. 417 abund : array 418 The abundance array. 419 420 Returns 421 ------- 422 tuple 423 A tuple containing the peak valley mz and abundance. 424 """ 425 dy = abund[1:] - abund[:-1] 426 427 # replaces nan for infinity 428 indices_nan = where(isnan(abund))[0] 429 430 if indices_nan.size: 431 abund[indices_nan] = inf 432 dy[where(isnan(dy))[0]] = inf 433 434 indexes = where((hstack((dy, 0)) > 0) & (hstack((0, dy)) < 0))[0] 435 436 return mz[indexes], abund[indexes]
11class MassErrorPrediction(Thread): 12 """Class for mass error prediction. 13 14 Parameters 15 ---------- 16 mass_spectrum : list 17 List of mass spectrum objects. 18 mz_overlay : int, optional 19 The mz overlay value for peak simulation. Default is 10. 20 rp_increments : int, optional 21 The resolving power increments for peak simulation. Default is 10000. 22 base_line_target : float, optional 23 The target value for the baseline resolution. Default is 0.01. 24 max_interation : int, optional 25 The maximum number of iterations for peak simulation. Default is 1000. 26 interpolation : str, optional 27 The interpolation method for missing data. Default is 'linear'. 28 29 Attributes 30 ---------- 31 mass_spectrum_obj : list 32 List of mass spectrum objects. 33 mz_overlay : int 34 The mz overlay value for peak simulation. 35 rp_increments : int 36 The resolving power increments for peak simulation. 37 base_line_target : float 38 The target value for the baseline resolution. 39 max_interation : int 40 The maximum number of iterations for peak simulation. 41 df : DataFrame or None 42 The calculated error distribution dataframe. 43 interpolation : str 44 The interpolation method for missing data. 45 46 Methods 47 ------- 48 * run(). 49 Runs the mass error prediction calculation. 50 * get_results(). 51 Returns the calculated error distribution dataframe. 52 53 """ 54 55 def __init__( 56 self, 57 mass_spectrum, 58 mz_overlay=10, 59 rp_increments=10000, 60 base_line_target: float = 0.01, 61 max_interation=1000, 62 interpolation="linear", 63 ): 64 Thread.__init__(self) 65 66 self.mass_spectrum_obj = mass_spectrum 67 68 self.mz_overlay = mz_overlay 69 70 self.rp_increments = rp_increments 71 72 self.base_line_target = base_line_target 73 74 self.max_interation = max_interation 75 76 self.df = None 77 78 self.interpolation = interpolation 79 80 def run(self): 81 """Runs the mass error prediction calculation.""" 82 self.df = self.calc_error_dist() 83 84 def get_results(self): 85 """Returns the calculated error distribution dataframe.""" 86 87 if not self.df: 88 self.run() 89 90 return self.df 91 92 def calc_error_dist(self): 93 """Calculate the error distribution.""" 94 verbose = self.mass_spectrum_obj.parameters.mass_spectrum.verbose_processing 95 results_list = [] 96 97 indexes_without_results = list(range(len(self.mass_spectrum_obj))) 98 # loop trough mass spectrum 99 for peak_obj_idx, peak_obj in enumerate(tqdm(self.mass_spectrum_obj), disable=not verbose): 100 # access ms peaks triplets ( peak_obj_idx -1, peak_obj_idx, and peak_obj_idx + 1) 101 # check lower and upper boundaries to not excesses mass spectrum range 102 103 if peak_obj_idx != 0 and peak_obj_idx != len(self.mass_spectrum_obj) - 1: 104 # current peak_obj initialted in the loop expression 105 # geting the peak on the left (previous_peak_obj) and the one in the right position (next_peak_obj) 106 next_peak_obj = self.mass_spectrum_obj[peak_obj_idx + 1] 107 previous_peak_obj = self.mass_spectrum_obj[peak_obj_idx - 1] 108 109 # check mz range defined in max_mz variable and check if peaks have same nominal mz 110 # keeping same mz for better plotting representation only, remove it for production 111 if ( 112 peak_obj.nominal_mz_exp == next_peak_obj.nominal_mz_exp 113 and peak_obj.nominal_mz_exp == previous_peak_obj.nominal_mz_exp 114 ): 115 # simulate peak shape 116 sim_mz, sim_abun = peak_obj.gaussian(mz_overlay=self.mz_overlay) 117 # update_plot(sim_mz,sim_abun, 0.5) 118 119 # simulate peak shape 120 next_sim_mz, next_sim_abun = next_peak_obj.gaussian( 121 mz_overlay=self.mz_overlay 122 ) 123 # update_plot(next_sim_mz, next_sim_abun, 0.5) 124 125 # simulate peak shape 126 previous_sim_mz, previous_sim_abun = previous_peak_obj.gaussian( 127 mz_overlay=self.mz_overlay 128 ) 129 # update_plot(previous_sim_mz, previous_sim_abun, 0.5) 130 131 sim_mz_domain, summed_peaks_abun = self.sum_data( 132 ( 133 (previous_sim_mz, previous_sim_abun), 134 (sim_mz, sim_abun), 135 (next_sim_mz, next_sim_abun), 136 ) 137 ) 138 # update_plot(sim_mz_domain,summed_peaks_abun, 0.5) 139 140 # sum simulated abundances 141 # summed_peaks_abun = (sim_abun + next_sim_abun + previous_sim_abun) 142 143 # normalize abundances to 0-1 144 # summed_peaks_abun = summed_peaks_abun/(max(summed_peaks_abun)) 145 146 # find appexes location (mz) and magnitude 147 mz_centroid, abund_centroid = self.find_peak_apex( 148 sim_mz_domain, summed_peaks_abun 149 ) 150 151 # find valley location (mz_min_valley) and magnitude (abund_min_valley) 152 mz_min_valley, abund_min_valley = self.find_peak_valley( 153 sim_mz_domain, summed_peaks_abun 154 ) 155 156 # clear delta_rp (global implementation) and store choose resolving power increments 157 delta_rp = self.rp_increments 158 159 # used to limited number of iterations 160 i = 0 161 j = 0 162 163 # TODO: fit peak shape and decide best fit #gaussian, lorentz and voigt 164 # plot_triplets(mz_centroid,abund_centroid, mz_min_valley, abund_min_valley, sim_mz_domain, summed_peaks_abun ) 165 if len(mz_centroid) == 2: 166 while len(mz_centroid) < 3 and i <= self.max_interation: 167 previous_sim_mz, previous_sim_abun = ( 168 previous_peak_obj.gaussian( 169 delta_rp=delta_rp, mz_overlay=self.mz_overlay 170 ) 171 ) 172 173 sim_mz, sim_abun = peak_obj.gaussian( 174 delta_rp=delta_rp, mz_overlay=self.mz_overlay 175 ) 176 177 next_sim_mz, next_sim_abun = next_peak_obj.gaussian( 178 delta_rp=delta_rp, mz_overlay=self.mz_overlay 179 ) 180 181 sim_mz_domain, summed_peaks_abun = self.sum_data( 182 ( 183 (previous_sim_mz, previous_sim_abun), 184 (sim_mz, sim_abun), 185 (next_sim_mz, next_sim_abun), 186 ) 187 ) 188 189 # update_plot(sim_mz_domain, summed_peaks_abun, 0.01) 190 191 mz_centroid, abund_centroid = self.find_peak_apex( 192 sim_mz_domain, summed_peaks_abun 193 ) 194 195 delta_rp += self.rp_increments 196 197 i += 1 198 199 mz_min_valley, abund_min_valley = self.find_peak_valley( 200 sim_mz_domain, summed_peaks_abun 201 ) 202 203 if len(mz_centroid) == 3 and len(abund_min_valley) == 2: 204 # increase all three peak resolving power until both valley magnitude is bellow the defined target 205 # calculate peak shapes with the needed resolving power to have a baseline resolution for all peaks 206 # calculate mass difference (ppm) between original centroid and the new simulated peak. 207 208 while ( 209 abund_min_valley[0] > self.base_line_target 210 or abund_min_valley[1] > self.base_line_target 211 and j <= self.max_interation 212 ): 213 previous_sim_mz, previous_sim_abun = ( 214 previous_peak_obj.gaussian( 215 delta_rp=delta_rp, mz_overlay=self.mz_overlay 216 ) 217 ) 218 219 sim_mz, sim_abun = peak_obj.gaussian( 220 delta_rp=delta_rp, mz_overlay=self.mz_overlay 221 ) 222 223 next_sim_mz, next_sim_abun = next_peak_obj.gaussian( 224 delta_rp=delta_rp, mz_overlay=self.mz_overlay 225 ) 226 227 sim_mz_domain, summed_peaks_abun = self.sum_data( 228 ( 229 (previous_sim_mz, previous_sim_abun), 230 (sim_mz, sim_abun), 231 (next_sim_mz, next_sim_abun), 232 ) 233 ) 234 235 # update_plot(sim_mz_domain, summed_peaks_abun, 0.001) 236 237 # summed_peaks_abun = (sim_abun + next_sim_abun + previous_sim_abun) 238 239 # find appexes location (mz) and magnitude 240 mz_centroid, abund_centroid = self.find_peak_apex( 241 sim_mz_domain, summed_peaks_abun 242 ) 243 244 # find valley location (mz_min_valley) and magnitude (abund_min_valley) 245 summed_peaks_abun = summed_peaks_abun / ( 246 summed_peaks_abun.max() 247 ) 248 mz_min_valley, abund_min_valley = self.find_peak_valley( 249 sim_mz_domain, summed_peaks_abun 250 ) 251 252 if len(abund_min_valley) != 2: 253 break 254 255 delta_rp += self.rp_increments 256 j += 1 257 258 # plot_triplets(mz_centroid,abund_centroid, mz_min_valley, abund_min_valley, sim_mz_domain, summed_peaks_abun ) 259 260 # plot_triplets(mz_centroid,abund_centroid, mz_min_valley, abund_min_valley, sim_mz_domain, summed_peaks_abun ) 261 262 mass_shift_ppp = self.calc_error( 263 mz_centroid[1], peak_obj.mz_exp, 1000000 264 ) 265 # delta_mz = mz_centroid[1] - peak_obj.mz_exp 266 height_shift_per = self.calc_error( 267 abund_centroid[1], peak_obj.abundance, 100 268 ) 269 # excitation_amplitude = str(mass_spectrum_obj.filename.stem).split("ex")[1].split("pc")[0] 270 # ion_time = str(mass_spectrum_obj.filename.stem).split("0pt")[1].split("s")[0] 271 peak_obj.predicted_std = mass_shift_ppp 272 273 results_list.append( 274 { 275 "ms_index_position": peak_obj_idx, 276 "predicted_std": mass_shift_ppp, 277 "mz_exp": peak_obj.mz_exp, 278 "nominal_mz_exp": peak_obj.nominal_mz_exp, 279 "predicted_mz": mz_centroid[1], 280 "s2n": peak_obj.signal_to_noise, 281 "peak_height": peak_obj.abundance, 282 "predicted_peak_height": abund_centroid[1], 283 "peak_height_error": height_shift_per, 284 "resolving_power": peak_obj.resolving_power, 285 # "excitation_amplitude" : excitation_amplitude, 286 # "ion_time" : ion_time 287 } 288 ) 289 290 indexes_without_results.remove(peak_obj_idx) 291 # elif len(mz_centroid) == 3 and len(abund_min_valley) != 2: 292 293 for peak_obj_idx in indexes_without_results: 294 results_list.append( 295 { 296 "ms_index_position": peak_obj_idx, 297 "mz_exp": self.mass_spectrum_obj[peak_obj_idx].mz_exp, 298 "nominal_mz_exp": self.mass_spectrum_obj[ 299 peak_obj_idx 300 ].nominal_mz_exp, 301 "s2n": self.mass_spectrum_obj[peak_obj_idx].signal_to_noise, 302 "peak_height": self.mass_spectrum_obj[peak_obj_idx].abundance, 303 "resolving_power": self.mass_spectrum_obj[ 304 peak_obj_idx 305 ].resolving_power, 306 # "excitation_amplitude" : excitation_amplitude, 307 # "ion_time" : ion_time 308 } 309 ) 310 311 df = DataFrame(results_list).sort_values("mz_exp") 312 313 df.interpolate(method="linear", limit_direction="backward", inplace=True) 314 df.interpolate(method="linear", limit_direction="forward", inplace=True) 315 316 # TODO improve interpolation for missing data 317 # f1 = interpolate.interp1d(x1, y1, kind='quadratic',fill_value="extrapolate") 318 319 for peak_obj_idx in indexes_without_results: 320 predicted_std = df.loc[peak_obj_idx].predicted_std 321 322 self.mass_spectrum_obj[peak_obj_idx].predicted_std = predicted_std 323 324 return df 325 326 def sum_data(self, tuple_mz_abun_list: tuple): 327 """Sum the abundances of the simulated peaks. 328 329 Parameters 330 ------ 331 tuple_mz_abun_list : tuple 332 A tuple containing the mz and abundance lists. 333 334 Returns 335 ------- 336 tuple 337 A tuple containing the summed mz and abundance lists. 338 339 """ 340 all_mz = {} 341 342 for mz_list, abun_list in tuple_mz_abun_list: 343 for index, mz in enumerate(mz_list): 344 abundance = abun_list[index] 345 346 if mz in all_mz: 347 all_mz[mz] = all_mz[mz] + abundance 348 else: 349 all_mz[mz] = abundance 350 351 mz_all = [] 352 abun_all = [] 353 354 for mz in sorted(all_mz): 355 mz_all.append(mz) 356 abun_all.append(all_mz[mz]) 357 358 return array(mz_all), array(abun_all) 359 360 def calc_error(self, mass_ref, mass_sim, factor): 361 """Calculate the error between two values. 362 363 Parameters 364 ---------- 365 mass_ref : float 366 The reference value. 367 mass_sim : float 368 The simulated value. 369 factor : float 370 The factor to multiply the error by. 371 372 Returns 373 ------- 374 float 375 The calculated error. 376 377 378 """ 379 return (mass_sim - mass_ref / mass_ref) * factor 380 381 def find_peak_apex(self, mz, abund): 382 """Find the peak apex. 383 384 Parameters 385 ------ 386 mz : array 387 The mz array. 388 abund : array 389 The abundance array. 390 391 Returns 392 ------- 393 tuple 394 A tuple containing the peak apex mass and abundance. 395 396 """ 397 dy = abund[1:] - abund[:-1] 398 399 # replaces nan for infinity''' 400 indices_nan = where(isnan(abund))[0] 401 402 if indices_nan.size: 403 abund[indices_nan] = inf 404 dy[where(isnan(dy))[0]] = inf 405 406 indexes = where((hstack((dy, 0)) < 0) & (hstack((0, dy)) > 0))[0] 407 408 if indexes.size: 409 return mz[indexes], abund[indexes] 410 411 def find_peak_valley(self, mz, abund): 412 """Find the peak valley. 413 414 Parameters 415 ------ 416 mz : array 417 The mz array. 418 abund : array 419 The abundance array. 420 421 Returns 422 ------- 423 tuple 424 A tuple containing the peak valley mz and abundance. 425 """ 426 dy = abund[1:] - abund[:-1] 427 428 # replaces nan for infinity 429 indices_nan = where(isnan(abund))[0] 430 431 if indices_nan.size: 432 abund[indices_nan] = inf 433 dy[where(isnan(dy))[0]] = inf 434 435 indexes = where((hstack((dy, 0)) > 0) & (hstack((0, dy)) < 0))[0] 436 437 return mz[indexes], abund[indexes]
Class for mass error prediction.
Parameters
- mass_spectrum (list): List of mass spectrum objects.
- mz_overlay (int, optional): The mz overlay value for peak simulation. Default is 10.
- rp_increments (int, optional): The resolving power increments for peak simulation. Default is 10000.
- base_line_target (float, optional): The target value for the baseline resolution. Default is 0.01.
- max_interation (int, optional): The maximum number of iterations for peak simulation. Default is 1000.
- interpolation (str, optional): The interpolation method for missing data. Default is 'linear'.
Attributes
- mass_spectrum_obj (list): List of mass spectrum objects.
- mz_overlay (int): The mz overlay value for peak simulation.
- rp_increments (int): The resolving power increments for peak simulation.
- base_line_target (float): The target value for the baseline resolution.
- max_interation (int): The maximum number of iterations for peak simulation.
- df (DataFrame or None): The calculated error distribution dataframe.
- interpolation (str): The interpolation method for missing data.
Methods
- run(). Runs the mass error prediction calculation.
- get_results(). Returns the calculated error distribution dataframe.
55 def __init__( 56 self, 57 mass_spectrum, 58 mz_overlay=10, 59 rp_increments=10000, 60 base_line_target: float = 0.01, 61 max_interation=1000, 62 interpolation="linear", 63 ): 64 Thread.__init__(self) 65 66 self.mass_spectrum_obj = mass_spectrum 67 68 self.mz_overlay = mz_overlay 69 70 self.rp_increments = rp_increments 71 72 self.base_line_target = base_line_target 73 74 self.max_interation = max_interation 75 76 self.df = None 77 78 self.interpolation = interpolation
This constructor should always be called with keyword arguments. Arguments are:
group should be None; reserved for future extension when a ThreadGroup class is implemented.
target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.
name is the thread name. By default, a unique name is constructed of the form "Thread-N" where N is a small decimal number.
args is the argument tuple for the target invocation. Defaults to ().
kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.
If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread.
80 def run(self): 81 """Runs the mass error prediction calculation.""" 82 self.df = self.calc_error_dist()
Runs the mass error prediction calculation.
84 def get_results(self): 85 """Returns the calculated error distribution dataframe.""" 86 87 if not self.df: 88 self.run() 89 90 return self.df
Returns the calculated error distribution dataframe.
92 def calc_error_dist(self): 93 """Calculate the error distribution.""" 94 verbose = self.mass_spectrum_obj.parameters.mass_spectrum.verbose_processing 95 results_list = [] 96 97 indexes_without_results = list(range(len(self.mass_spectrum_obj))) 98 # loop trough mass spectrum 99 for peak_obj_idx, peak_obj in enumerate(tqdm(self.mass_spectrum_obj), disable=not verbose): 100 # access ms peaks triplets ( peak_obj_idx -1, peak_obj_idx, and peak_obj_idx + 1) 101 # check lower and upper boundaries to not excesses mass spectrum range 102 103 if peak_obj_idx != 0 and peak_obj_idx != len(self.mass_spectrum_obj) - 1: 104 # current peak_obj initialted in the loop expression 105 # geting the peak on the left (previous_peak_obj) and the one in the right position (next_peak_obj) 106 next_peak_obj = self.mass_spectrum_obj[peak_obj_idx + 1] 107 previous_peak_obj = self.mass_spectrum_obj[peak_obj_idx - 1] 108 109 # check mz range defined in max_mz variable and check if peaks have same nominal mz 110 # keeping same mz for better plotting representation only, remove it for production 111 if ( 112 peak_obj.nominal_mz_exp == next_peak_obj.nominal_mz_exp 113 and peak_obj.nominal_mz_exp == previous_peak_obj.nominal_mz_exp 114 ): 115 # simulate peak shape 116 sim_mz, sim_abun = peak_obj.gaussian(mz_overlay=self.mz_overlay) 117 # update_plot(sim_mz,sim_abun, 0.5) 118 119 # simulate peak shape 120 next_sim_mz, next_sim_abun = next_peak_obj.gaussian( 121 mz_overlay=self.mz_overlay 122 ) 123 # update_plot(next_sim_mz, next_sim_abun, 0.5) 124 125 # simulate peak shape 126 previous_sim_mz, previous_sim_abun = previous_peak_obj.gaussian( 127 mz_overlay=self.mz_overlay 128 ) 129 # update_plot(previous_sim_mz, previous_sim_abun, 0.5) 130 131 sim_mz_domain, summed_peaks_abun = self.sum_data( 132 ( 133 (previous_sim_mz, previous_sim_abun), 134 (sim_mz, sim_abun), 135 (next_sim_mz, next_sim_abun), 136 ) 137 ) 138 # update_plot(sim_mz_domain,summed_peaks_abun, 0.5) 139 140 # sum simulated abundances 141 # summed_peaks_abun = (sim_abun + next_sim_abun + previous_sim_abun) 142 143 # normalize abundances to 0-1 144 # summed_peaks_abun = summed_peaks_abun/(max(summed_peaks_abun)) 145 146 # find appexes location (mz) and magnitude 147 mz_centroid, abund_centroid = self.find_peak_apex( 148 sim_mz_domain, summed_peaks_abun 149 ) 150 151 # find valley location (mz_min_valley) and magnitude (abund_min_valley) 152 mz_min_valley, abund_min_valley = self.find_peak_valley( 153 sim_mz_domain, summed_peaks_abun 154 ) 155 156 # clear delta_rp (global implementation) and store choose resolving power increments 157 delta_rp = self.rp_increments 158 159 # used to limited number of iterations 160 i = 0 161 j = 0 162 163 # TODO: fit peak shape and decide best fit #gaussian, lorentz and voigt 164 # plot_triplets(mz_centroid,abund_centroid, mz_min_valley, abund_min_valley, sim_mz_domain, summed_peaks_abun ) 165 if len(mz_centroid) == 2: 166 while len(mz_centroid) < 3 and i <= self.max_interation: 167 previous_sim_mz, previous_sim_abun = ( 168 previous_peak_obj.gaussian( 169 delta_rp=delta_rp, mz_overlay=self.mz_overlay 170 ) 171 ) 172 173 sim_mz, sim_abun = peak_obj.gaussian( 174 delta_rp=delta_rp, mz_overlay=self.mz_overlay 175 ) 176 177 next_sim_mz, next_sim_abun = next_peak_obj.gaussian( 178 delta_rp=delta_rp, mz_overlay=self.mz_overlay 179 ) 180 181 sim_mz_domain, summed_peaks_abun = self.sum_data( 182 ( 183 (previous_sim_mz, previous_sim_abun), 184 (sim_mz, sim_abun), 185 (next_sim_mz, next_sim_abun), 186 ) 187 ) 188 189 # update_plot(sim_mz_domain, summed_peaks_abun, 0.01) 190 191 mz_centroid, abund_centroid = self.find_peak_apex( 192 sim_mz_domain, summed_peaks_abun 193 ) 194 195 delta_rp += self.rp_increments 196 197 i += 1 198 199 mz_min_valley, abund_min_valley = self.find_peak_valley( 200 sim_mz_domain, summed_peaks_abun 201 ) 202 203 if len(mz_centroid) == 3 and len(abund_min_valley) == 2: 204 # increase all three peak resolving power until both valley magnitude is bellow the defined target 205 # calculate peak shapes with the needed resolving power to have a baseline resolution for all peaks 206 # calculate mass difference (ppm) between original centroid and the new simulated peak. 207 208 while ( 209 abund_min_valley[0] > self.base_line_target 210 or abund_min_valley[1] > self.base_line_target 211 and j <= self.max_interation 212 ): 213 previous_sim_mz, previous_sim_abun = ( 214 previous_peak_obj.gaussian( 215 delta_rp=delta_rp, mz_overlay=self.mz_overlay 216 ) 217 ) 218 219 sim_mz, sim_abun = peak_obj.gaussian( 220 delta_rp=delta_rp, mz_overlay=self.mz_overlay 221 ) 222 223 next_sim_mz, next_sim_abun = next_peak_obj.gaussian( 224 delta_rp=delta_rp, mz_overlay=self.mz_overlay 225 ) 226 227 sim_mz_domain, summed_peaks_abun = self.sum_data( 228 ( 229 (previous_sim_mz, previous_sim_abun), 230 (sim_mz, sim_abun), 231 (next_sim_mz, next_sim_abun), 232 ) 233 ) 234 235 # update_plot(sim_mz_domain, summed_peaks_abun, 0.001) 236 237 # summed_peaks_abun = (sim_abun + next_sim_abun + previous_sim_abun) 238 239 # find appexes location (mz) and magnitude 240 mz_centroid, abund_centroid = self.find_peak_apex( 241 sim_mz_domain, summed_peaks_abun 242 ) 243 244 # find valley location (mz_min_valley) and magnitude (abund_min_valley) 245 summed_peaks_abun = summed_peaks_abun / ( 246 summed_peaks_abun.max() 247 ) 248 mz_min_valley, abund_min_valley = self.find_peak_valley( 249 sim_mz_domain, summed_peaks_abun 250 ) 251 252 if len(abund_min_valley) != 2: 253 break 254 255 delta_rp += self.rp_increments 256 j += 1 257 258 # plot_triplets(mz_centroid,abund_centroid, mz_min_valley, abund_min_valley, sim_mz_domain, summed_peaks_abun ) 259 260 # plot_triplets(mz_centroid,abund_centroid, mz_min_valley, abund_min_valley, sim_mz_domain, summed_peaks_abun ) 261 262 mass_shift_ppp = self.calc_error( 263 mz_centroid[1], peak_obj.mz_exp, 1000000 264 ) 265 # delta_mz = mz_centroid[1] - peak_obj.mz_exp 266 height_shift_per = self.calc_error( 267 abund_centroid[1], peak_obj.abundance, 100 268 ) 269 # excitation_amplitude = str(mass_spectrum_obj.filename.stem).split("ex")[1].split("pc")[0] 270 # ion_time = str(mass_spectrum_obj.filename.stem).split("0pt")[1].split("s")[0] 271 peak_obj.predicted_std = mass_shift_ppp 272 273 results_list.append( 274 { 275 "ms_index_position": peak_obj_idx, 276 "predicted_std": mass_shift_ppp, 277 "mz_exp": peak_obj.mz_exp, 278 "nominal_mz_exp": peak_obj.nominal_mz_exp, 279 "predicted_mz": mz_centroid[1], 280 "s2n": peak_obj.signal_to_noise, 281 "peak_height": peak_obj.abundance, 282 "predicted_peak_height": abund_centroid[1], 283 "peak_height_error": height_shift_per, 284 "resolving_power": peak_obj.resolving_power, 285 # "excitation_amplitude" : excitation_amplitude, 286 # "ion_time" : ion_time 287 } 288 ) 289 290 indexes_without_results.remove(peak_obj_idx) 291 # elif len(mz_centroid) == 3 and len(abund_min_valley) != 2: 292 293 for peak_obj_idx in indexes_without_results: 294 results_list.append( 295 { 296 "ms_index_position": peak_obj_idx, 297 "mz_exp": self.mass_spectrum_obj[peak_obj_idx].mz_exp, 298 "nominal_mz_exp": self.mass_spectrum_obj[ 299 peak_obj_idx 300 ].nominal_mz_exp, 301 "s2n": self.mass_spectrum_obj[peak_obj_idx].signal_to_noise, 302 "peak_height": self.mass_spectrum_obj[peak_obj_idx].abundance, 303 "resolving_power": self.mass_spectrum_obj[ 304 peak_obj_idx 305 ].resolving_power, 306 # "excitation_amplitude" : excitation_amplitude, 307 # "ion_time" : ion_time 308 } 309 ) 310 311 df = DataFrame(results_list).sort_values("mz_exp") 312 313 df.interpolate(method="linear", limit_direction="backward", inplace=True) 314 df.interpolate(method="linear", limit_direction="forward", inplace=True) 315 316 # TODO improve interpolation for missing data 317 # f1 = interpolate.interp1d(x1, y1, kind='quadratic',fill_value="extrapolate") 318 319 for peak_obj_idx in indexes_without_results: 320 predicted_std = df.loc[peak_obj_idx].predicted_std 321 322 self.mass_spectrum_obj[peak_obj_idx].predicted_std = predicted_std 323 324 return df
Calculate the error distribution.
326 def sum_data(self, tuple_mz_abun_list: tuple): 327 """Sum the abundances of the simulated peaks. 328 329 Parameters 330 ------ 331 tuple_mz_abun_list : tuple 332 A tuple containing the mz and abundance lists. 333 334 Returns 335 ------- 336 tuple 337 A tuple containing the summed mz and abundance lists. 338 339 """ 340 all_mz = {} 341 342 for mz_list, abun_list in tuple_mz_abun_list: 343 for index, mz in enumerate(mz_list): 344 abundance = abun_list[index] 345 346 if mz in all_mz: 347 all_mz[mz] = all_mz[mz] + abundance 348 else: 349 all_mz[mz] = abundance 350 351 mz_all = [] 352 abun_all = [] 353 354 for mz in sorted(all_mz): 355 mz_all.append(mz) 356 abun_all.append(all_mz[mz]) 357 358 return array(mz_all), array(abun_all)
Sum the abundances of the simulated peaks.
Parameters
- tuple_mz_abun_list (tuple): A tuple containing the mz and abundance lists.
Returns
- tuple: A tuple containing the summed mz and abundance lists.
360 def calc_error(self, mass_ref, mass_sim, factor): 361 """Calculate the error between two values. 362 363 Parameters 364 ---------- 365 mass_ref : float 366 The reference value. 367 mass_sim : float 368 The simulated value. 369 factor : float 370 The factor to multiply the error by. 371 372 Returns 373 ------- 374 float 375 The calculated error. 376 377 378 """ 379 return (mass_sim - mass_ref / mass_ref) * factor
Calculate the error between two values.
Parameters
- mass_ref (float): The reference value.
- mass_sim (float): The simulated value.
- factor (float): The factor to multiply the error by.
Returns
- float: The calculated error.
381 def find_peak_apex(self, mz, abund): 382 """Find the peak apex. 383 384 Parameters 385 ------ 386 mz : array 387 The mz array. 388 abund : array 389 The abundance array. 390 391 Returns 392 ------- 393 tuple 394 A tuple containing the peak apex mass and abundance. 395 396 """ 397 dy = abund[1:] - abund[:-1] 398 399 # replaces nan for infinity''' 400 indices_nan = where(isnan(abund))[0] 401 402 if indices_nan.size: 403 abund[indices_nan] = inf 404 dy[where(isnan(dy))[0]] = inf 405 406 indexes = where((hstack((dy, 0)) < 0) & (hstack((0, dy)) > 0))[0] 407 408 if indexes.size: 409 return mz[indexes], abund[indexes]
Find the peak apex.
Parameters
- mz (array): The mz array.
- abund (array): The abundance array.
Returns
- tuple: A tuple containing the peak apex mass and abundance.
411 def find_peak_valley(self, mz, abund): 412 """Find the peak valley. 413 414 Parameters 415 ------ 416 mz : array 417 The mz array. 418 abund : array 419 The abundance array. 420 421 Returns 422 ------- 423 tuple 424 A tuple containing the peak valley mz and abundance. 425 """ 426 dy = abund[1:] - abund[:-1] 427 428 # replaces nan for infinity 429 indices_nan = where(isnan(abund))[0] 430 431 if indices_nan.size: 432 abund[indices_nan] = inf 433 dy[where(isnan(dy))[0]] = inf 434 435 indexes = where((hstack((dy, 0)) > 0) & (hstack((0, dy)) < 0))[0] 436 437 return mz[indexes], abund[indexes]
Find the peak valley.
Parameters
- mz (array): The mz array.
- abund (array): The abundance array.
Returns
- tuple: A tuple containing the peak valley mz and abundance.
Inherited Members
- threading.Thread
- start
- join
- name
- ident
- is_alive
- daemon
- isDaemon
- setDaemon
- getName
- setName
- native_id