corems.mass_spectrum.calc.Calibration
Created on Wed May 13 02:16:09 2020
@author: Will Kew
1# -*- coding: utf-8 -*- 2""" 3Created on Wed May 13 02:16:09 2020 4 5@author: Will Kew 6""" 7 8# import modules 9import csv 10import warnings 11from io import BytesIO, StringIO 12from pathlib import Path 13 14import numpy as np 15import pandas as pd 16from s3path import S3Path 17 18# import scipy modules for calibration 19from scipy.optimize import minimize 20 21 22class MzDomainCalibration: 23 """MzDomainCalibration class for recalibrating mass spectra 24 25 Parameters 26 ---------- 27 mass_spectrum : CoreMS MassSpectrum Object 28 The mass spectrum to be calibrated. 29 ref_masslist : str 30 The path to a reference mass list. 31 mzsegment : tuple of floats, optional 32 The mz range to recalibrate, or None. Used for calibration of specific parts of the mz domain at a time. 33 Future work - allow multiple mzsegments to be passed. 34 35 Attributes 36 ---------- 37 mass_spectrum : CoreMS MassSpectrum Object 38 The mass spectrum to be calibrated. 39 mzsegment : tuple of floats or None 40 The mz range to recalibrate, or None. 41 ref_mass_list_path : str or Path 42 The path to the reference mass list. 43 44 Methods 45 ------- 46 * run(). 47 Main function to run this class. 48 * load_ref_mass_list(). 49 Load reference mass list (Bruker format). 50 * gen_ref_mass_list_from_assigned(min_conf=0.7). 51 Generate reference mass list from assigned masses. 52 * find_calibration_points(df_ref, calib_ppm_error_threshold=(-1, 1), calib_snr_threshold=5). 53 Find calibration points in the mass spectrum based on the reference mass list. 54 * robust_calib(param, cal_peaks_mz, cal_refs_mz, order=1). 55 Recalibration function. 56 * recalibrate_mass_spectrum(cal_peaks_mz, cal_refs_mz, order=1, diagnostic=False). 57 Main recalibration function which uses a robust linear regression. 58 59 60 """ 61 62 def __init__(self, mass_spectrum, ref_masslist, mzsegment=None): 63 self.mass_spectrum = mass_spectrum 64 self.mzsegment = mzsegment 65 66 # define reference mass list - bruker .ref format 67 self.ref_mass_list_path = ref_masslist 68 if self.mass_spectrum.percentile_assigned(mute_output=True)[0] != 0: 69 warnings.warn( 70 "Warning: calibrating spectra which have already been assigned may yield erroneous results" 71 ) 72 self.mass_spectrum.mz_cal = self.mass_spectrum.mz_exp 73 self.mass_spectrum.mz_cal_profile = self.mass_spectrum._mz_exp 74 75 if self.mass_spectrum.parameters.mass_spectrum.verbose_processing: 76 print( 77 "MS Obj loaded - " + str(len(mass_spectrum.mspeaks)) + " peaks found." 78 ) 79 80 def load_ref_mass_list(self): 81 """ 82 Load reference mass list (Bruker format). 83 84 Loads in a reference mass list from a .ref file. Some versions of 85 Bruker's software produce .ref files with a different format where 86 the header lines (starting with '#' or '##') and delimiters may vary. 87 The file may be located locally or on S3 and will be handled accordingly. 88 89 Returns 90 ------- 91 df_ref : Pandas DataFrame 92 Reference mass list object. 93 """ 94 # Get a Path-like object from the input path string or S3Path 95 refmasslist = (Path(self.ref_mass_list_path) 96 if isinstance(self.ref_mass_list_path, str) 97 else self.ref_mass_list_path) 98 99 # Make sure the file exists 100 if not refmasslist.exists(): 101 raise FileExistsError("File does not exist: %s" % refmasslist) 102 103 # Read all lines from the file (handling S3 vs local differently) 104 if isinstance(refmasslist, S3Path): 105 # For S3, read the file in binary, then decode to string and split into lines. 106 content = refmasslist.open("rb").read() 107 all_lines = content.decode("utf-8").splitlines(keepends=True) 108 else: 109 # For a local file, open in text mode and read lines. 110 with refmasslist.open("r") as f: 111 all_lines = f.readlines() 112 113 # Identify the index of the first line of the actual data. 114 # We assume header lines start with '#' (or '##') and ignore blank lines. 115 data_start_idx = 0 116 for idx, line in enumerate(all_lines): 117 if line.strip() and not line.lstrip().startswith("#"): 118 data_start_idx = idx 119 break 120 121 # If there are not enough lines to guess the dialect, throw an error 122 if data_start_idx >= len(all_lines): 123 raise ValueError("The file does not appear to contain any data lines.") 124 125 # Use a couple of the data lines to let csv.Sniffer detect the delimiter 126 sample_lines = "".join(all_lines[data_start_idx:data_start_idx+2]) 127 try: 128 dialect = csv.Sniffer().sniff(sample_lines) 129 delimiter = dialect.delimiter 130 except csv.Error: 131 # If csv.Sniffer fails, default to a common delimiter (e.g., comma) 132 delimiter = "," 133 134 # Join the lines from the beginning of data (this might include a blank line) 135 joined_data = "".join(all_lines[data_start_idx:]) 136 137 # Depending on whether the file is S3 or local, wrap the data as needed for pandas 138 if isinstance(refmasslist, S3Path): 139 data_buffer = BytesIO(joined_data.encode("utf-8")) 140 else: 141 data_buffer = StringIO(joined_data) 142 143 # Read data into a DataFrame. 144 # Adjust columns and names as needed – here we assume at least 2 columns: 145 df_ref = pd.read_csv(data_buffer, 146 sep=delimiter, 147 header=None, 148 usecols=[0, 1], # Modify if more columns are required. 149 names=["Formula", "m/z"]) 150 151 df_ref.sort_values(by="m/z", ascending=True, inplace=True) 152 153 if self.mass_spectrum.parameters.mass_spectrum.verbose_processing: 154 print("Reference mass list loaded - {} calibration masses loaded.".format(len(df_ref))) 155 156 return df_ref 157 158 def gen_ref_mass_list_from_assigned(self, min_conf: float = 0.7): 159 """Generate reference mass list from assigned masses 160 161 This function will generate a ref mass dataframe object from an assigned corems mass spec obj 162 using assigned masses above a certain minimum confidence threshold. 163 164 This function needs to be retested and check it is covered in the unit tests. 165 166 Parameters 167 ---------- 168 min_conf : float, optional 169 minimum confidence score. The default is 0.7. 170 171 Returns 172 ------- 173 df_ref : Pandas DataFrame 174 reference mass list - based on calculated masses. 175 176 """ 177 # TODO this function needs to be retested and check it is covered in the unit tests 178 df = self.mass_spectrum.to_dataframe() 179 df = df[df["Confidence Score"] > min_conf] 180 df_ref = pd.DataFrame(columns=["m/z"]) 181 df_ref["m/z"] = df["Calculated m/z"] 182 if self.mass_spectrum.parameters.mass_spectrum.verbose_processing: 183 print( 184 "Reference mass list generated - " 185 + str(len(df_ref)) 186 + " calibration masses." 187 ) 188 return df_ref 189 190 def find_calibration_points( 191 self, 192 df_ref, 193 calib_ppm_error_threshold: tuple[float, float] = (-1, 1), 194 calib_snr_threshold: float = 5, 195 calibration_ref_match_method: str = "legacy", 196 calibration_ref_match_tolerance: float = 0.003, 197 calibration_ref_match_std_raw_error_limit: float = 1.5, 198 ): 199 """Function to find calibration points in the mass spectrum 200 201 Based on the reference mass list. 202 203 Parameters 204 ---------- 205 df_ref : Pandas DataFrame 206 reference mass list for recalibration. 207 calib_ppm_error_threshold : tuple of floats, optional 208 ppm error for finding calibration masses in the spectrum. The default is -1,1. 209 Note: This is based on the calculation of ppm = ((mz_measure - mz_theoretical)/mz_theoretical)*1e6. 210 Some software does this the other way around and value signs must be inverted for that to work. 211 calib_snr_threshold : float, optional 212 snr threshold for finding calibration masses in the spectrum. The default is 5. 213 214 Returns 215 ------- 216 cal_peaks_mz : list of floats 217 masses of measured ions to use in calibration routine 218 cal_refs_mz : list of floats 219 reference mz values of found calibration points. 220 221 """ 222 223 # This approach is much more efficient and expedient than the original implementation. 224 peaks_mz = [] 225 for x in self.mass_spectrum.mspeaks: 226 if x.signal_to_noise > calib_snr_threshold: 227 if self.mzsegment: 228 if min(self.mzsegment) <= x.mz_exp <= max(self.mzsegment): 229 peaks_mz.append(x.mz_exp) 230 else: 231 peaks_mz.append(x.mz_exp) 232 peaks_mz = np.asarray(peaks_mz) 233 234 if calibration_ref_match_method == "legacy": 235 # This legacy approach iterates through each reference match and finds the entries within 1 mz and within the user defined PPM error threshold 236 # Then it removes ambiguities - which means the calibration threshold hasto be very tight. 237 cal_peaks_mz = [] 238 cal_refs_mz = [] 239 for mzref in df_ref["m/z"]: 240 tmp_peaks_mz = peaks_mz[abs(peaks_mz - mzref) < 1] 241 for mzmeas in tmp_peaks_mz: 242 delta_mass = ((mzmeas - mzref) / mzref) * 1e6 243 if delta_mass < max(calib_ppm_error_threshold): 244 if delta_mass > min(calib_ppm_error_threshold): 245 cal_peaks_mz.append(mzmeas) 246 cal_refs_mz.append(mzref) 247 248 # To remove entries with duplicated indices (reference masses matching multiple peaks) 249 tmpdf = pd.Series(index=cal_refs_mz, data=cal_peaks_mz, dtype=float) 250 tmpdf = tmpdf[~tmpdf.index.duplicated(keep=False)] 251 252 cal_peaks_mz = list(tmpdf.values) 253 cal_refs_mz = list(tmpdf.index) 254 elif calibration_ref_match_method == "merged": 255 #warnings.warn("Using experimental new reference mass list merging") 256 # This is a new approach (August 2024) which uses Pandas 'merged_asof' to find the peaks closest in m/z between 257 # reference and measured masses. This is a quicker way to match, and seems to get more matches. 258 # It may not work as well when the data are far from correc initial mass 259 # e.g. if the correct peak is further from the reference than an incorrect peak. 260 meas_df = pd.DataFrame(columns=["meas_m/z"], data=peaks_mz) 261 tolerance = calibration_ref_match_tolerance 262 merged_df = pd.merge_asof( 263 df_ref, 264 meas_df, 265 left_on="m/z", 266 right_on="meas_m/z", 267 tolerance=tolerance, 268 direction="nearest", 269 ) 270 merged_df.dropna(how="any", inplace=True) 271 merged_df["Error_ppm"] = ( 272 (merged_df["meas_m/z"] - merged_df["m/z"]) / merged_df["m/z"] 273 ) * 1e6 274 median_raw_error = merged_df["Error_ppm"].median() 275 std_raw_error = merged_df["Error_ppm"].std() 276 if std_raw_error > calibration_ref_match_std_raw_error_limit: 277 std_raw_error = calibration_ref_match_std_raw_error_limit 278 self.mass_spectrum.calibration_raw_error_median = median_raw_error 279 self.mass_spectrum.calibration_raw_error_stdev = std_raw_error 280 merged_df = merged_df[ 281 (merged_df["Error_ppm"] > (median_raw_error - 1.5 * std_raw_error)) 282 & (merged_df["Error_ppm"] < (median_raw_error + 1.5 * std_raw_error)) 283 ] 284 # merged_df= merged_df[(merged_df['Error_ppm']>min(calib_ppm_error_threshold))&(merged_df['Error_ppm']<max(calib_ppm_error_threshold))] 285 cal_peaks_mz = list(merged_df["meas_m/z"]) 286 cal_refs_mz = list(merged_df["m/z"]) 287 else: 288 raise ValueError(f"{calibration_ref_match_method} not allowed.") 289 290 if False: 291 min_calib_ppm_error = calib_ppm_error_threshold[0] 292 max_calib_ppm_error = calib_ppm_error_threshold[1] 293 df_raw = self.mass_spectrum.to_dataframe() 294 295 df_raw = df_raw[df_raw["S/N"] > calib_snr_threshold] 296 # optionally further subset that based on minimum S/N, RP, Peak Height 297 # to ensure only valid points are utilized 298 # in this example, only a S/N threshold is implemented. 299 imzmeas = [] 300 mzrefs = [] 301 302 for mzref in df_ref["m/z"]: 303 # find all peaks within a defined ppm error threshold 304 tmpdf = df_raw[ 305 ((df_raw["m/z"] - mzref) / mzref) * 1e6 < max_calib_ppm_error 306 ] 307 # Error is relative to the theoretical, so the divisor should be divisor 308 309 tmpdf = tmpdf[ 310 ((tmpdf["m/z"] - mzref) / mzref) * 1e6 > min_calib_ppm_error 311 ] 312 313 # only use the calibration point if only one peak is within the thresholds 314 # This may require some optimization of the threshold tolerances 315 if len(tmpdf) == 1: 316 imzmeas.append(int(tmpdf.index.values)) 317 mzrefs.append(mzref) 318 319 # it is crucial the mass lists are in same order 320 # corems likes to do masses from high to low. 321 cal_refs_mz.sort(reverse=False) 322 cal_peaks_mz.sort(reverse=False) 323 if self.mass_spectrum.parameters.mass_spectrum.verbose_processing: 324 print( 325 str(len(cal_peaks_mz)) 326 + " calibration points matched within thresholds." 327 ) 328 return cal_peaks_mz, cal_refs_mz 329 330 def robust_calib( 331 self, 332 param: list[float], 333 cal_peaks_mz: list[float], 334 cal_refs_mz: list[float], 335 order: int = 1, 336 ): 337 """Recalibration function 338 339 Computes the rms of m/z errors to minimize when calibrating. 340 This is adapted from from spike. 341 342 Parameters 343 ---------- 344 param : list of floats 345 generated by minimize function from scipy optimize. 346 cal_peaks_mz : list of floats 347 masses of measured peaks to use in mass calibration. 348 cal_peaks_mz : list of floats 349 reference mz values of found calibration points. 350 order : int, optional 351 order of the recalibration function. 1 = linear, 2 = quadratic. The default is 1. 352 353 Returns 354 ------- 355 rmserror : float 356 root mean square mass error for calibration points. 357 358 """ 359 Aterm = param[0] 360 Bterm = param[1] 361 try: 362 Cterm = param[2] 363 except IndexError: 364 pass 365 366 # get the mspeaks from the mass spectrum object which were calibration points 367 # mspeaks = [self.mass_spectrum.mspeaks[x] for x in imzmeas] 368 # get their calibrated mass values 369 # mspeakmzs = [x.mz_cal for x in mspeaks] 370 cal_peaks_mz = np.asarray(cal_peaks_mz) 371 372 # linearz 373 if order == 1: 374 ref_recal_points = (Aterm * cal_peaks_mz) + Bterm 375 # quadratic 376 elif order == 2: 377 ref_recal_points = (Aterm * (cal_peaks_mz)) + ( 378 Bterm * np.power((cal_peaks_mz), 2) + Cterm 379 ) 380 381 # sort both the calibration points (measured, recalibrated) 382 ref_recal_points.sort() 383 # and sort the calibration points (theoretical, predefined) 384 cal_refs_mz.sort() 385 386 # calculate the ppm error for each calibration point 387 error = ((ref_recal_points - cal_refs_mz) / cal_refs_mz) * 1e6 388 # calculate the root mean square error - this is our target to minimize 389 rmserror = np.sqrt(np.mean(error**2)) 390 return rmserror 391 392 def recalibrate_mass_spectrum( 393 self, 394 cal_peaks_mz: list[float], 395 cal_refs_mz: list[float], 396 order: int = 1, 397 diagnostic: bool = False, 398 ): 399 """Main recalibration function which uses a robust linear regression 400 401 This function performs the recalibration of the mass spectrum object. 402 It iteratively applies 403 404 Parameters 405 ---------- 406 cal_peaks_mz : list of float 407 masses of measured peaks to use in mass calibration. 408 cal_refs_mz : list of float 409 reference mz values of found calibration points. 410 order : int, optional 411 order of the recalibration function. 1 = linear, 2 = quadratic. The default is 1. 412 413 Returns 414 ------- 415 mass_spectrum : CoreMS mass spectrum object 416 Calibrated mass spectrum object 417 418 419 Notes 420 ----- 421 This function is adapted, in part, from the SPIKE project [1,2] and is based on the robust linear regression method. 422 423 References 424 ---------- 425 1. Chiron L., Coutouly M-A., Starck J-P., Rolando C., Delsuc M-A. 426 SPIKE a Processing Software dedicated to Fourier Spectroscopies 427 https://arxiv.org/abs/1608.06777 (2016) 428 2. SPIKE - https://github.com/spike-project/spike 429 430 """ 431 # initialise parameters for recalibration 432 # these are the 'Aterm, Bterm, Cterm' 433 # as spectra are already freq->mz calibrated, these terms are very small 434 # may be beneficial to formally separate them from the freq->mz terms 435 if order == 1: 436 Po = [1, 0] 437 elif order == 2: 438 Po = [1, 0, 0] 439 440 if len(cal_peaks_mz) >= 2: 441 if self.mzsegment: # If only part of the spectrum is to be recalibrated 442 mz_exp_peaks = np.array( 443 [mspeak.mz_exp for mspeak in self.mass_spectrum] 444 ) 445 # Split the array into two parts - one to recailbrate, one to keep unchanged. 446 mz_exp_peaks_tocal = mz_exp_peaks[ 447 (mz_exp_peaks >= min(self.mzsegment)) 448 & (mz_exp_peaks <= max(self.mzsegment)) 449 ] 450 mz_exp_peaks_unchanged = mz_exp_peaks[ 451 ~(mz_exp_peaks >= min(self.mzsegment)) 452 | ~(mz_exp_peaks <= max(self.mzsegment)) 453 ] 454 # TODO: - segmented calibration needs a way to better track the calibration args/values... 455 if not self.mass_spectrum.is_centroid: 456 mz_exp_profile = np.array(self.mass_spectrum.mz_exp_profile) 457 # Split the array into two parts - one to recailbrate, one to keep unchanged. 458 mz_exp_profile_tocal = mz_exp_profile[ 459 (mz_exp_profile >= min(self.mzsegment)) 460 & (mz_exp_profile <= max(self.mzsegment)) 461 ] 462 mz_exp_profile_unchanged = mz_exp_profile[ 463 ~(mz_exp_profile >= min(self.mzsegment)) 464 | ~(mz_exp_profile <= max(self.mzsegment)) 465 ] 466 else: # if just recalibrating the whole spectrum 467 mz_exp_peaks_tocal = np.array( 468 [mspeak.mz_exp for mspeak in self.mass_spectrum] 469 ) 470 if not self.mass_spectrum.is_centroid: 471 mz_exp_profile_tocal = np.array(self.mass_spectrum.mz_exp_profile) 472 473 minimize_method = self.mass_spectrum.settings.calib_minimize_method 474 res = minimize( 475 self.robust_calib, 476 Po, 477 args=(cal_peaks_mz, cal_refs_mz, order), 478 method=minimize_method, 479 ) 480 if self.mass_spectrum.parameters.mass_spectrum.verbose_processing: 481 print( 482 "minimize function completed with RMS error of: {:0.3f} ppm".format( 483 res["fun"] 484 ) 485 ) 486 print( 487 "minimize function performed {:1d} fn evals and {:1d} iterations".format( 488 res["nfev"], res["nit"] 489 ) 490 ) 491 Pn = res.x 492 493 # mz_exp_ms = np.array([mspeak.mz_exp for mspeak in self.mass_spectrum]) 494 495 if order == 1: 496 mz_domain = (Pn[0] * mz_exp_peaks_tocal) + Pn[1] 497 if not self.mass_spectrum.is_centroid: 498 mz_profile_calc = (Pn[0] * mz_exp_profile_tocal) + Pn[1] 499 500 elif order == 2: 501 mz_domain = (Pn[0] * (mz_exp_peaks_tocal)) + ( 502 Pn[1] * np.power((mz_exp_peaks_tocal), 2) + Pn[2] 503 ) 504 505 if not self.mass_spectrum.is_centroid: 506 mz_profile_calc = (Pn[0] * (mz_exp_profile_tocal)) + ( 507 Pn[1] * np.power((mz_exp_profile_tocal), 2) + Pn[2] 508 ) 509 510 if self.mzsegment: 511 # Recombine the mass domains 512 mz_domain = np.concatenate([mz_domain, mz_exp_peaks_unchanged]) 513 mz_domain.sort() 514 if not self.mass_spectrum.is_centroid: 515 mz_profile_calc = np.concatenate( 516 [mz_profile_calc, mz_exp_profile_unchanged] 517 ) 518 mz_profile_calc.sort() 519 # Sort them 520 if ( 521 mz_exp_peaks[0] > mz_exp_peaks[1] 522 ): # If originally descending mass order 523 mz_domain = mz_domain[::-1] 524 if not self.mass_spectrum.is_centroid: 525 mz_profile_calc = mz_profile_calc[::-1] 526 527 self.mass_spectrum.mz_cal = mz_domain 528 if not self.mass_spectrum.is_centroid: 529 self.mass_spectrum.mz_cal_profile = mz_profile_calc 530 531 self.mass_spectrum.calibration_order = order 532 self.mass_spectrum.calibration_RMS = float(res["fun"]) 533 self.mass_spectrum.calibration_points = int(len(cal_refs_mz)) 534 self.mass_spectrum.calibration_ref_mzs = cal_refs_mz 535 self.mass_spectrum.calibration_meas_mzs = cal_peaks_mz 536 537 self.mass_spectrum.calibration_segment = self.mzsegment 538 539 if diagnostic: 540 return self.mass_spectrum, res 541 return self.mass_spectrum 542 else: 543 warnings.warn("Too few calibration points - aborting.") 544 return self.mass_spectrum 545 546 def run(self): 547 """Run the calibration routine 548 549 This function runs the calibration routine. 550 551 """ 552 calib_ppm_error_threshold = self.mass_spectrum.settings.calib_sn_threshold 553 max_calib_ppm_error = self.mass_spectrum.settings.max_calib_ppm_error 554 min_calib_ppm_error = self.mass_spectrum.settings.min_calib_ppm_error 555 calib_pol_order = self.mass_spectrum.settings.calib_pol_order 556 calibration_ref_match_method = ( 557 self.mass_spectrum.settings.calibration_ref_match_method 558 ) 559 calibration_ref_match_tolerance = ( 560 self.mass_spectrum.settings.calibration_ref_match_tolerance 561 ) 562 calibration_ref_match_std_raw_error_limit = ( 563 self.mass_spectrum.settings.calibration_ref_match_std_raw_error_limit 564 ) 565 566 # load reference mass list 567 df_ref = self.load_ref_mass_list() 568 569 # find calibration points 570 cal_peaks_mz, cal_refs_mz = self.find_calibration_points( 571 df_ref, 572 calib_ppm_error_threshold=(min_calib_ppm_error, max_calib_ppm_error), 573 calib_snr_threshold=calib_ppm_error_threshold, 574 calibration_ref_match_method=calibration_ref_match_method, 575 calibration_ref_match_tolerance=calibration_ref_match_tolerance, 576 calibration_ref_match_std_raw_error_limit=calibration_ref_match_std_raw_error_limit, 577 ) 578 if len(cal_peaks_mz) == 2: 579 self.mass_spectrum.settings.calib_pol_order = 1 580 calib_pol_order = 1 581 if self.mass_spectrum.parameters.mass_spectrum.verbose_processing: 582 print("Only 2 calibration points found, forcing a linear recalibration") 583 elif len(cal_peaks_mz) < 2: 584 warnings.warn("Too few calibration points found, function will fail") 585 self.recalibrate_mass_spectrum(cal_peaks_mz, cal_refs_mz, order=calib_pol_order)
23class MzDomainCalibration: 24 """MzDomainCalibration class for recalibrating mass spectra 25 26 Parameters 27 ---------- 28 mass_spectrum : CoreMS MassSpectrum Object 29 The mass spectrum to be calibrated. 30 ref_masslist : str 31 The path to a reference mass list. 32 mzsegment : tuple of floats, optional 33 The mz range to recalibrate, or None. Used for calibration of specific parts of the mz domain at a time. 34 Future work - allow multiple mzsegments to be passed. 35 36 Attributes 37 ---------- 38 mass_spectrum : CoreMS MassSpectrum Object 39 The mass spectrum to be calibrated. 40 mzsegment : tuple of floats or None 41 The mz range to recalibrate, or None. 42 ref_mass_list_path : str or Path 43 The path to the reference mass list. 44 45 Methods 46 ------- 47 * run(). 48 Main function to run this class. 49 * load_ref_mass_list(). 50 Load reference mass list (Bruker format). 51 * gen_ref_mass_list_from_assigned(min_conf=0.7). 52 Generate reference mass list from assigned masses. 53 * find_calibration_points(df_ref, calib_ppm_error_threshold=(-1, 1), calib_snr_threshold=5). 54 Find calibration points in the mass spectrum based on the reference mass list. 55 * robust_calib(param, cal_peaks_mz, cal_refs_mz, order=1). 56 Recalibration function. 57 * recalibrate_mass_spectrum(cal_peaks_mz, cal_refs_mz, order=1, diagnostic=False). 58 Main recalibration function which uses a robust linear regression. 59 60 61 """ 62 63 def __init__(self, mass_spectrum, ref_masslist, mzsegment=None): 64 self.mass_spectrum = mass_spectrum 65 self.mzsegment = mzsegment 66 67 # define reference mass list - bruker .ref format 68 self.ref_mass_list_path = ref_masslist 69 if self.mass_spectrum.percentile_assigned(mute_output=True)[0] != 0: 70 warnings.warn( 71 "Warning: calibrating spectra which have already been assigned may yield erroneous results" 72 ) 73 self.mass_spectrum.mz_cal = self.mass_spectrum.mz_exp 74 self.mass_spectrum.mz_cal_profile = self.mass_spectrum._mz_exp 75 76 if self.mass_spectrum.parameters.mass_spectrum.verbose_processing: 77 print( 78 "MS Obj loaded - " + str(len(mass_spectrum.mspeaks)) + " peaks found." 79 ) 80 81 def load_ref_mass_list(self): 82 """ 83 Load reference mass list (Bruker format). 84 85 Loads in a reference mass list from a .ref file. Some versions of 86 Bruker's software produce .ref files with a different format where 87 the header lines (starting with '#' or '##') and delimiters may vary. 88 The file may be located locally or on S3 and will be handled accordingly. 89 90 Returns 91 ------- 92 df_ref : Pandas DataFrame 93 Reference mass list object. 94 """ 95 # Get a Path-like object from the input path string or S3Path 96 refmasslist = (Path(self.ref_mass_list_path) 97 if isinstance(self.ref_mass_list_path, str) 98 else self.ref_mass_list_path) 99 100 # Make sure the file exists 101 if not refmasslist.exists(): 102 raise FileExistsError("File does not exist: %s" % refmasslist) 103 104 # Read all lines from the file (handling S3 vs local differently) 105 if isinstance(refmasslist, S3Path): 106 # For S3, read the file in binary, then decode to string and split into lines. 107 content = refmasslist.open("rb").read() 108 all_lines = content.decode("utf-8").splitlines(keepends=True) 109 else: 110 # For a local file, open in text mode and read lines. 111 with refmasslist.open("r") as f: 112 all_lines = f.readlines() 113 114 # Identify the index of the first line of the actual data. 115 # We assume header lines start with '#' (or '##') and ignore blank lines. 116 data_start_idx = 0 117 for idx, line in enumerate(all_lines): 118 if line.strip() and not line.lstrip().startswith("#"): 119 data_start_idx = idx 120 break 121 122 # If there are not enough lines to guess the dialect, throw an error 123 if data_start_idx >= len(all_lines): 124 raise ValueError("The file does not appear to contain any data lines.") 125 126 # Use a couple of the data lines to let csv.Sniffer detect the delimiter 127 sample_lines = "".join(all_lines[data_start_idx:data_start_idx+2]) 128 try: 129 dialect = csv.Sniffer().sniff(sample_lines) 130 delimiter = dialect.delimiter 131 except csv.Error: 132 # If csv.Sniffer fails, default to a common delimiter (e.g., comma) 133 delimiter = "," 134 135 # Join the lines from the beginning of data (this might include a blank line) 136 joined_data = "".join(all_lines[data_start_idx:]) 137 138 # Depending on whether the file is S3 or local, wrap the data as needed for pandas 139 if isinstance(refmasslist, S3Path): 140 data_buffer = BytesIO(joined_data.encode("utf-8")) 141 else: 142 data_buffer = StringIO(joined_data) 143 144 # Read data into a DataFrame. 145 # Adjust columns and names as needed – here we assume at least 2 columns: 146 df_ref = pd.read_csv(data_buffer, 147 sep=delimiter, 148 header=None, 149 usecols=[0, 1], # Modify if more columns are required. 150 names=["Formula", "m/z"]) 151 152 df_ref.sort_values(by="m/z", ascending=True, inplace=True) 153 154 if self.mass_spectrum.parameters.mass_spectrum.verbose_processing: 155 print("Reference mass list loaded - {} calibration masses loaded.".format(len(df_ref))) 156 157 return df_ref 158 159 def gen_ref_mass_list_from_assigned(self, min_conf: float = 0.7): 160 """Generate reference mass list from assigned masses 161 162 This function will generate a ref mass dataframe object from an assigned corems mass spec obj 163 using assigned masses above a certain minimum confidence threshold. 164 165 This function needs to be retested and check it is covered in the unit tests. 166 167 Parameters 168 ---------- 169 min_conf : float, optional 170 minimum confidence score. The default is 0.7. 171 172 Returns 173 ------- 174 df_ref : Pandas DataFrame 175 reference mass list - based on calculated masses. 176 177 """ 178 # TODO this function needs to be retested and check it is covered in the unit tests 179 df = self.mass_spectrum.to_dataframe() 180 df = df[df["Confidence Score"] > min_conf] 181 df_ref = pd.DataFrame(columns=["m/z"]) 182 df_ref["m/z"] = df["Calculated m/z"] 183 if self.mass_spectrum.parameters.mass_spectrum.verbose_processing: 184 print( 185 "Reference mass list generated - " 186 + str(len(df_ref)) 187 + " calibration masses." 188 ) 189 return df_ref 190 191 def find_calibration_points( 192 self, 193 df_ref, 194 calib_ppm_error_threshold: tuple[float, float] = (-1, 1), 195 calib_snr_threshold: float = 5, 196 calibration_ref_match_method: str = "legacy", 197 calibration_ref_match_tolerance: float = 0.003, 198 calibration_ref_match_std_raw_error_limit: float = 1.5, 199 ): 200 """Function to find calibration points in the mass spectrum 201 202 Based on the reference mass list. 203 204 Parameters 205 ---------- 206 df_ref : Pandas DataFrame 207 reference mass list for recalibration. 208 calib_ppm_error_threshold : tuple of floats, optional 209 ppm error for finding calibration masses in the spectrum. The default is -1,1. 210 Note: This is based on the calculation of ppm = ((mz_measure - mz_theoretical)/mz_theoretical)*1e6. 211 Some software does this the other way around and value signs must be inverted for that to work. 212 calib_snr_threshold : float, optional 213 snr threshold for finding calibration masses in the spectrum. The default is 5. 214 215 Returns 216 ------- 217 cal_peaks_mz : list of floats 218 masses of measured ions to use in calibration routine 219 cal_refs_mz : list of floats 220 reference mz values of found calibration points. 221 222 """ 223 224 # This approach is much more efficient and expedient than the original implementation. 225 peaks_mz = [] 226 for x in self.mass_spectrum.mspeaks: 227 if x.signal_to_noise > calib_snr_threshold: 228 if self.mzsegment: 229 if min(self.mzsegment) <= x.mz_exp <= max(self.mzsegment): 230 peaks_mz.append(x.mz_exp) 231 else: 232 peaks_mz.append(x.mz_exp) 233 peaks_mz = np.asarray(peaks_mz) 234 235 if calibration_ref_match_method == "legacy": 236 # This legacy approach iterates through each reference match and finds the entries within 1 mz and within the user defined PPM error threshold 237 # Then it removes ambiguities - which means the calibration threshold hasto be very tight. 238 cal_peaks_mz = [] 239 cal_refs_mz = [] 240 for mzref in df_ref["m/z"]: 241 tmp_peaks_mz = peaks_mz[abs(peaks_mz - mzref) < 1] 242 for mzmeas in tmp_peaks_mz: 243 delta_mass = ((mzmeas - mzref) / mzref) * 1e6 244 if delta_mass < max(calib_ppm_error_threshold): 245 if delta_mass > min(calib_ppm_error_threshold): 246 cal_peaks_mz.append(mzmeas) 247 cal_refs_mz.append(mzref) 248 249 # To remove entries with duplicated indices (reference masses matching multiple peaks) 250 tmpdf = pd.Series(index=cal_refs_mz, data=cal_peaks_mz, dtype=float) 251 tmpdf = tmpdf[~tmpdf.index.duplicated(keep=False)] 252 253 cal_peaks_mz = list(tmpdf.values) 254 cal_refs_mz = list(tmpdf.index) 255 elif calibration_ref_match_method == "merged": 256 #warnings.warn("Using experimental new reference mass list merging") 257 # This is a new approach (August 2024) which uses Pandas 'merged_asof' to find the peaks closest in m/z between 258 # reference and measured masses. This is a quicker way to match, and seems to get more matches. 259 # It may not work as well when the data are far from correc initial mass 260 # e.g. if the correct peak is further from the reference than an incorrect peak. 261 meas_df = pd.DataFrame(columns=["meas_m/z"], data=peaks_mz) 262 tolerance = calibration_ref_match_tolerance 263 merged_df = pd.merge_asof( 264 df_ref, 265 meas_df, 266 left_on="m/z", 267 right_on="meas_m/z", 268 tolerance=tolerance, 269 direction="nearest", 270 ) 271 merged_df.dropna(how="any", inplace=True) 272 merged_df["Error_ppm"] = ( 273 (merged_df["meas_m/z"] - merged_df["m/z"]) / merged_df["m/z"] 274 ) * 1e6 275 median_raw_error = merged_df["Error_ppm"].median() 276 std_raw_error = merged_df["Error_ppm"].std() 277 if std_raw_error > calibration_ref_match_std_raw_error_limit: 278 std_raw_error = calibration_ref_match_std_raw_error_limit 279 self.mass_spectrum.calibration_raw_error_median = median_raw_error 280 self.mass_spectrum.calibration_raw_error_stdev = std_raw_error 281 merged_df = merged_df[ 282 (merged_df["Error_ppm"] > (median_raw_error - 1.5 * std_raw_error)) 283 & (merged_df["Error_ppm"] < (median_raw_error + 1.5 * std_raw_error)) 284 ] 285 # merged_df= merged_df[(merged_df['Error_ppm']>min(calib_ppm_error_threshold))&(merged_df['Error_ppm']<max(calib_ppm_error_threshold))] 286 cal_peaks_mz = list(merged_df["meas_m/z"]) 287 cal_refs_mz = list(merged_df["m/z"]) 288 else: 289 raise ValueError(f"{calibration_ref_match_method} not allowed.") 290 291 if False: 292 min_calib_ppm_error = calib_ppm_error_threshold[0] 293 max_calib_ppm_error = calib_ppm_error_threshold[1] 294 df_raw = self.mass_spectrum.to_dataframe() 295 296 df_raw = df_raw[df_raw["S/N"] > calib_snr_threshold] 297 # optionally further subset that based on minimum S/N, RP, Peak Height 298 # to ensure only valid points are utilized 299 # in this example, only a S/N threshold is implemented. 300 imzmeas = [] 301 mzrefs = [] 302 303 for mzref in df_ref["m/z"]: 304 # find all peaks within a defined ppm error threshold 305 tmpdf = df_raw[ 306 ((df_raw["m/z"] - mzref) / mzref) * 1e6 < max_calib_ppm_error 307 ] 308 # Error is relative to the theoretical, so the divisor should be divisor 309 310 tmpdf = tmpdf[ 311 ((tmpdf["m/z"] - mzref) / mzref) * 1e6 > min_calib_ppm_error 312 ] 313 314 # only use the calibration point if only one peak is within the thresholds 315 # This may require some optimization of the threshold tolerances 316 if len(tmpdf) == 1: 317 imzmeas.append(int(tmpdf.index.values)) 318 mzrefs.append(mzref) 319 320 # it is crucial the mass lists are in same order 321 # corems likes to do masses from high to low. 322 cal_refs_mz.sort(reverse=False) 323 cal_peaks_mz.sort(reverse=False) 324 if self.mass_spectrum.parameters.mass_spectrum.verbose_processing: 325 print( 326 str(len(cal_peaks_mz)) 327 + " calibration points matched within thresholds." 328 ) 329 return cal_peaks_mz, cal_refs_mz 330 331 def robust_calib( 332 self, 333 param: list[float], 334 cal_peaks_mz: list[float], 335 cal_refs_mz: list[float], 336 order: int = 1, 337 ): 338 """Recalibration function 339 340 Computes the rms of m/z errors to minimize when calibrating. 341 This is adapted from from spike. 342 343 Parameters 344 ---------- 345 param : list of floats 346 generated by minimize function from scipy optimize. 347 cal_peaks_mz : list of floats 348 masses of measured peaks to use in mass calibration. 349 cal_peaks_mz : list of floats 350 reference mz values of found calibration points. 351 order : int, optional 352 order of the recalibration function. 1 = linear, 2 = quadratic. The default is 1. 353 354 Returns 355 ------- 356 rmserror : float 357 root mean square mass error for calibration points. 358 359 """ 360 Aterm = param[0] 361 Bterm = param[1] 362 try: 363 Cterm = param[2] 364 except IndexError: 365 pass 366 367 # get the mspeaks from the mass spectrum object which were calibration points 368 # mspeaks = [self.mass_spectrum.mspeaks[x] for x in imzmeas] 369 # get their calibrated mass values 370 # mspeakmzs = [x.mz_cal for x in mspeaks] 371 cal_peaks_mz = np.asarray(cal_peaks_mz) 372 373 # linearz 374 if order == 1: 375 ref_recal_points = (Aterm * cal_peaks_mz) + Bterm 376 # quadratic 377 elif order == 2: 378 ref_recal_points = (Aterm * (cal_peaks_mz)) + ( 379 Bterm * np.power((cal_peaks_mz), 2) + Cterm 380 ) 381 382 # sort both the calibration points (measured, recalibrated) 383 ref_recal_points.sort() 384 # and sort the calibration points (theoretical, predefined) 385 cal_refs_mz.sort() 386 387 # calculate the ppm error for each calibration point 388 error = ((ref_recal_points - cal_refs_mz) / cal_refs_mz) * 1e6 389 # calculate the root mean square error - this is our target to minimize 390 rmserror = np.sqrt(np.mean(error**2)) 391 return rmserror 392 393 def recalibrate_mass_spectrum( 394 self, 395 cal_peaks_mz: list[float], 396 cal_refs_mz: list[float], 397 order: int = 1, 398 diagnostic: bool = False, 399 ): 400 """Main recalibration function which uses a robust linear regression 401 402 This function performs the recalibration of the mass spectrum object. 403 It iteratively applies 404 405 Parameters 406 ---------- 407 cal_peaks_mz : list of float 408 masses of measured peaks to use in mass calibration. 409 cal_refs_mz : list of float 410 reference mz values of found calibration points. 411 order : int, optional 412 order of the recalibration function. 1 = linear, 2 = quadratic. The default is 1. 413 414 Returns 415 ------- 416 mass_spectrum : CoreMS mass spectrum object 417 Calibrated mass spectrum object 418 419 420 Notes 421 ----- 422 This function is adapted, in part, from the SPIKE project [1,2] and is based on the robust linear regression method. 423 424 References 425 ---------- 426 1. Chiron L., Coutouly M-A., Starck J-P., Rolando C., Delsuc M-A. 427 SPIKE a Processing Software dedicated to Fourier Spectroscopies 428 https://arxiv.org/abs/1608.06777 (2016) 429 2. SPIKE - https://github.com/spike-project/spike 430 431 """ 432 # initialise parameters for recalibration 433 # these are the 'Aterm, Bterm, Cterm' 434 # as spectra are already freq->mz calibrated, these terms are very small 435 # may be beneficial to formally separate them from the freq->mz terms 436 if order == 1: 437 Po = [1, 0] 438 elif order == 2: 439 Po = [1, 0, 0] 440 441 if len(cal_peaks_mz) >= 2: 442 if self.mzsegment: # If only part of the spectrum is to be recalibrated 443 mz_exp_peaks = np.array( 444 [mspeak.mz_exp for mspeak in self.mass_spectrum] 445 ) 446 # Split the array into two parts - one to recailbrate, one to keep unchanged. 447 mz_exp_peaks_tocal = mz_exp_peaks[ 448 (mz_exp_peaks >= min(self.mzsegment)) 449 & (mz_exp_peaks <= max(self.mzsegment)) 450 ] 451 mz_exp_peaks_unchanged = mz_exp_peaks[ 452 ~(mz_exp_peaks >= min(self.mzsegment)) 453 | ~(mz_exp_peaks <= max(self.mzsegment)) 454 ] 455 # TODO: - segmented calibration needs a way to better track the calibration args/values... 456 if not self.mass_spectrum.is_centroid: 457 mz_exp_profile = np.array(self.mass_spectrum.mz_exp_profile) 458 # Split the array into two parts - one to recailbrate, one to keep unchanged. 459 mz_exp_profile_tocal = mz_exp_profile[ 460 (mz_exp_profile >= min(self.mzsegment)) 461 & (mz_exp_profile <= max(self.mzsegment)) 462 ] 463 mz_exp_profile_unchanged = mz_exp_profile[ 464 ~(mz_exp_profile >= min(self.mzsegment)) 465 | ~(mz_exp_profile <= max(self.mzsegment)) 466 ] 467 else: # if just recalibrating the whole spectrum 468 mz_exp_peaks_tocal = np.array( 469 [mspeak.mz_exp for mspeak in self.mass_spectrum] 470 ) 471 if not self.mass_spectrum.is_centroid: 472 mz_exp_profile_tocal = np.array(self.mass_spectrum.mz_exp_profile) 473 474 minimize_method = self.mass_spectrum.settings.calib_minimize_method 475 res = minimize( 476 self.robust_calib, 477 Po, 478 args=(cal_peaks_mz, cal_refs_mz, order), 479 method=minimize_method, 480 ) 481 if self.mass_spectrum.parameters.mass_spectrum.verbose_processing: 482 print( 483 "minimize function completed with RMS error of: {:0.3f} ppm".format( 484 res["fun"] 485 ) 486 ) 487 print( 488 "minimize function performed {:1d} fn evals and {:1d} iterations".format( 489 res["nfev"], res["nit"] 490 ) 491 ) 492 Pn = res.x 493 494 # mz_exp_ms = np.array([mspeak.mz_exp for mspeak in self.mass_spectrum]) 495 496 if order == 1: 497 mz_domain = (Pn[0] * mz_exp_peaks_tocal) + Pn[1] 498 if not self.mass_spectrum.is_centroid: 499 mz_profile_calc = (Pn[0] * mz_exp_profile_tocal) + Pn[1] 500 501 elif order == 2: 502 mz_domain = (Pn[0] * (mz_exp_peaks_tocal)) + ( 503 Pn[1] * np.power((mz_exp_peaks_tocal), 2) + Pn[2] 504 ) 505 506 if not self.mass_spectrum.is_centroid: 507 mz_profile_calc = (Pn[0] * (mz_exp_profile_tocal)) + ( 508 Pn[1] * np.power((mz_exp_profile_tocal), 2) + Pn[2] 509 ) 510 511 if self.mzsegment: 512 # Recombine the mass domains 513 mz_domain = np.concatenate([mz_domain, mz_exp_peaks_unchanged]) 514 mz_domain.sort() 515 if not self.mass_spectrum.is_centroid: 516 mz_profile_calc = np.concatenate( 517 [mz_profile_calc, mz_exp_profile_unchanged] 518 ) 519 mz_profile_calc.sort() 520 # Sort them 521 if ( 522 mz_exp_peaks[0] > mz_exp_peaks[1] 523 ): # If originally descending mass order 524 mz_domain = mz_domain[::-1] 525 if not self.mass_spectrum.is_centroid: 526 mz_profile_calc = mz_profile_calc[::-1] 527 528 self.mass_spectrum.mz_cal = mz_domain 529 if not self.mass_spectrum.is_centroid: 530 self.mass_spectrum.mz_cal_profile = mz_profile_calc 531 532 self.mass_spectrum.calibration_order = order 533 self.mass_spectrum.calibration_RMS = float(res["fun"]) 534 self.mass_spectrum.calibration_points = int(len(cal_refs_mz)) 535 self.mass_spectrum.calibration_ref_mzs = cal_refs_mz 536 self.mass_spectrum.calibration_meas_mzs = cal_peaks_mz 537 538 self.mass_spectrum.calibration_segment = self.mzsegment 539 540 if diagnostic: 541 return self.mass_spectrum, res 542 return self.mass_spectrum 543 else: 544 warnings.warn("Too few calibration points - aborting.") 545 return self.mass_spectrum 546 547 def run(self): 548 """Run the calibration routine 549 550 This function runs the calibration routine. 551 552 """ 553 calib_ppm_error_threshold = self.mass_spectrum.settings.calib_sn_threshold 554 max_calib_ppm_error = self.mass_spectrum.settings.max_calib_ppm_error 555 min_calib_ppm_error = self.mass_spectrum.settings.min_calib_ppm_error 556 calib_pol_order = self.mass_spectrum.settings.calib_pol_order 557 calibration_ref_match_method = ( 558 self.mass_spectrum.settings.calibration_ref_match_method 559 ) 560 calibration_ref_match_tolerance = ( 561 self.mass_spectrum.settings.calibration_ref_match_tolerance 562 ) 563 calibration_ref_match_std_raw_error_limit = ( 564 self.mass_spectrum.settings.calibration_ref_match_std_raw_error_limit 565 ) 566 567 # load reference mass list 568 df_ref = self.load_ref_mass_list() 569 570 # find calibration points 571 cal_peaks_mz, cal_refs_mz = self.find_calibration_points( 572 df_ref, 573 calib_ppm_error_threshold=(min_calib_ppm_error, max_calib_ppm_error), 574 calib_snr_threshold=calib_ppm_error_threshold, 575 calibration_ref_match_method=calibration_ref_match_method, 576 calibration_ref_match_tolerance=calibration_ref_match_tolerance, 577 calibration_ref_match_std_raw_error_limit=calibration_ref_match_std_raw_error_limit, 578 ) 579 if len(cal_peaks_mz) == 2: 580 self.mass_spectrum.settings.calib_pol_order = 1 581 calib_pol_order = 1 582 if self.mass_spectrum.parameters.mass_spectrum.verbose_processing: 583 print("Only 2 calibration points found, forcing a linear recalibration") 584 elif len(cal_peaks_mz) < 2: 585 warnings.warn("Too few calibration points found, function will fail") 586 self.recalibrate_mass_spectrum(cal_peaks_mz, cal_refs_mz, order=calib_pol_order)
MzDomainCalibration class for recalibrating mass spectra
Parameters
- mass_spectrum (CoreMS MassSpectrum Object): The mass spectrum to be calibrated.
- ref_masslist (str): The path to a reference mass list.
- mzsegment (tuple of floats, optional): The mz range to recalibrate, or None. Used for calibration of specific parts of the mz domain at a time. Future work - allow multiple mzsegments to be passed.
Attributes
- mass_spectrum (CoreMS MassSpectrum Object): The mass spectrum to be calibrated.
- mzsegment (tuple of floats or None): The mz range to recalibrate, or None.
- ref_mass_list_path (str or Path): The path to the reference mass list.
Methods
- run(). Main function to run this class.
- load_ref_mass_list(). Load reference mass list (Bruker format).
- gen_ref_mass_list_from_assigned(min_conf=0.7). Generate reference mass list from assigned masses.
- find_calibration_points(df_ref, calib_ppm_error_threshold=(-1, 1), calib_snr_threshold=5). Find calibration points in the mass spectrum based on the reference mass list.
- robust_calib(param, cal_peaks_mz, cal_refs_mz, order=1). Recalibration function.
- recalibrate_mass_spectrum(cal_peaks_mz, cal_refs_mz, order=1, diagnostic=False). Main recalibration function which uses a robust linear regression.
63 def __init__(self, mass_spectrum, ref_masslist, mzsegment=None): 64 self.mass_spectrum = mass_spectrum 65 self.mzsegment = mzsegment 66 67 # define reference mass list - bruker .ref format 68 self.ref_mass_list_path = ref_masslist 69 if self.mass_spectrum.percentile_assigned(mute_output=True)[0] != 0: 70 warnings.warn( 71 "Warning: calibrating spectra which have already been assigned may yield erroneous results" 72 ) 73 self.mass_spectrum.mz_cal = self.mass_spectrum.mz_exp 74 self.mass_spectrum.mz_cal_profile = self.mass_spectrum._mz_exp 75 76 if self.mass_spectrum.parameters.mass_spectrum.verbose_processing: 77 print( 78 "MS Obj loaded - " + str(len(mass_spectrum.mspeaks)) + " peaks found." 79 )
81 def load_ref_mass_list(self): 82 """ 83 Load reference mass list (Bruker format). 84 85 Loads in a reference mass list from a .ref file. Some versions of 86 Bruker's software produce .ref files with a different format where 87 the header lines (starting with '#' or '##') and delimiters may vary. 88 The file may be located locally or on S3 and will be handled accordingly. 89 90 Returns 91 ------- 92 df_ref : Pandas DataFrame 93 Reference mass list object. 94 """ 95 # Get a Path-like object from the input path string or S3Path 96 refmasslist = (Path(self.ref_mass_list_path) 97 if isinstance(self.ref_mass_list_path, str) 98 else self.ref_mass_list_path) 99 100 # Make sure the file exists 101 if not refmasslist.exists(): 102 raise FileExistsError("File does not exist: %s" % refmasslist) 103 104 # Read all lines from the file (handling S3 vs local differently) 105 if isinstance(refmasslist, S3Path): 106 # For S3, read the file in binary, then decode to string and split into lines. 107 content = refmasslist.open("rb").read() 108 all_lines = content.decode("utf-8").splitlines(keepends=True) 109 else: 110 # For a local file, open in text mode and read lines. 111 with refmasslist.open("r") as f: 112 all_lines = f.readlines() 113 114 # Identify the index of the first line of the actual data. 115 # We assume header lines start with '#' (or '##') and ignore blank lines. 116 data_start_idx = 0 117 for idx, line in enumerate(all_lines): 118 if line.strip() and not line.lstrip().startswith("#"): 119 data_start_idx = idx 120 break 121 122 # If there are not enough lines to guess the dialect, throw an error 123 if data_start_idx >= len(all_lines): 124 raise ValueError("The file does not appear to contain any data lines.") 125 126 # Use a couple of the data lines to let csv.Sniffer detect the delimiter 127 sample_lines = "".join(all_lines[data_start_idx:data_start_idx+2]) 128 try: 129 dialect = csv.Sniffer().sniff(sample_lines) 130 delimiter = dialect.delimiter 131 except csv.Error: 132 # If csv.Sniffer fails, default to a common delimiter (e.g., comma) 133 delimiter = "," 134 135 # Join the lines from the beginning of data (this might include a blank line) 136 joined_data = "".join(all_lines[data_start_idx:]) 137 138 # Depending on whether the file is S3 or local, wrap the data as needed for pandas 139 if isinstance(refmasslist, S3Path): 140 data_buffer = BytesIO(joined_data.encode("utf-8")) 141 else: 142 data_buffer = StringIO(joined_data) 143 144 # Read data into a DataFrame. 145 # Adjust columns and names as needed – here we assume at least 2 columns: 146 df_ref = pd.read_csv(data_buffer, 147 sep=delimiter, 148 header=None, 149 usecols=[0, 1], # Modify if more columns are required. 150 names=["Formula", "m/z"]) 151 152 df_ref.sort_values(by="m/z", ascending=True, inplace=True) 153 154 if self.mass_spectrum.parameters.mass_spectrum.verbose_processing: 155 print("Reference mass list loaded - {} calibration masses loaded.".format(len(df_ref))) 156 157 return df_ref
Load reference mass list (Bruker format).
Loads in a reference mass list from a .ref file. Some versions of Bruker's software produce .ref files with a different format where the header lines (starting with '#' or '##') and delimiters may vary. The file may be located locally or on S3 and will be handled accordingly.
Returns
- df_ref (Pandas DataFrame): Reference mass list object.
159 def gen_ref_mass_list_from_assigned(self, min_conf: float = 0.7): 160 """Generate reference mass list from assigned masses 161 162 This function will generate a ref mass dataframe object from an assigned corems mass spec obj 163 using assigned masses above a certain minimum confidence threshold. 164 165 This function needs to be retested and check it is covered in the unit tests. 166 167 Parameters 168 ---------- 169 min_conf : float, optional 170 minimum confidence score. The default is 0.7. 171 172 Returns 173 ------- 174 df_ref : Pandas DataFrame 175 reference mass list - based on calculated masses. 176 177 """ 178 # TODO this function needs to be retested and check it is covered in the unit tests 179 df = self.mass_spectrum.to_dataframe() 180 df = df[df["Confidence Score"] > min_conf] 181 df_ref = pd.DataFrame(columns=["m/z"]) 182 df_ref["m/z"] = df["Calculated m/z"] 183 if self.mass_spectrum.parameters.mass_spectrum.verbose_processing: 184 print( 185 "Reference mass list generated - " 186 + str(len(df_ref)) 187 + " calibration masses." 188 ) 189 return df_ref
Generate reference mass list from assigned masses
This function will generate a ref mass dataframe object from an assigned corems mass spec obj using assigned masses above a certain minimum confidence threshold.
This function needs to be retested and check it is covered in the unit tests.
Parameters
- min_conf (float, optional): minimum confidence score. The default is 0.7.
Returns
- df_ref (Pandas DataFrame): reference mass list - based on calculated masses.
191 def find_calibration_points( 192 self, 193 df_ref, 194 calib_ppm_error_threshold: tuple[float, float] = (-1, 1), 195 calib_snr_threshold: float = 5, 196 calibration_ref_match_method: str = "legacy", 197 calibration_ref_match_tolerance: float = 0.003, 198 calibration_ref_match_std_raw_error_limit: float = 1.5, 199 ): 200 """Function to find calibration points in the mass spectrum 201 202 Based on the reference mass list. 203 204 Parameters 205 ---------- 206 df_ref : Pandas DataFrame 207 reference mass list for recalibration. 208 calib_ppm_error_threshold : tuple of floats, optional 209 ppm error for finding calibration masses in the spectrum. The default is -1,1. 210 Note: This is based on the calculation of ppm = ((mz_measure - mz_theoretical)/mz_theoretical)*1e6. 211 Some software does this the other way around and value signs must be inverted for that to work. 212 calib_snr_threshold : float, optional 213 snr threshold for finding calibration masses in the spectrum. The default is 5. 214 215 Returns 216 ------- 217 cal_peaks_mz : list of floats 218 masses of measured ions to use in calibration routine 219 cal_refs_mz : list of floats 220 reference mz values of found calibration points. 221 222 """ 223 224 # This approach is much more efficient and expedient than the original implementation. 225 peaks_mz = [] 226 for x in self.mass_spectrum.mspeaks: 227 if x.signal_to_noise > calib_snr_threshold: 228 if self.mzsegment: 229 if min(self.mzsegment) <= x.mz_exp <= max(self.mzsegment): 230 peaks_mz.append(x.mz_exp) 231 else: 232 peaks_mz.append(x.mz_exp) 233 peaks_mz = np.asarray(peaks_mz) 234 235 if calibration_ref_match_method == "legacy": 236 # This legacy approach iterates through each reference match and finds the entries within 1 mz and within the user defined PPM error threshold 237 # Then it removes ambiguities - which means the calibration threshold hasto be very tight. 238 cal_peaks_mz = [] 239 cal_refs_mz = [] 240 for mzref in df_ref["m/z"]: 241 tmp_peaks_mz = peaks_mz[abs(peaks_mz - mzref) < 1] 242 for mzmeas in tmp_peaks_mz: 243 delta_mass = ((mzmeas - mzref) / mzref) * 1e6 244 if delta_mass < max(calib_ppm_error_threshold): 245 if delta_mass > min(calib_ppm_error_threshold): 246 cal_peaks_mz.append(mzmeas) 247 cal_refs_mz.append(mzref) 248 249 # To remove entries with duplicated indices (reference masses matching multiple peaks) 250 tmpdf = pd.Series(index=cal_refs_mz, data=cal_peaks_mz, dtype=float) 251 tmpdf = tmpdf[~tmpdf.index.duplicated(keep=False)] 252 253 cal_peaks_mz = list(tmpdf.values) 254 cal_refs_mz = list(tmpdf.index) 255 elif calibration_ref_match_method == "merged": 256 #warnings.warn("Using experimental new reference mass list merging") 257 # This is a new approach (August 2024) which uses Pandas 'merged_asof' to find the peaks closest in m/z between 258 # reference and measured masses. This is a quicker way to match, and seems to get more matches. 259 # It may not work as well when the data are far from correc initial mass 260 # e.g. if the correct peak is further from the reference than an incorrect peak. 261 meas_df = pd.DataFrame(columns=["meas_m/z"], data=peaks_mz) 262 tolerance = calibration_ref_match_tolerance 263 merged_df = pd.merge_asof( 264 df_ref, 265 meas_df, 266 left_on="m/z", 267 right_on="meas_m/z", 268 tolerance=tolerance, 269 direction="nearest", 270 ) 271 merged_df.dropna(how="any", inplace=True) 272 merged_df["Error_ppm"] = ( 273 (merged_df["meas_m/z"] - merged_df["m/z"]) / merged_df["m/z"] 274 ) * 1e6 275 median_raw_error = merged_df["Error_ppm"].median() 276 std_raw_error = merged_df["Error_ppm"].std() 277 if std_raw_error > calibration_ref_match_std_raw_error_limit: 278 std_raw_error = calibration_ref_match_std_raw_error_limit 279 self.mass_spectrum.calibration_raw_error_median = median_raw_error 280 self.mass_spectrum.calibration_raw_error_stdev = std_raw_error 281 merged_df = merged_df[ 282 (merged_df["Error_ppm"] > (median_raw_error - 1.5 * std_raw_error)) 283 & (merged_df["Error_ppm"] < (median_raw_error + 1.5 * std_raw_error)) 284 ] 285 # merged_df= merged_df[(merged_df['Error_ppm']>min(calib_ppm_error_threshold))&(merged_df['Error_ppm']<max(calib_ppm_error_threshold))] 286 cal_peaks_mz = list(merged_df["meas_m/z"]) 287 cal_refs_mz = list(merged_df["m/z"]) 288 else: 289 raise ValueError(f"{calibration_ref_match_method} not allowed.") 290 291 if False: 292 min_calib_ppm_error = calib_ppm_error_threshold[0] 293 max_calib_ppm_error = calib_ppm_error_threshold[1] 294 df_raw = self.mass_spectrum.to_dataframe() 295 296 df_raw = df_raw[df_raw["S/N"] > calib_snr_threshold] 297 # optionally further subset that based on minimum S/N, RP, Peak Height 298 # to ensure only valid points are utilized 299 # in this example, only a S/N threshold is implemented. 300 imzmeas = [] 301 mzrefs = [] 302 303 for mzref in df_ref["m/z"]: 304 # find all peaks within a defined ppm error threshold 305 tmpdf = df_raw[ 306 ((df_raw["m/z"] - mzref) / mzref) * 1e6 < max_calib_ppm_error 307 ] 308 # Error is relative to the theoretical, so the divisor should be divisor 309 310 tmpdf = tmpdf[ 311 ((tmpdf["m/z"] - mzref) / mzref) * 1e6 > min_calib_ppm_error 312 ] 313 314 # only use the calibration point if only one peak is within the thresholds 315 # This may require some optimization of the threshold tolerances 316 if len(tmpdf) == 1: 317 imzmeas.append(int(tmpdf.index.values)) 318 mzrefs.append(mzref) 319 320 # it is crucial the mass lists are in same order 321 # corems likes to do masses from high to low. 322 cal_refs_mz.sort(reverse=False) 323 cal_peaks_mz.sort(reverse=False) 324 if self.mass_spectrum.parameters.mass_spectrum.verbose_processing: 325 print( 326 str(len(cal_peaks_mz)) 327 + " calibration points matched within thresholds." 328 ) 329 return cal_peaks_mz, cal_refs_mz
Function to find calibration points in the mass spectrum
Based on the reference mass list.
Parameters
- df_ref (Pandas DataFrame): reference mass list for recalibration.
- calib_ppm_error_threshold (tuple of floats, optional): ppm error for finding calibration masses in the spectrum. The default is -1,1. Note: This is based on the calculation of ppm = ((mz_measure - mz_theoretical)/mz_theoretical)*1e6. Some software does this the other way around and value signs must be inverted for that to work.
- calib_snr_threshold (float, optional): snr threshold for finding calibration masses in the spectrum. The default is 5.
Returns
- cal_peaks_mz (list of floats): masses of measured ions to use in calibration routine
- cal_refs_mz (list of floats): reference mz values of found calibration points.
331 def robust_calib( 332 self, 333 param: list[float], 334 cal_peaks_mz: list[float], 335 cal_refs_mz: list[float], 336 order: int = 1, 337 ): 338 """Recalibration function 339 340 Computes the rms of m/z errors to minimize when calibrating. 341 This is adapted from from spike. 342 343 Parameters 344 ---------- 345 param : list of floats 346 generated by minimize function from scipy optimize. 347 cal_peaks_mz : list of floats 348 masses of measured peaks to use in mass calibration. 349 cal_peaks_mz : list of floats 350 reference mz values of found calibration points. 351 order : int, optional 352 order of the recalibration function. 1 = linear, 2 = quadratic. The default is 1. 353 354 Returns 355 ------- 356 rmserror : float 357 root mean square mass error for calibration points. 358 359 """ 360 Aterm = param[0] 361 Bterm = param[1] 362 try: 363 Cterm = param[2] 364 except IndexError: 365 pass 366 367 # get the mspeaks from the mass spectrum object which were calibration points 368 # mspeaks = [self.mass_spectrum.mspeaks[x] for x in imzmeas] 369 # get their calibrated mass values 370 # mspeakmzs = [x.mz_cal for x in mspeaks] 371 cal_peaks_mz = np.asarray(cal_peaks_mz) 372 373 # linearz 374 if order == 1: 375 ref_recal_points = (Aterm * cal_peaks_mz) + Bterm 376 # quadratic 377 elif order == 2: 378 ref_recal_points = (Aterm * (cal_peaks_mz)) + ( 379 Bterm * np.power((cal_peaks_mz), 2) + Cterm 380 ) 381 382 # sort both the calibration points (measured, recalibrated) 383 ref_recal_points.sort() 384 # and sort the calibration points (theoretical, predefined) 385 cal_refs_mz.sort() 386 387 # calculate the ppm error for each calibration point 388 error = ((ref_recal_points - cal_refs_mz) / cal_refs_mz) * 1e6 389 # calculate the root mean square error - this is our target to minimize 390 rmserror = np.sqrt(np.mean(error**2)) 391 return rmserror
Recalibration function
Computes the rms of m/z errors to minimize when calibrating. This is adapted from from spike.
Parameters
- param (list of floats): generated by minimize function from scipy optimize.
- cal_peaks_mz (list of floats): masses of measured peaks to use in mass calibration.
- cal_peaks_mz (list of floats): reference mz values of found calibration points.
- order (int, optional): order of the recalibration function. 1 = linear, 2 = quadratic. The default is 1.
Returns
- rmserror (float): root mean square mass error for calibration points.
393 def recalibrate_mass_spectrum( 394 self, 395 cal_peaks_mz: list[float], 396 cal_refs_mz: list[float], 397 order: int = 1, 398 diagnostic: bool = False, 399 ): 400 """Main recalibration function which uses a robust linear regression 401 402 This function performs the recalibration of the mass spectrum object. 403 It iteratively applies 404 405 Parameters 406 ---------- 407 cal_peaks_mz : list of float 408 masses of measured peaks to use in mass calibration. 409 cal_refs_mz : list of float 410 reference mz values of found calibration points. 411 order : int, optional 412 order of the recalibration function. 1 = linear, 2 = quadratic. The default is 1. 413 414 Returns 415 ------- 416 mass_spectrum : CoreMS mass spectrum object 417 Calibrated mass spectrum object 418 419 420 Notes 421 ----- 422 This function is adapted, in part, from the SPIKE project [1,2] and is based on the robust linear regression method. 423 424 References 425 ---------- 426 1. Chiron L., Coutouly M-A., Starck J-P., Rolando C., Delsuc M-A. 427 SPIKE a Processing Software dedicated to Fourier Spectroscopies 428 https://arxiv.org/abs/1608.06777 (2016) 429 2. SPIKE - https://github.com/spike-project/spike 430 431 """ 432 # initialise parameters for recalibration 433 # these are the 'Aterm, Bterm, Cterm' 434 # as spectra are already freq->mz calibrated, these terms are very small 435 # may be beneficial to formally separate them from the freq->mz terms 436 if order == 1: 437 Po = [1, 0] 438 elif order == 2: 439 Po = [1, 0, 0] 440 441 if len(cal_peaks_mz) >= 2: 442 if self.mzsegment: # If only part of the spectrum is to be recalibrated 443 mz_exp_peaks = np.array( 444 [mspeak.mz_exp for mspeak in self.mass_spectrum] 445 ) 446 # Split the array into two parts - one to recailbrate, one to keep unchanged. 447 mz_exp_peaks_tocal = mz_exp_peaks[ 448 (mz_exp_peaks >= min(self.mzsegment)) 449 & (mz_exp_peaks <= max(self.mzsegment)) 450 ] 451 mz_exp_peaks_unchanged = mz_exp_peaks[ 452 ~(mz_exp_peaks >= min(self.mzsegment)) 453 | ~(mz_exp_peaks <= max(self.mzsegment)) 454 ] 455 # TODO: - segmented calibration needs a way to better track the calibration args/values... 456 if not self.mass_spectrum.is_centroid: 457 mz_exp_profile = np.array(self.mass_spectrum.mz_exp_profile) 458 # Split the array into two parts - one to recailbrate, one to keep unchanged. 459 mz_exp_profile_tocal = mz_exp_profile[ 460 (mz_exp_profile >= min(self.mzsegment)) 461 & (mz_exp_profile <= max(self.mzsegment)) 462 ] 463 mz_exp_profile_unchanged = mz_exp_profile[ 464 ~(mz_exp_profile >= min(self.mzsegment)) 465 | ~(mz_exp_profile <= max(self.mzsegment)) 466 ] 467 else: # if just recalibrating the whole spectrum 468 mz_exp_peaks_tocal = np.array( 469 [mspeak.mz_exp for mspeak in self.mass_spectrum] 470 ) 471 if not self.mass_spectrum.is_centroid: 472 mz_exp_profile_tocal = np.array(self.mass_spectrum.mz_exp_profile) 473 474 minimize_method = self.mass_spectrum.settings.calib_minimize_method 475 res = minimize( 476 self.robust_calib, 477 Po, 478 args=(cal_peaks_mz, cal_refs_mz, order), 479 method=minimize_method, 480 ) 481 if self.mass_spectrum.parameters.mass_spectrum.verbose_processing: 482 print( 483 "minimize function completed with RMS error of: {:0.3f} ppm".format( 484 res["fun"] 485 ) 486 ) 487 print( 488 "minimize function performed {:1d} fn evals and {:1d} iterations".format( 489 res["nfev"], res["nit"] 490 ) 491 ) 492 Pn = res.x 493 494 # mz_exp_ms = np.array([mspeak.mz_exp for mspeak in self.mass_spectrum]) 495 496 if order == 1: 497 mz_domain = (Pn[0] * mz_exp_peaks_tocal) + Pn[1] 498 if not self.mass_spectrum.is_centroid: 499 mz_profile_calc = (Pn[0] * mz_exp_profile_tocal) + Pn[1] 500 501 elif order == 2: 502 mz_domain = (Pn[0] * (mz_exp_peaks_tocal)) + ( 503 Pn[1] * np.power((mz_exp_peaks_tocal), 2) + Pn[2] 504 ) 505 506 if not self.mass_spectrum.is_centroid: 507 mz_profile_calc = (Pn[0] * (mz_exp_profile_tocal)) + ( 508 Pn[1] * np.power((mz_exp_profile_tocal), 2) + Pn[2] 509 ) 510 511 if self.mzsegment: 512 # Recombine the mass domains 513 mz_domain = np.concatenate([mz_domain, mz_exp_peaks_unchanged]) 514 mz_domain.sort() 515 if not self.mass_spectrum.is_centroid: 516 mz_profile_calc = np.concatenate( 517 [mz_profile_calc, mz_exp_profile_unchanged] 518 ) 519 mz_profile_calc.sort() 520 # Sort them 521 if ( 522 mz_exp_peaks[0] > mz_exp_peaks[1] 523 ): # If originally descending mass order 524 mz_domain = mz_domain[::-1] 525 if not self.mass_spectrum.is_centroid: 526 mz_profile_calc = mz_profile_calc[::-1] 527 528 self.mass_spectrum.mz_cal = mz_domain 529 if not self.mass_spectrum.is_centroid: 530 self.mass_spectrum.mz_cal_profile = mz_profile_calc 531 532 self.mass_spectrum.calibration_order = order 533 self.mass_spectrum.calibration_RMS = float(res["fun"]) 534 self.mass_spectrum.calibration_points = int(len(cal_refs_mz)) 535 self.mass_spectrum.calibration_ref_mzs = cal_refs_mz 536 self.mass_spectrum.calibration_meas_mzs = cal_peaks_mz 537 538 self.mass_spectrum.calibration_segment = self.mzsegment 539 540 if diagnostic: 541 return self.mass_spectrum, res 542 return self.mass_spectrum 543 else: 544 warnings.warn("Too few calibration points - aborting.") 545 return self.mass_spectrum
Main recalibration function which uses a robust linear regression
This function performs the recalibration of the mass spectrum object. It iteratively applies
Parameters
- cal_peaks_mz (list of float): masses of measured peaks to use in mass calibration.
- cal_refs_mz (list of float): reference mz values of found calibration points.
- order (int, optional): order of the recalibration function. 1 = linear, 2 = quadratic. The default is 1.
Returns
- mass_spectrum (CoreMS mass spectrum object): Calibrated mass spectrum object
Notes
This function is adapted, in part, from the SPIKE project [1,2] and is based on the robust linear regression method.
References
- Chiron L., Coutouly M-A., Starck J-P., Rolando C., Delsuc M-A. SPIKE a Processing Software dedicated to Fourier Spectroscopies https://arxiv.org/abs/1608.06777 (2016)
- SPIKE - https://github.com/spike-project/spike
547 def run(self): 548 """Run the calibration routine 549 550 This function runs the calibration routine. 551 552 """ 553 calib_ppm_error_threshold = self.mass_spectrum.settings.calib_sn_threshold 554 max_calib_ppm_error = self.mass_spectrum.settings.max_calib_ppm_error 555 min_calib_ppm_error = self.mass_spectrum.settings.min_calib_ppm_error 556 calib_pol_order = self.mass_spectrum.settings.calib_pol_order 557 calibration_ref_match_method = ( 558 self.mass_spectrum.settings.calibration_ref_match_method 559 ) 560 calibration_ref_match_tolerance = ( 561 self.mass_spectrum.settings.calibration_ref_match_tolerance 562 ) 563 calibration_ref_match_std_raw_error_limit = ( 564 self.mass_spectrum.settings.calibration_ref_match_std_raw_error_limit 565 ) 566 567 # load reference mass list 568 df_ref = self.load_ref_mass_list() 569 570 # find calibration points 571 cal_peaks_mz, cal_refs_mz = self.find_calibration_points( 572 df_ref, 573 calib_ppm_error_threshold=(min_calib_ppm_error, max_calib_ppm_error), 574 calib_snr_threshold=calib_ppm_error_threshold, 575 calibration_ref_match_method=calibration_ref_match_method, 576 calibration_ref_match_tolerance=calibration_ref_match_tolerance, 577 calibration_ref_match_std_raw_error_limit=calibration_ref_match_std_raw_error_limit, 578 ) 579 if len(cal_peaks_mz) == 2: 580 self.mass_spectrum.settings.calib_pol_order = 1 581 calib_pol_order = 1 582 if self.mass_spectrum.parameters.mass_spectrum.verbose_processing: 583 print("Only 2 calibration points found, forcing a linear recalibration") 584 elif len(cal_peaks_mz) < 2: 585 warnings.warn("Too few calibration points found, function will fail") 586 self.recalibrate_mass_spectrum(cal_peaks_mz, cal_refs_mz, order=calib_pol_order)
Run the calibration routine
This function runs the calibration routine.