corems.chroma_peak.calc.ChromaPeakCalc
1import numpy as np 2from bisect import bisect_left 3from scipy.optimize import curve_fit 4 5 6__author__ = "Yuri E. Corilo" 7__date__ = "March 11, 2020" 8 9 10class GCPeakCalculation(object): 11 """ 12 Class for performing peak calculations in GC chromatography. 13 14 Methods 15 ------- 16 * `calc_area(self, tic: List[float], dx: float) -> None`: Calculate the area under the curve of the chromatogram. 17 * `linear_ri(self, right_ri: float, left_ri: float, left_rt: float, right_rt: float) -> float`: Calculate the retention index using linear interpolation. 18 * `calc_ri(self, rt_ri_pairs: List[Tuple[float, float]]) -> int`: Calculate the retention index based on the given retention time - retention index pairs. 19 """ 20 21 def calc_area(self, tic: list[float], dx: float) -> None: 22 """ 23 Calculate the area under the curve of the chromatogram. 24 25 Parameters 26 ---------- 27 tic : List[float] 28 The total ion current (TIC) values. 29 dx : float 30 The spacing between data points. 31 """ 32 yy = tic[self.start_scan : self.final_scan] 33 self._area = np.trapz(yy, dx=dx) 34 35 def linear_ri( 36 self, right_ri: float, left_ri: float, left_rt: float, right_rt: float 37 ) -> float: 38 """ 39 Calculate the retention index using linear interpolation. 40 41 Parameters 42 ---------- 43 right_ri : float 44 The retention index at the right reference point. 45 left_ri : float 46 The retention index at the left reference point. 47 left_rt : float 48 The retention time at the left reference point. 49 right_rt : float 50 The retention time at the right reference point. 51 52 Returns 53 ------- 54 float 55 The calculated retention index. 56 """ 57 return left_ri + ( 58 (right_ri - left_ri) 59 * (self.retention_time - left_rt) 60 / (right_rt - left_rt) 61 ) 62 63 def calc_ri(self, rt_ri_pairs: list[tuple[float, float]]) -> None: 64 """ 65 Calculate the retention index based on the given retention time - retention index pairs. 66 67 Parameters 68 ---------- 69 rt_ri_pairs : List[Tuple[float, float]] 70 The list of retention time - retention index pairs. 71 72 """ 73 current_rt = self.retention_time 74 75 rts = [rt_ri[0] for rt_ri in rt_ri_pairs] 76 index = bisect_left(rts, current_rt) 77 78 if index >= len(rt_ri_pairs): 79 index -= 1 80 81 current_ref = rt_ri_pairs[index] 82 83 if current_rt == current_ref[0]: 84 self._ri = current_ref[1] 85 86 else: 87 if index == 0: 88 index += 1 89 90 left_rt = rt_ri_pairs[index - 1][0] 91 left_ri = rt_ri_pairs[index - 1][1] 92 93 right_rt = rt_ri_pairs[index][0] 94 right_ri = rt_ri_pairs[index][1] 95 96 self._ri = self.linear_ri(right_ri, left_ri, left_rt, right_rt) 97 98 99class LCMSMassFeatureCalculation: 100 """Class for performing peak calculations in LC-MS mass spectrometry. 101 102 This class is intended to be used as a mixin class for the LCMSMassFeature class. 103 """ 104 105 def calc_dispersity_index(self): 106 """ 107 Calculate the dispersity index of the mass feature. 108 109 This function calculates the dispersity index of the mass feature and 110 stores the result in the `_dispersity_index` attribute. The dispersity index is calculated as the standard 111 deviation of the retention times that account for 50% of the cummulative intensity, starting from the most 112 intense point, as described in [1]. Note that this calculation is done within the integration bounds with 113 a pad according to the window factor, where the window factor is parameterized and encapsulated in the 114 parent LCMS object (or, if not available, defaults to 2.0 minutes before and after the apex 115 116 Returns 117 ------- 118 None, stores the result in the `_dispersity_index` attribute of the class and the `_normalized_dispersity_index` attribute, 119 which is the dispersity index normalized to the total time window used for the calculation (unitless, fraction of total window). 120 121 Raises 122 ------ 123 ValueError 124 If the EIC data are not available. 125 126 References 127 ---------- 128 1) Boiteau, Rene M., et al. "Relating Molecular Properties to the Persistence of Marine Dissolved 129 Organic Matter with Liquid Chromatography–Ultrahigh-Resolution Mass Spectrometry." 130 Environmental Science & Technology 58.7 (2024): 3267-3277. 131 """ 132 # Check if LCMSMassFeature has a parent LCMS object with a window factor 133 if hasattr(self, "mass_spectrum_obj"): 134 window_min = self.mass_spectrum_obj.parameters.lc_ms.dispersity_index_window 135 else: 136 window_min = 3.0 # minutes 137 138 # Check if the EIC data is available 139 if self.eic_list is None: 140 raise ValueError( 141 "EIC data are not available. Please add the EIC data first." 142 ) 143 144 # Define start and end of the window around the apex 145 apex_rt = self.retention_time 146 full_time = self._eic_data.time 147 full_eic = self._eic_data.eic 148 left_start = apex_rt - window_min 149 right_end = apex_rt + window_min 150 151 # Extract the EIC data within the defined window 152 time_mask = (full_time >= left_start) & (full_time <= right_end) 153 eic_subset = full_eic[time_mask] 154 time_subset = full_time[time_mask] 155 156 # Sort the EIC data and RT data by descending intensity 157 sorted_eic = eic_subset[eic_subset.argsort()[::-1]] 158 sorted_rt = time_subset[eic_subset.argsort()[::-1]] 159 160 # Calculate the dispersity index 161 cum_sum = np.cumsum(sorted_eic) / np.sum(sorted_eic) 162 rt_summ = sorted_rt[np.where(cum_sum < 0.5)] 163 if len(rt_summ) > 1: 164 d = np.std(rt_summ) 165 self._dispersity_index = d # minutes 166 self._normalized_dispersity_index = d / ( 167 time_subset[-1] - time_subset[0] 168 ) # unitless (fraction of total window used) 169 elif len(rt_summ) == 1: 170 self._dispersity_index = 0 171 self._normalized_dispersity_index = 0 172 173 def calc_fraction_height_width(self, fraction: float): 174 """ 175 Calculate the height width of the mass feature at a specfic fraction of the maximum intensity. 176 177 This function returns a tuple with the minimum and maximum half-height width based on scan resolution. 178 179 Parameters 180 ---------- 181 fraction : float 182 The fraction of the maximum intensity to calculate the height width. 183 For example, 0.5 will calculate the half-height width. 184 185 Returns 186 ------- 187 Tuple[float, float, bool] 188 The minimum and maximum half-height width based on scan resolution (in minutes), and a boolean indicating if the width was estimated. 189 """ 190 191 # Pull out the EIC data 192 eic = self._eic_data.eic_smoothed 193 194 # Find the indices of the maximum intensity on either side 195 max_index = np.where(self._eic_data.scans == self.apex_scan)[0][0] 196 left_index = max_index 197 right_index = max_index 198 while eic[left_index] > eic[max_index] * fraction and left_index > 0: 199 left_index -= 1 200 while ( 201 eic[right_index] > eic[max_index] * fraction and right_index < len(eic) - 1 202 ): 203 right_index += 1 204 205 # Get the retention times of the indexes just below the half height 206 left_rt = self._eic_data.time[left_index] 207 right_rt = self._eic_data.time[right_index] 208 209 # If left_rt and right_rt are outside the bounds of the integration, set them to the bounds and set estimated to True 210 estimated = False 211 if left_rt < self.eic_rt_list[0]: 212 left_rt = self.eic_rt_list[0] 213 left_index = np.where(self._eic_data.scans == self._eic_data.apexes[0][0])[ 214 0 215 ][0] 216 estimated = True 217 if right_rt > self.eic_rt_list[-1]: 218 right_rt = self.eic_rt_list[-1] 219 right_index = np.where( 220 self._eic_data.scans == self._eic_data.apexes[0][-1] 221 )[0][0] 222 estimated = True 223 half_height_width_max = right_rt - left_rt 224 225 # Get the retention times of the indexes just above the half height 226 left_rt = self._eic_data.time[left_index + 1] 227 right_rt = self._eic_data.time[right_index - 1] 228 half_height_width_min = right_rt - left_rt 229 230 return half_height_width_min, half_height_width_max, estimated 231 232 def calc_half_height_width(self, accept_estimated: bool = False): 233 """ 234 Calculate the half-height width of the mass feature. 235 236 This function calculates the half-height width of the mass feature and 237 stores the result in the `_half_height_width` attribute 238 239 Returns 240 ------- 241 None, stores the result in the `_half_height_width` attribute of the class. 242 """ 243 min_, max_, estimated = self.calc_fraction_height_width(0.5) 244 if not estimated or accept_estimated: 245 self._half_height_width = np.array([min_, max_]) 246 247 def calc_tailing_factor(self, accept_estimated: bool = False): 248 """ 249 Calculate the peak asymmetry of the mass feature. 250 251 This function calculates the peak asymmetry of the mass feature and 252 stores the result in the `_tailing_factor` attribute. 253 Calculations completed at 5% of the peak height in accordance with the USP tailing factor calculation. 254 255 Returns 256 ------- 257 None, stores the result in the `_tailing_factor` attribute of the class. 258 259 References 260 ---------- 261 1) JIS K0124:2011 General rules for high performance liquid chromatography 262 2) JIS K0214:2013 Technical terms for analytical chemistry 263 """ 264 # First calculate the width of the peak at 5% of the peak height 265 width_min, width_max, estimated = self.calc_fraction_height_width(0.05) 266 267 if not estimated or accept_estimated: 268 # Next calculate the width of the peak at 95% of the peak height 269 eic = self._eic_data.eic_smoothed 270 max_index = np.where(self._eic_data.scans == self.apex_scan)[0][0] 271 left_index = max_index 272 while eic[left_index] > eic[max_index] * 0.05 and left_index > 0: 273 left_index -= 1 274 275 left_half_time_min = ( 276 self._eic_data.time[max_index] - self._eic_data.time[left_index] 277 ) 278 left_half_time_max = ( 279 self._eic_data.time[max_index] - self._eic_data.time[left_index + 1] 280 ) 281 282 tailing_factor = np.mean([width_min, width_max]) / ( 283 2 * np.mean([left_half_time_min, left_half_time_max]) 284 ) 285 286 self._tailing_factor = tailing_factor 287 288 def calc_gaussian_similarity(self): 289 """ 290 Calculate the Gaussian similarity score of the mass feature. 291 292 This function fits a Gaussian curve to the EIC data and evaluates 293 the goodness of fit using R-squared. Note that this only uses data within 294 the set integration bounds of the mass feature. A score close to 1 indicates 295 the peak closely resembles an ideal Gaussian shape. 296 297 Returns 298 ------- 299 None, stores the result in the `_gaussian_similarity` attribute of the class. 300 301 Raises 302 ------ 303 ValueError 304 If the EIC data are not available. 305 """ 306 # Check if the EIC data is available 307 if self.eic_list is None: 308 raise ValueError( 309 "EIC data are not available. Please add the EIC data first." 310 ) 311 312 # Get EIC data within integration bounds 313 time_data = np.array(self.eic_rt_list) 314 intensity_data = np.array(self.eic_list) 315 316 if len(time_data) < 4: # Need minimum points for meaningful fit 317 self._gaussian_similarity = np.nan 318 return 319 320 # Check for valid intensity data 321 max_intensity = np.max(intensity_data) 322 if max_intensity == 0: 323 self._gaussian_similarity = np.nan 324 return 325 326 try: 327 # Define Gaussian function 328 def gaussian(x, amplitude, mean, stddev, baseline): 329 return ( 330 amplitude * np.exp(-((x - mean) ** 2) / (2 * stddev**2)) + baseline 331 ) 332 333 # Initial parameter estimates 334 amplitude_init = max_intensity 335 mean_init = time_data[np.argmax(intensity_data)] 336 stddev_init = (time_data[-1] - time_data[0]) / 6 # Rough estimate 337 baseline_init = np.min(intensity_data) 338 339 # Fit Gaussian curve 340 popt, _ = curve_fit( 341 gaussian, 342 time_data, 343 intensity_data, 344 p0=[amplitude_init, mean_init, stddev_init, baseline_init], 345 maxfev=1000, 346 bounds=( 347 [0, time_data[0], 0, 0], # Lower bounds 348 [np.inf, time_data[-1], np.inf, max_intensity], # Upper bounds 349 ), 350 ) 351 352 # Calculate fitted values 353 fitted_intensities = gaussian(time_data, *popt) 354 355 # Calculate R-squared (coefficient of determination) 356 ss_res = np.sum((intensity_data - fitted_intensities) ** 2) 357 ss_tot = np.sum((intensity_data - np.mean(intensity_data)) ** 2) 358 359 if ss_tot == 0: 360 self._gaussian_similarity = np.nan 361 else: 362 r_squared = 1 - (ss_res / ss_tot) 363 # R² should be between 0 and 1 for reasonable fits 364 # If negative, the model is worse than the mean - treat as non-computable 365 self._gaussian_similarity = r_squared if r_squared >= 0 else np.nan 366 367 except (RuntimeError, ValueError, TypeError): 368 # Fitting failed, assign NaN 369 self._gaussian_similarity = np.nan 370 371 def calc_noise_score(self): 372 """ 373 Calculate the noise score of the mass feature separately for left and right sides. 374 375 This function estimates the signal-to-noise ratio by comparing the peak 376 intensity to the baseline noise level in surrounding regions. It calculates 377 separate scores for the left and right sides of the peak, which are stored as a tuple 378 in the `_noise_score` attribute. The noise estimation windows are encapsulated in the 379 parent LCMS object (or, if not available, defaults to twice the peak width on each side). 380 381 382 Returns 383 ------- 384 None, stores the result in the `_noise_score` attribute as a tuple (left_score, right_score). 385 386 Raises 387 ------ 388 ValueError 389 If the EIC data are not available. 390 """ 391 # Check if the EIC data is available 392 if self.eic_list is None: 393 raise ValueError( 394 "EIC data are not available. Please add the EIC data first." 395 ) 396 397 # Check if LCMSMassFeature has a parent LCMS object with a window factor 398 if hasattr(self, "mass_spectrum_obj"): 399 noise_window_factor = ( 400 self.mass_spectrum_obj.parameters.lc_ms.noise_window_factor 401 ) 402 else: 403 noise_window_factor = 2.0 # times the peak width 404 405 # Get full EIC data (not just integration bounds) 406 full_time = self._eic_data.time 407 full_eic = self._eic_data.eic 408 409 # Get peak information 410 apex_rt = self.retention_time 411 peak_intensity = np.max(self.eic_list) 412 413 # Retrieve width from integration bounds 414 peak_width = self.eic_rt_list[-1] - self.eic_rt_list[0] 415 416 # Define noise estimation windows 417 noise_window_size = peak_width * noise_window_factor # in minutes 418 left_noise_start = apex_rt - peak_width - noise_window_size 419 left_noise_end = apex_rt - peak_width 420 right_noise_start = apex_rt + peak_width 421 right_noise_end = apex_rt + peak_width + noise_window_size 422 423 # Extract noise regions 424 left_noise_mask = (full_time >= left_noise_start) & ( 425 full_time <= left_noise_end 426 ) 427 right_noise_mask = (full_time >= right_noise_start) & ( 428 full_time <= right_noise_end 429 ) 430 431 left_noise = full_eic[left_noise_mask] 432 right_noise = full_eic[right_noise_mask] 433 434 # Calculate left noise score 435 if len(left_noise) == 0: 436 left_score = np.nan 437 else: 438 left_baseline = np.median(left_noise) 439 left_noise_std = np.std(left_noise) 440 441 if left_noise_std == 0: 442 if peak_intensity > left_baseline: 443 left_score = 1.0 444 else: 445 left_score = np.nan 446 else: 447 left_signal = peak_intensity - left_baseline 448 if left_signal <= 0: 449 left_score = 0.0 450 else: 451 left_snr = left_signal / left_noise_std 452 left_score = min(1.0, left_snr / (left_snr + 10.0)) 453 454 # Calculate right noise score 455 if len(right_noise) == 0: 456 right_score = np.nan 457 else: 458 right_baseline = np.median(right_noise) 459 right_noise_std = np.std(right_noise) 460 461 if right_noise_std == 0: 462 if peak_intensity > right_baseline: 463 right_score = 1.0 464 else: 465 right_score = np.nan 466 else: 467 right_signal = peak_intensity - right_baseline 468 if right_signal <= 0: 469 right_score = 0.0 470 else: 471 right_snr = right_signal / right_noise_std 472 right_score = min(1.0, right_snr / (right_snr + 10.0)) 473 474 # Store as tuple 475 self._noise_score = (left_score, right_score)
11class GCPeakCalculation(object): 12 """ 13 Class for performing peak calculations in GC chromatography. 14 15 Methods 16 ------- 17 * `calc_area(self, tic: List[float], dx: float) -> None`: Calculate the area under the curve of the chromatogram. 18 * `linear_ri(self, right_ri: float, left_ri: float, left_rt: float, right_rt: float) -> float`: Calculate the retention index using linear interpolation. 19 * `calc_ri(self, rt_ri_pairs: List[Tuple[float, float]]) -> int`: Calculate the retention index based on the given retention time - retention index pairs. 20 """ 21 22 def calc_area(self, tic: list[float], dx: float) -> None: 23 """ 24 Calculate the area under the curve of the chromatogram. 25 26 Parameters 27 ---------- 28 tic : List[float] 29 The total ion current (TIC) values. 30 dx : float 31 The spacing between data points. 32 """ 33 yy = tic[self.start_scan : self.final_scan] 34 self._area = np.trapz(yy, dx=dx) 35 36 def linear_ri( 37 self, right_ri: float, left_ri: float, left_rt: float, right_rt: float 38 ) -> float: 39 """ 40 Calculate the retention index using linear interpolation. 41 42 Parameters 43 ---------- 44 right_ri : float 45 The retention index at the right reference point. 46 left_ri : float 47 The retention index at the left reference point. 48 left_rt : float 49 The retention time at the left reference point. 50 right_rt : float 51 The retention time at the right reference point. 52 53 Returns 54 ------- 55 float 56 The calculated retention index. 57 """ 58 return left_ri + ( 59 (right_ri - left_ri) 60 * (self.retention_time - left_rt) 61 / (right_rt - left_rt) 62 ) 63 64 def calc_ri(self, rt_ri_pairs: list[tuple[float, float]]) -> None: 65 """ 66 Calculate the retention index based on the given retention time - retention index pairs. 67 68 Parameters 69 ---------- 70 rt_ri_pairs : List[Tuple[float, float]] 71 The list of retention time - retention index pairs. 72 73 """ 74 current_rt = self.retention_time 75 76 rts = [rt_ri[0] for rt_ri in rt_ri_pairs] 77 index = bisect_left(rts, current_rt) 78 79 if index >= len(rt_ri_pairs): 80 index -= 1 81 82 current_ref = rt_ri_pairs[index] 83 84 if current_rt == current_ref[0]: 85 self._ri = current_ref[1] 86 87 else: 88 if index == 0: 89 index += 1 90 91 left_rt = rt_ri_pairs[index - 1][0] 92 left_ri = rt_ri_pairs[index - 1][1] 93 94 right_rt = rt_ri_pairs[index][0] 95 right_ri = rt_ri_pairs[index][1] 96 97 self._ri = self.linear_ri(right_ri, left_ri, left_rt, right_rt)
Class for performing peak calculations in GC chromatography.
Methods
calc_area(self, tic: List[float], dx: float) -> None: Calculate the area under the curve of the chromatogram.linear_ri(self, right_ri: float, left_ri: float, left_rt: float, right_rt: float) -> float: Calculate the retention index using linear interpolation.calc_ri(self, rt_ri_pairs: List[Tuple[float, float]]) -> int: Calculate the retention index based on the given retention time - retention index pairs.
22 def calc_area(self, tic: list[float], dx: float) -> None: 23 """ 24 Calculate the area under the curve of the chromatogram. 25 26 Parameters 27 ---------- 28 tic : List[float] 29 The total ion current (TIC) values. 30 dx : float 31 The spacing between data points. 32 """ 33 yy = tic[self.start_scan : self.final_scan] 34 self._area = np.trapz(yy, dx=dx)
Calculate the area under the curve of the chromatogram.
Parameters
- tic (List[float]): The total ion current (TIC) values.
- dx (float): The spacing between data points.
36 def linear_ri( 37 self, right_ri: float, left_ri: float, left_rt: float, right_rt: float 38 ) -> float: 39 """ 40 Calculate the retention index using linear interpolation. 41 42 Parameters 43 ---------- 44 right_ri : float 45 The retention index at the right reference point. 46 left_ri : float 47 The retention index at the left reference point. 48 left_rt : float 49 The retention time at the left reference point. 50 right_rt : float 51 The retention time at the right reference point. 52 53 Returns 54 ------- 55 float 56 The calculated retention index. 57 """ 58 return left_ri + ( 59 (right_ri - left_ri) 60 * (self.retention_time - left_rt) 61 / (right_rt - left_rt) 62 )
Calculate the retention index using linear interpolation.
Parameters
- right_ri (float): The retention index at the right reference point.
- left_ri (float): The retention index at the left reference point.
- left_rt (float): The retention time at the left reference point.
- right_rt (float): The retention time at the right reference point.
Returns
- float: The calculated retention index.
64 def calc_ri(self, rt_ri_pairs: list[tuple[float, float]]) -> None: 65 """ 66 Calculate the retention index based on the given retention time - retention index pairs. 67 68 Parameters 69 ---------- 70 rt_ri_pairs : List[Tuple[float, float]] 71 The list of retention time - retention index pairs. 72 73 """ 74 current_rt = self.retention_time 75 76 rts = [rt_ri[0] for rt_ri in rt_ri_pairs] 77 index = bisect_left(rts, current_rt) 78 79 if index >= len(rt_ri_pairs): 80 index -= 1 81 82 current_ref = rt_ri_pairs[index] 83 84 if current_rt == current_ref[0]: 85 self._ri = current_ref[1] 86 87 else: 88 if index == 0: 89 index += 1 90 91 left_rt = rt_ri_pairs[index - 1][0] 92 left_ri = rt_ri_pairs[index - 1][1] 93 94 right_rt = rt_ri_pairs[index][0] 95 right_ri = rt_ri_pairs[index][1] 96 97 self._ri = self.linear_ri(right_ri, left_ri, left_rt, right_rt)
Calculate the retention index based on the given retention time - retention index pairs.
Parameters
- rt_ri_pairs (List[Tuple[float, float]]): The list of retention time - retention index pairs.
100class LCMSMassFeatureCalculation: 101 """Class for performing peak calculations in LC-MS mass spectrometry. 102 103 This class is intended to be used as a mixin class for the LCMSMassFeature class. 104 """ 105 106 def calc_dispersity_index(self): 107 """ 108 Calculate the dispersity index of the mass feature. 109 110 This function calculates the dispersity index of the mass feature and 111 stores the result in the `_dispersity_index` attribute. The dispersity index is calculated as the standard 112 deviation of the retention times that account for 50% of the cummulative intensity, starting from the most 113 intense point, as described in [1]. Note that this calculation is done within the integration bounds with 114 a pad according to the window factor, where the window factor is parameterized and encapsulated in the 115 parent LCMS object (or, if not available, defaults to 2.0 minutes before and after the apex 116 117 Returns 118 ------- 119 None, stores the result in the `_dispersity_index` attribute of the class and the `_normalized_dispersity_index` attribute, 120 which is the dispersity index normalized to the total time window used for the calculation (unitless, fraction of total window). 121 122 Raises 123 ------ 124 ValueError 125 If the EIC data are not available. 126 127 References 128 ---------- 129 1) Boiteau, Rene M., et al. "Relating Molecular Properties to the Persistence of Marine Dissolved 130 Organic Matter with Liquid Chromatography–Ultrahigh-Resolution Mass Spectrometry." 131 Environmental Science & Technology 58.7 (2024): 3267-3277. 132 """ 133 # Check if LCMSMassFeature has a parent LCMS object with a window factor 134 if hasattr(self, "mass_spectrum_obj"): 135 window_min = self.mass_spectrum_obj.parameters.lc_ms.dispersity_index_window 136 else: 137 window_min = 3.0 # minutes 138 139 # Check if the EIC data is available 140 if self.eic_list is None: 141 raise ValueError( 142 "EIC data are not available. Please add the EIC data first." 143 ) 144 145 # Define start and end of the window around the apex 146 apex_rt = self.retention_time 147 full_time = self._eic_data.time 148 full_eic = self._eic_data.eic 149 left_start = apex_rt - window_min 150 right_end = apex_rt + window_min 151 152 # Extract the EIC data within the defined window 153 time_mask = (full_time >= left_start) & (full_time <= right_end) 154 eic_subset = full_eic[time_mask] 155 time_subset = full_time[time_mask] 156 157 # Sort the EIC data and RT data by descending intensity 158 sorted_eic = eic_subset[eic_subset.argsort()[::-1]] 159 sorted_rt = time_subset[eic_subset.argsort()[::-1]] 160 161 # Calculate the dispersity index 162 cum_sum = np.cumsum(sorted_eic) / np.sum(sorted_eic) 163 rt_summ = sorted_rt[np.where(cum_sum < 0.5)] 164 if len(rt_summ) > 1: 165 d = np.std(rt_summ) 166 self._dispersity_index = d # minutes 167 self._normalized_dispersity_index = d / ( 168 time_subset[-1] - time_subset[0] 169 ) # unitless (fraction of total window used) 170 elif len(rt_summ) == 1: 171 self._dispersity_index = 0 172 self._normalized_dispersity_index = 0 173 174 def calc_fraction_height_width(self, fraction: float): 175 """ 176 Calculate the height width of the mass feature at a specfic fraction of the maximum intensity. 177 178 This function returns a tuple with the minimum and maximum half-height width based on scan resolution. 179 180 Parameters 181 ---------- 182 fraction : float 183 The fraction of the maximum intensity to calculate the height width. 184 For example, 0.5 will calculate the half-height width. 185 186 Returns 187 ------- 188 Tuple[float, float, bool] 189 The minimum and maximum half-height width based on scan resolution (in minutes), and a boolean indicating if the width was estimated. 190 """ 191 192 # Pull out the EIC data 193 eic = self._eic_data.eic_smoothed 194 195 # Find the indices of the maximum intensity on either side 196 max_index = np.where(self._eic_data.scans == self.apex_scan)[0][0] 197 left_index = max_index 198 right_index = max_index 199 while eic[left_index] > eic[max_index] * fraction and left_index > 0: 200 left_index -= 1 201 while ( 202 eic[right_index] > eic[max_index] * fraction and right_index < len(eic) - 1 203 ): 204 right_index += 1 205 206 # Get the retention times of the indexes just below the half height 207 left_rt = self._eic_data.time[left_index] 208 right_rt = self._eic_data.time[right_index] 209 210 # If left_rt and right_rt are outside the bounds of the integration, set them to the bounds and set estimated to True 211 estimated = False 212 if left_rt < self.eic_rt_list[0]: 213 left_rt = self.eic_rt_list[0] 214 left_index = np.where(self._eic_data.scans == self._eic_data.apexes[0][0])[ 215 0 216 ][0] 217 estimated = True 218 if right_rt > self.eic_rt_list[-1]: 219 right_rt = self.eic_rt_list[-1] 220 right_index = np.where( 221 self._eic_data.scans == self._eic_data.apexes[0][-1] 222 )[0][0] 223 estimated = True 224 half_height_width_max = right_rt - left_rt 225 226 # Get the retention times of the indexes just above the half height 227 left_rt = self._eic_data.time[left_index + 1] 228 right_rt = self._eic_data.time[right_index - 1] 229 half_height_width_min = right_rt - left_rt 230 231 return half_height_width_min, half_height_width_max, estimated 232 233 def calc_half_height_width(self, accept_estimated: bool = False): 234 """ 235 Calculate the half-height width of the mass feature. 236 237 This function calculates the half-height width of the mass feature and 238 stores the result in the `_half_height_width` attribute 239 240 Returns 241 ------- 242 None, stores the result in the `_half_height_width` attribute of the class. 243 """ 244 min_, max_, estimated = self.calc_fraction_height_width(0.5) 245 if not estimated or accept_estimated: 246 self._half_height_width = np.array([min_, max_]) 247 248 def calc_tailing_factor(self, accept_estimated: bool = False): 249 """ 250 Calculate the peak asymmetry of the mass feature. 251 252 This function calculates the peak asymmetry of the mass feature and 253 stores the result in the `_tailing_factor` attribute. 254 Calculations completed at 5% of the peak height in accordance with the USP tailing factor calculation. 255 256 Returns 257 ------- 258 None, stores the result in the `_tailing_factor` attribute of the class. 259 260 References 261 ---------- 262 1) JIS K0124:2011 General rules for high performance liquid chromatography 263 2) JIS K0214:2013 Technical terms for analytical chemistry 264 """ 265 # First calculate the width of the peak at 5% of the peak height 266 width_min, width_max, estimated = self.calc_fraction_height_width(0.05) 267 268 if not estimated or accept_estimated: 269 # Next calculate the width of the peak at 95% of the peak height 270 eic = self._eic_data.eic_smoothed 271 max_index = np.where(self._eic_data.scans == self.apex_scan)[0][0] 272 left_index = max_index 273 while eic[left_index] > eic[max_index] * 0.05 and left_index > 0: 274 left_index -= 1 275 276 left_half_time_min = ( 277 self._eic_data.time[max_index] - self._eic_data.time[left_index] 278 ) 279 left_half_time_max = ( 280 self._eic_data.time[max_index] - self._eic_data.time[left_index + 1] 281 ) 282 283 tailing_factor = np.mean([width_min, width_max]) / ( 284 2 * np.mean([left_half_time_min, left_half_time_max]) 285 ) 286 287 self._tailing_factor = tailing_factor 288 289 def calc_gaussian_similarity(self): 290 """ 291 Calculate the Gaussian similarity score of the mass feature. 292 293 This function fits a Gaussian curve to the EIC data and evaluates 294 the goodness of fit using R-squared. Note that this only uses data within 295 the set integration bounds of the mass feature. A score close to 1 indicates 296 the peak closely resembles an ideal Gaussian shape. 297 298 Returns 299 ------- 300 None, stores the result in the `_gaussian_similarity` attribute of the class. 301 302 Raises 303 ------ 304 ValueError 305 If the EIC data are not available. 306 """ 307 # Check if the EIC data is available 308 if self.eic_list is None: 309 raise ValueError( 310 "EIC data are not available. Please add the EIC data first." 311 ) 312 313 # Get EIC data within integration bounds 314 time_data = np.array(self.eic_rt_list) 315 intensity_data = np.array(self.eic_list) 316 317 if len(time_data) < 4: # Need minimum points for meaningful fit 318 self._gaussian_similarity = np.nan 319 return 320 321 # Check for valid intensity data 322 max_intensity = np.max(intensity_data) 323 if max_intensity == 0: 324 self._gaussian_similarity = np.nan 325 return 326 327 try: 328 # Define Gaussian function 329 def gaussian(x, amplitude, mean, stddev, baseline): 330 return ( 331 amplitude * np.exp(-((x - mean) ** 2) / (2 * stddev**2)) + baseline 332 ) 333 334 # Initial parameter estimates 335 amplitude_init = max_intensity 336 mean_init = time_data[np.argmax(intensity_data)] 337 stddev_init = (time_data[-1] - time_data[0]) / 6 # Rough estimate 338 baseline_init = np.min(intensity_data) 339 340 # Fit Gaussian curve 341 popt, _ = curve_fit( 342 gaussian, 343 time_data, 344 intensity_data, 345 p0=[amplitude_init, mean_init, stddev_init, baseline_init], 346 maxfev=1000, 347 bounds=( 348 [0, time_data[0], 0, 0], # Lower bounds 349 [np.inf, time_data[-1], np.inf, max_intensity], # Upper bounds 350 ), 351 ) 352 353 # Calculate fitted values 354 fitted_intensities = gaussian(time_data, *popt) 355 356 # Calculate R-squared (coefficient of determination) 357 ss_res = np.sum((intensity_data - fitted_intensities) ** 2) 358 ss_tot = np.sum((intensity_data - np.mean(intensity_data)) ** 2) 359 360 if ss_tot == 0: 361 self._gaussian_similarity = np.nan 362 else: 363 r_squared = 1 - (ss_res / ss_tot) 364 # R² should be between 0 and 1 for reasonable fits 365 # If negative, the model is worse than the mean - treat as non-computable 366 self._gaussian_similarity = r_squared if r_squared >= 0 else np.nan 367 368 except (RuntimeError, ValueError, TypeError): 369 # Fitting failed, assign NaN 370 self._gaussian_similarity = np.nan 371 372 def calc_noise_score(self): 373 """ 374 Calculate the noise score of the mass feature separately for left and right sides. 375 376 This function estimates the signal-to-noise ratio by comparing the peak 377 intensity to the baseline noise level in surrounding regions. It calculates 378 separate scores for the left and right sides of the peak, which are stored as a tuple 379 in the `_noise_score` attribute. The noise estimation windows are encapsulated in the 380 parent LCMS object (or, if not available, defaults to twice the peak width on each side). 381 382 383 Returns 384 ------- 385 None, stores the result in the `_noise_score` attribute as a tuple (left_score, right_score). 386 387 Raises 388 ------ 389 ValueError 390 If the EIC data are not available. 391 """ 392 # Check if the EIC data is available 393 if self.eic_list is None: 394 raise ValueError( 395 "EIC data are not available. Please add the EIC data first." 396 ) 397 398 # Check if LCMSMassFeature has a parent LCMS object with a window factor 399 if hasattr(self, "mass_spectrum_obj"): 400 noise_window_factor = ( 401 self.mass_spectrum_obj.parameters.lc_ms.noise_window_factor 402 ) 403 else: 404 noise_window_factor = 2.0 # times the peak width 405 406 # Get full EIC data (not just integration bounds) 407 full_time = self._eic_data.time 408 full_eic = self._eic_data.eic 409 410 # Get peak information 411 apex_rt = self.retention_time 412 peak_intensity = np.max(self.eic_list) 413 414 # Retrieve width from integration bounds 415 peak_width = self.eic_rt_list[-1] - self.eic_rt_list[0] 416 417 # Define noise estimation windows 418 noise_window_size = peak_width * noise_window_factor # in minutes 419 left_noise_start = apex_rt - peak_width - noise_window_size 420 left_noise_end = apex_rt - peak_width 421 right_noise_start = apex_rt + peak_width 422 right_noise_end = apex_rt + peak_width + noise_window_size 423 424 # Extract noise regions 425 left_noise_mask = (full_time >= left_noise_start) & ( 426 full_time <= left_noise_end 427 ) 428 right_noise_mask = (full_time >= right_noise_start) & ( 429 full_time <= right_noise_end 430 ) 431 432 left_noise = full_eic[left_noise_mask] 433 right_noise = full_eic[right_noise_mask] 434 435 # Calculate left noise score 436 if len(left_noise) == 0: 437 left_score = np.nan 438 else: 439 left_baseline = np.median(left_noise) 440 left_noise_std = np.std(left_noise) 441 442 if left_noise_std == 0: 443 if peak_intensity > left_baseline: 444 left_score = 1.0 445 else: 446 left_score = np.nan 447 else: 448 left_signal = peak_intensity - left_baseline 449 if left_signal <= 0: 450 left_score = 0.0 451 else: 452 left_snr = left_signal / left_noise_std 453 left_score = min(1.0, left_snr / (left_snr + 10.0)) 454 455 # Calculate right noise score 456 if len(right_noise) == 0: 457 right_score = np.nan 458 else: 459 right_baseline = np.median(right_noise) 460 right_noise_std = np.std(right_noise) 461 462 if right_noise_std == 0: 463 if peak_intensity > right_baseline: 464 right_score = 1.0 465 else: 466 right_score = np.nan 467 else: 468 right_signal = peak_intensity - right_baseline 469 if right_signal <= 0: 470 right_score = 0.0 471 else: 472 right_snr = right_signal / right_noise_std 473 right_score = min(1.0, right_snr / (right_snr + 10.0)) 474 475 # Store as tuple 476 self._noise_score = (left_score, right_score)
Class for performing peak calculations in LC-MS mass spectrometry.
This class is intended to be used as a mixin class for the LCMSMassFeature class.
106 def calc_dispersity_index(self): 107 """ 108 Calculate the dispersity index of the mass feature. 109 110 This function calculates the dispersity index of the mass feature and 111 stores the result in the `_dispersity_index` attribute. The dispersity index is calculated as the standard 112 deviation of the retention times that account for 50% of the cummulative intensity, starting from the most 113 intense point, as described in [1]. Note that this calculation is done within the integration bounds with 114 a pad according to the window factor, where the window factor is parameterized and encapsulated in the 115 parent LCMS object (or, if not available, defaults to 2.0 minutes before and after the apex 116 117 Returns 118 ------- 119 None, stores the result in the `_dispersity_index` attribute of the class and the `_normalized_dispersity_index` attribute, 120 which is the dispersity index normalized to the total time window used for the calculation (unitless, fraction of total window). 121 122 Raises 123 ------ 124 ValueError 125 If the EIC data are not available. 126 127 References 128 ---------- 129 1) Boiteau, Rene M., et al. "Relating Molecular Properties to the Persistence of Marine Dissolved 130 Organic Matter with Liquid Chromatography–Ultrahigh-Resolution Mass Spectrometry." 131 Environmental Science & Technology 58.7 (2024): 3267-3277. 132 """ 133 # Check if LCMSMassFeature has a parent LCMS object with a window factor 134 if hasattr(self, "mass_spectrum_obj"): 135 window_min = self.mass_spectrum_obj.parameters.lc_ms.dispersity_index_window 136 else: 137 window_min = 3.0 # minutes 138 139 # Check if the EIC data is available 140 if self.eic_list is None: 141 raise ValueError( 142 "EIC data are not available. Please add the EIC data first." 143 ) 144 145 # Define start and end of the window around the apex 146 apex_rt = self.retention_time 147 full_time = self._eic_data.time 148 full_eic = self._eic_data.eic 149 left_start = apex_rt - window_min 150 right_end = apex_rt + window_min 151 152 # Extract the EIC data within the defined window 153 time_mask = (full_time >= left_start) & (full_time <= right_end) 154 eic_subset = full_eic[time_mask] 155 time_subset = full_time[time_mask] 156 157 # Sort the EIC data and RT data by descending intensity 158 sorted_eic = eic_subset[eic_subset.argsort()[::-1]] 159 sorted_rt = time_subset[eic_subset.argsort()[::-1]] 160 161 # Calculate the dispersity index 162 cum_sum = np.cumsum(sorted_eic) / np.sum(sorted_eic) 163 rt_summ = sorted_rt[np.where(cum_sum < 0.5)] 164 if len(rt_summ) > 1: 165 d = np.std(rt_summ) 166 self._dispersity_index = d # minutes 167 self._normalized_dispersity_index = d / ( 168 time_subset[-1] - time_subset[0] 169 ) # unitless (fraction of total window used) 170 elif len(rt_summ) == 1: 171 self._dispersity_index = 0 172 self._normalized_dispersity_index = 0
Calculate the dispersity index of the mass feature.
This function calculates the dispersity index of the mass feature and
stores the result in the _dispersity_index attribute. The dispersity index is calculated as the standard
deviation of the retention times that account for 50% of the cummulative intensity, starting from the most
intense point, as described in [1]. Note that this calculation is done within the integration bounds with
a pad according to the window factor, where the window factor is parameterized and encapsulated in the
parent LCMS object (or, if not available, defaults to 2.0 minutes before and after the apex
Returns
- None, stores the result in the
_dispersity_indexattribute of the class and the_normalized_dispersity_indexattribute, - which is the dispersity index normalized to the total time window used for the calculation (unitless, fraction of total window).
Raises
- ValueError: If the EIC data are not available.
References
1) Boiteau, Rene M., et al. "Relating Molecular Properties to the Persistence of Marine Dissolved Organic Matter with Liquid Chromatography–Ultrahigh-Resolution Mass Spectrometry." Environmental Science & Technology 58.7 (2024): 3267-3277.
174 def calc_fraction_height_width(self, fraction: float): 175 """ 176 Calculate the height width of the mass feature at a specfic fraction of the maximum intensity. 177 178 This function returns a tuple with the minimum and maximum half-height width based on scan resolution. 179 180 Parameters 181 ---------- 182 fraction : float 183 The fraction of the maximum intensity to calculate the height width. 184 For example, 0.5 will calculate the half-height width. 185 186 Returns 187 ------- 188 Tuple[float, float, bool] 189 The minimum and maximum half-height width based on scan resolution (in minutes), and a boolean indicating if the width was estimated. 190 """ 191 192 # Pull out the EIC data 193 eic = self._eic_data.eic_smoothed 194 195 # Find the indices of the maximum intensity on either side 196 max_index = np.where(self._eic_data.scans == self.apex_scan)[0][0] 197 left_index = max_index 198 right_index = max_index 199 while eic[left_index] > eic[max_index] * fraction and left_index > 0: 200 left_index -= 1 201 while ( 202 eic[right_index] > eic[max_index] * fraction and right_index < len(eic) - 1 203 ): 204 right_index += 1 205 206 # Get the retention times of the indexes just below the half height 207 left_rt = self._eic_data.time[left_index] 208 right_rt = self._eic_data.time[right_index] 209 210 # If left_rt and right_rt are outside the bounds of the integration, set them to the bounds and set estimated to True 211 estimated = False 212 if left_rt < self.eic_rt_list[0]: 213 left_rt = self.eic_rt_list[0] 214 left_index = np.where(self._eic_data.scans == self._eic_data.apexes[0][0])[ 215 0 216 ][0] 217 estimated = True 218 if right_rt > self.eic_rt_list[-1]: 219 right_rt = self.eic_rt_list[-1] 220 right_index = np.where( 221 self._eic_data.scans == self._eic_data.apexes[0][-1] 222 )[0][0] 223 estimated = True 224 half_height_width_max = right_rt - left_rt 225 226 # Get the retention times of the indexes just above the half height 227 left_rt = self._eic_data.time[left_index + 1] 228 right_rt = self._eic_data.time[right_index - 1] 229 half_height_width_min = right_rt - left_rt 230 231 return half_height_width_min, half_height_width_max, estimated
Calculate the height width of the mass feature at a specfic fraction of the maximum intensity.
This function returns a tuple with the minimum and maximum half-height width based on scan resolution.
Parameters
- fraction (float): The fraction of the maximum intensity to calculate the height width. For example, 0.5 will calculate the half-height width.
Returns
- Tuple[float, float, bool]: The minimum and maximum half-height width based on scan resolution (in minutes), and a boolean indicating if the width was estimated.
233 def calc_half_height_width(self, accept_estimated: bool = False): 234 """ 235 Calculate the half-height width of the mass feature. 236 237 This function calculates the half-height width of the mass feature and 238 stores the result in the `_half_height_width` attribute 239 240 Returns 241 ------- 242 None, stores the result in the `_half_height_width` attribute of the class. 243 """ 244 min_, max_, estimated = self.calc_fraction_height_width(0.5) 245 if not estimated or accept_estimated: 246 self._half_height_width = np.array([min_, max_])
Calculate the half-height width of the mass feature.
This function calculates the half-height width of the mass feature and
stores the result in the _half_height_width attribute
Returns
- None, stores the result in the
_half_height_widthattribute of the class.
248 def calc_tailing_factor(self, accept_estimated: bool = False): 249 """ 250 Calculate the peak asymmetry of the mass feature. 251 252 This function calculates the peak asymmetry of the mass feature and 253 stores the result in the `_tailing_factor` attribute. 254 Calculations completed at 5% of the peak height in accordance with the USP tailing factor calculation. 255 256 Returns 257 ------- 258 None, stores the result in the `_tailing_factor` attribute of the class. 259 260 References 261 ---------- 262 1) JIS K0124:2011 General rules for high performance liquid chromatography 263 2) JIS K0214:2013 Technical terms for analytical chemistry 264 """ 265 # First calculate the width of the peak at 5% of the peak height 266 width_min, width_max, estimated = self.calc_fraction_height_width(0.05) 267 268 if not estimated or accept_estimated: 269 # Next calculate the width of the peak at 95% of the peak height 270 eic = self._eic_data.eic_smoothed 271 max_index = np.where(self._eic_data.scans == self.apex_scan)[0][0] 272 left_index = max_index 273 while eic[left_index] > eic[max_index] * 0.05 and left_index > 0: 274 left_index -= 1 275 276 left_half_time_min = ( 277 self._eic_data.time[max_index] - self._eic_data.time[left_index] 278 ) 279 left_half_time_max = ( 280 self._eic_data.time[max_index] - self._eic_data.time[left_index + 1] 281 ) 282 283 tailing_factor = np.mean([width_min, width_max]) / ( 284 2 * np.mean([left_half_time_min, left_half_time_max]) 285 ) 286 287 self._tailing_factor = tailing_factor
Calculate the peak asymmetry of the mass feature.
This function calculates the peak asymmetry of the mass feature and
stores the result in the _tailing_factor attribute.
Calculations completed at 5% of the peak height in accordance with the USP tailing factor calculation.
Returns
- None, stores the result in the
_tailing_factorattribute of the class.
References
1) JIS K0124:2011 General rules for high performance liquid chromatography 2) JIS K0214:2013 Technical terms for analytical chemistry
289 def calc_gaussian_similarity(self): 290 """ 291 Calculate the Gaussian similarity score of the mass feature. 292 293 This function fits a Gaussian curve to the EIC data and evaluates 294 the goodness of fit using R-squared. Note that this only uses data within 295 the set integration bounds of the mass feature. A score close to 1 indicates 296 the peak closely resembles an ideal Gaussian shape. 297 298 Returns 299 ------- 300 None, stores the result in the `_gaussian_similarity` attribute of the class. 301 302 Raises 303 ------ 304 ValueError 305 If the EIC data are not available. 306 """ 307 # Check if the EIC data is available 308 if self.eic_list is None: 309 raise ValueError( 310 "EIC data are not available. Please add the EIC data first." 311 ) 312 313 # Get EIC data within integration bounds 314 time_data = np.array(self.eic_rt_list) 315 intensity_data = np.array(self.eic_list) 316 317 if len(time_data) < 4: # Need minimum points for meaningful fit 318 self._gaussian_similarity = np.nan 319 return 320 321 # Check for valid intensity data 322 max_intensity = np.max(intensity_data) 323 if max_intensity == 0: 324 self._gaussian_similarity = np.nan 325 return 326 327 try: 328 # Define Gaussian function 329 def gaussian(x, amplitude, mean, stddev, baseline): 330 return ( 331 amplitude * np.exp(-((x - mean) ** 2) / (2 * stddev**2)) + baseline 332 ) 333 334 # Initial parameter estimates 335 amplitude_init = max_intensity 336 mean_init = time_data[np.argmax(intensity_data)] 337 stddev_init = (time_data[-1] - time_data[0]) / 6 # Rough estimate 338 baseline_init = np.min(intensity_data) 339 340 # Fit Gaussian curve 341 popt, _ = curve_fit( 342 gaussian, 343 time_data, 344 intensity_data, 345 p0=[amplitude_init, mean_init, stddev_init, baseline_init], 346 maxfev=1000, 347 bounds=( 348 [0, time_data[0], 0, 0], # Lower bounds 349 [np.inf, time_data[-1], np.inf, max_intensity], # Upper bounds 350 ), 351 ) 352 353 # Calculate fitted values 354 fitted_intensities = gaussian(time_data, *popt) 355 356 # Calculate R-squared (coefficient of determination) 357 ss_res = np.sum((intensity_data - fitted_intensities) ** 2) 358 ss_tot = np.sum((intensity_data - np.mean(intensity_data)) ** 2) 359 360 if ss_tot == 0: 361 self._gaussian_similarity = np.nan 362 else: 363 r_squared = 1 - (ss_res / ss_tot) 364 # R² should be between 0 and 1 for reasonable fits 365 # If negative, the model is worse than the mean - treat as non-computable 366 self._gaussian_similarity = r_squared if r_squared >= 0 else np.nan 367 368 except (RuntimeError, ValueError, TypeError): 369 # Fitting failed, assign NaN 370 self._gaussian_similarity = np.nan
Calculate the Gaussian similarity score of the mass feature.
This function fits a Gaussian curve to the EIC data and evaluates the goodness of fit using R-squared. Note that this only uses data within the set integration bounds of the mass feature. A score close to 1 indicates the peak closely resembles an ideal Gaussian shape.
Returns
- None, stores the result in the
_gaussian_similarityattribute of the class.
Raises
- ValueError: If the EIC data are not available.
372 def calc_noise_score(self): 373 """ 374 Calculate the noise score of the mass feature separately for left and right sides. 375 376 This function estimates the signal-to-noise ratio by comparing the peak 377 intensity to the baseline noise level in surrounding regions. It calculates 378 separate scores for the left and right sides of the peak, which are stored as a tuple 379 in the `_noise_score` attribute. The noise estimation windows are encapsulated in the 380 parent LCMS object (or, if not available, defaults to twice the peak width on each side). 381 382 383 Returns 384 ------- 385 None, stores the result in the `_noise_score` attribute as a tuple (left_score, right_score). 386 387 Raises 388 ------ 389 ValueError 390 If the EIC data are not available. 391 """ 392 # Check if the EIC data is available 393 if self.eic_list is None: 394 raise ValueError( 395 "EIC data are not available. Please add the EIC data first." 396 ) 397 398 # Check if LCMSMassFeature has a parent LCMS object with a window factor 399 if hasattr(self, "mass_spectrum_obj"): 400 noise_window_factor = ( 401 self.mass_spectrum_obj.parameters.lc_ms.noise_window_factor 402 ) 403 else: 404 noise_window_factor = 2.0 # times the peak width 405 406 # Get full EIC data (not just integration bounds) 407 full_time = self._eic_data.time 408 full_eic = self._eic_data.eic 409 410 # Get peak information 411 apex_rt = self.retention_time 412 peak_intensity = np.max(self.eic_list) 413 414 # Retrieve width from integration bounds 415 peak_width = self.eic_rt_list[-1] - self.eic_rt_list[0] 416 417 # Define noise estimation windows 418 noise_window_size = peak_width * noise_window_factor # in minutes 419 left_noise_start = apex_rt - peak_width - noise_window_size 420 left_noise_end = apex_rt - peak_width 421 right_noise_start = apex_rt + peak_width 422 right_noise_end = apex_rt + peak_width + noise_window_size 423 424 # Extract noise regions 425 left_noise_mask = (full_time >= left_noise_start) & ( 426 full_time <= left_noise_end 427 ) 428 right_noise_mask = (full_time >= right_noise_start) & ( 429 full_time <= right_noise_end 430 ) 431 432 left_noise = full_eic[left_noise_mask] 433 right_noise = full_eic[right_noise_mask] 434 435 # Calculate left noise score 436 if len(left_noise) == 0: 437 left_score = np.nan 438 else: 439 left_baseline = np.median(left_noise) 440 left_noise_std = np.std(left_noise) 441 442 if left_noise_std == 0: 443 if peak_intensity > left_baseline: 444 left_score = 1.0 445 else: 446 left_score = np.nan 447 else: 448 left_signal = peak_intensity - left_baseline 449 if left_signal <= 0: 450 left_score = 0.0 451 else: 452 left_snr = left_signal / left_noise_std 453 left_score = min(1.0, left_snr / (left_snr + 10.0)) 454 455 # Calculate right noise score 456 if len(right_noise) == 0: 457 right_score = np.nan 458 else: 459 right_baseline = np.median(right_noise) 460 right_noise_std = np.std(right_noise) 461 462 if right_noise_std == 0: 463 if peak_intensity > right_baseline: 464 right_score = 1.0 465 else: 466 right_score = np.nan 467 else: 468 right_signal = peak_intensity - right_baseline 469 if right_signal <= 0: 470 right_score = 0.0 471 else: 472 right_snr = right_signal / right_noise_std 473 right_score = min(1.0, right_snr / (right_snr + 10.0)) 474 475 # Store as tuple 476 self._noise_score = (left_score, right_score)
Calculate the noise score of the mass feature separately for left and right sides.
This function estimates the signal-to-noise ratio by comparing the peak
intensity to the baseline noise level in surrounding regions. It calculates
separate scores for the left and right sides of the peak, which are stored as a tuple
in the _noise_score attribute. The noise estimation windows are encapsulated in the
parent LCMS object (or, if not available, defaults to twice the peak width on each side).
Returns
- None, stores the result in the
_noise_scoreattribute as a tuple (left_score, right_score).
Raises
- ValueError: If the EIC data are not available.