corems.mass_spectra.calc.GC_Deconvolution
1from matplotlib import pyplot as plt 2import numpy as np 3 4from corems.encapsulation.constant import Labels 5from corems.encapsulation.factory.parameters import default_parameters 6from corems.mass_spectrum.factory.MassSpectrumClasses import MassSpecCentroidLowRes 7from corems.chroma_peak.factory.chroma_peak_classes import GCPeakDeconvolved 8from corems.mass_spectra.calc import SignalProcessing as sp 9 10 11class MassDeconvolution: 12 def run_deconvolution(self, plot_res=False): 13 eic_dict = self.ion_extracted_chroma(self._ms) 14 15 peaks_entity_data = self.find_peaks_entity(eic_dict) 16 17 """ select model peaks, create Mass Spectrum objs, GCPeak objs, store results in GC_Class gcpeaks obj""" 18 self.deconvolution(peaks_entity_data, plot_res) 19 20 def centroid_detector(self, tic, rt): 21 """this function has been replaced with sp.peak_picking_first_derivative 22 and it not used 23 """ 24 noise_std = self.chromatogram_settings.std_noise_threshold 25 26 method = self.chromatogram_settings.noise_threshold_method 27 28 """ peak picking""" 29 min_height = self.chromatogram_settings.peak_height_min_percent 30 min_datapoints = self.chromatogram_settings.min_peak_datapoints 31 32 """ baseline detection""" 33 max_prominence = self.chromatogram_settings.peak_max_prominence_percent 34 max_height = self.chromatogram_settings.peak_height_max_percent 35 36 peak_indexes_generator = sp.peak_detector_generator( 37 tic, 38 noise_std, 39 method, 40 rt, 41 max_height, 42 min_height, 43 max_prominence, 44 min_datapoints, 45 ) 46 47 return peak_indexes_generator 48 49 def ion_extracted_chroma(self, mass_spectra_obj): 50 eic_dict = {} 51 52 for scan_number, ms_obj in mass_spectra_obj.items(): 53 mz_list = ms_obj.mz_exp 54 abundance_list = ms_obj.abundance 55 # add list of scan numbers 56 for index, mz in enumerate(mz_list): 57 # dict of mz and tuple (mass spectrum abundances index, and scan number) 58 if mz not in eic_dict.keys(): 59 eic_dict[mz] = [[abundance_list[index]], [ms_obj.retention_time]] 60 61 else: 62 eic_dict[mz][0].append(ms_obj.abundance[index]) 63 eic_dict[mz][1].append(ms_obj.retention_time) 64 65 return eic_dict 66 67 def hc(self, X, Y, max_rt_distance=0.025): 68 from scipy.cluster.hierarchy import dendrogram, linkage 69 from scipy.cluster.hierarchy import fcluster 70 # from matplotlib import pyplot as plt 71 72 Z = linkage(np.reshape(X, (len(X), 1)), method="ward") 73 # Z = linkage(X, method = "ward") 74 75 max_d = max_rt_distance 76 distance_clusters = fcluster(Z, max_d, criterion="distance") 77 # print("distance") 78 # print(distance_clusters) 79 80 # inconsistency_cluster = fcluster(Z, 2, depth=2) 81 # max_cluster = fcluster(Z, 2, criterion='maxclust') 82 83 grouped_rt = {} 84 85 for index_obj, group in enumerate(distance_clusters): 86 if group not in grouped_rt.keys(): 87 grouped_rt[group] = [X[index_obj]] 88 else: 89 grouped_rt[group].append(X[index_obj]) 90 91 # print(distance_clusters, grouped_rt) 92 return grouped_rt 93 94 # plt.figure(figsize=(10, 8)) 95 # plt.scatter(X, Y, c=distance_clusters, cmap='prism') # plot points with cluster dependent colors 96 # plt.show() 97 # labelList = range(int(min(X)), int(max(X))) 98 99 # plt.figure(figsize=(10, 7)) 100 # dendrogram(Z, 101 # orientation='top', 102 # distance_sort='descending', 103 # show_leaf_counts=True) 104 # plt.show() 105 # print(Z) 106 107 def find_peaks_entity(self, eic_dict): 108 """combine eic with mathing rt apexes""" 109 max_prominence = self.chromatogram_settings.peak_max_prominence_percent 110 111 max_height = self.chromatogram_settings.peak_height_max_percent 112 113 signal_threshold = self.chromatogram_settings.eic_signal_threshold 114 115 min_peak_datapoints = self.chromatogram_settings.min_peak_datapoints 116 117 peak_derivative_threshold = self.chromatogram_settings.peak_derivative_threshold 118 119 correct_baseline = False 120 peaks_entity_data = {} 121 122 max_eic = 0 123 for mz, eic_scan_index_rt in eic_dict.items(): 124 ind_max_eic = max(eic_scan_index_rt[0]) 125 max_eic = ind_max_eic if ind_max_eic > max_eic else max_eic 126 127 for mz, eic_scan_index_rt in eic_dict.items(): 128 eic = eic_scan_index_rt[0] 129 rt_list = eic_scan_index_rt[1] 130 131 if len(eic) >= min_peak_datapoints: 132 smooth_eic = self.smooth_tic(eic) 133 134 include_indexes = sp.peak_picking_first_derivative( 135 rt_list, 136 smooth_eic, 137 max_height, 138 max_prominence, 139 max_eic, 140 min_peak_datapoints, 141 peak_derivative_threshold, 142 signal_threshold=signal_threshold, 143 correct_baseline=correct_baseline, 144 ) 145 146 for initial_scan, apex_scan, final_scan in include_indexes: 147 rt_corrected_therm = self.quadratic_interpolation( 148 rt_list, smooth_eic, apex_scan 149 ) 150 151 ref_apex_rt = round(rt_list[apex_scan] + rt_corrected_therm, 4) 152 153 apex_rt = rt_list[apex_scan] 154 # apex_abundance = smooth_eic[apex_scan] 155 156 # maximum_tic = apex_abundance if apex_abundance > maximum_tic else maximum_tic 157 158 for scan_index in range(initial_scan, final_scan): 159 peak_rt = rt_list[scan_index] 160 peak_abundance = smooth_eic[scan_index] 161 162 if peak_abundance > 0: 163 dict_data = { 164 peak_rt: { 165 "mz": [mz], 166 "abundance": [peak_abundance], 167 "scan_number": [scan_index], 168 }, 169 "ref_apex_rt": ref_apex_rt, 170 } 171 172 if apex_rt not in peaks_entity_data.keys(): 173 peaks_entity_data[apex_rt] = dict_data 174 175 else: 176 if peak_rt not in peaks_entity_data[apex_rt].keys(): 177 peaks_entity_data[apex_rt][peak_rt] = dict_data.get( 178 peak_rt 179 ) 180 181 else: 182 existing_data = peaks_entity_data[apex_rt].get( 183 peak_rt 184 ) 185 186 existing_data["mz"].append(mz) 187 existing_data["abundance"].append(peak_abundance) 188 existing_data["scan_number"].append(scan_index) 189 190 return peaks_entity_data 191 192 def mass_spec_factory(self, rt, datadict): 193 # tic = sum(datadict.get('abundance')) 194 195 scan_index = datadict["scan_number"][0] 196 197 mz_list, abundance_list = zip( 198 *sorted(zip(datadict["mz"], datadict["abundance"])) 199 ) 200 201 data_dict = {Labels.mz: mz_list, Labels.abundance: abundance_list} 202 203 d_params = default_parameters(self._ms[scan_index]._filename) 204 205 d_params["rt"] = rt 206 207 d_params["scan_number"] = scan_index 208 209 d_params["label"] = Labels.gcms_centroid 210 211 d_params["polarity"] = self._ms[scan_index].polarity 212 213 d_params["analyzer"] = self._ms[scan_index].analyzer 214 215 d_params["instrument_label"] = self._ms[scan_index].instrument_label 216 217 d_params["filename_path"] = self._ms[scan_index].instrument_label 218 219 ms = MassSpecCentroidLowRes(data_dict, d_params) 220 221 return ms 222 223 def smooth_signal(self, signal): 224 implemented_smooth_method = self.chromatogram_settings.implemented_smooth_method 225 226 pol_order = self.chromatogram_settings.savgol_pol_order 227 228 window_len = self.chromatogram_settings.smooth_window 229 230 window = self.chromatogram_settings.smooth_method 231 232 return sp.smooth_signal( 233 signal, window_len, window, pol_order, implemented_smooth_method 234 ) 235 236 def add_gcpeak( 237 self, 238 new_apex_index, 239 start_rt, 240 final_rt, 241 peak_rt, 242 smoothed_tic, 243 datadict, 244 plot_res, 245 ): 246 if start_rt <= peak_rt[new_apex_index[1]] <= final_rt: 247 rt_list = peak_rt[new_apex_index[0] : new_apex_index[2]] 248 tic_list = smoothed_tic[new_apex_index[0] : new_apex_index[2]] 249 250 apex_rt = peak_rt[new_apex_index[1]] 251 apex_i = rt_list.index(apex_rt) 252 253 """workaround for peak picking missing some local minimas""" 254 if apex_rt not in self.processed_appexes: 255 self.processed_appexes.append(apex_rt) 256 257 mass_spectra = ( 258 self.mass_spec_factory(rt, datadict.get(rt)) for rt in rt_list 259 ) 260 261 gc_peak = GCPeakDeconvolved( 262 self, mass_spectra, apex_i, rt_list, tic_list 263 ) 264 265 gc_peak.calc_area(tic_list, 1) 266 267 self.gcpeaks.append(gc_peak) 268 269 if plot_res: 270 plt.plot(gc_peak.rt_list, gc_peak.tic_list) 271 plt.plot( 272 gc_peak.retention_time, 273 gc_peak.tic, 274 c="black", 275 marker="^", 276 linewidth=0, 277 ) 278 279 def deconvolution(self, peaks_entity_data, plot_res): 280 # plot_res = True 281 domain = self.retention_time 282 signal = self._processed_tic 283 max_height = self.chromatogram_settings.peak_height_max_percent 284 max_prominence = self.chromatogram_settings.peak_max_prominence_percent 285 min_peak_datapoints = self.chromatogram_settings.min_peak_datapoints 286 signal_threshold = self.chromatogram_settings.peak_height_min_percent 287 max_rt_distance = self.chromatogram_settings.max_rt_distance 288 peak_derivative_threshold = self.chromatogram_settings.peak_derivative_threshold 289 290 max_signal = max(signal) 291 correct_baseline = False 292 293 include_indexes = sp.peak_picking_first_derivative( 294 domain, 295 signal, 296 max_height, 297 max_prominence, 298 max_signal, 299 min_peak_datapoints, 300 peak_derivative_threshold, 301 signal_threshold=signal_threshold, 302 correct_baseline=correct_baseline, 303 plot_res=False, 304 ) 305 306 """ deconvolution window is defined by the TIC peak region""" 307 all_apexes_rt = np.array(list(peaks_entity_data.keys())) 308 309 """workaround for peak picking missing some local minimas""" 310 self.processed_appexes = [] 311 312 for indexes_tuple in include_indexes: 313 start_rt = self.retention_time[indexes_tuple[0]] 314 # apex_rt = self.retention_time[indexes_tuple[1]] 315 final_rt = self.retention_time[indexes_tuple[2]] 316 317 """ find all features within TIC peak window""" 318 peak_features_indexes = np.where( 319 (all_apexes_rt > start_rt) & (all_apexes_rt < final_rt) 320 )[0] 321 peak_features_rts = all_apexes_rt[peak_features_indexes] 322 323 # print(start_rt, apex_rt, final_rt ) 324 325 filtered_features_rt = [] 326 filtered_features_abundance = [] 327 328 for each_apex_rt in peak_features_rts: 329 apex_data = peaks_entity_data.get(each_apex_rt).get(each_apex_rt) 330 331 peak_features_tic = sum( 332 peaks_entity_data.get(each_apex_rt) 333 .get(each_apex_rt) 334 .get("abundance") 335 ) 336 337 norm_smooth_tic = (peak_features_tic / max_signal) * 100 338 339 """ TODO: 340 Improve Peak Filtering 341 342 Calculate peaks sharpness here and filter it out (Amax - An /n)? 343 Peak Fit and Calculate Peak Gaussian Similarity? 344 Currentely using flat % tic relative abundance threshold and min 3 m/z per mass spectrum 345 """ 346 if norm_smooth_tic > signal_threshold and len(apex_data["mz"]) > 1: 347 # print(len(apex_data['mz'])) 348 filtered_features_rt.append(each_apex_rt) 349 filtered_features_abundance.append(peak_features_tic) 350 351 if len(filtered_features_rt) > 1: 352 """ more than one peak feature identified inside a TIC peak """ 353 # plt.plot(self.retention_time[indexes_tuple[0]:indexes_tuple[2]], signal[indexes_tuple[0]:indexes_tuple[2]], c='black') 354 355 # print(filtered_features_rt) 356 grouped_rt = self.hc( 357 filtered_features_rt, 358 filtered_features_abundance, 359 max_rt_distance=max_rt_distance, 360 ) 361 # print(grouped_rt) 362 363 for group, apex_rt_list in grouped_rt.items(): 364 """ each group is a peak feature defined by the hierarchical clutter algorithm 365 366 """ 367 group_datadict = {} 368 group_datadict["ref_apex_rt"] = [] 369 370 for each_group_apex_rt in apex_rt_list: 371 datadict = peaks_entity_data.get(each_group_apex_rt) 372 373 for rt, each_datadict in datadict.items(): 374 if rt == "ref_apex_rt": 375 group_datadict["ref_apex_rt"].append(each_datadict) 376 377 else: 378 if rt in group_datadict.keys(): 379 mz_list = each_datadict.get("mz") 380 abundance_list = each_datadict.get("abundance") 381 382 each_mz_abun = dict(zip(mz_list, abundance_list)) 383 384 for index_mz, mz in enumerate( 385 group_datadict[rt].get("mz") 386 ): 387 if mz in each_mz_abun.keys(): 388 each_mz_abun[mz] = ( 389 each_mz_abun[mz] 390 + group_datadict[rt].get("abundance")[ 391 index_mz 392 ] 393 ) 394 395 else: 396 each_mz_abun[mz] = group_datadict[rt].get( 397 "abundance" 398 )[index_mz] 399 400 group_datadict[rt] = { 401 "mz": list(each_mz_abun.keys()), 402 "abundance": list(each_mz_abun.values()), 403 "scan_number": each_datadict.get("scan_number"), 404 } 405 406 else: 407 group_datadict[rt] = each_datadict 408 409 peak_rt = [] 410 peak_tic = [] 411 412 # print(group_datadict.get('ref_apex_rt')) 413 for rt, each_datadict in group_datadict.items(): 414 if rt != "ref_apex_rt": 415 peak_rt.append(rt) 416 peak_tic.append(sum(each_datadict["abundance"])) 417 418 peak_rt, peak_tic = zip(*sorted(zip(peak_rt, peak_tic))) 419 420 smoothed_tic = self.smooth_signal(peak_tic) 421 422 include_indexes = sp.peak_picking_first_derivative( 423 peak_rt, 424 smoothed_tic, 425 max_height, 426 max_prominence, 427 max_signal, 428 min_peak_datapoints, 429 peak_derivative_threshold, 430 signal_threshold=signal_threshold, 431 correct_baseline=False, 432 plot_res=False, 433 ) 434 435 include_indexes = list(include_indexes) 436 437 if include_indexes: 438 if len(include_indexes) > 1: 439 """ after sum there are two apexes 440 check if it is inside the deconvolution window, otherwise ignores it 441 """ 442 443 for new_apex_index in include_indexes: 444 # pass 445 self.add_gcpeak( 446 new_apex_index, 447 start_rt, 448 final_rt, 449 peak_rt, 450 smoothed_tic, 451 group_datadict, 452 plot_res, 453 ) 454 455 else: 456 """ after sum there is on apex 457 save it 458 """ 459 new_apex_index = include_indexes[0] 460 # print(include_indexes, group, apex_rt_list) 461 self.add_gcpeak( 462 new_apex_index, 463 start_rt, 464 final_rt, 465 peak_rt, 466 smoothed_tic, 467 group_datadict, 468 plot_res, 469 ) 470 471 elif len(filtered_features_rt) == 1: 472 """ only one peak feature inside deconvolution window """ 473 474 each_apex_rt = filtered_features_rt[0] 475 476 datadict = peaks_entity_data.get(each_apex_rt) 477 478 peak_rt = [] 479 peak_tic = [] 480 481 for rt, each_datadict in datadict.items(): 482 if rt != "ref_apex_rt": 483 peak_rt.append(rt) 484 peak_tic.append(sum(each_datadict["abundance"])) 485 486 peak_rt, peak_tic = zip(*sorted(zip(peak_rt, peak_tic))) 487 488 smoothed_tic = self.smooth_signal(peak_tic) 489 490 include_indexes = sp.peak_picking_first_derivative( 491 peak_rt, 492 smoothed_tic, 493 max_height, 494 max_prominence, 495 max_signal, 496 min_peak_datapoints, 497 peak_derivative_threshold, 498 signal_threshold=signal_threshold, 499 correct_baseline=False, 500 plot_res=False, 501 ) 502 include_indexes = list(include_indexes) 503 504 if include_indexes: 505 """ after sum there are two apexes 506 check if it is inside the deconvolution window, otherwise ignores it""" 507 if len(include_indexes) > 1: 508 for new_apex_index in include_indexes: 509 # pass 510 self.add_gcpeak( 511 new_apex_index, 512 start_rt, 513 final_rt, 514 peak_rt, 515 smoothed_tic, 516 datadict, 517 plot_res, 518 ) 519 520 else: 521 """ after sum there is one apex 522 save it 523 includes_indexes = (start, apex, final )""" 524 525 new_apex_index = include_indexes[0] 526 527 self.add_gcpeak( 528 new_apex_index, 529 start_rt, 530 final_rt, 531 peak_rt, 532 smoothed_tic, 533 datadict, 534 plot_res, 535 ) 536 537 else: 538 # print('no data after filter') 539 pass 540 if plot_res: 541 plt.plot(self.retention_time, self._processed_tic, c="black") 542 plt.show() 543 544 def quadratic_interpolation(self, rt_list, tic_list, apex_index): 545 rt_list = np.array(rt_list) 546 tic_list = np.array(tic_list) 547 three_highest_i = [i for i in range(apex_index - 1, apex_index + 2)] 548 549 z = np.poly1d( 550 np.polyfit(rt_list[three_highest_i], tic_list[three_highest_i], 2) 551 ) 552 a = z[2] 553 b = z[1] 554 555 corrected_apex_rt = -b / (2 * a) 556 initial_rt = rt_list[apex_index] 557 558 return initial_rt - corrected_apex_rt
class
MassDeconvolution:
12class MassDeconvolution: 13 def run_deconvolution(self, plot_res=False): 14 eic_dict = self.ion_extracted_chroma(self._ms) 15 16 peaks_entity_data = self.find_peaks_entity(eic_dict) 17 18 """ select model peaks, create Mass Spectrum objs, GCPeak objs, store results in GC_Class gcpeaks obj""" 19 self.deconvolution(peaks_entity_data, plot_res) 20 21 def centroid_detector(self, tic, rt): 22 """this function has been replaced with sp.peak_picking_first_derivative 23 and it not used 24 """ 25 noise_std = self.chromatogram_settings.std_noise_threshold 26 27 method = self.chromatogram_settings.noise_threshold_method 28 29 """ peak picking""" 30 min_height = self.chromatogram_settings.peak_height_min_percent 31 min_datapoints = self.chromatogram_settings.min_peak_datapoints 32 33 """ baseline detection""" 34 max_prominence = self.chromatogram_settings.peak_max_prominence_percent 35 max_height = self.chromatogram_settings.peak_height_max_percent 36 37 peak_indexes_generator = sp.peak_detector_generator( 38 tic, 39 noise_std, 40 method, 41 rt, 42 max_height, 43 min_height, 44 max_prominence, 45 min_datapoints, 46 ) 47 48 return peak_indexes_generator 49 50 def ion_extracted_chroma(self, mass_spectra_obj): 51 eic_dict = {} 52 53 for scan_number, ms_obj in mass_spectra_obj.items(): 54 mz_list = ms_obj.mz_exp 55 abundance_list = ms_obj.abundance 56 # add list of scan numbers 57 for index, mz in enumerate(mz_list): 58 # dict of mz and tuple (mass spectrum abundances index, and scan number) 59 if mz not in eic_dict.keys(): 60 eic_dict[mz] = [[abundance_list[index]], [ms_obj.retention_time]] 61 62 else: 63 eic_dict[mz][0].append(ms_obj.abundance[index]) 64 eic_dict[mz][1].append(ms_obj.retention_time) 65 66 return eic_dict 67 68 def hc(self, X, Y, max_rt_distance=0.025): 69 from scipy.cluster.hierarchy import dendrogram, linkage 70 from scipy.cluster.hierarchy import fcluster 71 # from matplotlib import pyplot as plt 72 73 Z = linkage(np.reshape(X, (len(X), 1)), method="ward") 74 # Z = linkage(X, method = "ward") 75 76 max_d = max_rt_distance 77 distance_clusters = fcluster(Z, max_d, criterion="distance") 78 # print("distance") 79 # print(distance_clusters) 80 81 # inconsistency_cluster = fcluster(Z, 2, depth=2) 82 # max_cluster = fcluster(Z, 2, criterion='maxclust') 83 84 grouped_rt = {} 85 86 for index_obj, group in enumerate(distance_clusters): 87 if group not in grouped_rt.keys(): 88 grouped_rt[group] = [X[index_obj]] 89 else: 90 grouped_rt[group].append(X[index_obj]) 91 92 # print(distance_clusters, grouped_rt) 93 return grouped_rt 94 95 # plt.figure(figsize=(10, 8)) 96 # plt.scatter(X, Y, c=distance_clusters, cmap='prism') # plot points with cluster dependent colors 97 # plt.show() 98 # labelList = range(int(min(X)), int(max(X))) 99 100 # plt.figure(figsize=(10, 7)) 101 # dendrogram(Z, 102 # orientation='top', 103 # distance_sort='descending', 104 # show_leaf_counts=True) 105 # plt.show() 106 # print(Z) 107 108 def find_peaks_entity(self, eic_dict): 109 """combine eic with mathing rt apexes""" 110 max_prominence = self.chromatogram_settings.peak_max_prominence_percent 111 112 max_height = self.chromatogram_settings.peak_height_max_percent 113 114 signal_threshold = self.chromatogram_settings.eic_signal_threshold 115 116 min_peak_datapoints = self.chromatogram_settings.min_peak_datapoints 117 118 peak_derivative_threshold = self.chromatogram_settings.peak_derivative_threshold 119 120 correct_baseline = False 121 peaks_entity_data = {} 122 123 max_eic = 0 124 for mz, eic_scan_index_rt in eic_dict.items(): 125 ind_max_eic = max(eic_scan_index_rt[0]) 126 max_eic = ind_max_eic if ind_max_eic > max_eic else max_eic 127 128 for mz, eic_scan_index_rt in eic_dict.items(): 129 eic = eic_scan_index_rt[0] 130 rt_list = eic_scan_index_rt[1] 131 132 if len(eic) >= min_peak_datapoints: 133 smooth_eic = self.smooth_tic(eic) 134 135 include_indexes = sp.peak_picking_first_derivative( 136 rt_list, 137 smooth_eic, 138 max_height, 139 max_prominence, 140 max_eic, 141 min_peak_datapoints, 142 peak_derivative_threshold, 143 signal_threshold=signal_threshold, 144 correct_baseline=correct_baseline, 145 ) 146 147 for initial_scan, apex_scan, final_scan in include_indexes: 148 rt_corrected_therm = self.quadratic_interpolation( 149 rt_list, smooth_eic, apex_scan 150 ) 151 152 ref_apex_rt = round(rt_list[apex_scan] + rt_corrected_therm, 4) 153 154 apex_rt = rt_list[apex_scan] 155 # apex_abundance = smooth_eic[apex_scan] 156 157 # maximum_tic = apex_abundance if apex_abundance > maximum_tic else maximum_tic 158 159 for scan_index in range(initial_scan, final_scan): 160 peak_rt = rt_list[scan_index] 161 peak_abundance = smooth_eic[scan_index] 162 163 if peak_abundance > 0: 164 dict_data = { 165 peak_rt: { 166 "mz": [mz], 167 "abundance": [peak_abundance], 168 "scan_number": [scan_index], 169 }, 170 "ref_apex_rt": ref_apex_rt, 171 } 172 173 if apex_rt not in peaks_entity_data.keys(): 174 peaks_entity_data[apex_rt] = dict_data 175 176 else: 177 if peak_rt not in peaks_entity_data[apex_rt].keys(): 178 peaks_entity_data[apex_rt][peak_rt] = dict_data.get( 179 peak_rt 180 ) 181 182 else: 183 existing_data = peaks_entity_data[apex_rt].get( 184 peak_rt 185 ) 186 187 existing_data["mz"].append(mz) 188 existing_data["abundance"].append(peak_abundance) 189 existing_data["scan_number"].append(scan_index) 190 191 return peaks_entity_data 192 193 def mass_spec_factory(self, rt, datadict): 194 # tic = sum(datadict.get('abundance')) 195 196 scan_index = datadict["scan_number"][0] 197 198 mz_list, abundance_list = zip( 199 *sorted(zip(datadict["mz"], datadict["abundance"])) 200 ) 201 202 data_dict = {Labels.mz: mz_list, Labels.abundance: abundance_list} 203 204 d_params = default_parameters(self._ms[scan_index]._filename) 205 206 d_params["rt"] = rt 207 208 d_params["scan_number"] = scan_index 209 210 d_params["label"] = Labels.gcms_centroid 211 212 d_params["polarity"] = self._ms[scan_index].polarity 213 214 d_params["analyzer"] = self._ms[scan_index].analyzer 215 216 d_params["instrument_label"] = self._ms[scan_index].instrument_label 217 218 d_params["filename_path"] = self._ms[scan_index].instrument_label 219 220 ms = MassSpecCentroidLowRes(data_dict, d_params) 221 222 return ms 223 224 def smooth_signal(self, signal): 225 implemented_smooth_method = self.chromatogram_settings.implemented_smooth_method 226 227 pol_order = self.chromatogram_settings.savgol_pol_order 228 229 window_len = self.chromatogram_settings.smooth_window 230 231 window = self.chromatogram_settings.smooth_method 232 233 return sp.smooth_signal( 234 signal, window_len, window, pol_order, implemented_smooth_method 235 ) 236 237 def add_gcpeak( 238 self, 239 new_apex_index, 240 start_rt, 241 final_rt, 242 peak_rt, 243 smoothed_tic, 244 datadict, 245 plot_res, 246 ): 247 if start_rt <= peak_rt[new_apex_index[1]] <= final_rt: 248 rt_list = peak_rt[new_apex_index[0] : new_apex_index[2]] 249 tic_list = smoothed_tic[new_apex_index[0] : new_apex_index[2]] 250 251 apex_rt = peak_rt[new_apex_index[1]] 252 apex_i = rt_list.index(apex_rt) 253 254 """workaround for peak picking missing some local minimas""" 255 if apex_rt not in self.processed_appexes: 256 self.processed_appexes.append(apex_rt) 257 258 mass_spectra = ( 259 self.mass_spec_factory(rt, datadict.get(rt)) for rt in rt_list 260 ) 261 262 gc_peak = GCPeakDeconvolved( 263 self, mass_spectra, apex_i, rt_list, tic_list 264 ) 265 266 gc_peak.calc_area(tic_list, 1) 267 268 self.gcpeaks.append(gc_peak) 269 270 if plot_res: 271 plt.plot(gc_peak.rt_list, gc_peak.tic_list) 272 plt.plot( 273 gc_peak.retention_time, 274 gc_peak.tic, 275 c="black", 276 marker="^", 277 linewidth=0, 278 ) 279 280 def deconvolution(self, peaks_entity_data, plot_res): 281 # plot_res = True 282 domain = self.retention_time 283 signal = self._processed_tic 284 max_height = self.chromatogram_settings.peak_height_max_percent 285 max_prominence = self.chromatogram_settings.peak_max_prominence_percent 286 min_peak_datapoints = self.chromatogram_settings.min_peak_datapoints 287 signal_threshold = self.chromatogram_settings.peak_height_min_percent 288 max_rt_distance = self.chromatogram_settings.max_rt_distance 289 peak_derivative_threshold = self.chromatogram_settings.peak_derivative_threshold 290 291 max_signal = max(signal) 292 correct_baseline = False 293 294 include_indexes = sp.peak_picking_first_derivative( 295 domain, 296 signal, 297 max_height, 298 max_prominence, 299 max_signal, 300 min_peak_datapoints, 301 peak_derivative_threshold, 302 signal_threshold=signal_threshold, 303 correct_baseline=correct_baseline, 304 plot_res=False, 305 ) 306 307 """ deconvolution window is defined by the TIC peak region""" 308 all_apexes_rt = np.array(list(peaks_entity_data.keys())) 309 310 """workaround for peak picking missing some local minimas""" 311 self.processed_appexes = [] 312 313 for indexes_tuple in include_indexes: 314 start_rt = self.retention_time[indexes_tuple[0]] 315 # apex_rt = self.retention_time[indexes_tuple[1]] 316 final_rt = self.retention_time[indexes_tuple[2]] 317 318 """ find all features within TIC peak window""" 319 peak_features_indexes = np.where( 320 (all_apexes_rt > start_rt) & (all_apexes_rt < final_rt) 321 )[0] 322 peak_features_rts = all_apexes_rt[peak_features_indexes] 323 324 # print(start_rt, apex_rt, final_rt ) 325 326 filtered_features_rt = [] 327 filtered_features_abundance = [] 328 329 for each_apex_rt in peak_features_rts: 330 apex_data = peaks_entity_data.get(each_apex_rt).get(each_apex_rt) 331 332 peak_features_tic = sum( 333 peaks_entity_data.get(each_apex_rt) 334 .get(each_apex_rt) 335 .get("abundance") 336 ) 337 338 norm_smooth_tic = (peak_features_tic / max_signal) * 100 339 340 """ TODO: 341 Improve Peak Filtering 342 343 Calculate peaks sharpness here and filter it out (Amax - An /n)? 344 Peak Fit and Calculate Peak Gaussian Similarity? 345 Currentely using flat % tic relative abundance threshold and min 3 m/z per mass spectrum 346 """ 347 if norm_smooth_tic > signal_threshold and len(apex_data["mz"]) > 1: 348 # print(len(apex_data['mz'])) 349 filtered_features_rt.append(each_apex_rt) 350 filtered_features_abundance.append(peak_features_tic) 351 352 if len(filtered_features_rt) > 1: 353 """ more than one peak feature identified inside a TIC peak """ 354 # plt.plot(self.retention_time[indexes_tuple[0]:indexes_tuple[2]], signal[indexes_tuple[0]:indexes_tuple[2]], c='black') 355 356 # print(filtered_features_rt) 357 grouped_rt = self.hc( 358 filtered_features_rt, 359 filtered_features_abundance, 360 max_rt_distance=max_rt_distance, 361 ) 362 # print(grouped_rt) 363 364 for group, apex_rt_list in grouped_rt.items(): 365 """ each group is a peak feature defined by the hierarchical clutter algorithm 366 367 """ 368 group_datadict = {} 369 group_datadict["ref_apex_rt"] = [] 370 371 for each_group_apex_rt in apex_rt_list: 372 datadict = peaks_entity_data.get(each_group_apex_rt) 373 374 for rt, each_datadict in datadict.items(): 375 if rt == "ref_apex_rt": 376 group_datadict["ref_apex_rt"].append(each_datadict) 377 378 else: 379 if rt in group_datadict.keys(): 380 mz_list = each_datadict.get("mz") 381 abundance_list = each_datadict.get("abundance") 382 383 each_mz_abun = dict(zip(mz_list, abundance_list)) 384 385 for index_mz, mz in enumerate( 386 group_datadict[rt].get("mz") 387 ): 388 if mz in each_mz_abun.keys(): 389 each_mz_abun[mz] = ( 390 each_mz_abun[mz] 391 + group_datadict[rt].get("abundance")[ 392 index_mz 393 ] 394 ) 395 396 else: 397 each_mz_abun[mz] = group_datadict[rt].get( 398 "abundance" 399 )[index_mz] 400 401 group_datadict[rt] = { 402 "mz": list(each_mz_abun.keys()), 403 "abundance": list(each_mz_abun.values()), 404 "scan_number": each_datadict.get("scan_number"), 405 } 406 407 else: 408 group_datadict[rt] = each_datadict 409 410 peak_rt = [] 411 peak_tic = [] 412 413 # print(group_datadict.get('ref_apex_rt')) 414 for rt, each_datadict in group_datadict.items(): 415 if rt != "ref_apex_rt": 416 peak_rt.append(rt) 417 peak_tic.append(sum(each_datadict["abundance"])) 418 419 peak_rt, peak_tic = zip(*sorted(zip(peak_rt, peak_tic))) 420 421 smoothed_tic = self.smooth_signal(peak_tic) 422 423 include_indexes = sp.peak_picking_first_derivative( 424 peak_rt, 425 smoothed_tic, 426 max_height, 427 max_prominence, 428 max_signal, 429 min_peak_datapoints, 430 peak_derivative_threshold, 431 signal_threshold=signal_threshold, 432 correct_baseline=False, 433 plot_res=False, 434 ) 435 436 include_indexes = list(include_indexes) 437 438 if include_indexes: 439 if len(include_indexes) > 1: 440 """ after sum there are two apexes 441 check if it is inside the deconvolution window, otherwise ignores it 442 """ 443 444 for new_apex_index in include_indexes: 445 # pass 446 self.add_gcpeak( 447 new_apex_index, 448 start_rt, 449 final_rt, 450 peak_rt, 451 smoothed_tic, 452 group_datadict, 453 plot_res, 454 ) 455 456 else: 457 """ after sum there is on apex 458 save it 459 """ 460 new_apex_index = include_indexes[0] 461 # print(include_indexes, group, apex_rt_list) 462 self.add_gcpeak( 463 new_apex_index, 464 start_rt, 465 final_rt, 466 peak_rt, 467 smoothed_tic, 468 group_datadict, 469 plot_res, 470 ) 471 472 elif len(filtered_features_rt) == 1: 473 """ only one peak feature inside deconvolution window """ 474 475 each_apex_rt = filtered_features_rt[0] 476 477 datadict = peaks_entity_data.get(each_apex_rt) 478 479 peak_rt = [] 480 peak_tic = [] 481 482 for rt, each_datadict in datadict.items(): 483 if rt != "ref_apex_rt": 484 peak_rt.append(rt) 485 peak_tic.append(sum(each_datadict["abundance"])) 486 487 peak_rt, peak_tic = zip(*sorted(zip(peak_rt, peak_tic))) 488 489 smoothed_tic = self.smooth_signal(peak_tic) 490 491 include_indexes = sp.peak_picking_first_derivative( 492 peak_rt, 493 smoothed_tic, 494 max_height, 495 max_prominence, 496 max_signal, 497 min_peak_datapoints, 498 peak_derivative_threshold, 499 signal_threshold=signal_threshold, 500 correct_baseline=False, 501 plot_res=False, 502 ) 503 include_indexes = list(include_indexes) 504 505 if include_indexes: 506 """ after sum there are two apexes 507 check if it is inside the deconvolution window, otherwise ignores it""" 508 if len(include_indexes) > 1: 509 for new_apex_index in include_indexes: 510 # pass 511 self.add_gcpeak( 512 new_apex_index, 513 start_rt, 514 final_rt, 515 peak_rt, 516 smoothed_tic, 517 datadict, 518 plot_res, 519 ) 520 521 else: 522 """ after sum there is one apex 523 save it 524 includes_indexes = (start, apex, final )""" 525 526 new_apex_index = include_indexes[0] 527 528 self.add_gcpeak( 529 new_apex_index, 530 start_rt, 531 final_rt, 532 peak_rt, 533 smoothed_tic, 534 datadict, 535 plot_res, 536 ) 537 538 else: 539 # print('no data after filter') 540 pass 541 if plot_res: 542 plt.plot(self.retention_time, self._processed_tic, c="black") 543 plt.show() 544 545 def quadratic_interpolation(self, rt_list, tic_list, apex_index): 546 rt_list = np.array(rt_list) 547 tic_list = np.array(tic_list) 548 three_highest_i = [i for i in range(apex_index - 1, apex_index + 2)] 549 550 z = np.poly1d( 551 np.polyfit(rt_list[three_highest_i], tic_list[three_highest_i], 2) 552 ) 553 a = z[2] 554 b = z[1] 555 556 corrected_apex_rt = -b / (2 * a) 557 initial_rt = rt_list[apex_index] 558 559 return initial_rt - corrected_apex_rt
def
run_deconvolution(self, plot_res=False):
13 def run_deconvolution(self, plot_res=False): 14 eic_dict = self.ion_extracted_chroma(self._ms) 15 16 peaks_entity_data = self.find_peaks_entity(eic_dict) 17 18 """ select model peaks, create Mass Spectrum objs, GCPeak objs, store results in GC_Class gcpeaks obj""" 19 self.deconvolution(peaks_entity_data, plot_res)
def
centroid_detector(self, tic, rt):
21 def centroid_detector(self, tic, rt): 22 """this function has been replaced with sp.peak_picking_first_derivative 23 and it not used 24 """ 25 noise_std = self.chromatogram_settings.std_noise_threshold 26 27 method = self.chromatogram_settings.noise_threshold_method 28 29 """ peak picking""" 30 min_height = self.chromatogram_settings.peak_height_min_percent 31 min_datapoints = self.chromatogram_settings.min_peak_datapoints 32 33 """ baseline detection""" 34 max_prominence = self.chromatogram_settings.peak_max_prominence_percent 35 max_height = self.chromatogram_settings.peak_height_max_percent 36 37 peak_indexes_generator = sp.peak_detector_generator( 38 tic, 39 noise_std, 40 method, 41 rt, 42 max_height, 43 min_height, 44 max_prominence, 45 min_datapoints, 46 ) 47 48 return peak_indexes_generator
this function has been replaced with sp.peak_picking_first_derivative and it not used
def
ion_extracted_chroma(self, mass_spectra_obj):
50 def ion_extracted_chroma(self, mass_spectra_obj): 51 eic_dict = {} 52 53 for scan_number, ms_obj in mass_spectra_obj.items(): 54 mz_list = ms_obj.mz_exp 55 abundance_list = ms_obj.abundance 56 # add list of scan numbers 57 for index, mz in enumerate(mz_list): 58 # dict of mz and tuple (mass spectrum abundances index, and scan number) 59 if mz not in eic_dict.keys(): 60 eic_dict[mz] = [[abundance_list[index]], [ms_obj.retention_time]] 61 62 else: 63 eic_dict[mz][0].append(ms_obj.abundance[index]) 64 eic_dict[mz][1].append(ms_obj.retention_time) 65 66 return eic_dict
def
hc(self, X, Y, max_rt_distance=0.025):
68 def hc(self, X, Y, max_rt_distance=0.025): 69 from scipy.cluster.hierarchy import dendrogram, linkage 70 from scipy.cluster.hierarchy import fcluster 71 # from matplotlib import pyplot as plt 72 73 Z = linkage(np.reshape(X, (len(X), 1)), method="ward") 74 # Z = linkage(X, method = "ward") 75 76 max_d = max_rt_distance 77 distance_clusters = fcluster(Z, max_d, criterion="distance") 78 # print("distance") 79 # print(distance_clusters) 80 81 # inconsistency_cluster = fcluster(Z, 2, depth=2) 82 # max_cluster = fcluster(Z, 2, criterion='maxclust') 83 84 grouped_rt = {} 85 86 for index_obj, group in enumerate(distance_clusters): 87 if group not in grouped_rt.keys(): 88 grouped_rt[group] = [X[index_obj]] 89 else: 90 grouped_rt[group].append(X[index_obj]) 91 92 # print(distance_clusters, grouped_rt) 93 return grouped_rt 94 95 # plt.figure(figsize=(10, 8)) 96 # plt.scatter(X, Y, c=distance_clusters, cmap='prism') # plot points with cluster dependent colors 97 # plt.show() 98 # labelList = range(int(min(X)), int(max(X))) 99 100 # plt.figure(figsize=(10, 7)) 101 # dendrogram(Z, 102 # orientation='top', 103 # distance_sort='descending', 104 # show_leaf_counts=True) 105 # plt.show() 106 # print(Z)
def
find_peaks_entity(self, eic_dict):
108 def find_peaks_entity(self, eic_dict): 109 """combine eic with mathing rt apexes""" 110 max_prominence = self.chromatogram_settings.peak_max_prominence_percent 111 112 max_height = self.chromatogram_settings.peak_height_max_percent 113 114 signal_threshold = self.chromatogram_settings.eic_signal_threshold 115 116 min_peak_datapoints = self.chromatogram_settings.min_peak_datapoints 117 118 peak_derivative_threshold = self.chromatogram_settings.peak_derivative_threshold 119 120 correct_baseline = False 121 peaks_entity_data = {} 122 123 max_eic = 0 124 for mz, eic_scan_index_rt in eic_dict.items(): 125 ind_max_eic = max(eic_scan_index_rt[0]) 126 max_eic = ind_max_eic if ind_max_eic > max_eic else max_eic 127 128 for mz, eic_scan_index_rt in eic_dict.items(): 129 eic = eic_scan_index_rt[0] 130 rt_list = eic_scan_index_rt[1] 131 132 if len(eic) >= min_peak_datapoints: 133 smooth_eic = self.smooth_tic(eic) 134 135 include_indexes = sp.peak_picking_first_derivative( 136 rt_list, 137 smooth_eic, 138 max_height, 139 max_prominence, 140 max_eic, 141 min_peak_datapoints, 142 peak_derivative_threshold, 143 signal_threshold=signal_threshold, 144 correct_baseline=correct_baseline, 145 ) 146 147 for initial_scan, apex_scan, final_scan in include_indexes: 148 rt_corrected_therm = self.quadratic_interpolation( 149 rt_list, smooth_eic, apex_scan 150 ) 151 152 ref_apex_rt = round(rt_list[apex_scan] + rt_corrected_therm, 4) 153 154 apex_rt = rt_list[apex_scan] 155 # apex_abundance = smooth_eic[apex_scan] 156 157 # maximum_tic = apex_abundance if apex_abundance > maximum_tic else maximum_tic 158 159 for scan_index in range(initial_scan, final_scan): 160 peak_rt = rt_list[scan_index] 161 peak_abundance = smooth_eic[scan_index] 162 163 if peak_abundance > 0: 164 dict_data = { 165 peak_rt: { 166 "mz": [mz], 167 "abundance": [peak_abundance], 168 "scan_number": [scan_index], 169 }, 170 "ref_apex_rt": ref_apex_rt, 171 } 172 173 if apex_rt not in peaks_entity_data.keys(): 174 peaks_entity_data[apex_rt] = dict_data 175 176 else: 177 if peak_rt not in peaks_entity_data[apex_rt].keys(): 178 peaks_entity_data[apex_rt][peak_rt] = dict_data.get( 179 peak_rt 180 ) 181 182 else: 183 existing_data = peaks_entity_data[apex_rt].get( 184 peak_rt 185 ) 186 187 existing_data["mz"].append(mz) 188 existing_data["abundance"].append(peak_abundance) 189 existing_data["scan_number"].append(scan_index) 190 191 return peaks_entity_data
combine eic with mathing rt apexes
def
mass_spec_factory(self, rt, datadict):
193 def mass_spec_factory(self, rt, datadict): 194 # tic = sum(datadict.get('abundance')) 195 196 scan_index = datadict["scan_number"][0] 197 198 mz_list, abundance_list = zip( 199 *sorted(zip(datadict["mz"], datadict["abundance"])) 200 ) 201 202 data_dict = {Labels.mz: mz_list, Labels.abundance: abundance_list} 203 204 d_params = default_parameters(self._ms[scan_index]._filename) 205 206 d_params["rt"] = rt 207 208 d_params["scan_number"] = scan_index 209 210 d_params["label"] = Labels.gcms_centroid 211 212 d_params["polarity"] = self._ms[scan_index].polarity 213 214 d_params["analyzer"] = self._ms[scan_index].analyzer 215 216 d_params["instrument_label"] = self._ms[scan_index].instrument_label 217 218 d_params["filename_path"] = self._ms[scan_index].instrument_label 219 220 ms = MassSpecCentroidLowRes(data_dict, d_params) 221 222 return ms
def
smooth_signal(self, signal):
224 def smooth_signal(self, signal): 225 implemented_smooth_method = self.chromatogram_settings.implemented_smooth_method 226 227 pol_order = self.chromatogram_settings.savgol_pol_order 228 229 window_len = self.chromatogram_settings.smooth_window 230 231 window = self.chromatogram_settings.smooth_method 232 233 return sp.smooth_signal( 234 signal, window_len, window, pol_order, implemented_smooth_method 235 )
def
add_gcpeak( self, new_apex_index, start_rt, final_rt, peak_rt, smoothed_tic, datadict, plot_res):
237 def add_gcpeak( 238 self, 239 new_apex_index, 240 start_rt, 241 final_rt, 242 peak_rt, 243 smoothed_tic, 244 datadict, 245 plot_res, 246 ): 247 if start_rt <= peak_rt[new_apex_index[1]] <= final_rt: 248 rt_list = peak_rt[new_apex_index[0] : new_apex_index[2]] 249 tic_list = smoothed_tic[new_apex_index[0] : new_apex_index[2]] 250 251 apex_rt = peak_rt[new_apex_index[1]] 252 apex_i = rt_list.index(apex_rt) 253 254 """workaround for peak picking missing some local minimas""" 255 if apex_rt not in self.processed_appexes: 256 self.processed_appexes.append(apex_rt) 257 258 mass_spectra = ( 259 self.mass_spec_factory(rt, datadict.get(rt)) for rt in rt_list 260 ) 261 262 gc_peak = GCPeakDeconvolved( 263 self, mass_spectra, apex_i, rt_list, tic_list 264 ) 265 266 gc_peak.calc_area(tic_list, 1) 267 268 self.gcpeaks.append(gc_peak) 269 270 if plot_res: 271 plt.plot(gc_peak.rt_list, gc_peak.tic_list) 272 plt.plot( 273 gc_peak.retention_time, 274 gc_peak.tic, 275 c="black", 276 marker="^", 277 linewidth=0, 278 )
def
deconvolution(self, peaks_entity_data, plot_res):
280 def deconvolution(self, peaks_entity_data, plot_res): 281 # plot_res = True 282 domain = self.retention_time 283 signal = self._processed_tic 284 max_height = self.chromatogram_settings.peak_height_max_percent 285 max_prominence = self.chromatogram_settings.peak_max_prominence_percent 286 min_peak_datapoints = self.chromatogram_settings.min_peak_datapoints 287 signal_threshold = self.chromatogram_settings.peak_height_min_percent 288 max_rt_distance = self.chromatogram_settings.max_rt_distance 289 peak_derivative_threshold = self.chromatogram_settings.peak_derivative_threshold 290 291 max_signal = max(signal) 292 correct_baseline = False 293 294 include_indexes = sp.peak_picking_first_derivative( 295 domain, 296 signal, 297 max_height, 298 max_prominence, 299 max_signal, 300 min_peak_datapoints, 301 peak_derivative_threshold, 302 signal_threshold=signal_threshold, 303 correct_baseline=correct_baseline, 304 plot_res=False, 305 ) 306 307 """ deconvolution window is defined by the TIC peak region""" 308 all_apexes_rt = np.array(list(peaks_entity_data.keys())) 309 310 """workaround for peak picking missing some local minimas""" 311 self.processed_appexes = [] 312 313 for indexes_tuple in include_indexes: 314 start_rt = self.retention_time[indexes_tuple[0]] 315 # apex_rt = self.retention_time[indexes_tuple[1]] 316 final_rt = self.retention_time[indexes_tuple[2]] 317 318 """ find all features within TIC peak window""" 319 peak_features_indexes = np.where( 320 (all_apexes_rt > start_rt) & (all_apexes_rt < final_rt) 321 )[0] 322 peak_features_rts = all_apexes_rt[peak_features_indexes] 323 324 # print(start_rt, apex_rt, final_rt ) 325 326 filtered_features_rt = [] 327 filtered_features_abundance = [] 328 329 for each_apex_rt in peak_features_rts: 330 apex_data = peaks_entity_data.get(each_apex_rt).get(each_apex_rt) 331 332 peak_features_tic = sum( 333 peaks_entity_data.get(each_apex_rt) 334 .get(each_apex_rt) 335 .get("abundance") 336 ) 337 338 norm_smooth_tic = (peak_features_tic / max_signal) * 100 339 340 """ TODO: 341 Improve Peak Filtering 342 343 Calculate peaks sharpness here and filter it out (Amax - An /n)? 344 Peak Fit and Calculate Peak Gaussian Similarity? 345 Currentely using flat % tic relative abundance threshold and min 3 m/z per mass spectrum 346 """ 347 if norm_smooth_tic > signal_threshold and len(apex_data["mz"]) > 1: 348 # print(len(apex_data['mz'])) 349 filtered_features_rt.append(each_apex_rt) 350 filtered_features_abundance.append(peak_features_tic) 351 352 if len(filtered_features_rt) > 1: 353 """ more than one peak feature identified inside a TIC peak """ 354 # plt.plot(self.retention_time[indexes_tuple[0]:indexes_tuple[2]], signal[indexes_tuple[0]:indexes_tuple[2]], c='black') 355 356 # print(filtered_features_rt) 357 grouped_rt = self.hc( 358 filtered_features_rt, 359 filtered_features_abundance, 360 max_rt_distance=max_rt_distance, 361 ) 362 # print(grouped_rt) 363 364 for group, apex_rt_list in grouped_rt.items(): 365 """ each group is a peak feature defined by the hierarchical clutter algorithm 366 367 """ 368 group_datadict = {} 369 group_datadict["ref_apex_rt"] = [] 370 371 for each_group_apex_rt in apex_rt_list: 372 datadict = peaks_entity_data.get(each_group_apex_rt) 373 374 for rt, each_datadict in datadict.items(): 375 if rt == "ref_apex_rt": 376 group_datadict["ref_apex_rt"].append(each_datadict) 377 378 else: 379 if rt in group_datadict.keys(): 380 mz_list = each_datadict.get("mz") 381 abundance_list = each_datadict.get("abundance") 382 383 each_mz_abun = dict(zip(mz_list, abundance_list)) 384 385 for index_mz, mz in enumerate( 386 group_datadict[rt].get("mz") 387 ): 388 if mz in each_mz_abun.keys(): 389 each_mz_abun[mz] = ( 390 each_mz_abun[mz] 391 + group_datadict[rt].get("abundance")[ 392 index_mz 393 ] 394 ) 395 396 else: 397 each_mz_abun[mz] = group_datadict[rt].get( 398 "abundance" 399 )[index_mz] 400 401 group_datadict[rt] = { 402 "mz": list(each_mz_abun.keys()), 403 "abundance": list(each_mz_abun.values()), 404 "scan_number": each_datadict.get("scan_number"), 405 } 406 407 else: 408 group_datadict[rt] = each_datadict 409 410 peak_rt = [] 411 peak_tic = [] 412 413 # print(group_datadict.get('ref_apex_rt')) 414 for rt, each_datadict in group_datadict.items(): 415 if rt != "ref_apex_rt": 416 peak_rt.append(rt) 417 peak_tic.append(sum(each_datadict["abundance"])) 418 419 peak_rt, peak_tic = zip(*sorted(zip(peak_rt, peak_tic))) 420 421 smoothed_tic = self.smooth_signal(peak_tic) 422 423 include_indexes = sp.peak_picking_first_derivative( 424 peak_rt, 425 smoothed_tic, 426 max_height, 427 max_prominence, 428 max_signal, 429 min_peak_datapoints, 430 peak_derivative_threshold, 431 signal_threshold=signal_threshold, 432 correct_baseline=False, 433 plot_res=False, 434 ) 435 436 include_indexes = list(include_indexes) 437 438 if include_indexes: 439 if len(include_indexes) > 1: 440 """ after sum there are two apexes 441 check if it is inside the deconvolution window, otherwise ignores it 442 """ 443 444 for new_apex_index in include_indexes: 445 # pass 446 self.add_gcpeak( 447 new_apex_index, 448 start_rt, 449 final_rt, 450 peak_rt, 451 smoothed_tic, 452 group_datadict, 453 plot_res, 454 ) 455 456 else: 457 """ after sum there is on apex 458 save it 459 """ 460 new_apex_index = include_indexes[0] 461 # print(include_indexes, group, apex_rt_list) 462 self.add_gcpeak( 463 new_apex_index, 464 start_rt, 465 final_rt, 466 peak_rt, 467 smoothed_tic, 468 group_datadict, 469 plot_res, 470 ) 471 472 elif len(filtered_features_rt) == 1: 473 """ only one peak feature inside deconvolution window """ 474 475 each_apex_rt = filtered_features_rt[0] 476 477 datadict = peaks_entity_data.get(each_apex_rt) 478 479 peak_rt = [] 480 peak_tic = [] 481 482 for rt, each_datadict in datadict.items(): 483 if rt != "ref_apex_rt": 484 peak_rt.append(rt) 485 peak_tic.append(sum(each_datadict["abundance"])) 486 487 peak_rt, peak_tic = zip(*sorted(zip(peak_rt, peak_tic))) 488 489 smoothed_tic = self.smooth_signal(peak_tic) 490 491 include_indexes = sp.peak_picking_first_derivative( 492 peak_rt, 493 smoothed_tic, 494 max_height, 495 max_prominence, 496 max_signal, 497 min_peak_datapoints, 498 peak_derivative_threshold, 499 signal_threshold=signal_threshold, 500 correct_baseline=False, 501 plot_res=False, 502 ) 503 include_indexes = list(include_indexes) 504 505 if include_indexes: 506 """ after sum there are two apexes 507 check if it is inside the deconvolution window, otherwise ignores it""" 508 if len(include_indexes) > 1: 509 for new_apex_index in include_indexes: 510 # pass 511 self.add_gcpeak( 512 new_apex_index, 513 start_rt, 514 final_rt, 515 peak_rt, 516 smoothed_tic, 517 datadict, 518 plot_res, 519 ) 520 521 else: 522 """ after sum there is one apex 523 save it 524 includes_indexes = (start, apex, final )""" 525 526 new_apex_index = include_indexes[0] 527 528 self.add_gcpeak( 529 new_apex_index, 530 start_rt, 531 final_rt, 532 peak_rt, 533 smoothed_tic, 534 datadict, 535 plot_res, 536 ) 537 538 else: 539 # print('no data after filter') 540 pass 541 if plot_res: 542 plt.plot(self.retention_time, self._processed_tic, c="black") 543 plt.show()
def
quadratic_interpolation(self, rt_list, tic_list, apex_index):
545 def quadratic_interpolation(self, rt_list, tic_list, apex_index): 546 rt_list = np.array(rt_list) 547 tic_list = np.array(tic_list) 548 three_highest_i = [i for i in range(apex_index - 1, apex_index + 2)] 549 550 z = np.poly1d( 551 np.polyfit(rt_list[three_highest_i], tic_list[three_highest_i], 2) 552 ) 553 a = z[2] 554 b = z[1] 555 556 corrected_apex_rt = -b / (2 * a) 557 initial_rt = rt_list[apex_index] 558 559 return initial_rt - corrected_apex_rt