corems.mass_spectra.calc.SignalProcessing
1import numpy as np 2 3from scipy.signal import savgol_filter 4from scipy.signal.windows import boxcar 5from scipy import interpolate 6from matplotlib import pyplot as plt 7from numpy import abs 8from numpy import array, polyfit, asarray 9 10 11def peak_detector(tic, max_tic): # TODO remove max_tic argument? 12 """ 13 Find peaks by detecting minima in the first derivative of the data 14 Used in LC/GC data processing 15 16 Parameters 17 ---------- 18 tic : array 19 array of data points to find the peaks 20 max_tic : float 21 maximum value of the data points 22 23 Returns 24 ------- 25 tuple 26 tuple of indexes of the start, apex and final points of the peak 27 28 """ 29 dy = derivate(tic) 30 31 indexes = np.where((np.hstack((dy, 0)) < 0) & (np.hstack((0, dy)) > 0))[0] 32 33 for index in indexes: 34 start_index = find_minima(index, tic, right=False) 35 final_index = find_minima(index, tic) 36 37 yield (start_index, index, final_index) 38 39 40def find_nearest_scan(data, nodes): 41 """ 42 Find nearest data point in a list of nodes (derivated data) 43 in LC/GC this is 'scan', in MS this is 'm/z' data point 44 45 Parameters 46 ---------- 47 data : float 48 data point to find the nearest node 49 nodes : array 50 array of nodes to search for the nearest node 51 52 Returns 53 ------- 54 float 55 nearest node to the data point 56 """ 57 58 array_data = asarray(nodes) 59 60 scan_index = (abs(array_data - data)).argmin() 61 62 return nodes[scan_index] 63 64 65def check_corrected_abundance( 66 closest_left, 67 closest_right, 68 apex_index, 69 signal, 70 max_signal, 71 signal_threshold, 72 abun_norm, 73): 74 """ 75 Check the corrected abundance of the peak 76 77 Parameters 78 ---------- 79 closest_left : int 80 index of the closest left node 81 closest_right : int 82 index of the closest right node 83 apex_index : int 84 index of the apex node 85 signal : array 86 array of data points to find the peaks 87 max_signal : float 88 maximum value of the data points 89 signal_threshold : float 90 threshold for the signal 91 abun_norm : float 92 abundance normalization factor 93 94 Returns 95 ------- 96 float 97 corrected abundance of the peak 98 99 100 """ 101 x = [closest_left, closest_right] 102 y = [signal[closest_left], signal[closest_right]] 103 104 pol = polyfit(x, y, 1) # TODO replace with faster method in this file 105 106 corrected_peak_height = signal[apex_index] - pol(apex_index) 107 108 if (corrected_peak_height / max_signal) * abun_norm > signal_threshold: 109 return corrected_peak_height 110 else: 111 return False 112 113 114def peak_picking_first_derivative( 115 domain, 116 signal, 117 max_height, 118 max_prominence, 119 max_signal, 120 min_peak_datapoints, 121 peak_derivative_threshold, 122 signal_threshold=0.1, 123 correct_baseline=True, 124 plot_res=False, 125 abun_norm=100, 126 check_abundance=False, 127 apex_indexes=[], 128): 129 """ 130 Find peaks by detecting minima in the first derivative of the data 131 Used in LC/GC and MS data processing 132 Optional baseline correction, then peak apex detection via 1st derivative. 133 For each apex the peak datapoints surrounding the apex are determined. 134 Some basic thresholding is applied (signal, number of datapoints, etc). 135 136 Parameters 137 ---------- 138 domain : array 139 array of data points to find the peaks 140 signal : array 141 array of data points to find the peaks 142 max_height : float 143 maximum height of the peak 144 max_prominence : float 145 maximum prominence of the peak 146 max_signal : float 147 maximum signal of the peak 148 min_peak_datapoints : int 149 minimum number of data points in the peak 150 peak_derivative_threshold : float 151 threshold for the peak derivative 152 signal_threshold : float 153 threshold for the signal 154 correct_baseline : bool 155 flag to correct the baseline 156 plot_res : bool 157 flag to plot the results 158 abun_norm : float 159 abundance normalization factor 160 check_abundance : bool 161 flag to check the abundance 162 163 164 Returns 165 ------- 166 tuple 167 tuple of indexes of the start, apex and final points of the peak 168 169 170 """ 171 if correct_baseline: 172 signal = signal - baseline_detector(signal, domain, max_height, max_prominence) 173 174 domain = np.array(domain) 175 signal = np.array(signal) 176 177 dy = derivate(signal) 178 if len(apex_indexes) == 0: 179 # Find apexes 180 apex_indexes = np.where((np.hstack((dy, 0)) < 0) & (np.hstack((0, dy)) > 0))[0] 181 else: 182 apex_indexes = np.array(apex_indexes) 183 184 if apex_indexes.size and apex_indexes is not None: 185 apex_indexes = apex_indexes[ 186 signal[apex_indexes] / max_signal >= signal_threshold 187 ] 188 189 signal = signal / max(signal) 190 start_peak = [] 191 end_peak = [] 192 193 pos_dy_threshold = peak_derivative_threshold # max(dy) * peak_derivative_threshold 194 neg_dy_threshold = -peak_derivative_threshold # min(dy) * peak_derivative_threshold 195 len_dy = len(dy) 196 # take apex_index and move left to find start 197 for index in apex_indexes: 198 # catch for starting position 199 200 if index == 0: 201 index_start = index 202 else: 203 index_start = index - 1 204 205 # catch for ending position 206 if (index + 1) >= dy.shape[0]: 207 index_end = index - 1 208 else: 209 index_end = index + 1 210 211 # while dy[index_start-1] > 0 and index_start != 0: 212 while dy[index_start - 1] > pos_dy_threshold and index_start > 0: 213 index_start = index_start - 1 214 start_peak.append(index_start) 215 216 # while dy[index_end] < 0 and index_end != (len(dy) - 1): 217 while dy[index_end] < neg_dy_threshold and index_end != (len_dy - 1): 218 index_end = index_end + 1 219 end_peak.append(index_end) 220 221 start_peak = array(start_peak) 222 end_peak = array(end_peak) 223 224 for apex_index in apex_indexes: 225 # index_gt_apex = np.where(end_peak >= apex_index)[0] 226 # index_lt_apex = np.where(start_peak <= apex_index)[0] 227 index_gt_apex = np.arange(np.searchsorted(end_peak, apex_index), len(end_peak)) 228 index_lt_apex = np.arange( 229 0, np.searchsorted(start_peak, apex_index, side="right") 230 ) 231 232 if not index_gt_apex.size == 0 and not index_lt_apex.size == 0: 233 closest_right = find_nearest_scan(apex_index, end_peak[index_gt_apex]) 234 closest_left = find_nearest_scan(apex_index, start_peak[index_lt_apex]) 235 if check_abundance: 236 corrected_peak_height = check_corrected_abundance( 237 closest_left, 238 closest_right, 239 apex_index, 240 signal, 241 max_signal, 242 signal_threshold, 243 abun_norm, 244 ) 245 else: 246 corrected_peak_height = signal[apex_index] 247 248 if (closest_right - closest_left) >= min_peak_datapoints: 249 if plot_res: 250 plt.plot( 251 domain[closest_left : closest_right + 1], 252 dy[closest_left : closest_right + 1], 253 c="red", 254 ) 255 plt.plot( 256 domain[closest_left : closest_right + 1], 257 signal[closest_left : closest_right + 1], 258 c="black", 259 ) 260 plt.title(str((corrected_peak_height / max_signal) * 100)) 261 plt.show() 262 263 yield (closest_left, apex_index, closest_right) 264 265 266def find_minima(index, tic, right=True): 267 """ 268 Find the index of the local minima in the given time-of-flight (TOF) intensity array. 269 270 Parameters: 271 ----------- 272 index: int 273 The starting index to search for the minima. 274 tic: list 275 TIC data points 276 right : bool, optional 277 Determines the direction of the search. If True, search to the right of the index. If False, search to the left of the index. Default is True. 278 279 Returns: 280 -------- 281 int 282 The index of the local minima in the TIC array. 283 """ 284 285 j = index 286 # apex_abundance = tic[index] 287 tic_len = len(tic) 288 289 if right: 290 minima = tic[j] >= tic[j + 1] 291 else: 292 minima = tic[j] >= tic[j - 1] 293 294 while minima: 295 if j == 1 or j == tic_len - 2: 296 break 297 298 if right: 299 j += 1 300 301 minima = tic[j] >= tic[j + 1] 302 303 else: 304 j -= 1 305 minima = tic[j] >= tic[j - 1] 306 307 if right: 308 return j 309 else: 310 return j 311 312 313def derivate(data_array): 314 """ 315 Calculate derivative of the data points. 316 Replaces nan with infinity 317 318 Parameters 319 ---------- 320 data_array : array 321 array of data points 322 323 Returns 324 ------- 325 array 326 array of the derivative of the data points 327 """ 328 data_array = np.array(data_array) 329 330 dy = data_array[1:] - data_array[:-1] 331 332 # replaces nan for infinity 333 indices_nan = np.where(np.isnan(data_array))[0] 334 335 if indices_nan.size: 336 data_array[indices_nan] = np.inf 337 dy[np.where(np.isnan(dy))[0]] = np.inf 338 339 return dy 340 341 342def minima_detector(tic, max_tic, peak_height_max_percent, peak_max_prominence_percent): 343 """ 344 Minima detector for the TIC data points. 345 346 Parameters 347 ---------- 348 tic : array 349 array of data points to find the peaks 350 max_tic : float 351 maximum value of the data points 352 peak_height_max_percent : float 353 maximum height of the peak 354 peak_max_prominence_percent : float 355 maximum prominence of the peak 356 357 Returns 358 ------- 359 generator 360 generator of the indexes of the minima in the TIC array 361 362 """ 363 peak_height_diff = lambda hi, li: ((tic[hi] - tic[li]) / max_tic) * 100 364 365 for start_index, index, final_index in peak_detector(tic, max_tic): 366 # abundance max threshold 367 if (tic[index] / max_tic) * 100 < peak_height_max_percent: 368 # calculates prominence and filter 369 if ( 370 peak_height_diff(index, start_index) 371 and peak_height_diff(index, final_index) < peak_max_prominence_percent 372 ): 373 yield from (start_index, final_index) 374 375 376def baseline_detector( 377 tic, rt, peak_height_max_percent, peak_max_prominence_percent, do_interpolation=True 378): 379 """ 380 Baseline detector for the TIC data points. 381 For LC/GC data processing 382 383 Parameters 384 ---------- 385 tic : array 386 array of data points to find the peaks 387 rt : array 388 array of retention time data points 389 peak_height_max_percent : float 390 maximum height of the peak 391 peak_max_prominence_percent : float 392 maximum prominence of the peak 393 do_interpolation : bool, optional 394 flag to interpolate the data points. Default is True 395 396 Returns 397 ------- 398 array 399 array of the baseline corrected data points 400 401 """ 402 rt = np.array(rt) 403 404 max_tic = max(tic) 405 406 indexes = sorted( 407 list( 408 set( 409 i 410 for i in minima_detector( 411 tic, max_tic, peak_height_max_percent, peak_max_prominence_percent 412 ) 413 ) 414 ) 415 ) 416 417 y = -tic 418 419 x1 = rt[indexes] 420 421 y1 = y[indexes] 422 423 if len(x1) <= 5: 424 return tic 425 426 if not do_interpolation: 427 y0 = np.zeros(tic.shape) 428 y0[indexes] = y[indexes] 429 430 return y0 431 432 else: 433 f1 = interpolate.interp1d(x1, y1, kind="quadratic", fill_value="extrapolate") 434 435 ynew1 = f1(list(rt)) 436 437 # from matplotlib import pyplot as plt 438 # if self.deconv_rt_list and self.deconv_mz == 51: 439 440 # plt.plot(rt, tic-(-1* ynew1), color='green') 441 442 # plt.plot(rt, -1* ynew1, c='black') 443 444 # s = self.smooth(s, 10, 'blackman') 445 446 # plt.plot(self.retention_time, -s) 447 448 # plt.show() 449 450 return -1 * ynew1 451 452 453def peak_detector_generator( 454 tic, stds, method, rt, max_height, min_height, max_prominence, min_datapoints 455): 456 """ 457 Peak detector generator for the TIC data points. 458 459 Parameters 460 ---------- 461 tic : array 462 array of data points to find the peaks 463 stds : float 464 standard deviation 465 method : str 466 method to detect the peaks 467 Available methods: 'manual_relative_abundance', 'auto_relative_abundance', 'second_derivative' 468 rt : array 469 array of retention time data points 470 max_height : float 471 maximum height of the peak 472 min_height : float 473 minimum height of the peak 474 max_prominence : float 475 maximum prominence of the peak 476 min_datapoints : int 477 minimum number of data points in the peak 478 479 Returns 480 ------- 481 generator 482 generator of the indexes of the peaks in the TIC array 483 484 """ 485 max_tic = max(tic) 486 487 if method == "manual_relative_abundance": 488 tic = tic - baseline_detector(tic, rt, max_height, max_prominence) 489 490 norm_tic = (tic / max_tic) * 100 491 492 remove_indexes = np.where(norm_tic < min_height)[0] 493 494 # if self.deconv_rt_list and self.deconv_mz == 51: 495 # plt.plot(self.deconv_rt_list, tic, label=self.deconv_mz) 496 497 elif method == "auto_relative_abundance": 498 tic = tic - baseline_detector(tic, rt, max_height, max_prominence) 499 500 baseline = baseline_detector(tic, rt, max_height, max_prominence) 501 502 peak_detect_threshold = np.nanmean(baseline) + (stds * np.std(baseline)) 503 504 remove_indexes = np.where(tic < peak_detect_threshold)[0] 505 506 elif method == "second_derivative": 507 remove_indexes = second_derivative_threshold( 508 tic, stds, rt, max_height, max_prominence 509 ) 510 511 else: 512 NotImplemented(method) 513 514 peak_height_diff = lambda hi, li: ((tic[hi] - tic[li]) / max_tic) * 100 515 516 dy = derivate(tic) 517 518 include_indexes = np.where((np.hstack((dy, 0)) < 0) & (np.hstack((0, dy)) > 0))[0] 519 520 final_indexes = sorted(set(include_indexes) - set(remove_indexes)) 521 522 # from matplotlib import pyplot as plt 523 524 # plt.plot(self.retention_time, tic, color='black') 525 # plt.scatter(self.retention_time[remove_indexes], tic[remove_indexes], color='red') 526 # plt.scatter(self.retention_time[include_indexes], tic[include_indexes], color='blue') 527 # plt.scatter(self.retention_time[final_indexes], tic[final_indexes], color='blue') 528 529 # plt.show() 530 531 for index in final_indexes: 532 start_index = find_minima(index, tic, right=False) 533 final_index = find_minima(index, tic) 534 535 if final_index - start_index > min_datapoints: 536 # if min( peak_height_diff(index,start_index), peak_height_diff(index,final_index) )> self.chromatogram_settings.peak_min_prominence_percent : 537 538 yield (start_index, index, final_index) 539 540 541def smooth_signal(x, window_len, window, pol_order, implemented_smooth_method): 542 """ 543 Smooth the data using a window with requested size. 544 545 This method is based on the convolution of a scaled window with the signal. 546 The signal is prepared by introducing reflected copies of the signal 547 (with the window size) in both ends so that transient parts are minimized 548 in the begining and end part of the output signal. 549 550 Parameters 551 ---------- 552 x: array 553 the input signal 554 window_len: int 555 the dimension of the smoothing window; should be an odd integer 556 window: str 557 the type of window from 'flat', 'hanning', 'hamming', 'bartlett', 'blackman' 558 pol_order: int 559 the order of the polynomial to fit the data 560 implemented_smooth_method: list 561 list of implemented smoothing methods 562 563 Returns 564 ------- 565 y: array 566 the smoothed signal 567 568 Notes: 569 ----- 570 See also: numpy.hanning, numpy.hamming, numpy.bartlett, numpy.blackman, numpy.convolve 571 scipy.signal.savgol_filter 572 573 """ 574 x = np.array(x) 575 576 if x.ndim != 1: 577 raise ValueError("smooth only accepts 1 dimension arrays.") 578 579 if x.size < window_len: 580 raise ValueError("Input array needs to be bigger than window size") 581 582 # if window_len < 3: 583 # return x 584 585 if not window in implemented_smooth_method: 586 raise ValueError( 587 "Window method should be 'flat', 'hanning', 'hamming', 'bartlett', 'blackman'" 588 ) 589 590 s = np.r_[x[window_len - 1 : 0 : -1], x, x[-1:-window_len:-1]] 591 592 if window == "savgol": 593 return savgol_filter(x, window_len, pol_order) 594 595 elif window == "boxcar": # moving average 596 w = boxcar(window_len) 597 598 y = np.convolve(w, s, mode="valid") 599 600 elif window == "flat": # moving average 601 w = np.ones(window_len, "d") 602 603 y = np.convolve(w / w.sum(), s, mode="valid") 604 605 else: 606 w = eval(window + "(window_len)") 607 608 y = np.convolve(w / w.sum(), s, mode="valid") 609 610 return y[int(window_len / 2 - 1) : int(-window_len / 2)] 611 612 613def second_derivative_threshold( 614 tic, stds, rt, peak_height_max_percent, peak_max_prominence_percent 615): 616 """ 617 Second derivative threshold for the TIC data points. 618 For LC/GC data processing 619 620 Parameters 621 ---------- 622 tic : array 623 array of data points to find the peaks 624 stds : float 625 standard deviation 626 rt : array 627 array of retention time data points 628 peak_height_max_percent : float 629 maximum height of the peak 630 631 Returns 632 ------- 633 array 634 array of the indexes of the data points to remove 635 636 """ 637 638 dy = derivate(tic) 639 640 dydy = derivate(dy) 641 dydy = np.hstack((dydy, 0)) 642 dydy = np.hstack((0, dydy)) 643 644 baseline = baseline_detector( 645 dydy, 646 rt, 647 peak_height_max_percent, 648 peak_max_prominence_percent, 649 do_interpolation=False, 650 ) 651 652 threshold_median = np.median(baseline) - (stds * np.std(baseline)) 653 654 remove_indexes = np.where(dydy > threshold_median)[0] 655 656 return remove_indexes
12def peak_detector(tic, max_tic): # TODO remove max_tic argument? 13 """ 14 Find peaks by detecting minima in the first derivative of the data 15 Used in LC/GC data processing 16 17 Parameters 18 ---------- 19 tic : array 20 array of data points to find the peaks 21 max_tic : float 22 maximum value of the data points 23 24 Returns 25 ------- 26 tuple 27 tuple of indexes of the start, apex and final points of the peak 28 29 """ 30 dy = derivate(tic) 31 32 indexes = np.where((np.hstack((dy, 0)) < 0) & (np.hstack((0, dy)) > 0))[0] 33 34 for index in indexes: 35 start_index = find_minima(index, tic, right=False) 36 final_index = find_minima(index, tic) 37 38 yield (start_index, index, final_index)
Find peaks by detecting minima in the first derivative of the data Used in LC/GC data processing
Parameters
- tic (array): array of data points to find the peaks
- max_tic (float): maximum value of the data points
Returns
- tuple: tuple of indexes of the start, apex and final points of the peak
41def find_nearest_scan(data, nodes): 42 """ 43 Find nearest data point in a list of nodes (derivated data) 44 in LC/GC this is 'scan', in MS this is 'm/z' data point 45 46 Parameters 47 ---------- 48 data : float 49 data point to find the nearest node 50 nodes : array 51 array of nodes to search for the nearest node 52 53 Returns 54 ------- 55 float 56 nearest node to the data point 57 """ 58 59 array_data = asarray(nodes) 60 61 scan_index = (abs(array_data - data)).argmin() 62 63 return nodes[scan_index]
Find nearest data point in a list of nodes (derivated data) in LC/GC this is 'scan', in MS this is 'm/z' data point
Parameters
- data (float): data point to find the nearest node
- nodes (array): array of nodes to search for the nearest node
Returns
- float: nearest node to the data point
66def check_corrected_abundance( 67 closest_left, 68 closest_right, 69 apex_index, 70 signal, 71 max_signal, 72 signal_threshold, 73 abun_norm, 74): 75 """ 76 Check the corrected abundance of the peak 77 78 Parameters 79 ---------- 80 closest_left : int 81 index of the closest left node 82 closest_right : int 83 index of the closest right node 84 apex_index : int 85 index of the apex node 86 signal : array 87 array of data points to find the peaks 88 max_signal : float 89 maximum value of the data points 90 signal_threshold : float 91 threshold for the signal 92 abun_norm : float 93 abundance normalization factor 94 95 Returns 96 ------- 97 float 98 corrected abundance of the peak 99 100 101 """ 102 x = [closest_left, closest_right] 103 y = [signal[closest_left], signal[closest_right]] 104 105 pol = polyfit(x, y, 1) # TODO replace with faster method in this file 106 107 corrected_peak_height = signal[apex_index] - pol(apex_index) 108 109 if (corrected_peak_height / max_signal) * abun_norm > signal_threshold: 110 return corrected_peak_height 111 else: 112 return False
Check the corrected abundance of the peak
Parameters
- closest_left (int): index of the closest left node
- closest_right (int): index of the closest right node
- apex_index (int): index of the apex node
- signal (array): array of data points to find the peaks
- max_signal (float): maximum value of the data points
- signal_threshold (float): threshold for the signal
- abun_norm (float): abundance normalization factor
Returns
- float: corrected abundance of the peak
115def peak_picking_first_derivative( 116 domain, 117 signal, 118 max_height, 119 max_prominence, 120 max_signal, 121 min_peak_datapoints, 122 peak_derivative_threshold, 123 signal_threshold=0.1, 124 correct_baseline=True, 125 plot_res=False, 126 abun_norm=100, 127 check_abundance=False, 128 apex_indexes=[], 129): 130 """ 131 Find peaks by detecting minima in the first derivative of the data 132 Used in LC/GC and MS data processing 133 Optional baseline correction, then peak apex detection via 1st derivative. 134 For each apex the peak datapoints surrounding the apex are determined. 135 Some basic thresholding is applied (signal, number of datapoints, etc). 136 137 Parameters 138 ---------- 139 domain : array 140 array of data points to find the peaks 141 signal : array 142 array of data points to find the peaks 143 max_height : float 144 maximum height of the peak 145 max_prominence : float 146 maximum prominence of the peak 147 max_signal : float 148 maximum signal of the peak 149 min_peak_datapoints : int 150 minimum number of data points in the peak 151 peak_derivative_threshold : float 152 threshold for the peak derivative 153 signal_threshold : float 154 threshold for the signal 155 correct_baseline : bool 156 flag to correct the baseline 157 plot_res : bool 158 flag to plot the results 159 abun_norm : float 160 abundance normalization factor 161 check_abundance : bool 162 flag to check the abundance 163 164 165 Returns 166 ------- 167 tuple 168 tuple of indexes of the start, apex and final points of the peak 169 170 171 """ 172 if correct_baseline: 173 signal = signal - baseline_detector(signal, domain, max_height, max_prominence) 174 175 domain = np.array(domain) 176 signal = np.array(signal) 177 178 dy = derivate(signal) 179 if len(apex_indexes) == 0: 180 # Find apexes 181 apex_indexes = np.where((np.hstack((dy, 0)) < 0) & (np.hstack((0, dy)) > 0))[0] 182 else: 183 apex_indexes = np.array(apex_indexes) 184 185 if apex_indexes.size and apex_indexes is not None: 186 apex_indexes = apex_indexes[ 187 signal[apex_indexes] / max_signal >= signal_threshold 188 ] 189 190 signal = signal / max(signal) 191 start_peak = [] 192 end_peak = [] 193 194 pos_dy_threshold = peak_derivative_threshold # max(dy) * peak_derivative_threshold 195 neg_dy_threshold = -peak_derivative_threshold # min(dy) * peak_derivative_threshold 196 len_dy = len(dy) 197 # take apex_index and move left to find start 198 for index in apex_indexes: 199 # catch for starting position 200 201 if index == 0: 202 index_start = index 203 else: 204 index_start = index - 1 205 206 # catch for ending position 207 if (index + 1) >= dy.shape[0]: 208 index_end = index - 1 209 else: 210 index_end = index + 1 211 212 # while dy[index_start-1] > 0 and index_start != 0: 213 while dy[index_start - 1] > pos_dy_threshold and index_start > 0: 214 index_start = index_start - 1 215 start_peak.append(index_start) 216 217 # while dy[index_end] < 0 and index_end != (len(dy) - 1): 218 while dy[index_end] < neg_dy_threshold and index_end != (len_dy - 1): 219 index_end = index_end + 1 220 end_peak.append(index_end) 221 222 start_peak = array(start_peak) 223 end_peak = array(end_peak) 224 225 for apex_index in apex_indexes: 226 # index_gt_apex = np.where(end_peak >= apex_index)[0] 227 # index_lt_apex = np.where(start_peak <= apex_index)[0] 228 index_gt_apex = np.arange(np.searchsorted(end_peak, apex_index), len(end_peak)) 229 index_lt_apex = np.arange( 230 0, np.searchsorted(start_peak, apex_index, side="right") 231 ) 232 233 if not index_gt_apex.size == 0 and not index_lt_apex.size == 0: 234 closest_right = find_nearest_scan(apex_index, end_peak[index_gt_apex]) 235 closest_left = find_nearest_scan(apex_index, start_peak[index_lt_apex]) 236 if check_abundance: 237 corrected_peak_height = check_corrected_abundance( 238 closest_left, 239 closest_right, 240 apex_index, 241 signal, 242 max_signal, 243 signal_threshold, 244 abun_norm, 245 ) 246 else: 247 corrected_peak_height = signal[apex_index] 248 249 if (closest_right - closest_left) >= min_peak_datapoints: 250 if plot_res: 251 plt.plot( 252 domain[closest_left : closest_right + 1], 253 dy[closest_left : closest_right + 1], 254 c="red", 255 ) 256 plt.plot( 257 domain[closest_left : closest_right + 1], 258 signal[closest_left : closest_right + 1], 259 c="black", 260 ) 261 plt.title(str((corrected_peak_height / max_signal) * 100)) 262 plt.show() 263 264 yield (closest_left, apex_index, closest_right)
Find peaks by detecting minima in the first derivative of the data Used in LC/GC and MS data processing Optional baseline correction, then peak apex detection via 1st derivative. For each apex the peak datapoints surrounding the apex are determined. Some basic thresholding is applied (signal, number of datapoints, etc).
Parameters
- domain (array): array of data points to find the peaks
- signal (array): array of data points to find the peaks
- max_height (float): maximum height of the peak
- max_prominence (float): maximum prominence of the peak
- max_signal (float): maximum signal of the peak
- min_peak_datapoints (int): minimum number of data points in the peak
- peak_derivative_threshold (float): threshold for the peak derivative
- signal_threshold (float): threshold for the signal
- correct_baseline (bool): flag to correct the baseline
- plot_res (bool): flag to plot the results
- abun_norm (float): abundance normalization factor
- check_abundance (bool): flag to check the abundance
Returns
- tuple: tuple of indexes of the start, apex and final points of the peak
267def find_minima(index, tic, right=True): 268 """ 269 Find the index of the local minima in the given time-of-flight (TOF) intensity array. 270 271 Parameters: 272 ----------- 273 index: int 274 The starting index to search for the minima. 275 tic: list 276 TIC data points 277 right : bool, optional 278 Determines the direction of the search. If True, search to the right of the index. If False, search to the left of the index. Default is True. 279 280 Returns: 281 -------- 282 int 283 The index of the local minima in the TIC array. 284 """ 285 286 j = index 287 # apex_abundance = tic[index] 288 tic_len = len(tic) 289 290 if right: 291 minima = tic[j] >= tic[j + 1] 292 else: 293 minima = tic[j] >= tic[j - 1] 294 295 while minima: 296 if j == 1 or j == tic_len - 2: 297 break 298 299 if right: 300 j += 1 301 302 minima = tic[j] >= tic[j + 1] 303 304 else: 305 j -= 1 306 minima = tic[j] >= tic[j - 1] 307 308 if right: 309 return j 310 else: 311 return j
Find the index of the local minima in the given time-of-flight (TOF) intensity array.
Parameters:
index: int The starting index to search for the minima. tic: list TIC data points right : bool, optional Determines the direction of the search. If True, search to the right of the index. If False, search to the left of the index. Default is True.
Returns:
int The index of the local minima in the TIC array.
314def derivate(data_array): 315 """ 316 Calculate derivative of the data points. 317 Replaces nan with infinity 318 319 Parameters 320 ---------- 321 data_array : array 322 array of data points 323 324 Returns 325 ------- 326 array 327 array of the derivative of the data points 328 """ 329 data_array = np.array(data_array) 330 331 dy = data_array[1:] - data_array[:-1] 332 333 # replaces nan for infinity 334 indices_nan = np.where(np.isnan(data_array))[0] 335 336 if indices_nan.size: 337 data_array[indices_nan] = np.inf 338 dy[np.where(np.isnan(dy))[0]] = np.inf 339 340 return dy
Calculate derivative of the data points. Replaces nan with infinity
Parameters
- data_array (array): array of data points
Returns
- array: array of the derivative of the data points
343def minima_detector(tic, max_tic, peak_height_max_percent, peak_max_prominence_percent): 344 """ 345 Minima detector for the TIC data points. 346 347 Parameters 348 ---------- 349 tic : array 350 array of data points to find the peaks 351 max_tic : float 352 maximum value of the data points 353 peak_height_max_percent : float 354 maximum height of the peak 355 peak_max_prominence_percent : float 356 maximum prominence of the peak 357 358 Returns 359 ------- 360 generator 361 generator of the indexes of the minima in the TIC array 362 363 """ 364 peak_height_diff = lambda hi, li: ((tic[hi] - tic[li]) / max_tic) * 100 365 366 for start_index, index, final_index in peak_detector(tic, max_tic): 367 # abundance max threshold 368 if (tic[index] / max_tic) * 100 < peak_height_max_percent: 369 # calculates prominence and filter 370 if ( 371 peak_height_diff(index, start_index) 372 and peak_height_diff(index, final_index) < peak_max_prominence_percent 373 ): 374 yield from (start_index, final_index)
Minima detector for the TIC data points.
Parameters
- tic (array): array of data points to find the peaks
- max_tic (float): maximum value of the data points
- peak_height_max_percent (float): maximum height of the peak
- peak_max_prominence_percent (float): maximum prominence of the peak
Returns
- generator: generator of the indexes of the minima in the TIC array
377def baseline_detector( 378 tic, rt, peak_height_max_percent, peak_max_prominence_percent, do_interpolation=True 379): 380 """ 381 Baseline detector for the TIC data points. 382 For LC/GC data processing 383 384 Parameters 385 ---------- 386 tic : array 387 array of data points to find the peaks 388 rt : array 389 array of retention time data points 390 peak_height_max_percent : float 391 maximum height of the peak 392 peak_max_prominence_percent : float 393 maximum prominence of the peak 394 do_interpolation : bool, optional 395 flag to interpolate the data points. Default is True 396 397 Returns 398 ------- 399 array 400 array of the baseline corrected data points 401 402 """ 403 rt = np.array(rt) 404 405 max_tic = max(tic) 406 407 indexes = sorted( 408 list( 409 set( 410 i 411 for i in minima_detector( 412 tic, max_tic, peak_height_max_percent, peak_max_prominence_percent 413 ) 414 ) 415 ) 416 ) 417 418 y = -tic 419 420 x1 = rt[indexes] 421 422 y1 = y[indexes] 423 424 if len(x1) <= 5: 425 return tic 426 427 if not do_interpolation: 428 y0 = np.zeros(tic.shape) 429 y0[indexes] = y[indexes] 430 431 return y0 432 433 else: 434 f1 = interpolate.interp1d(x1, y1, kind="quadratic", fill_value="extrapolate") 435 436 ynew1 = f1(list(rt)) 437 438 # from matplotlib import pyplot as plt 439 # if self.deconv_rt_list and self.deconv_mz == 51: 440 441 # plt.plot(rt, tic-(-1* ynew1), color='green') 442 443 # plt.plot(rt, -1* ynew1, c='black') 444 445 # s = self.smooth(s, 10, 'blackman') 446 447 # plt.plot(self.retention_time, -s) 448 449 # plt.show() 450 451 return -1 * ynew1
Baseline detector for the TIC data points. For LC/GC data processing
Parameters
- tic (array): array of data points to find the peaks
- rt (array): array of retention time data points
- peak_height_max_percent (float): maximum height of the peak
- peak_max_prominence_percent (float): maximum prominence of the peak
- do_interpolation (bool, optional): flag to interpolate the data points. Default is True
Returns
- array: array of the baseline corrected data points
454def peak_detector_generator( 455 tic, stds, method, rt, max_height, min_height, max_prominence, min_datapoints 456): 457 """ 458 Peak detector generator for the TIC data points. 459 460 Parameters 461 ---------- 462 tic : array 463 array of data points to find the peaks 464 stds : float 465 standard deviation 466 method : str 467 method to detect the peaks 468 Available methods: 'manual_relative_abundance', 'auto_relative_abundance', 'second_derivative' 469 rt : array 470 array of retention time data points 471 max_height : float 472 maximum height of the peak 473 min_height : float 474 minimum height of the peak 475 max_prominence : float 476 maximum prominence of the peak 477 min_datapoints : int 478 minimum number of data points in the peak 479 480 Returns 481 ------- 482 generator 483 generator of the indexes of the peaks in the TIC array 484 485 """ 486 max_tic = max(tic) 487 488 if method == "manual_relative_abundance": 489 tic = tic - baseline_detector(tic, rt, max_height, max_prominence) 490 491 norm_tic = (tic / max_tic) * 100 492 493 remove_indexes = np.where(norm_tic < min_height)[0] 494 495 # if self.deconv_rt_list and self.deconv_mz == 51: 496 # plt.plot(self.deconv_rt_list, tic, label=self.deconv_mz) 497 498 elif method == "auto_relative_abundance": 499 tic = tic - baseline_detector(tic, rt, max_height, max_prominence) 500 501 baseline = baseline_detector(tic, rt, max_height, max_prominence) 502 503 peak_detect_threshold = np.nanmean(baseline) + (stds * np.std(baseline)) 504 505 remove_indexes = np.where(tic < peak_detect_threshold)[0] 506 507 elif method == "second_derivative": 508 remove_indexes = second_derivative_threshold( 509 tic, stds, rt, max_height, max_prominence 510 ) 511 512 else: 513 NotImplemented(method) 514 515 peak_height_diff = lambda hi, li: ((tic[hi] - tic[li]) / max_tic) * 100 516 517 dy = derivate(tic) 518 519 include_indexes = np.where((np.hstack((dy, 0)) < 0) & (np.hstack((0, dy)) > 0))[0] 520 521 final_indexes = sorted(set(include_indexes) - set(remove_indexes)) 522 523 # from matplotlib import pyplot as plt 524 525 # plt.plot(self.retention_time, tic, color='black') 526 # plt.scatter(self.retention_time[remove_indexes], tic[remove_indexes], color='red') 527 # plt.scatter(self.retention_time[include_indexes], tic[include_indexes], color='blue') 528 # plt.scatter(self.retention_time[final_indexes], tic[final_indexes], color='blue') 529 530 # plt.show() 531 532 for index in final_indexes: 533 start_index = find_minima(index, tic, right=False) 534 final_index = find_minima(index, tic) 535 536 if final_index - start_index > min_datapoints: 537 # if min( peak_height_diff(index,start_index), peak_height_diff(index,final_index) )> self.chromatogram_settings.peak_min_prominence_percent : 538 539 yield (start_index, index, final_index)
Peak detector generator for the TIC data points.
Parameters
- tic (array): array of data points to find the peaks
- stds (float): standard deviation
- method (str): method to detect the peaks Available methods: 'manual_relative_abundance', 'auto_relative_abundance', 'second_derivative'
- rt (array): array of retention time data points
- max_height (float): maximum height of the peak
- min_height (float): minimum height of the peak
- max_prominence (float): maximum prominence of the peak
- min_datapoints (int): minimum number of data points in the peak
Returns
- generator: generator of the indexes of the peaks in the TIC array
542def smooth_signal(x, window_len, window, pol_order, implemented_smooth_method): 543 """ 544 Smooth the data using a window with requested size. 545 546 This method is based on the convolution of a scaled window with the signal. 547 The signal is prepared by introducing reflected copies of the signal 548 (with the window size) in both ends so that transient parts are minimized 549 in the begining and end part of the output signal. 550 551 Parameters 552 ---------- 553 x: array 554 the input signal 555 window_len: int 556 the dimension of the smoothing window; should be an odd integer 557 window: str 558 the type of window from 'flat', 'hanning', 'hamming', 'bartlett', 'blackman' 559 pol_order: int 560 the order of the polynomial to fit the data 561 implemented_smooth_method: list 562 list of implemented smoothing methods 563 564 Returns 565 ------- 566 y: array 567 the smoothed signal 568 569 Notes: 570 ----- 571 See also: numpy.hanning, numpy.hamming, numpy.bartlett, numpy.blackman, numpy.convolve 572 scipy.signal.savgol_filter 573 574 """ 575 x = np.array(x) 576 577 if x.ndim != 1: 578 raise ValueError("smooth only accepts 1 dimension arrays.") 579 580 if x.size < window_len: 581 raise ValueError("Input array needs to be bigger than window size") 582 583 # if window_len < 3: 584 # return x 585 586 if not window in implemented_smooth_method: 587 raise ValueError( 588 "Window method should be 'flat', 'hanning', 'hamming', 'bartlett', 'blackman'" 589 ) 590 591 s = np.r_[x[window_len - 1 : 0 : -1], x, x[-1:-window_len:-1]] 592 593 if window == "savgol": 594 return savgol_filter(x, window_len, pol_order) 595 596 elif window == "boxcar": # moving average 597 w = boxcar(window_len) 598 599 y = np.convolve(w, s, mode="valid") 600 601 elif window == "flat": # moving average 602 w = np.ones(window_len, "d") 603 604 y = np.convolve(w / w.sum(), s, mode="valid") 605 606 else: 607 w = eval(window + "(window_len)") 608 609 y = np.convolve(w / w.sum(), s, mode="valid") 610 611 return y[int(window_len / 2 - 1) : int(-window_len / 2)]
Smooth the data using a window with requested size.
This method is based on the convolution of a scaled window with the signal. The signal is prepared by introducing reflected copies of the signal (with the window size) in both ends so that transient parts are minimized in the begining and end part of the output signal.
Parameters
- x (array): the input signal
- window_len (int): the dimension of the smoothing window; should be an odd integer
- window (str): the type of window from 'flat', 'hanning', 'hamming', 'bartlett', 'blackman'
- pol_order (int): the order of the polynomial to fit the data
- implemented_smooth_method (list): list of implemented smoothing methods
Returns
- y (array): the smoothed signal
- Notes:
- -----
See also (numpy.hanning, numpy.hamming, numpy.bartlett, numpy.blackman, numpy.convolve):
scipy.signal.savgol_filter
614def second_derivative_threshold( 615 tic, stds, rt, peak_height_max_percent, peak_max_prominence_percent 616): 617 """ 618 Second derivative threshold for the TIC data points. 619 For LC/GC data processing 620 621 Parameters 622 ---------- 623 tic : array 624 array of data points to find the peaks 625 stds : float 626 standard deviation 627 rt : array 628 array of retention time data points 629 peak_height_max_percent : float 630 maximum height of the peak 631 632 Returns 633 ------- 634 array 635 array of the indexes of the data points to remove 636 637 """ 638 639 dy = derivate(tic) 640 641 dydy = derivate(dy) 642 dydy = np.hstack((dydy, 0)) 643 dydy = np.hstack((0, dydy)) 644 645 baseline = baseline_detector( 646 dydy, 647 rt, 648 peak_height_max_percent, 649 peak_max_prominence_percent, 650 do_interpolation=False, 651 ) 652 653 threshold_median = np.median(baseline) - (stds * np.std(baseline)) 654 655 remove_indexes = np.where(dydy > threshold_median)[0] 656 657 return remove_indexes
Second derivative threshold for the TIC data points. For LC/GC data processing
Parameters
- tic (array): array of data points to find the peaks
- stds (float): standard deviation
- rt (array): array of retention time data points
- peak_height_max_percent (float): maximum height of the peak
Returns
- array: array of the indexes of the data points to remove