corems.mass_spectrum.calc.PeakPicking
@author: Yuri E. Corilo @date: Jun 27, 2019
1""" 2@author: Yuri E. Corilo 3@date: Jun 27, 2019 4""" 5 6import warnings 7from numpy import ( 8 hstack, 9 inf, 10 isnan, 11 where, 12 array, 13 polyfit, 14 nan, 15 pad, 16 zeros, 17 searchsorted, 18) 19from corems.encapsulation.constant import Labels 20from corems.mass_spectra.calc import SignalProcessing as sp 21 22 23class PeakPicking: 24 """Class for peak picking. 25 26 Parameters 27 ---------- 28 None 29 30 Attributes 31 ---------- 32 None 33 34 Methods 35 ------- 36 * prepare_peak_picking_data(). 37 Prepare the mz, abundance, and frequence data for peak picking. 38 * cut_mz_domain_peak_picking(). 39 Cut the m/z domain for peak picking. 40 * extrapolate_axes_for_pp(mz=None, abund=None, freq=None). 41 Extrapolate the m/z axis and fill the abundance axis with 0s. 42 * do_peak_picking(). 43 Perform peak picking. 44 * find_minima(apex_index, abundance, len_abundance, right=True). 45 Find the minima of a peak. 46 * linear_fit_calc(intes, massa, index_term, index_sign). 47 Algebraic solution to a linear fit. 48 * calculate_resolving_power(intes, massa, current_index). 49 Calculate the resolving power of a peak. 50 * cal_minima(mass, abun). 51 Calculate the minima of a peak. 52 * calc_centroid(mass, abund, freq). 53 Calculate the centroid of a peak. 54 * get_threshold(intes). 55 Get the intensity threshold for peak picking. 56 * algebraic_quadratic(list_mass, list_y). 57 Find the apex of a peak - algebraically. 58 * find_apex_fit_quadratic(mass, abund, freq, current_index). 59 Find the apex of a peak. 60 * check_prominence(abun, current_index, len_abundance, peak_height_diff). 61 Check the prominence of a peak. 62 * use_the_max(mass, abund, current_index, len_abundance, peak_height_diff). 63 Use the max peak height as the centroid. 64 * calc_centroid_legacy(mass, abund, freq). 65 Legacy centroid calculation. Deprecated - for deletion. 66 67 """ 68 69 def prepare_peak_picking_data(self): 70 """Prepare the data for peak picking. 71 72 This function will prepare the m/z, abundance, and frequency data for peak picking according to the settings. 73 74 Returns 75 ------- 76 mz : ndarray 77 The m/z axis. 78 abundance : ndarray 79 The abundance axis. 80 freq : ndarray or None 81 The frequency axis, if available. 82 """ 83 # First apply cut_mz_domain_peak_picking 84 mz, abundance, freq = self.cut_mz_domain_peak_picking() 85 86 # Then extrapolate the axes for peak picking 87 if self.settings.picking_point_extrapolate > 0: 88 mz, abundance, freq = self.extrapolate_axes_for_pp(mz, abundance, freq) 89 return mz, abundance, freq 90 91 def cut_mz_domain_peak_picking(self): 92 """ 93 Cut the m/z domain for peak picking. 94 95 Simplified function 96 97 Returns 98 ------- 99 mz_domain_X_low_cutoff : ndarray 100 The m/z values within the specified range. 101 mz_domain_low_Y_cutoff : ndarray 102 The abundance values within the specified range. 103 freq_domain_low_Y_cutoff : ndarray or None 104 The frequency values within the specified range, if available. 105 106 """ 107 max_picking_mz = self.settings.max_picking_mz 108 min_picking_mz = self.settings.min_picking_mz 109 110 # min_start = where(self.mz_exp_profile > min_picking_mz)[0][0] 111 # max_final = where(self.mz_exp_profile < max_picking_mz)[-1][-1] 112 min_start = searchsorted(a=self.mz_exp_profile, v=min_picking_mz) 113 max_final = searchsorted(a=self.mz_exp_profile, v=max_picking_mz) 114 115 if self.has_frequency: 116 if self.freq_exp_profile.any(): 117 return ( 118 self.mz_exp_profile[min_start:max_final], 119 self.abundance_profile[min_start:max_final], 120 self.freq_exp_profile[min_start:max_final], 121 ) 122 123 else: 124 return ( 125 self.mz_exp_profile[min_start:max_final], 126 self.abundance_profile[min_start:max_final], 127 None, 128 ) 129 130 def legacy_cut_mz_domain_peak_picking(self): 131 """ 132 Cut the m/z domain for peak picking. 133 DEPRECATED 134 Returns 135 ------- 136 mz_domain_X_low_cutoff : ndarray 137 The m/z values within the specified range. 138 mz_domain_low_Y_cutoff : ndarray 139 The abundance values within the specified range. 140 freq_domain_low_Y_cutoff : ndarray or None 141 The frequency values within the specified range, if available. 142 143 """ 144 max_picking_mz = self.settings.max_picking_mz 145 min_picking_mz = self.settings.min_picking_mz 146 147 min_final = where(self.mz_exp_profile > min_picking_mz)[-1][-1] 148 min_start = where(self.mz_exp_profile > min_picking_mz)[0][0] 149 150 ( 151 mz_domain_X_low_cutoff, 152 mz_domain_low_Y_cutoff, 153 ) = ( 154 self.mz_exp_profile[min_start:min_final], 155 self.abundance_profile[min_start:min_final], 156 ) 157 158 max_final = where(self.mz_exp_profile < max_picking_mz)[-1][-1] 159 max_start = where(self.mz_exp_profile < max_picking_mz)[0][0] 160 161 if self.has_frequency: 162 if self.freq_exp_profile.any(): 163 freq_domain_low_Y_cutoff = self.freq_exp_profile[min_start:min_final] 164 165 return ( 166 mz_domain_X_low_cutoff[max_start:max_final], 167 mz_domain_low_Y_cutoff[max_start:max_final], 168 freq_domain_low_Y_cutoff[max_start:max_final], 169 ) 170 171 else: 172 return ( 173 mz_domain_X_low_cutoff[max_start:max_final], 174 mz_domain_low_Y_cutoff[max_start:max_final], 175 None, 176 ) 177 178 @staticmethod 179 def extrapolate_axis(initial_array, pts): 180 """ 181 This function will extrapolate an input array in both directions by N pts. 182 183 Parameters 184 ---------- 185 initial_array : ndarray 186 The input array. 187 pts : int 188 The number of points to extrapolate. 189 190 Returns 191 ------- 192 ndarray 193 The extrapolated array. 194 195 Notes 196 -------- 197 This is a static method. 198 """ 199 initial_array_len = len(initial_array) 200 right_delta = initial_array[-1] - initial_array[-2] 201 left_delta = initial_array[1] - initial_array[0] 202 203 # Create an array with extra space for extrapolation 204 pad_array = zeros(initial_array_len + 2 * pts) 205 206 # Copy original array into the middle of the padded array 207 pad_array[pts : pts + initial_array_len] = initial_array 208 209 # Extrapolate the right side 210 for pt in range(pts): 211 final_value = initial_array[-1] 212 value_to_add = right_delta * (pt + 1) 213 new_value = final_value + value_to_add 214 pad_array[initial_array_len + pts + pt] = new_value 215 216 # Extrapolate the left side 217 for pt in range(pts): 218 first_value = initial_array[0] 219 value_to_subtract = left_delta * (pt + 1) 220 new_value = first_value - value_to_subtract 221 pad_array[pts - pt - 1] = new_value 222 223 return pad_array 224 225 def extrapolate_axes_for_pp(self, mz=None, abund=None, freq=None): 226 """Extrapolate the m/z axis and fill the abundance axis with 0s. 227 228 Parameters 229 ---------- 230 mz : ndarray or None 231 The m/z axis, if available. If None, the experimental m/z axis is used. 232 abund : ndarray or None 233 The abundance axis, if available. If None, the experimental abundance axis is used. 234 freq : ndarray or None 235 The frequency axis, if available. If None, the experimental frequency axis is used. 236 237 Returns 238 ------- 239 mz : ndarray 240 The extrapolated m/z axis. 241 abund : ndarray 242 The abundance axis with 0s filled. 243 freq : ndarray or None 244 The extrapolated frequency axis, if available. 245 246 Notes 247 -------- 248 This function will extrapolate the mz axis by the number of datapoints specified in the settings, 249 and fill the abundance axis with 0s. 250 This should prevent peak picking issues at the spectrum edge. 251 252 """ 253 # Check if the input arrays are provided 254 if mz is None or abund is None: 255 mz, abund = self.mz_exp_profile, self.abundance_profile 256 if self.has_frequency: 257 freq = self.freq_exp_profile 258 else: 259 freq = None 260 pts = self.settings.picking_point_extrapolate 261 if pts == 0: 262 return mz, abund, freq 263 264 mz = self.extrapolate_axis(mz, pts) 265 abund = pad(abund, (pts, pts), mode="constant", constant_values=(0, 0)) 266 if freq is not None: 267 freq = self.extrapolate_axis(freq, pts) 268 return mz, abund, freq 269 270 def do_peak_picking(self): 271 """Perform peak picking.""" 272 mz, abundance, freq = self.prepare_peak_picking_data() 273 274 if ( 275 self.label == Labels.bruker_frequency 276 or self.label == Labels.midas_frequency 277 ): 278 self.calc_centroid(mz, abundance, freq) 279 280 elif self.label == Labels.thermo_profile: 281 self.calc_centroid(mz, abundance, freq) 282 283 elif self.label == Labels.bruker_profile: 284 self.calc_centroid(mz, abundance, freq) 285 286 elif self.label == Labels.booster_profile: 287 self.calc_centroid(mz, abundance, freq) 288 289 elif self.label == Labels.simulated_profile: 290 self.calc_centroid(mz, abundance, freq) 291 292 else: 293 raise Exception("Unknow mass spectrum type", self.label) 294 295 def find_minima(self, apex_index, abundance, len_abundance, right=True): 296 """Find the minima of a peak. 297 298 Parameters 299 ---------- 300 apex_index : int 301 The index of the peak apex. 302 abundance : ndarray 303 The abundance values. 304 len_abundance : int 305 The length of the abundance array. 306 right : bool, optional 307 Flag indicating whether to search for minima to the right of the apex (default is True). 308 309 Returns 310 ------- 311 int 312 The index of the minima. 313 314 """ 315 j = apex_index 316 317 if right: 318 minima = abundance[j] > abundance[j + 1] 319 else: 320 minima = abundance[j] > abundance[j - 1] 321 322 while minima: 323 if j == 1 or j == len_abundance - 2: 324 break 325 326 if right: 327 j += 1 328 329 minima = abundance[j] >= abundance[j + 1] 330 331 else: 332 j -= 1 333 minima = abundance[j] >= abundance[j - 1] 334 335 if right: 336 return j 337 else: 338 return j 339 340 @staticmethod 341 def linear_fit_calc(intes, massa, index_term, index_sign): 342 """ 343 Algebraic solution to a linear fit - roughly 25-50x faster than numpy polyfit when passing only two vals and doing a 1st order fit 344 345 Parameters 346 ---------- 347 intes : ndarray 348 The intensity values. 349 massa : ndarray 350 The mass values. 351 index_term : int 352 The index of the current term. 353 index_sign : str 354 The index sign 355 356 Returns 357 ------- 358 ndarray 359 The coefficients of the linear fit. 360 361 Notes 362 -------- 363 This is a static method. 364 """ 365 if index_sign == "+": 366 x1, x2 = massa[index_term], massa[index_term + 1] 367 y1, y2 = intes[index_term], intes[index_term + 1] 368 elif index_sign == "-": 369 x1, x2 = massa[index_term], massa[index_term - 1] 370 y1, y2 = intes[index_term], intes[index_term - 1] 371 else: 372 warnings.warn("error in linear fit calc, unknown index sign") 373 374 # Calculate the slope (m) 375 slope = (y2 - y1) / (x2 - x1) 376 377 # Calculate the intercept (b) 378 intercept = y1 - slope * x1 379 380 # The coefficients array would be [slope, intercept] 381 coefficients = array([slope, intercept]) 382 return coefficients 383 384 def calculate_resolving_power(self, intes, massa, current_index): 385 """Calculate the resolving power of a peak. 386 387 Parameters 388 ---------- 389 intes : ndarray 390 The intensity values. 391 massa : ndarray 392 The mass values. 393 current_index : int 394 The index of the current peak. 395 396 Returns 397 ------- 398 float 399 The resolving power of the peak. 400 401 Notes 402 -------- 403 This is a conservative calculation of resolving power, 404 the peak need to be resolved at least at the half-maximum magnitude, 405 otherwise, the combined full width at half maximum is used to calculate resolving power. 406 407 """ 408 409 peak_height = intes[current_index] 410 target_peak_height = peak_height / 2 411 412 peak_height_minus = peak_height 413 peak_height_plus = peak_height 414 415 # There are issues when a peak is at the high or low limit of a spectrum in finding its local minima and maxima 416 # This solution will return nan for resolving power when a peak is possibly too close to an edge to avoid the issue 417 418 if current_index < 5: 419 warnings.warn("peak at low spectrum edge, returning no resolving power") 420 return nan 421 elif abs(current_index - len(intes)) < 5: 422 warnings.warn("peak at high spectrum edge, returning no resolving power") 423 return nan 424 else: 425 pass 426 427 index_minus = current_index 428 while peak_height_minus >= target_peak_height: 429 index_minus = index_minus - 1 430 if index_minus < 0: 431 warnings.warn( 432 "Res. calc. warning - peak index minus adjacent to spectrum edge \n \ 433 Zeroing the first 5 data points of abundance. Peaks at spectrum edge may be incorrectly reported \n \ 434 Perhaps try to increase picking_point_extrapolate (e.g. to 3)" 435 ) 436 # Pad the first 5 data points with zeros and restart the loop 437 intes[:5] = 0 438 peak_height_minus = target_peak_height 439 index_minus = current_index 440 else: 441 peak_height_minus = intes[index_minus] 442 443 if self.mspeaks_settings.legacy_centroid_polyfit: 444 x = [massa[index_minus], massa[index_minus + 1]] 445 y = [intes[index_minus], intes[index_minus + 1]] 446 coefficients = polyfit(x, y, 1) 447 else: 448 coefficients = self.linear_fit_calc( 449 intes, massa, index_minus, index_sign="+" 450 ) 451 452 a = coefficients[0] 453 b = coefficients[1] 454 if self.mspeaks_settings.legacy_resolving_power: 455 y_intercept = intes[index_minus] + ( 456 (intes[index_minus + 1] - intes[index_minus]) / 2 457 ) 458 else: 459 y_intercept = target_peak_height 460 massa1 = (y_intercept - b) / a 461 462 index_plus = current_index 463 while peak_height_plus >= target_peak_height: 464 index_plus = index_plus + 1 465 466 try: 467 peak_height_plus = intes[index_plus] 468 except IndexError: 469 warnings.warn( 470 "Res. calc. warning - peak index plus adjacent to spectrum edge \n \ 471 Zeroing the last 5 data points of abundance. Peaks at spectrum edge may be incorrectly reported\ 472 Perhaps try to increase picking_point_extrapolate (e.g. to 3)" 473 ) 474 # Pad the first 5 data points with zeros and restart the loop 475 intes[-5:] = 0 476 peak_height_plus = target_peak_height 477 index_plus = current_index 478 479 if self.mspeaks_settings.legacy_centroid_polyfit: 480 x = [massa[index_plus], massa[index_plus - 1]] 481 y = [intes[index_plus], intes[index_plus - 1]] 482 coefficients = polyfit(x, y, 1) 483 else: 484 coefficients = self.linear_fit_calc( 485 intes, massa, index_plus, index_sign="-" 486 ) 487 488 a = coefficients[0] 489 b = coefficients[1] 490 491 if self.mspeaks_settings.legacy_resolving_power: 492 y_intercept = intes[index_plus - 1] + ( 493 (intes[index_plus] - intes[index_plus - 1]) / 2 494 ) 495 else: 496 y_intercept = target_peak_height 497 498 massa2 = (y_intercept - b) / a 499 500 if massa1 > massa2: 501 resolvingpower = massa[current_index] / (massa1 - massa2) 502 503 else: 504 resolvingpower = massa[current_index] / (massa2 - massa1) 505 506 return resolvingpower 507 508 def cal_minima(self, mass, abun): 509 """Calculate the minima of a peak. 510 511 Parameters 512 ---------- 513 mass : ndarray 514 The mass values. 515 abun : ndarray 516 The abundance values. 517 518 Returns 519 ------- 520 ndarray or None 521 The mass values at the minima, if found. 522 523 """ 524 abun = -abun 525 526 dy = abun[1:] - abun[:-1] 527 528 # replaces nan for infinity 529 indices_nan = where(isnan(abun))[0] 530 531 if indices_nan.size: 532 abun[indices_nan] = inf 533 dy[where(isnan(dy))[0]] = inf 534 535 indexes = where((hstack((dy, 0)) < 0) & (hstack((0, dy)) > 0))[0] 536 537 if indexes.size: 538 return mass[indexes], abun[indexes] 539 540 def calc_centroid(self, mass, abund, freq): 541 """Calculate the centroid of a peak. 542 543 Parameters 544 ---------- 545 mass : ndarray 546 The mass values. 547 abund : ndarray 548 The abundance values. 549 freq : ndarray or None 550 The frequency values, if available. 551 552 Returns 553 ------- 554 None 555 556 """ 557 558 max_height = self.mspeaks_settings.peak_height_max_percent 559 max_prominence = self.mspeaks_settings.peak_max_prominence_percent 560 min_peak_datapoints = self.mspeaks_settings.min_peak_datapoints 561 peak_derivative_threshold = self.mspeaks_settings.peak_derivative_threshold 562 max_abun = max(abund) 563 peak_height_diff = lambda hi, li: ((abund[hi] - abund[li]) / max_abun) * 100 564 565 domain = mass 566 signal = abund 567 len_signal = len(signal) 568 569 signal_threshold, factor = self.get_threshold(abund) 570 max_signal = factor 571 572 correct_baseline = False 573 574 include_indexes = sp.peak_picking_first_derivative( 575 domain, 576 signal, 577 max_height, 578 max_prominence, 579 max_signal, 580 min_peak_datapoints, 581 peak_derivative_threshold, 582 signal_threshold=signal_threshold, 583 correct_baseline=correct_baseline, 584 abun_norm=1, 585 plot_res=False, 586 ) 587 588 for indexes_tuple in include_indexes: 589 apex_index = indexes_tuple[1] 590 591 peak_indexes = self.check_prominence( 592 abund, apex_index, len_signal, peak_height_diff 593 ) 594 595 if peak_indexes: 596 mz_exp_centroid, freq_centr, intes_centr = self.find_apex_fit_quadratic( 597 mass, abund, freq, apex_index 598 ) 599 600 if mz_exp_centroid: 601 peak_resolving_power = self.calculate_resolving_power( 602 abund, mass, apex_index 603 ) 604 s2n = intes_centr / self.baseline_noise_std 605 self.add_mspeak( 606 self.polarity, 607 mz_exp_centroid, 608 abund[apex_index], 609 peak_resolving_power, 610 s2n, 611 indexes_tuple, 612 exp_freq=freq_centr, 613 ms_parent=self, 614 ) 615 # pyplot.plot(domain[start_index: final_index + 1], signal[start_index:final_index + 1], c='black') 616 # pyplot.show() 617 618 def get_threshold(self, intes): 619 """Get the intensity threshold for peak picking. 620 621 Parameters 622 ---------- 623 intes : ndarray 624 The intensity values. 625 626 Returns 627 ------- 628 float 629 The intensity threshold. 630 float 631 The factor to multiply the intensity threshold by. 632 """ 633 634 intes = array(intes).astype(float) 635 636 noise_threshold_method = self.settings.noise_threshold_method 637 638 if noise_threshold_method == "minima": 639 if self.is_centroid: 640 warnings.warn( 641 "Auto threshould is disabled for centroid data, returning 0" 642 ) 643 factor = 1 644 abundance_threshold = 1e-20 645 # print(self.settings.noise_threshold_min_std) 646 else: 647 abundance_threshold = self.baseline_noise + ( 648 self.settings.noise_threshold_min_std * self.baseline_noise_std 649 ) 650 factor = 1 651 652 elif noise_threshold_method == "signal_noise": 653 abundance_threshold = self.settings.noise_threshold_min_s2n 654 if self.is_centroid: 655 factor = 1 656 else: 657 factor = self.baseline_noise_std 658 659 elif noise_threshold_method == "relative_abundance": 660 abundance_threshold = self.settings.noise_threshold_min_relative_abundance 661 factor = intes.max() / 100 662 663 elif noise_threshold_method == "absolute_abundance": 664 abundance_threshold = self.settings.noise_threshold_absolute_abundance 665 factor = 1 666 667 elif noise_threshold_method == "log": 668 if self.is_centroid: 669 raise Exception("log noise Not tested for centroid data") 670 abundance_threshold = self.settings.noise_threshold_log_nsigma 671 factor = self.baseline_noise_std 672 673 else: 674 raise Exception( 675 "%s method was not implemented, please refer to corems.mass_spectrum.calc.NoiseCalc Class" 676 % noise_threshold_method 677 ) 678 679 return abundance_threshold, factor 680 681 @staticmethod 682 def algebraic_quadratic(list_mass, list_y): 683 """ 684 Find the apex of a peak - algebraically. 685 Faster than using numpy polyfit by ~28x per fit. 686 687 Parameters 688 ---------- 689 list_mass : ndarray 690 list of m/z values (3 points) 691 list_y : ndarray 692 list of abundance values (3 points) 693 694 Returns 695 ------- 696 a, b, c: float 697 coefficients of the quadratic equation. 698 699 Notes 700 -------- 701 This is a static method. 702 """ 703 x_1, x_2, x_3 = list_mass 704 y_1, y_2, y_3 = list_y 705 706 a = ( 707 y_1 / ((x_1 - x_2) * (x_1 - x_3)) 708 + y_2 / ((x_2 - x_1) * (x_2 - x_3)) 709 + y_3 / ((x_3 - x_1) * (x_3 - x_2)) 710 ) 711 712 b = ( 713 -y_1 * (x_2 + x_3) / ((x_1 - x_2) * (x_1 - x_3)) 714 - y_2 * (x_1 + x_3) / ((x_2 - x_1) * (x_2 - x_3)) 715 - y_3 * (x_1 + x_2) / ((x_3 - x_1) * (x_3 - x_2)) 716 ) 717 718 c = ( 719 y_1 * x_2 * x_3 / ((x_1 - x_2) * (x_1 - x_3)) 720 + y_2 * x_1 * x_3 / ((x_2 - x_1) * (x_2 - x_3)) 721 + y_3 * x_1 * x_2 / ((x_3 - x_1) * (x_3 - x_2)) 722 ) 723 return a, b, c 724 725 def find_apex_fit_quadratic(self, mass, abund, freq, current_index): 726 """ 727 Find the apex of a peak. 728 729 Parameters 730 ---------- 731 mass : ndarray 732 The mass values. 733 abund : ndarray 734 The abundance values. 735 freq : ndarray or None 736 The frequency values, if available. 737 current_index : int 738 The index of the current peak. 739 740 741 Returns 742 ------- 743 float 744 The m/z value of the peak apex. 745 float 746 The frequency value of the peak apex, if available. 747 float 748 The abundance value of the peak apex. 749 750 """ 751 # calc prominence 752 # peak_indexes = self.check_prominence(abund, current_index, len_abundance, peak_height_diff ) 753 754 # if not peak_indexes: 755 756 # return None, None, None, None 757 758 # else: 759 760 # fit parabola to three most abundant datapoints 761 list_mass = [ 762 mass[current_index - 1], 763 mass[current_index], 764 mass[current_index + 1], 765 ] 766 list_y = [ 767 abund[current_index - 1], 768 abund[current_index], 769 abund[current_index + 1], 770 ] 771 772 if self.mspeaks_settings.legacy_centroid_polyfit: 773 z = polyfit(list_mass, list_y, 2) 774 a = z[0] 775 b = z[1] 776 else: 777 a, b, c = self.algebraic_quadratic(list_mass, list_y) 778 779 calculated = -b / (2 * a) 780 781 if calculated < 1 or int(calculated) != int(list_mass[1]): 782 mz_exp_centroid = list_mass[1] 783 784 else: 785 mz_exp_centroid = calculated 786 787 if ( 788 self.label == Labels.bruker_frequency 789 or self.label == Labels.midas_frequency 790 ): 791 # fit parabola to three most abundant frequency datapoints 792 list_freq = [ 793 freq[current_index - 1], 794 freq[current_index], 795 freq[current_index + 1], 796 ] 797 if self.mspeaks_settings.legacy_centroid_polyfit: 798 z = polyfit(list_mass, list_y, 2) 799 a = z[0] 800 b = z[1] 801 else: 802 a, b, c = self.algebraic_quadratic(list_mass, list_y) 803 804 calculated_freq = -b / (2 * a) 805 806 if calculated_freq < 1 or int(calculated_freq) != freq[current_index]: 807 freq_centr = list_freq[1] 808 809 else: 810 freq_centr = calculated_freq 811 812 else: 813 freq_centr = None 814 815 if self.mspeaks_settings.legacy_centroid_polyfit: 816 abundance_centroid = abund[current_index] 817 else: 818 abundance_centroid = a * mz_exp_centroid**2 + b * mz_exp_centroid + c 819 820 return mz_exp_centroid, freq_centr, abundance_centroid 821 822 def check_prominence( 823 self, abun, current_index, len_abundance, peak_height_diff 824 ) -> tuple or False: 825 """Check the prominence of a peak. 826 827 Parameters 828 ---------- 829 abun : ndarray 830 The abundance values. 831 current_index : int 832 The index of the current peak. 833 len_abundance : int 834 The length of the abundance array. 835 peak_height_diff : function 836 The function to calculate the peak height difference. 837 838 Returns 839 ------- 840 tuple or False 841 A tuple containing the indexes of the peak, if the prominence is above the threshold. 842 Otherwise, False. 843 844 """ 845 846 final_index = self.find_minima(current_index, abun, len_abundance, right=True) 847 848 start_index = self.find_minima(current_index, abun, len_abundance, right=False) 849 850 peak_indexes = (current_index - 1, current_index, current_index + 1) 851 852 if ( 853 min( 854 peak_height_diff(current_index, start_index), 855 peak_height_diff(current_index, final_index), 856 ) 857 > self.mspeaks_settings.peak_min_prominence_percent 858 ): 859 return peak_indexes 860 861 else: 862 return False 863 864 def use_the_max(self, mass, abund, current_index, len_abundance, peak_height_diff): 865 """Use the max peak height as the centroid 866 867 Parameters 868 ---------- 869 mass : ndarray 870 The mass values. 871 abund : ndarray 872 The abundance values. 873 current_index : int 874 The index of the current peak. 875 len_abundance : int 876 The length of the abundance array. 877 peak_height_diff : function 878 The function to calculate the peak height difference. 879 880 Returns 881 ------- 882 float 883 The m/z value of the peak apex. 884 float 885 The abundance value of the peak apex. 886 tuple or None 887 A tuple containing the indexes of the peak, if the prominence is above the threshold. 888 Otherwise, None. 889 """ 890 891 peak_indexes = self.check_prominence( 892 abund, current_index, len_abundance, peak_height_diff 893 ) 894 895 if not peak_indexes: 896 return None, None, None 897 898 else: 899 return mass[current_index], abund[current_index], peak_indexes 900 901 def calc_centroid_legacy(self, mass, abund, freq): 902 """Legacy centroid calculation 903 Deprecated - for deletion. 904 905 """ 906 warnings.warn( 907 "Legacy centroid calculation is deprecated. Please use the new centroid calculation method." 908 ) 909 pass 910 if False: 911 len_abundance = len(abund) 912 913 max_abundance = max(abund) 914 915 peak_height_diff = ( 916 lambda hi, li: ((abund[hi] - abund[li]) / max_abundance) * 100 917 ) 918 919 abundance_threshold, factor = self.get_threshold(abund) 920 # print(abundance_threshold, factor) 921 # find indices of all peaks 922 dy = abund[1:] - abund[:-1] 923 924 # replaces nan for infi nity 925 indices_nan = where(isnan(abund))[0] 926 927 if indices_nan.size: 928 abund[indices_nan] = inf 929 dy[where(isnan(dy))[0]] = inf 930 931 indexes = where((hstack((dy, 0)) < 0) & (hstack((0, dy)) > 0))[0] 932 933 # noise threshold 934 if indexes.size and abundance_threshold is not None: 935 indexes = indexes[abund[indexes] / factor >= abundance_threshold] 936 # filter out 'peaks' within 3 points of the spectrum limits 937 # remove entries within 3 points of upper limit 938 indexes = [x for x in indexes if (len_abundance - x) > 3] 939 # remove entries within 3 points of zero 940 indexes = [x for x in indexes if x > 3] 941 942 for current_index in indexes: 943 if self.label == Labels.simulated_profile: 944 mz_exp_centroid, intes_centr, peak_indexes = self.use_the_max( 945 mass, abund, current_index, len_abundance, peak_height_diff 946 ) 947 if mz_exp_centroid: 948 peak_resolving_power = self.calculate_resolving_power( 949 abund, mass, current_index 950 ) 951 s2n = intes_centr / self.baseline_noise_std 952 freq_centr = None 953 self.add_mspeak( 954 self.polarity, 955 mz_exp_centroid, 956 abund[current_index], 957 peak_resolving_power, 958 s2n, 959 peak_indexes, 960 exp_freq=freq_centr, 961 ms_parent=self, 962 ) 963 964 else: 965 mz_exp_centroid, freq_centr, intes_centr, peak_indexes = ( 966 self.find_apex_fit_quadratic( 967 mass, 968 abund, 969 freq, 970 current_index, 971 len_abundance, 972 peak_height_diff, 973 ) 974 ) 975 if mz_exp_centroid: 976 try: 977 peak_resolving_power = self.calculate_resolving_power( 978 abund, mass, current_index 979 ) 980 except IndexError: 981 print("index error, skipping peak") 982 continue 983 984 s2n = intes_centr / self.baseline_noise_std 985 self.add_mspeak( 986 self.polarity, 987 mz_exp_centroid, 988 abund[current_index], 989 peak_resolving_power, 990 s2n, 991 peak_indexes, 992 exp_freq=freq_centr, 993 ms_parent=self, 994 )
24class PeakPicking: 25 """Class for peak picking. 26 27 Parameters 28 ---------- 29 None 30 31 Attributes 32 ---------- 33 None 34 35 Methods 36 ------- 37 * prepare_peak_picking_data(). 38 Prepare the mz, abundance, and frequence data for peak picking. 39 * cut_mz_domain_peak_picking(). 40 Cut the m/z domain for peak picking. 41 * extrapolate_axes_for_pp(mz=None, abund=None, freq=None). 42 Extrapolate the m/z axis and fill the abundance axis with 0s. 43 * do_peak_picking(). 44 Perform peak picking. 45 * find_minima(apex_index, abundance, len_abundance, right=True). 46 Find the minima of a peak. 47 * linear_fit_calc(intes, massa, index_term, index_sign). 48 Algebraic solution to a linear fit. 49 * calculate_resolving_power(intes, massa, current_index). 50 Calculate the resolving power of a peak. 51 * cal_minima(mass, abun). 52 Calculate the minima of a peak. 53 * calc_centroid(mass, abund, freq). 54 Calculate the centroid of a peak. 55 * get_threshold(intes). 56 Get the intensity threshold for peak picking. 57 * algebraic_quadratic(list_mass, list_y). 58 Find the apex of a peak - algebraically. 59 * find_apex_fit_quadratic(mass, abund, freq, current_index). 60 Find the apex of a peak. 61 * check_prominence(abun, current_index, len_abundance, peak_height_diff). 62 Check the prominence of a peak. 63 * use_the_max(mass, abund, current_index, len_abundance, peak_height_diff). 64 Use the max peak height as the centroid. 65 * calc_centroid_legacy(mass, abund, freq). 66 Legacy centroid calculation. Deprecated - for deletion. 67 68 """ 69 70 def prepare_peak_picking_data(self): 71 """Prepare the data for peak picking. 72 73 This function will prepare the m/z, abundance, and frequency data for peak picking according to the settings. 74 75 Returns 76 ------- 77 mz : ndarray 78 The m/z axis. 79 abundance : ndarray 80 The abundance axis. 81 freq : ndarray or None 82 The frequency axis, if available. 83 """ 84 # First apply cut_mz_domain_peak_picking 85 mz, abundance, freq = self.cut_mz_domain_peak_picking() 86 87 # Then extrapolate the axes for peak picking 88 if self.settings.picking_point_extrapolate > 0: 89 mz, abundance, freq = self.extrapolate_axes_for_pp(mz, abundance, freq) 90 return mz, abundance, freq 91 92 def cut_mz_domain_peak_picking(self): 93 """ 94 Cut the m/z domain for peak picking. 95 96 Simplified function 97 98 Returns 99 ------- 100 mz_domain_X_low_cutoff : ndarray 101 The m/z values within the specified range. 102 mz_domain_low_Y_cutoff : ndarray 103 The abundance values within the specified range. 104 freq_domain_low_Y_cutoff : ndarray or None 105 The frequency values within the specified range, if available. 106 107 """ 108 max_picking_mz = self.settings.max_picking_mz 109 min_picking_mz = self.settings.min_picking_mz 110 111 # min_start = where(self.mz_exp_profile > min_picking_mz)[0][0] 112 # max_final = where(self.mz_exp_profile < max_picking_mz)[-1][-1] 113 min_start = searchsorted(a=self.mz_exp_profile, v=min_picking_mz) 114 max_final = searchsorted(a=self.mz_exp_profile, v=max_picking_mz) 115 116 if self.has_frequency: 117 if self.freq_exp_profile.any(): 118 return ( 119 self.mz_exp_profile[min_start:max_final], 120 self.abundance_profile[min_start:max_final], 121 self.freq_exp_profile[min_start:max_final], 122 ) 123 124 else: 125 return ( 126 self.mz_exp_profile[min_start:max_final], 127 self.abundance_profile[min_start:max_final], 128 None, 129 ) 130 131 def legacy_cut_mz_domain_peak_picking(self): 132 """ 133 Cut the m/z domain for peak picking. 134 DEPRECATED 135 Returns 136 ------- 137 mz_domain_X_low_cutoff : ndarray 138 The m/z values within the specified range. 139 mz_domain_low_Y_cutoff : ndarray 140 The abundance values within the specified range. 141 freq_domain_low_Y_cutoff : ndarray or None 142 The frequency values within the specified range, if available. 143 144 """ 145 max_picking_mz = self.settings.max_picking_mz 146 min_picking_mz = self.settings.min_picking_mz 147 148 min_final = where(self.mz_exp_profile > min_picking_mz)[-1][-1] 149 min_start = where(self.mz_exp_profile > min_picking_mz)[0][0] 150 151 ( 152 mz_domain_X_low_cutoff, 153 mz_domain_low_Y_cutoff, 154 ) = ( 155 self.mz_exp_profile[min_start:min_final], 156 self.abundance_profile[min_start:min_final], 157 ) 158 159 max_final = where(self.mz_exp_profile < max_picking_mz)[-1][-1] 160 max_start = where(self.mz_exp_profile < max_picking_mz)[0][0] 161 162 if self.has_frequency: 163 if self.freq_exp_profile.any(): 164 freq_domain_low_Y_cutoff = self.freq_exp_profile[min_start:min_final] 165 166 return ( 167 mz_domain_X_low_cutoff[max_start:max_final], 168 mz_domain_low_Y_cutoff[max_start:max_final], 169 freq_domain_low_Y_cutoff[max_start:max_final], 170 ) 171 172 else: 173 return ( 174 mz_domain_X_low_cutoff[max_start:max_final], 175 mz_domain_low_Y_cutoff[max_start:max_final], 176 None, 177 ) 178 179 @staticmethod 180 def extrapolate_axis(initial_array, pts): 181 """ 182 This function will extrapolate an input array in both directions by N pts. 183 184 Parameters 185 ---------- 186 initial_array : ndarray 187 The input array. 188 pts : int 189 The number of points to extrapolate. 190 191 Returns 192 ------- 193 ndarray 194 The extrapolated array. 195 196 Notes 197 -------- 198 This is a static method. 199 """ 200 initial_array_len = len(initial_array) 201 right_delta = initial_array[-1] - initial_array[-2] 202 left_delta = initial_array[1] - initial_array[0] 203 204 # Create an array with extra space for extrapolation 205 pad_array = zeros(initial_array_len + 2 * pts) 206 207 # Copy original array into the middle of the padded array 208 pad_array[pts : pts + initial_array_len] = initial_array 209 210 # Extrapolate the right side 211 for pt in range(pts): 212 final_value = initial_array[-1] 213 value_to_add = right_delta * (pt + 1) 214 new_value = final_value + value_to_add 215 pad_array[initial_array_len + pts + pt] = new_value 216 217 # Extrapolate the left side 218 for pt in range(pts): 219 first_value = initial_array[0] 220 value_to_subtract = left_delta * (pt + 1) 221 new_value = first_value - value_to_subtract 222 pad_array[pts - pt - 1] = new_value 223 224 return pad_array 225 226 def extrapolate_axes_for_pp(self, mz=None, abund=None, freq=None): 227 """Extrapolate the m/z axis and fill the abundance axis with 0s. 228 229 Parameters 230 ---------- 231 mz : ndarray or None 232 The m/z axis, if available. If None, the experimental m/z axis is used. 233 abund : ndarray or None 234 The abundance axis, if available. If None, the experimental abundance axis is used. 235 freq : ndarray or None 236 The frequency axis, if available. If None, the experimental frequency axis is used. 237 238 Returns 239 ------- 240 mz : ndarray 241 The extrapolated m/z axis. 242 abund : ndarray 243 The abundance axis with 0s filled. 244 freq : ndarray or None 245 The extrapolated frequency axis, if available. 246 247 Notes 248 -------- 249 This function will extrapolate the mz axis by the number of datapoints specified in the settings, 250 and fill the abundance axis with 0s. 251 This should prevent peak picking issues at the spectrum edge. 252 253 """ 254 # Check if the input arrays are provided 255 if mz is None or abund is None: 256 mz, abund = self.mz_exp_profile, self.abundance_profile 257 if self.has_frequency: 258 freq = self.freq_exp_profile 259 else: 260 freq = None 261 pts = self.settings.picking_point_extrapolate 262 if pts == 0: 263 return mz, abund, freq 264 265 mz = self.extrapolate_axis(mz, pts) 266 abund = pad(abund, (pts, pts), mode="constant", constant_values=(0, 0)) 267 if freq is not None: 268 freq = self.extrapolate_axis(freq, pts) 269 return mz, abund, freq 270 271 def do_peak_picking(self): 272 """Perform peak picking.""" 273 mz, abundance, freq = self.prepare_peak_picking_data() 274 275 if ( 276 self.label == Labels.bruker_frequency 277 or self.label == Labels.midas_frequency 278 ): 279 self.calc_centroid(mz, abundance, freq) 280 281 elif self.label == Labels.thermo_profile: 282 self.calc_centroid(mz, abundance, freq) 283 284 elif self.label == Labels.bruker_profile: 285 self.calc_centroid(mz, abundance, freq) 286 287 elif self.label == Labels.booster_profile: 288 self.calc_centroid(mz, abundance, freq) 289 290 elif self.label == Labels.simulated_profile: 291 self.calc_centroid(mz, abundance, freq) 292 293 else: 294 raise Exception("Unknow mass spectrum type", self.label) 295 296 def find_minima(self, apex_index, abundance, len_abundance, right=True): 297 """Find the minima of a peak. 298 299 Parameters 300 ---------- 301 apex_index : int 302 The index of the peak apex. 303 abundance : ndarray 304 The abundance values. 305 len_abundance : int 306 The length of the abundance array. 307 right : bool, optional 308 Flag indicating whether to search for minima to the right of the apex (default is True). 309 310 Returns 311 ------- 312 int 313 The index of the minima. 314 315 """ 316 j = apex_index 317 318 if right: 319 minima = abundance[j] > abundance[j + 1] 320 else: 321 minima = abundance[j] > abundance[j - 1] 322 323 while minima: 324 if j == 1 or j == len_abundance - 2: 325 break 326 327 if right: 328 j += 1 329 330 minima = abundance[j] >= abundance[j + 1] 331 332 else: 333 j -= 1 334 minima = abundance[j] >= abundance[j - 1] 335 336 if right: 337 return j 338 else: 339 return j 340 341 @staticmethod 342 def linear_fit_calc(intes, massa, index_term, index_sign): 343 """ 344 Algebraic solution to a linear fit - roughly 25-50x faster than numpy polyfit when passing only two vals and doing a 1st order fit 345 346 Parameters 347 ---------- 348 intes : ndarray 349 The intensity values. 350 massa : ndarray 351 The mass values. 352 index_term : int 353 The index of the current term. 354 index_sign : str 355 The index sign 356 357 Returns 358 ------- 359 ndarray 360 The coefficients of the linear fit. 361 362 Notes 363 -------- 364 This is a static method. 365 """ 366 if index_sign == "+": 367 x1, x2 = massa[index_term], massa[index_term + 1] 368 y1, y2 = intes[index_term], intes[index_term + 1] 369 elif index_sign == "-": 370 x1, x2 = massa[index_term], massa[index_term - 1] 371 y1, y2 = intes[index_term], intes[index_term - 1] 372 else: 373 warnings.warn("error in linear fit calc, unknown index sign") 374 375 # Calculate the slope (m) 376 slope = (y2 - y1) / (x2 - x1) 377 378 # Calculate the intercept (b) 379 intercept = y1 - slope * x1 380 381 # The coefficients array would be [slope, intercept] 382 coefficients = array([slope, intercept]) 383 return coefficients 384 385 def calculate_resolving_power(self, intes, massa, current_index): 386 """Calculate the resolving power of a peak. 387 388 Parameters 389 ---------- 390 intes : ndarray 391 The intensity values. 392 massa : ndarray 393 The mass values. 394 current_index : int 395 The index of the current peak. 396 397 Returns 398 ------- 399 float 400 The resolving power of the peak. 401 402 Notes 403 -------- 404 This is a conservative calculation of resolving power, 405 the peak need to be resolved at least at the half-maximum magnitude, 406 otherwise, the combined full width at half maximum is used to calculate resolving power. 407 408 """ 409 410 peak_height = intes[current_index] 411 target_peak_height = peak_height / 2 412 413 peak_height_minus = peak_height 414 peak_height_plus = peak_height 415 416 # There are issues when a peak is at the high or low limit of a spectrum in finding its local minima and maxima 417 # This solution will return nan for resolving power when a peak is possibly too close to an edge to avoid the issue 418 419 if current_index < 5: 420 warnings.warn("peak at low spectrum edge, returning no resolving power") 421 return nan 422 elif abs(current_index - len(intes)) < 5: 423 warnings.warn("peak at high spectrum edge, returning no resolving power") 424 return nan 425 else: 426 pass 427 428 index_minus = current_index 429 while peak_height_minus >= target_peak_height: 430 index_minus = index_minus - 1 431 if index_minus < 0: 432 warnings.warn( 433 "Res. calc. warning - peak index minus adjacent to spectrum edge \n \ 434 Zeroing the first 5 data points of abundance. Peaks at spectrum edge may be incorrectly reported \n \ 435 Perhaps try to increase picking_point_extrapolate (e.g. to 3)" 436 ) 437 # Pad the first 5 data points with zeros and restart the loop 438 intes[:5] = 0 439 peak_height_minus = target_peak_height 440 index_minus = current_index 441 else: 442 peak_height_minus = intes[index_minus] 443 444 if self.mspeaks_settings.legacy_centroid_polyfit: 445 x = [massa[index_minus], massa[index_minus + 1]] 446 y = [intes[index_minus], intes[index_minus + 1]] 447 coefficients = polyfit(x, y, 1) 448 else: 449 coefficients = self.linear_fit_calc( 450 intes, massa, index_minus, index_sign="+" 451 ) 452 453 a = coefficients[0] 454 b = coefficients[1] 455 if self.mspeaks_settings.legacy_resolving_power: 456 y_intercept = intes[index_minus] + ( 457 (intes[index_minus + 1] - intes[index_minus]) / 2 458 ) 459 else: 460 y_intercept = target_peak_height 461 massa1 = (y_intercept - b) / a 462 463 index_plus = current_index 464 while peak_height_plus >= target_peak_height: 465 index_plus = index_plus + 1 466 467 try: 468 peak_height_plus = intes[index_plus] 469 except IndexError: 470 warnings.warn( 471 "Res. calc. warning - peak index plus adjacent to spectrum edge \n \ 472 Zeroing the last 5 data points of abundance. Peaks at spectrum edge may be incorrectly reported\ 473 Perhaps try to increase picking_point_extrapolate (e.g. to 3)" 474 ) 475 # Pad the first 5 data points with zeros and restart the loop 476 intes[-5:] = 0 477 peak_height_plus = target_peak_height 478 index_plus = current_index 479 480 if self.mspeaks_settings.legacy_centroid_polyfit: 481 x = [massa[index_plus], massa[index_plus - 1]] 482 y = [intes[index_plus], intes[index_plus - 1]] 483 coefficients = polyfit(x, y, 1) 484 else: 485 coefficients = self.linear_fit_calc( 486 intes, massa, index_plus, index_sign="-" 487 ) 488 489 a = coefficients[0] 490 b = coefficients[1] 491 492 if self.mspeaks_settings.legacy_resolving_power: 493 y_intercept = intes[index_plus - 1] + ( 494 (intes[index_plus] - intes[index_plus - 1]) / 2 495 ) 496 else: 497 y_intercept = target_peak_height 498 499 massa2 = (y_intercept - b) / a 500 501 if massa1 > massa2: 502 resolvingpower = massa[current_index] / (massa1 - massa2) 503 504 else: 505 resolvingpower = massa[current_index] / (massa2 - massa1) 506 507 return resolvingpower 508 509 def cal_minima(self, mass, abun): 510 """Calculate the minima of a peak. 511 512 Parameters 513 ---------- 514 mass : ndarray 515 The mass values. 516 abun : ndarray 517 The abundance values. 518 519 Returns 520 ------- 521 ndarray or None 522 The mass values at the minima, if found. 523 524 """ 525 abun = -abun 526 527 dy = abun[1:] - abun[:-1] 528 529 # replaces nan for infinity 530 indices_nan = where(isnan(abun))[0] 531 532 if indices_nan.size: 533 abun[indices_nan] = inf 534 dy[where(isnan(dy))[0]] = inf 535 536 indexes = where((hstack((dy, 0)) < 0) & (hstack((0, dy)) > 0))[0] 537 538 if indexes.size: 539 return mass[indexes], abun[indexes] 540 541 def calc_centroid(self, mass, abund, freq): 542 """Calculate the centroid of a peak. 543 544 Parameters 545 ---------- 546 mass : ndarray 547 The mass values. 548 abund : ndarray 549 The abundance values. 550 freq : ndarray or None 551 The frequency values, if available. 552 553 Returns 554 ------- 555 None 556 557 """ 558 559 max_height = self.mspeaks_settings.peak_height_max_percent 560 max_prominence = self.mspeaks_settings.peak_max_prominence_percent 561 min_peak_datapoints = self.mspeaks_settings.min_peak_datapoints 562 peak_derivative_threshold = self.mspeaks_settings.peak_derivative_threshold 563 max_abun = max(abund) 564 peak_height_diff = lambda hi, li: ((abund[hi] - abund[li]) / max_abun) * 100 565 566 domain = mass 567 signal = abund 568 len_signal = len(signal) 569 570 signal_threshold, factor = self.get_threshold(abund) 571 max_signal = factor 572 573 correct_baseline = False 574 575 include_indexes = sp.peak_picking_first_derivative( 576 domain, 577 signal, 578 max_height, 579 max_prominence, 580 max_signal, 581 min_peak_datapoints, 582 peak_derivative_threshold, 583 signal_threshold=signal_threshold, 584 correct_baseline=correct_baseline, 585 abun_norm=1, 586 plot_res=False, 587 ) 588 589 for indexes_tuple in include_indexes: 590 apex_index = indexes_tuple[1] 591 592 peak_indexes = self.check_prominence( 593 abund, apex_index, len_signal, peak_height_diff 594 ) 595 596 if peak_indexes: 597 mz_exp_centroid, freq_centr, intes_centr = self.find_apex_fit_quadratic( 598 mass, abund, freq, apex_index 599 ) 600 601 if mz_exp_centroid: 602 peak_resolving_power = self.calculate_resolving_power( 603 abund, mass, apex_index 604 ) 605 s2n = intes_centr / self.baseline_noise_std 606 self.add_mspeak( 607 self.polarity, 608 mz_exp_centroid, 609 abund[apex_index], 610 peak_resolving_power, 611 s2n, 612 indexes_tuple, 613 exp_freq=freq_centr, 614 ms_parent=self, 615 ) 616 # pyplot.plot(domain[start_index: final_index + 1], signal[start_index:final_index + 1], c='black') 617 # pyplot.show() 618 619 def get_threshold(self, intes): 620 """Get the intensity threshold for peak picking. 621 622 Parameters 623 ---------- 624 intes : ndarray 625 The intensity values. 626 627 Returns 628 ------- 629 float 630 The intensity threshold. 631 float 632 The factor to multiply the intensity threshold by. 633 """ 634 635 intes = array(intes).astype(float) 636 637 noise_threshold_method = self.settings.noise_threshold_method 638 639 if noise_threshold_method == "minima": 640 if self.is_centroid: 641 warnings.warn( 642 "Auto threshould is disabled for centroid data, returning 0" 643 ) 644 factor = 1 645 abundance_threshold = 1e-20 646 # print(self.settings.noise_threshold_min_std) 647 else: 648 abundance_threshold = self.baseline_noise + ( 649 self.settings.noise_threshold_min_std * self.baseline_noise_std 650 ) 651 factor = 1 652 653 elif noise_threshold_method == "signal_noise": 654 abundance_threshold = self.settings.noise_threshold_min_s2n 655 if self.is_centroid: 656 factor = 1 657 else: 658 factor = self.baseline_noise_std 659 660 elif noise_threshold_method == "relative_abundance": 661 abundance_threshold = self.settings.noise_threshold_min_relative_abundance 662 factor = intes.max() / 100 663 664 elif noise_threshold_method == "absolute_abundance": 665 abundance_threshold = self.settings.noise_threshold_absolute_abundance 666 factor = 1 667 668 elif noise_threshold_method == "log": 669 if self.is_centroid: 670 raise Exception("log noise Not tested for centroid data") 671 abundance_threshold = self.settings.noise_threshold_log_nsigma 672 factor = self.baseline_noise_std 673 674 else: 675 raise Exception( 676 "%s method was not implemented, please refer to corems.mass_spectrum.calc.NoiseCalc Class" 677 % noise_threshold_method 678 ) 679 680 return abundance_threshold, factor 681 682 @staticmethod 683 def algebraic_quadratic(list_mass, list_y): 684 """ 685 Find the apex of a peak - algebraically. 686 Faster than using numpy polyfit by ~28x per fit. 687 688 Parameters 689 ---------- 690 list_mass : ndarray 691 list of m/z values (3 points) 692 list_y : ndarray 693 list of abundance values (3 points) 694 695 Returns 696 ------- 697 a, b, c: float 698 coefficients of the quadratic equation. 699 700 Notes 701 -------- 702 This is a static method. 703 """ 704 x_1, x_2, x_3 = list_mass 705 y_1, y_2, y_3 = list_y 706 707 a = ( 708 y_1 / ((x_1 - x_2) * (x_1 - x_3)) 709 + y_2 / ((x_2 - x_1) * (x_2 - x_3)) 710 + y_3 / ((x_3 - x_1) * (x_3 - x_2)) 711 ) 712 713 b = ( 714 -y_1 * (x_2 + x_3) / ((x_1 - x_2) * (x_1 - x_3)) 715 - y_2 * (x_1 + x_3) / ((x_2 - x_1) * (x_2 - x_3)) 716 - y_3 * (x_1 + x_2) / ((x_3 - x_1) * (x_3 - x_2)) 717 ) 718 719 c = ( 720 y_1 * x_2 * x_3 / ((x_1 - x_2) * (x_1 - x_3)) 721 + y_2 * x_1 * x_3 / ((x_2 - x_1) * (x_2 - x_3)) 722 + y_3 * x_1 * x_2 / ((x_3 - x_1) * (x_3 - x_2)) 723 ) 724 return a, b, c 725 726 def find_apex_fit_quadratic(self, mass, abund, freq, current_index): 727 """ 728 Find the apex of a peak. 729 730 Parameters 731 ---------- 732 mass : ndarray 733 The mass values. 734 abund : ndarray 735 The abundance values. 736 freq : ndarray or None 737 The frequency values, if available. 738 current_index : int 739 The index of the current peak. 740 741 742 Returns 743 ------- 744 float 745 The m/z value of the peak apex. 746 float 747 The frequency value of the peak apex, if available. 748 float 749 The abundance value of the peak apex. 750 751 """ 752 # calc prominence 753 # peak_indexes = self.check_prominence(abund, current_index, len_abundance, peak_height_diff ) 754 755 # if not peak_indexes: 756 757 # return None, None, None, None 758 759 # else: 760 761 # fit parabola to three most abundant datapoints 762 list_mass = [ 763 mass[current_index - 1], 764 mass[current_index], 765 mass[current_index + 1], 766 ] 767 list_y = [ 768 abund[current_index - 1], 769 abund[current_index], 770 abund[current_index + 1], 771 ] 772 773 if self.mspeaks_settings.legacy_centroid_polyfit: 774 z = polyfit(list_mass, list_y, 2) 775 a = z[0] 776 b = z[1] 777 else: 778 a, b, c = self.algebraic_quadratic(list_mass, list_y) 779 780 calculated = -b / (2 * a) 781 782 if calculated < 1 or int(calculated) != int(list_mass[1]): 783 mz_exp_centroid = list_mass[1] 784 785 else: 786 mz_exp_centroid = calculated 787 788 if ( 789 self.label == Labels.bruker_frequency 790 or self.label == Labels.midas_frequency 791 ): 792 # fit parabola to three most abundant frequency datapoints 793 list_freq = [ 794 freq[current_index - 1], 795 freq[current_index], 796 freq[current_index + 1], 797 ] 798 if self.mspeaks_settings.legacy_centroid_polyfit: 799 z = polyfit(list_mass, list_y, 2) 800 a = z[0] 801 b = z[1] 802 else: 803 a, b, c = self.algebraic_quadratic(list_mass, list_y) 804 805 calculated_freq = -b / (2 * a) 806 807 if calculated_freq < 1 or int(calculated_freq) != freq[current_index]: 808 freq_centr = list_freq[1] 809 810 else: 811 freq_centr = calculated_freq 812 813 else: 814 freq_centr = None 815 816 if self.mspeaks_settings.legacy_centroid_polyfit: 817 abundance_centroid = abund[current_index] 818 else: 819 abundance_centroid = a * mz_exp_centroid**2 + b * mz_exp_centroid + c 820 821 return mz_exp_centroid, freq_centr, abundance_centroid 822 823 def check_prominence( 824 self, abun, current_index, len_abundance, peak_height_diff 825 ) -> tuple or False: 826 """Check the prominence of a peak. 827 828 Parameters 829 ---------- 830 abun : ndarray 831 The abundance values. 832 current_index : int 833 The index of the current peak. 834 len_abundance : int 835 The length of the abundance array. 836 peak_height_diff : function 837 The function to calculate the peak height difference. 838 839 Returns 840 ------- 841 tuple or False 842 A tuple containing the indexes of the peak, if the prominence is above the threshold. 843 Otherwise, False. 844 845 """ 846 847 final_index = self.find_minima(current_index, abun, len_abundance, right=True) 848 849 start_index = self.find_minima(current_index, abun, len_abundance, right=False) 850 851 peak_indexes = (current_index - 1, current_index, current_index + 1) 852 853 if ( 854 min( 855 peak_height_diff(current_index, start_index), 856 peak_height_diff(current_index, final_index), 857 ) 858 > self.mspeaks_settings.peak_min_prominence_percent 859 ): 860 return peak_indexes 861 862 else: 863 return False 864 865 def use_the_max(self, mass, abund, current_index, len_abundance, peak_height_diff): 866 """Use the max peak height as the centroid 867 868 Parameters 869 ---------- 870 mass : ndarray 871 The mass values. 872 abund : ndarray 873 The abundance values. 874 current_index : int 875 The index of the current peak. 876 len_abundance : int 877 The length of the abundance array. 878 peak_height_diff : function 879 The function to calculate the peak height difference. 880 881 Returns 882 ------- 883 float 884 The m/z value of the peak apex. 885 float 886 The abundance value of the peak apex. 887 tuple or None 888 A tuple containing the indexes of the peak, if the prominence is above the threshold. 889 Otherwise, None. 890 """ 891 892 peak_indexes = self.check_prominence( 893 abund, current_index, len_abundance, peak_height_diff 894 ) 895 896 if not peak_indexes: 897 return None, None, None 898 899 else: 900 return mass[current_index], abund[current_index], peak_indexes 901 902 def calc_centroid_legacy(self, mass, abund, freq): 903 """Legacy centroid calculation 904 Deprecated - for deletion. 905 906 """ 907 warnings.warn( 908 "Legacy centroid calculation is deprecated. Please use the new centroid calculation method." 909 ) 910 pass 911 if False: 912 len_abundance = len(abund) 913 914 max_abundance = max(abund) 915 916 peak_height_diff = ( 917 lambda hi, li: ((abund[hi] - abund[li]) / max_abundance) * 100 918 ) 919 920 abundance_threshold, factor = self.get_threshold(abund) 921 # print(abundance_threshold, factor) 922 # find indices of all peaks 923 dy = abund[1:] - abund[:-1] 924 925 # replaces nan for infi nity 926 indices_nan = where(isnan(abund))[0] 927 928 if indices_nan.size: 929 abund[indices_nan] = inf 930 dy[where(isnan(dy))[0]] = inf 931 932 indexes = where((hstack((dy, 0)) < 0) & (hstack((0, dy)) > 0))[0] 933 934 # noise threshold 935 if indexes.size and abundance_threshold is not None: 936 indexes = indexes[abund[indexes] / factor >= abundance_threshold] 937 # filter out 'peaks' within 3 points of the spectrum limits 938 # remove entries within 3 points of upper limit 939 indexes = [x for x in indexes if (len_abundance - x) > 3] 940 # remove entries within 3 points of zero 941 indexes = [x for x in indexes if x > 3] 942 943 for current_index in indexes: 944 if self.label == Labels.simulated_profile: 945 mz_exp_centroid, intes_centr, peak_indexes = self.use_the_max( 946 mass, abund, current_index, len_abundance, peak_height_diff 947 ) 948 if mz_exp_centroid: 949 peak_resolving_power = self.calculate_resolving_power( 950 abund, mass, current_index 951 ) 952 s2n = intes_centr / self.baseline_noise_std 953 freq_centr = None 954 self.add_mspeak( 955 self.polarity, 956 mz_exp_centroid, 957 abund[current_index], 958 peak_resolving_power, 959 s2n, 960 peak_indexes, 961 exp_freq=freq_centr, 962 ms_parent=self, 963 ) 964 965 else: 966 mz_exp_centroid, freq_centr, intes_centr, peak_indexes = ( 967 self.find_apex_fit_quadratic( 968 mass, 969 abund, 970 freq, 971 current_index, 972 len_abundance, 973 peak_height_diff, 974 ) 975 ) 976 if mz_exp_centroid: 977 try: 978 peak_resolving_power = self.calculate_resolving_power( 979 abund, mass, current_index 980 ) 981 except IndexError: 982 print("index error, skipping peak") 983 continue 984 985 s2n = intes_centr / self.baseline_noise_std 986 self.add_mspeak( 987 self.polarity, 988 mz_exp_centroid, 989 abund[current_index], 990 peak_resolving_power, 991 s2n, 992 peak_indexes, 993 exp_freq=freq_centr, 994 ms_parent=self, 995 )
Class for peak picking.
Parameters
- None
Attributes
- None
Methods
- prepare_peak_picking_data(). Prepare the mz, abundance, and frequence data for peak picking.
- cut_mz_domain_peak_picking(). Cut the m/z domain for peak picking.
- extrapolate_axes_for_pp(mz=None, abund=None, freq=None). Extrapolate the m/z axis and fill the abundance axis with 0s.
- do_peak_picking(). Perform peak picking.
- find_minima(apex_index, abundance, len_abundance, right=True). Find the minima of a peak.
- linear_fit_calc(intes, massa, index_term, index_sign). Algebraic solution to a linear fit.
- calculate_resolving_power(intes, massa, current_index). Calculate the resolving power of a peak.
- cal_minima(mass, abun). Calculate the minima of a peak.
- calc_centroid(mass, abund, freq). Calculate the centroid of a peak.
- get_threshold(intes). Get the intensity threshold for peak picking.
- algebraic_quadratic(list_mass, list_y). Find the apex of a peak - algebraically.
- find_apex_fit_quadratic(mass, abund, freq, current_index). Find the apex of a peak.
- check_prominence(abun, current_index, len_abundance, peak_height_diff). Check the prominence of a peak.
- use_the_max(mass, abund, current_index, len_abundance, peak_height_diff). Use the max peak height as the centroid.
- calc_centroid_legacy(mass, abund, freq). Legacy centroid calculation. Deprecated - for deletion.
70 def prepare_peak_picking_data(self): 71 """Prepare the data for peak picking. 72 73 This function will prepare the m/z, abundance, and frequency data for peak picking according to the settings. 74 75 Returns 76 ------- 77 mz : ndarray 78 The m/z axis. 79 abundance : ndarray 80 The abundance axis. 81 freq : ndarray or None 82 The frequency axis, if available. 83 """ 84 # First apply cut_mz_domain_peak_picking 85 mz, abundance, freq = self.cut_mz_domain_peak_picking() 86 87 # Then extrapolate the axes for peak picking 88 if self.settings.picking_point_extrapolate > 0: 89 mz, abundance, freq = self.extrapolate_axes_for_pp(mz, abundance, freq) 90 return mz, abundance, freq
Prepare the data for peak picking.
This function will prepare the m/z, abundance, and frequency data for peak picking according to the settings.
Returns
- mz (ndarray): The m/z axis.
- abundance (ndarray): The abundance axis.
- freq (ndarray or None): The frequency axis, if available.
92 def cut_mz_domain_peak_picking(self): 93 """ 94 Cut the m/z domain for peak picking. 95 96 Simplified function 97 98 Returns 99 ------- 100 mz_domain_X_low_cutoff : ndarray 101 The m/z values within the specified range. 102 mz_domain_low_Y_cutoff : ndarray 103 The abundance values within the specified range. 104 freq_domain_low_Y_cutoff : ndarray or None 105 The frequency values within the specified range, if available. 106 107 """ 108 max_picking_mz = self.settings.max_picking_mz 109 min_picking_mz = self.settings.min_picking_mz 110 111 # min_start = where(self.mz_exp_profile > min_picking_mz)[0][0] 112 # max_final = where(self.mz_exp_profile < max_picking_mz)[-1][-1] 113 min_start = searchsorted(a=self.mz_exp_profile, v=min_picking_mz) 114 max_final = searchsorted(a=self.mz_exp_profile, v=max_picking_mz) 115 116 if self.has_frequency: 117 if self.freq_exp_profile.any(): 118 return ( 119 self.mz_exp_profile[min_start:max_final], 120 self.abundance_profile[min_start:max_final], 121 self.freq_exp_profile[min_start:max_final], 122 ) 123 124 else: 125 return ( 126 self.mz_exp_profile[min_start:max_final], 127 self.abundance_profile[min_start:max_final], 128 None, 129 )
Cut the m/z domain for peak picking.
Simplified function
Returns
- mz_domain_X_low_cutoff (ndarray): The m/z values within the specified range.
- mz_domain_low_Y_cutoff (ndarray): The abundance values within the specified range.
- freq_domain_low_Y_cutoff (ndarray or None): The frequency values within the specified range, if available.
131 def legacy_cut_mz_domain_peak_picking(self): 132 """ 133 Cut the m/z domain for peak picking. 134 DEPRECATED 135 Returns 136 ------- 137 mz_domain_X_low_cutoff : ndarray 138 The m/z values within the specified range. 139 mz_domain_low_Y_cutoff : ndarray 140 The abundance values within the specified range. 141 freq_domain_low_Y_cutoff : ndarray or None 142 The frequency values within the specified range, if available. 143 144 """ 145 max_picking_mz = self.settings.max_picking_mz 146 min_picking_mz = self.settings.min_picking_mz 147 148 min_final = where(self.mz_exp_profile > min_picking_mz)[-1][-1] 149 min_start = where(self.mz_exp_profile > min_picking_mz)[0][0] 150 151 ( 152 mz_domain_X_low_cutoff, 153 mz_domain_low_Y_cutoff, 154 ) = ( 155 self.mz_exp_profile[min_start:min_final], 156 self.abundance_profile[min_start:min_final], 157 ) 158 159 max_final = where(self.mz_exp_profile < max_picking_mz)[-1][-1] 160 max_start = where(self.mz_exp_profile < max_picking_mz)[0][0] 161 162 if self.has_frequency: 163 if self.freq_exp_profile.any(): 164 freq_domain_low_Y_cutoff = self.freq_exp_profile[min_start:min_final] 165 166 return ( 167 mz_domain_X_low_cutoff[max_start:max_final], 168 mz_domain_low_Y_cutoff[max_start:max_final], 169 freq_domain_low_Y_cutoff[max_start:max_final], 170 ) 171 172 else: 173 return ( 174 mz_domain_X_low_cutoff[max_start:max_final], 175 mz_domain_low_Y_cutoff[max_start:max_final], 176 None, 177 )
Cut the m/z domain for peak picking. DEPRECATED
Returns
- mz_domain_X_low_cutoff (ndarray): The m/z values within the specified range.
- mz_domain_low_Y_cutoff (ndarray): The abundance values within the specified range.
- freq_domain_low_Y_cutoff (ndarray or None): The frequency values within the specified range, if available.
179 @staticmethod 180 def extrapolate_axis(initial_array, pts): 181 """ 182 This function will extrapolate an input array in both directions by N pts. 183 184 Parameters 185 ---------- 186 initial_array : ndarray 187 The input array. 188 pts : int 189 The number of points to extrapolate. 190 191 Returns 192 ------- 193 ndarray 194 The extrapolated array. 195 196 Notes 197 -------- 198 This is a static method. 199 """ 200 initial_array_len = len(initial_array) 201 right_delta = initial_array[-1] - initial_array[-2] 202 left_delta = initial_array[1] - initial_array[0] 203 204 # Create an array with extra space for extrapolation 205 pad_array = zeros(initial_array_len + 2 * pts) 206 207 # Copy original array into the middle of the padded array 208 pad_array[pts : pts + initial_array_len] = initial_array 209 210 # Extrapolate the right side 211 for pt in range(pts): 212 final_value = initial_array[-1] 213 value_to_add = right_delta * (pt + 1) 214 new_value = final_value + value_to_add 215 pad_array[initial_array_len + pts + pt] = new_value 216 217 # Extrapolate the left side 218 for pt in range(pts): 219 first_value = initial_array[0] 220 value_to_subtract = left_delta * (pt + 1) 221 new_value = first_value - value_to_subtract 222 pad_array[pts - pt - 1] = new_value 223 224 return pad_array
This function will extrapolate an input array in both directions by N pts.
Parameters
- initial_array (ndarray): The input array.
- pts (int): The number of points to extrapolate.
Returns
- ndarray: The extrapolated array.
Notes
This is a static method.
226 def extrapolate_axes_for_pp(self, mz=None, abund=None, freq=None): 227 """Extrapolate the m/z axis and fill the abundance axis with 0s. 228 229 Parameters 230 ---------- 231 mz : ndarray or None 232 The m/z axis, if available. If None, the experimental m/z axis is used. 233 abund : ndarray or None 234 The abundance axis, if available. If None, the experimental abundance axis is used. 235 freq : ndarray or None 236 The frequency axis, if available. If None, the experimental frequency axis is used. 237 238 Returns 239 ------- 240 mz : ndarray 241 The extrapolated m/z axis. 242 abund : ndarray 243 The abundance axis with 0s filled. 244 freq : ndarray or None 245 The extrapolated frequency axis, if available. 246 247 Notes 248 -------- 249 This function will extrapolate the mz axis by the number of datapoints specified in the settings, 250 and fill the abundance axis with 0s. 251 This should prevent peak picking issues at the spectrum edge. 252 253 """ 254 # Check if the input arrays are provided 255 if mz is None or abund is None: 256 mz, abund = self.mz_exp_profile, self.abundance_profile 257 if self.has_frequency: 258 freq = self.freq_exp_profile 259 else: 260 freq = None 261 pts = self.settings.picking_point_extrapolate 262 if pts == 0: 263 return mz, abund, freq 264 265 mz = self.extrapolate_axis(mz, pts) 266 abund = pad(abund, (pts, pts), mode="constant", constant_values=(0, 0)) 267 if freq is not None: 268 freq = self.extrapolate_axis(freq, pts) 269 return mz, abund, freq
Extrapolate the m/z axis and fill the abundance axis with 0s.
Parameters
- mz (ndarray or None): The m/z axis, if available. If None, the experimental m/z axis is used.
- abund (ndarray or None): The abundance axis, if available. If None, the experimental abundance axis is used.
- freq (ndarray or None): The frequency axis, if available. If None, the experimental frequency axis is used.
Returns
- mz (ndarray): The extrapolated m/z axis.
- abund (ndarray): The abundance axis with 0s filled.
- freq (ndarray or None): The extrapolated frequency axis, if available.
Notes
This function will extrapolate the mz axis by the number of datapoints specified in the settings, and fill the abundance axis with 0s. This should prevent peak picking issues at the spectrum edge.
271 def do_peak_picking(self): 272 """Perform peak picking.""" 273 mz, abundance, freq = self.prepare_peak_picking_data() 274 275 if ( 276 self.label == Labels.bruker_frequency 277 or self.label == Labels.midas_frequency 278 ): 279 self.calc_centroid(mz, abundance, freq) 280 281 elif self.label == Labels.thermo_profile: 282 self.calc_centroid(mz, abundance, freq) 283 284 elif self.label == Labels.bruker_profile: 285 self.calc_centroid(mz, abundance, freq) 286 287 elif self.label == Labels.booster_profile: 288 self.calc_centroid(mz, abundance, freq) 289 290 elif self.label == Labels.simulated_profile: 291 self.calc_centroid(mz, abundance, freq) 292 293 else: 294 raise Exception("Unknow mass spectrum type", self.label)
Perform peak picking.
296 def find_minima(self, apex_index, abundance, len_abundance, right=True): 297 """Find the minima of a peak. 298 299 Parameters 300 ---------- 301 apex_index : int 302 The index of the peak apex. 303 abundance : ndarray 304 The abundance values. 305 len_abundance : int 306 The length of the abundance array. 307 right : bool, optional 308 Flag indicating whether to search for minima to the right of the apex (default is True). 309 310 Returns 311 ------- 312 int 313 The index of the minima. 314 315 """ 316 j = apex_index 317 318 if right: 319 minima = abundance[j] > abundance[j + 1] 320 else: 321 minima = abundance[j] > abundance[j - 1] 322 323 while minima: 324 if j == 1 or j == len_abundance - 2: 325 break 326 327 if right: 328 j += 1 329 330 minima = abundance[j] >= abundance[j + 1] 331 332 else: 333 j -= 1 334 minima = abundance[j] >= abundance[j - 1] 335 336 if right: 337 return j 338 else: 339 return j
Find the minima of a peak.
Parameters
- apex_index (int): The index of the peak apex.
- abundance (ndarray): The abundance values.
- len_abundance (int): The length of the abundance array.
- right (bool, optional): Flag indicating whether to search for minima to the right of the apex (default is True).
Returns
- int: The index of the minima.
341 @staticmethod 342 def linear_fit_calc(intes, massa, index_term, index_sign): 343 """ 344 Algebraic solution to a linear fit - roughly 25-50x faster than numpy polyfit when passing only two vals and doing a 1st order fit 345 346 Parameters 347 ---------- 348 intes : ndarray 349 The intensity values. 350 massa : ndarray 351 The mass values. 352 index_term : int 353 The index of the current term. 354 index_sign : str 355 The index sign 356 357 Returns 358 ------- 359 ndarray 360 The coefficients of the linear fit. 361 362 Notes 363 -------- 364 This is a static method. 365 """ 366 if index_sign == "+": 367 x1, x2 = massa[index_term], massa[index_term + 1] 368 y1, y2 = intes[index_term], intes[index_term + 1] 369 elif index_sign == "-": 370 x1, x2 = massa[index_term], massa[index_term - 1] 371 y1, y2 = intes[index_term], intes[index_term - 1] 372 else: 373 warnings.warn("error in linear fit calc, unknown index sign") 374 375 # Calculate the slope (m) 376 slope = (y2 - y1) / (x2 - x1) 377 378 # Calculate the intercept (b) 379 intercept = y1 - slope * x1 380 381 # The coefficients array would be [slope, intercept] 382 coefficients = array([slope, intercept]) 383 return coefficients
Algebraic solution to a linear fit - roughly 25-50x faster than numpy polyfit when passing only two vals and doing a 1st order fit
Parameters
- intes (ndarray): The intensity values.
- massa (ndarray): The mass values.
- index_term (int): The index of the current term.
- index_sign (str): The index sign
Returns
- ndarray: The coefficients of the linear fit.
Notes
This is a static method.
385 def calculate_resolving_power(self, intes, massa, current_index): 386 """Calculate the resolving power of a peak. 387 388 Parameters 389 ---------- 390 intes : ndarray 391 The intensity values. 392 massa : ndarray 393 The mass values. 394 current_index : int 395 The index of the current peak. 396 397 Returns 398 ------- 399 float 400 The resolving power of the peak. 401 402 Notes 403 -------- 404 This is a conservative calculation of resolving power, 405 the peak need to be resolved at least at the half-maximum magnitude, 406 otherwise, the combined full width at half maximum is used to calculate resolving power. 407 408 """ 409 410 peak_height = intes[current_index] 411 target_peak_height = peak_height / 2 412 413 peak_height_minus = peak_height 414 peak_height_plus = peak_height 415 416 # There are issues when a peak is at the high or low limit of a spectrum in finding its local minima and maxima 417 # This solution will return nan for resolving power when a peak is possibly too close to an edge to avoid the issue 418 419 if current_index < 5: 420 warnings.warn("peak at low spectrum edge, returning no resolving power") 421 return nan 422 elif abs(current_index - len(intes)) < 5: 423 warnings.warn("peak at high spectrum edge, returning no resolving power") 424 return nan 425 else: 426 pass 427 428 index_minus = current_index 429 while peak_height_minus >= target_peak_height: 430 index_minus = index_minus - 1 431 if index_minus < 0: 432 warnings.warn( 433 "Res. calc. warning - peak index minus adjacent to spectrum edge \n \ 434 Zeroing the first 5 data points of abundance. Peaks at spectrum edge may be incorrectly reported \n \ 435 Perhaps try to increase picking_point_extrapolate (e.g. to 3)" 436 ) 437 # Pad the first 5 data points with zeros and restart the loop 438 intes[:5] = 0 439 peak_height_minus = target_peak_height 440 index_minus = current_index 441 else: 442 peak_height_minus = intes[index_minus] 443 444 if self.mspeaks_settings.legacy_centroid_polyfit: 445 x = [massa[index_minus], massa[index_minus + 1]] 446 y = [intes[index_minus], intes[index_minus + 1]] 447 coefficients = polyfit(x, y, 1) 448 else: 449 coefficients = self.linear_fit_calc( 450 intes, massa, index_minus, index_sign="+" 451 ) 452 453 a = coefficients[0] 454 b = coefficients[1] 455 if self.mspeaks_settings.legacy_resolving_power: 456 y_intercept = intes[index_minus] + ( 457 (intes[index_minus + 1] - intes[index_minus]) / 2 458 ) 459 else: 460 y_intercept = target_peak_height 461 massa1 = (y_intercept - b) / a 462 463 index_plus = current_index 464 while peak_height_plus >= target_peak_height: 465 index_plus = index_plus + 1 466 467 try: 468 peak_height_plus = intes[index_plus] 469 except IndexError: 470 warnings.warn( 471 "Res. calc. warning - peak index plus adjacent to spectrum edge \n \ 472 Zeroing the last 5 data points of abundance. Peaks at spectrum edge may be incorrectly reported\ 473 Perhaps try to increase picking_point_extrapolate (e.g. to 3)" 474 ) 475 # Pad the first 5 data points with zeros and restart the loop 476 intes[-5:] = 0 477 peak_height_plus = target_peak_height 478 index_plus = current_index 479 480 if self.mspeaks_settings.legacy_centroid_polyfit: 481 x = [massa[index_plus], massa[index_plus - 1]] 482 y = [intes[index_plus], intes[index_plus - 1]] 483 coefficients = polyfit(x, y, 1) 484 else: 485 coefficients = self.linear_fit_calc( 486 intes, massa, index_plus, index_sign="-" 487 ) 488 489 a = coefficients[0] 490 b = coefficients[1] 491 492 if self.mspeaks_settings.legacy_resolving_power: 493 y_intercept = intes[index_plus - 1] + ( 494 (intes[index_plus] - intes[index_plus - 1]) / 2 495 ) 496 else: 497 y_intercept = target_peak_height 498 499 massa2 = (y_intercept - b) / a 500 501 if massa1 > massa2: 502 resolvingpower = massa[current_index] / (massa1 - massa2) 503 504 else: 505 resolvingpower = massa[current_index] / (massa2 - massa1) 506 507 return resolvingpower
Calculate the resolving power of a peak.
Parameters
- intes (ndarray): The intensity values.
- massa (ndarray): The mass values.
- current_index (int): The index of the current peak.
Returns
- float: The resolving power of the peak.
Notes
This is a conservative calculation of resolving power, the peak need to be resolved at least at the half-maximum magnitude, otherwise, the combined full width at half maximum is used to calculate resolving power.
509 def cal_minima(self, mass, abun): 510 """Calculate the minima of a peak. 511 512 Parameters 513 ---------- 514 mass : ndarray 515 The mass values. 516 abun : ndarray 517 The abundance values. 518 519 Returns 520 ------- 521 ndarray or None 522 The mass values at the minima, if found. 523 524 """ 525 abun = -abun 526 527 dy = abun[1:] - abun[:-1] 528 529 # replaces nan for infinity 530 indices_nan = where(isnan(abun))[0] 531 532 if indices_nan.size: 533 abun[indices_nan] = inf 534 dy[where(isnan(dy))[0]] = inf 535 536 indexes = where((hstack((dy, 0)) < 0) & (hstack((0, dy)) > 0))[0] 537 538 if indexes.size: 539 return mass[indexes], abun[indexes]
Calculate the minima of a peak.
Parameters
- mass (ndarray): The mass values.
- abun (ndarray): The abundance values.
Returns
- ndarray or None: The mass values at the minima, if found.
541 def calc_centroid(self, mass, abund, freq): 542 """Calculate the centroid of a peak. 543 544 Parameters 545 ---------- 546 mass : ndarray 547 The mass values. 548 abund : ndarray 549 The abundance values. 550 freq : ndarray or None 551 The frequency values, if available. 552 553 Returns 554 ------- 555 None 556 557 """ 558 559 max_height = self.mspeaks_settings.peak_height_max_percent 560 max_prominence = self.mspeaks_settings.peak_max_prominence_percent 561 min_peak_datapoints = self.mspeaks_settings.min_peak_datapoints 562 peak_derivative_threshold = self.mspeaks_settings.peak_derivative_threshold 563 max_abun = max(abund) 564 peak_height_diff = lambda hi, li: ((abund[hi] - abund[li]) / max_abun) * 100 565 566 domain = mass 567 signal = abund 568 len_signal = len(signal) 569 570 signal_threshold, factor = self.get_threshold(abund) 571 max_signal = factor 572 573 correct_baseline = False 574 575 include_indexes = sp.peak_picking_first_derivative( 576 domain, 577 signal, 578 max_height, 579 max_prominence, 580 max_signal, 581 min_peak_datapoints, 582 peak_derivative_threshold, 583 signal_threshold=signal_threshold, 584 correct_baseline=correct_baseline, 585 abun_norm=1, 586 plot_res=False, 587 ) 588 589 for indexes_tuple in include_indexes: 590 apex_index = indexes_tuple[1] 591 592 peak_indexes = self.check_prominence( 593 abund, apex_index, len_signal, peak_height_diff 594 ) 595 596 if peak_indexes: 597 mz_exp_centroid, freq_centr, intes_centr = self.find_apex_fit_quadratic( 598 mass, abund, freq, apex_index 599 ) 600 601 if mz_exp_centroid: 602 peak_resolving_power = self.calculate_resolving_power( 603 abund, mass, apex_index 604 ) 605 s2n = intes_centr / self.baseline_noise_std 606 self.add_mspeak( 607 self.polarity, 608 mz_exp_centroid, 609 abund[apex_index], 610 peak_resolving_power, 611 s2n, 612 indexes_tuple, 613 exp_freq=freq_centr, 614 ms_parent=self, 615 ) 616 # pyplot.plot(domain[start_index: final_index + 1], signal[start_index:final_index + 1], c='black') 617 # pyplot.show()
Calculate the centroid of a peak.
Parameters
- mass (ndarray): The mass values.
- abund (ndarray): The abundance values.
- freq (ndarray or None): The frequency values, if available.
Returns
- None
619 def get_threshold(self, intes): 620 """Get the intensity threshold for peak picking. 621 622 Parameters 623 ---------- 624 intes : ndarray 625 The intensity values. 626 627 Returns 628 ------- 629 float 630 The intensity threshold. 631 float 632 The factor to multiply the intensity threshold by. 633 """ 634 635 intes = array(intes).astype(float) 636 637 noise_threshold_method = self.settings.noise_threshold_method 638 639 if noise_threshold_method == "minima": 640 if self.is_centroid: 641 warnings.warn( 642 "Auto threshould is disabled for centroid data, returning 0" 643 ) 644 factor = 1 645 abundance_threshold = 1e-20 646 # print(self.settings.noise_threshold_min_std) 647 else: 648 abundance_threshold = self.baseline_noise + ( 649 self.settings.noise_threshold_min_std * self.baseline_noise_std 650 ) 651 factor = 1 652 653 elif noise_threshold_method == "signal_noise": 654 abundance_threshold = self.settings.noise_threshold_min_s2n 655 if self.is_centroid: 656 factor = 1 657 else: 658 factor = self.baseline_noise_std 659 660 elif noise_threshold_method == "relative_abundance": 661 abundance_threshold = self.settings.noise_threshold_min_relative_abundance 662 factor = intes.max() / 100 663 664 elif noise_threshold_method == "absolute_abundance": 665 abundance_threshold = self.settings.noise_threshold_absolute_abundance 666 factor = 1 667 668 elif noise_threshold_method == "log": 669 if self.is_centroid: 670 raise Exception("log noise Not tested for centroid data") 671 abundance_threshold = self.settings.noise_threshold_log_nsigma 672 factor = self.baseline_noise_std 673 674 else: 675 raise Exception( 676 "%s method was not implemented, please refer to corems.mass_spectrum.calc.NoiseCalc Class" 677 % noise_threshold_method 678 ) 679 680 return abundance_threshold, factor
Get the intensity threshold for peak picking.
Parameters
- intes (ndarray): The intensity values.
Returns
- float: The intensity threshold.
- float: The factor to multiply the intensity threshold by.
682 @staticmethod 683 def algebraic_quadratic(list_mass, list_y): 684 """ 685 Find the apex of a peak - algebraically. 686 Faster than using numpy polyfit by ~28x per fit. 687 688 Parameters 689 ---------- 690 list_mass : ndarray 691 list of m/z values (3 points) 692 list_y : ndarray 693 list of abundance values (3 points) 694 695 Returns 696 ------- 697 a, b, c: float 698 coefficients of the quadratic equation. 699 700 Notes 701 -------- 702 This is a static method. 703 """ 704 x_1, x_2, x_3 = list_mass 705 y_1, y_2, y_3 = list_y 706 707 a = ( 708 y_1 / ((x_1 - x_2) * (x_1 - x_3)) 709 + y_2 / ((x_2 - x_1) * (x_2 - x_3)) 710 + y_3 / ((x_3 - x_1) * (x_3 - x_2)) 711 ) 712 713 b = ( 714 -y_1 * (x_2 + x_3) / ((x_1 - x_2) * (x_1 - x_3)) 715 - y_2 * (x_1 + x_3) / ((x_2 - x_1) * (x_2 - x_3)) 716 - y_3 * (x_1 + x_2) / ((x_3 - x_1) * (x_3 - x_2)) 717 ) 718 719 c = ( 720 y_1 * x_2 * x_3 / ((x_1 - x_2) * (x_1 - x_3)) 721 + y_2 * x_1 * x_3 / ((x_2 - x_1) * (x_2 - x_3)) 722 + y_3 * x_1 * x_2 / ((x_3 - x_1) * (x_3 - x_2)) 723 ) 724 return a, b, c
Find the apex of a peak - algebraically. Faster than using numpy polyfit by ~28x per fit.
Parameters
- list_mass (ndarray): list of m/z values (3 points)
- list_y (ndarray): list of abundance values (3 points)
Returns
- a, b, c (float): coefficients of the quadratic equation.
Notes
This is a static method.
726 def find_apex_fit_quadratic(self, mass, abund, freq, current_index): 727 """ 728 Find the apex of a peak. 729 730 Parameters 731 ---------- 732 mass : ndarray 733 The mass values. 734 abund : ndarray 735 The abundance values. 736 freq : ndarray or None 737 The frequency values, if available. 738 current_index : int 739 The index of the current peak. 740 741 742 Returns 743 ------- 744 float 745 The m/z value of the peak apex. 746 float 747 The frequency value of the peak apex, if available. 748 float 749 The abundance value of the peak apex. 750 751 """ 752 # calc prominence 753 # peak_indexes = self.check_prominence(abund, current_index, len_abundance, peak_height_diff ) 754 755 # if not peak_indexes: 756 757 # return None, None, None, None 758 759 # else: 760 761 # fit parabola to three most abundant datapoints 762 list_mass = [ 763 mass[current_index - 1], 764 mass[current_index], 765 mass[current_index + 1], 766 ] 767 list_y = [ 768 abund[current_index - 1], 769 abund[current_index], 770 abund[current_index + 1], 771 ] 772 773 if self.mspeaks_settings.legacy_centroid_polyfit: 774 z = polyfit(list_mass, list_y, 2) 775 a = z[0] 776 b = z[1] 777 else: 778 a, b, c = self.algebraic_quadratic(list_mass, list_y) 779 780 calculated = -b / (2 * a) 781 782 if calculated < 1 or int(calculated) != int(list_mass[1]): 783 mz_exp_centroid = list_mass[1] 784 785 else: 786 mz_exp_centroid = calculated 787 788 if ( 789 self.label == Labels.bruker_frequency 790 or self.label == Labels.midas_frequency 791 ): 792 # fit parabola to three most abundant frequency datapoints 793 list_freq = [ 794 freq[current_index - 1], 795 freq[current_index], 796 freq[current_index + 1], 797 ] 798 if self.mspeaks_settings.legacy_centroid_polyfit: 799 z = polyfit(list_mass, list_y, 2) 800 a = z[0] 801 b = z[1] 802 else: 803 a, b, c = self.algebraic_quadratic(list_mass, list_y) 804 805 calculated_freq = -b / (2 * a) 806 807 if calculated_freq < 1 or int(calculated_freq) != freq[current_index]: 808 freq_centr = list_freq[1] 809 810 else: 811 freq_centr = calculated_freq 812 813 else: 814 freq_centr = None 815 816 if self.mspeaks_settings.legacy_centroid_polyfit: 817 abundance_centroid = abund[current_index] 818 else: 819 abundance_centroid = a * mz_exp_centroid**2 + b * mz_exp_centroid + c 820 821 return mz_exp_centroid, freq_centr, abundance_centroid
Find the apex of a peak.
Parameters
- mass (ndarray): The mass values.
- abund (ndarray): The abundance values.
- freq (ndarray or None): The frequency values, if available.
- current_index (int): The index of the current peak.
Returns
- float: The m/z value of the peak apex.
- float: The frequency value of the peak apex, if available.
- float: The abundance value of the peak apex.
823 def check_prominence( 824 self, abun, current_index, len_abundance, peak_height_diff 825 ) -> tuple or False: 826 """Check the prominence of a peak. 827 828 Parameters 829 ---------- 830 abun : ndarray 831 The abundance values. 832 current_index : int 833 The index of the current peak. 834 len_abundance : int 835 The length of the abundance array. 836 peak_height_diff : function 837 The function to calculate the peak height difference. 838 839 Returns 840 ------- 841 tuple or False 842 A tuple containing the indexes of the peak, if the prominence is above the threshold. 843 Otherwise, False. 844 845 """ 846 847 final_index = self.find_minima(current_index, abun, len_abundance, right=True) 848 849 start_index = self.find_minima(current_index, abun, len_abundance, right=False) 850 851 peak_indexes = (current_index - 1, current_index, current_index + 1) 852 853 if ( 854 min( 855 peak_height_diff(current_index, start_index), 856 peak_height_diff(current_index, final_index), 857 ) 858 > self.mspeaks_settings.peak_min_prominence_percent 859 ): 860 return peak_indexes 861 862 else: 863 return False
Check the prominence of a peak.
Parameters
- abun (ndarray): The abundance values.
- current_index (int): The index of the current peak.
- len_abundance (int): The length of the abundance array.
- peak_height_diff (function): The function to calculate the peak height difference.
Returns
- tuple or False: A tuple containing the indexes of the peak, if the prominence is above the threshold. Otherwise, False.
865 def use_the_max(self, mass, abund, current_index, len_abundance, peak_height_diff): 866 """Use the max peak height as the centroid 867 868 Parameters 869 ---------- 870 mass : ndarray 871 The mass values. 872 abund : ndarray 873 The abundance values. 874 current_index : int 875 The index of the current peak. 876 len_abundance : int 877 The length of the abundance array. 878 peak_height_diff : function 879 The function to calculate the peak height difference. 880 881 Returns 882 ------- 883 float 884 The m/z value of the peak apex. 885 float 886 The abundance value of the peak apex. 887 tuple or None 888 A tuple containing the indexes of the peak, if the prominence is above the threshold. 889 Otherwise, None. 890 """ 891 892 peak_indexes = self.check_prominence( 893 abund, current_index, len_abundance, peak_height_diff 894 ) 895 896 if not peak_indexes: 897 return None, None, None 898 899 else: 900 return mass[current_index], abund[current_index], peak_indexes
Use the max peak height as the centroid
Parameters
- mass (ndarray): The mass values.
- abund (ndarray): The abundance values.
- current_index (int): The index of the current peak.
- len_abundance (int): The length of the abundance array.
- peak_height_diff (function): The function to calculate the peak height difference.
Returns
- float: The m/z value of the peak apex.
- float: The abundance value of the peak apex.
- tuple or None: A tuple containing the indexes of the peak, if the prominence is above the threshold. Otherwise, None.
902 def calc_centroid_legacy(self, mass, abund, freq): 903 """Legacy centroid calculation 904 Deprecated - for deletion. 905 906 """ 907 warnings.warn( 908 "Legacy centroid calculation is deprecated. Please use the new centroid calculation method." 909 ) 910 pass 911 if False: 912 len_abundance = len(abund) 913 914 max_abundance = max(abund) 915 916 peak_height_diff = ( 917 lambda hi, li: ((abund[hi] - abund[li]) / max_abundance) * 100 918 ) 919 920 abundance_threshold, factor = self.get_threshold(abund) 921 # print(abundance_threshold, factor) 922 # find indices of all peaks 923 dy = abund[1:] - abund[:-1] 924 925 # replaces nan for infi nity 926 indices_nan = where(isnan(abund))[0] 927 928 if indices_nan.size: 929 abund[indices_nan] = inf 930 dy[where(isnan(dy))[0]] = inf 931 932 indexes = where((hstack((dy, 0)) < 0) & (hstack((0, dy)) > 0))[0] 933 934 # noise threshold 935 if indexes.size and abundance_threshold is not None: 936 indexes = indexes[abund[indexes] / factor >= abundance_threshold] 937 # filter out 'peaks' within 3 points of the spectrum limits 938 # remove entries within 3 points of upper limit 939 indexes = [x for x in indexes if (len_abundance - x) > 3] 940 # remove entries within 3 points of zero 941 indexes = [x for x in indexes if x > 3] 942 943 for current_index in indexes: 944 if self.label == Labels.simulated_profile: 945 mz_exp_centroid, intes_centr, peak_indexes = self.use_the_max( 946 mass, abund, current_index, len_abundance, peak_height_diff 947 ) 948 if mz_exp_centroid: 949 peak_resolving_power = self.calculate_resolving_power( 950 abund, mass, current_index 951 ) 952 s2n = intes_centr / self.baseline_noise_std 953 freq_centr = None 954 self.add_mspeak( 955 self.polarity, 956 mz_exp_centroid, 957 abund[current_index], 958 peak_resolving_power, 959 s2n, 960 peak_indexes, 961 exp_freq=freq_centr, 962 ms_parent=self, 963 ) 964 965 else: 966 mz_exp_centroid, freq_centr, intes_centr, peak_indexes = ( 967 self.find_apex_fit_quadratic( 968 mass, 969 abund, 970 freq, 971 current_index, 972 len_abundance, 973 peak_height_diff, 974 ) 975 ) 976 if mz_exp_centroid: 977 try: 978 peak_resolving_power = self.calculate_resolving_power( 979 abund, mass, current_index 980 ) 981 except IndexError: 982 print("index error, skipping peak") 983 continue 984 985 s2n = intes_centr / self.baseline_noise_std 986 self.add_mspeak( 987 self.polarity, 988 mz_exp_centroid, 989 abund[current_index], 990 peak_resolving_power, 991 s2n, 992 peak_indexes, 993 exp_freq=freq_centr, 994 ms_parent=self, 995 )
Legacy centroid calculation Deprecated - for deletion.