corems.mass_spectrum.input.baseClass
1__author__ = "Yuri E. Corilo" 2__date__ = "Nov 11, 2019" 3 4from copy import deepcopy 5from io import BytesIO 6from pathlib import Path 7 8import chardet 9from bs4 import BeautifulSoup 10from pandas import read_csv, read_excel, read_pickle 11from pandas.core.frame import DataFrame 12from s3path import S3Path 13 14from corems.encapsulation.constant import Labels 15from corems.encapsulation.factory.parameters import default_parameters 16from corems.encapsulation.factory.processingSetting import DataInputSetting 17from corems.encapsulation.input.parameter_from_json import ( 18 load_and_set_parameters_class, 19 load_and_set_parameters_ms, 20 load_and_set_toml_parameters_class, 21) 22 23 24class MassListBaseClass: 25 """The MassListBaseClass object reads mass list data types and returns the mass spectrum obj 26 27 Parameters 28 ---------- 29 file_location : Path or S3Path 30 Full data path. 31 isCentroid : bool, optional 32 Determines the mass spectrum data structure. If set to True, it assumes centroid mode. If set to False, it assumes profile mode and attempts to peak pick. Default is True. 33 analyzer : str, optional 34 The analyzer used for the mass spectrum. Default is 'Unknown'. 35 instrument_label : str, optional 36 The label of the instrument used for the mass spectrum. Default is 'Unknown'. 37 sample_name : str, optional 38 The name of the sample. Default is None. 39 header_lines : int, optional 40 The number of lines to skip in the file, including the column labels line. Default is 0. 41 isThermoProfile : bool, optional 42 Determines the number of expected columns in the file. If set to True, only m/z and intensity columns are expected. Signal-to-noise ratio (S/N) and resolving power (RP) will be calculated based on the data. Default is False. 43 headerless : bool, optional 44 If True, assumes that there are no headers present in the file (e.g., a .xy file from Bruker) and assumes two columns: m/z and intensity. Default is False. 45 46 Attributes 47 ---------- 48 parameters : DataInputSetting 49 The data input settings for the mass spectrum. 50 data_type : str 51 The type of data in the file. 52 delimiter : str 53 The delimiter used to read text-based files. 54 55 Methods 56 ------- 57 * set_parameter_from_toml(parameters_path). Sets the data input settings from a TOML file. 58 * set_parameter_from_json(parameters_path). Sets the data input settings from a JSON file. 59 * get_dataframe(). Reads the file and returns the data as a pandas DataFrame. 60 * load_settings(mass_spec_obj, output_parameters). Loads the settings for the mass spectrum. 61 * get_output_parameters(polarity, scan_index=0). Returns the output parameters for the mass spectrum. 62 * clean_data_frame(dataframe). Cleans the data frame by removing columns that are not in the expected columns set. 63 64 """ 65 66 def __init__( 67 self, 68 file_location: Path | S3Path, 69 isCentroid: bool = True, 70 analyzer: str = "Unknown", 71 instrument_label: str = "Unknown", 72 sample_name: str = None, 73 header_lines: int = 0, 74 isThermoProfile: bool = False, 75 headerless: bool = False, 76 ): 77 self.file_location = ( 78 Path(file_location) if isinstance(file_location, str) else file_location 79 ) 80 81 if not self.file_location.exists(): 82 raise FileExistsError("File does not exist: %s" % file_location) 83 84 # (newline="\n") 85 86 self.header_lines = header_lines 87 88 if isThermoProfile: 89 self._expected_columns = {Labels.mz, Labels.abundance} 90 91 else: 92 self._expected_columns = { 93 Labels.mz, 94 Labels.abundance, 95 Labels.s2n, 96 Labels.rp, 97 } 98 99 self._delimiter = None 100 101 self.isCentroid = isCentroid 102 103 self.isThermoProfile = isThermoProfile 104 105 self.headerless = headerless 106 107 self._data_type = None 108 109 self.analyzer = analyzer 110 111 self.instrument_label = instrument_label 112 113 self.sample_name = sample_name 114 115 self._parameters = deepcopy(DataInputSetting()) 116 117 @property 118 def parameters(self): 119 return self._parameters 120 121 @parameters.setter 122 def parameters(self, instance_DataInputSetting): 123 self._parameters = instance_DataInputSetting 124 125 def set_parameter_from_toml(self, parameters_path): 126 self._parameters = load_and_set_toml_parameters_class( 127 "DataInput", self.parameters, parameters_path=parameters_path 128 ) 129 130 def set_parameter_from_json(self, parameters_path): 131 self._parameters = load_and_set_parameters_class( 132 "DataInput", self.parameters, parameters_path=parameters_path 133 ) 134 135 @property 136 def data_type(self): 137 return self._data_type 138 139 @data_type.setter 140 def data_type(self, data_type): 141 self._data_type = data_type 142 143 @property 144 def delimiter(self): 145 return self._delimiter 146 147 @delimiter.setter 148 def delimiter(self, delimiter): 149 self._delimiter = delimiter 150 151 def encoding_detector(self, file_location) -> str: 152 """ 153 Detects the encoding of a file. 154 155 Parameters 156 -------- 157 file_location : str 158 The location of the file to be analyzed. 159 160 Returns 161 -------- 162 str 163 The detected encoding of the file. 164 """ 165 166 with file_location.open("rb") as rawdata: 167 result = chardet.detect(rawdata.read(10000)) 168 return result["encoding"] 169 170 def set_data_type(self): 171 """ 172 Set the data type and delimiter based on the file extension. 173 174 Raises 175 ------ 176 TypeError 177 If the data type could not be automatically recognized. 178 """ 179 if self.file_location.suffix == ".csv": 180 self.data_type = "txt" 181 self.delimiter = "," 182 elif self.file_location.suffix == ".txt": 183 self.data_type = "txt" 184 self.delimiter = "\t" 185 elif self.file_location.suffix == ".tsv": 186 self.data_type = "txt" 187 self.delimiter = "\t" 188 elif self.file_location.suffix == ".xlsx": 189 self.data_type = "excel" 190 elif self.file_location.suffix == ".ascii": 191 self.data_type = "txt" 192 self.delimiter = " " 193 elif self.file_location.suffix == ".pkl": 194 self.data_type = "dataframe" 195 elif self.file_location.suffix == ".pks": 196 self.data_type = "pks" 197 self.delimiter = " " 198 self.header_lines = 9 199 elif self.file_location.suffix == ".xml": 200 self.data_type = "xml" 201 # self.delimiter = None 202 # self.header_lines = None 203 elif self.file_location.suffix == ".xy": 204 self.data_type = "txt" 205 self.delimiter = " " 206 self.header_lines = None 207 else: 208 raise TypeError( 209 "Data type could not be automatically recognized for %s; please set data type and delimiter manually." 210 % self.file_location.name 211 ) 212 213 def get_dataframe(self) -> DataFrame: 214 """ 215 Get the data as a pandas DataFrame. 216 217 Returns 218 ------- 219 pandas.DataFrame 220 The data as a pandas DataFrame. 221 222 Raises 223 ------ 224 TypeError 225 If the data type is not supported. 226 """ 227 228 if not self.data_type or not self.delimiter: 229 self.set_data_type() 230 231 if isinstance(self.file_location, S3Path): 232 data = BytesIO(self.file_location.open("rb").read()) 233 else: 234 data = self.file_location 235 236 if self.data_type == "txt": 237 if self.headerless: 238 dataframe = read_csv( 239 data, 240 skiprows=self.header_lines, 241 delimiter=self.delimiter, 242 header=None, 243 names=["m/z", "I"], 244 encoding=self.encoding_detector(self.file_location), 245 engine="python", 246 ) 247 else: 248 dataframe = read_csv( 249 data, 250 skiprows=self.header_lines, 251 delimiter=self.delimiter, 252 encoding=self.encoding_detector(self.file_location), 253 engine="python", 254 ) 255 256 elif self.data_type == "pks": 257 names = [ 258 "m/z", 259 "I", 260 "Scaled Peak Height", 261 "Resolving Power", 262 "Frequency", 263 "S/N", 264 ] 265 clean_data = [] 266 with self.file_location.open() as maglabfile: 267 for i in maglabfile.readlines()[8:-1]: 268 clean_data.append(i.split()) 269 dataframe = DataFrame(clean_data, columns=names) 270 271 elif self.data_type == "dataframe": 272 dataframe = read_pickle(data) 273 274 elif self.data_type == "excel": 275 dataframe = read_excel(data) 276 277 elif self.data_type == "xml": 278 dataframe = self.read_xml_peaks(data) 279 280 else: 281 raise TypeError("Data type %s is not supported" % self.data_type) 282 283 return dataframe 284 285 def load_settings(self, mass_spec_obj, output_parameters): 286 """ 287 #TODO loading output parameters from json file is not functional 288 Load settings from a JSON file and apply them to the given mass_spec_obj. 289 290 Parameters 291 ---------- 292 mass_spec_obj : MassSpec 293 The mass spectrum object to apply the settings to. 294 295 """ 296 import json 297 import warnings 298 299 settings_file_path = self.file_location.with_suffix(".json") 300 301 if settings_file_path.exists(): 302 self._parameters = load_and_set_parameters_class( 303 "DataInput", self._parameters, parameters_path=settings_file_path 304 ) 305 306 load_and_set_parameters_ms( 307 mass_spec_obj, parameters_path=settings_file_path 308 ) 309 310 else: 311 warnings.warn( 312 "auto settings loading is enabled but could not locate the file: %s. Please load the settings manually" 313 % settings_file_path 314 ) 315 316 # TODO this will load the setting from SettingCoreMS.json 317 # coreMSHFD5 overrides this function to import the attrs stored in the h5 file 318 # loaded_settings = {} 319 # loaded_settings['MoleculaSearch'] = self.get_scan_group_attr_data(scan_index, time_index, 'MoleculaSearchSetting') 320 # loaded_settings['MassSpecPeak'] = self.get_scan_group_attr_data(scan_index, time_index, 'MassSpecPeakSetting') 321 322 # loaded_settings['MassSpectrum'] = self.get_scan_group_attr_data(scan_index, time_index, 'MassSpectrumSetting') 323 # loaded_settings['Transient'] = self.get_scan_group_attr_data(scan_index, time_index, 'TransientSetting') 324 325 def get_output_parameters(self, polarity: int, scan_index: int = 0) -> dict: 326 """ 327 Get the output parameters for the mass spectrum. 328 329 Parameters 330 ---------- 331 polarity : int 332 The polarity of the mass spectrum +1 or -1. 333 scan_index : int, optional 334 The index of the scan. Default is 0. 335 336 Returns 337 ------- 338 dict 339 A dictionary containing the output parameters. 340 341 """ 342 from copy import deepcopy 343 344 output_parameters = default_parameters(self.file_location) 345 346 if self.isCentroid: 347 output_parameters["label"] = Labels.corems_centroid 348 else: 349 output_parameters["label"] = Labels.bruker_profile 350 351 output_parameters["analyzer"] = self.analyzer 352 353 output_parameters["instrument_label"] = self.instrument_label 354 355 output_parameters["sample_name"] = self.sample_name 356 357 output_parameters["Aterm"] = None 358 359 output_parameters["Bterm"] = None 360 361 output_parameters["Cterm"] = None 362 363 output_parameters["polarity"] = polarity 364 365 # scan_number and rt will be need to lc ms==== 366 367 output_parameters["mobility_scan"] = 0 368 369 output_parameters["mobility_rt"] = 0 370 371 output_parameters["scan_number"] = scan_index 372 373 output_parameters["rt"] = 0 374 375 return output_parameters 376 377 def clean_data_frame(self, dataframe): 378 """ 379 Clean the input dataframe by removing columns that are not expected. 380 381 Parameters 382 ---------- 383 pandas.DataFrame 384 The input dataframe to be cleaned. 385 386 """ 387 388 for column_name in dataframe.columns: 389 expected_column_name = self.parameters.header_translate.get(column_name) 390 if expected_column_name not in self._expected_columns: 391 del dataframe[column_name] 392 393 def check_columns(self, header_labels: list[str]): 394 """ 395 Check if the given header labels match the expected columns. 396 397 Parameters 398 ---------- 399 header_labels : list 400 The header labels to be checked. 401 402 Raises 403 ------ 404 Exception 405 If any expected column is not found in the header labels. 406 """ 407 found_label = set() 408 409 for label in header_labels: 410 if not label in self._expected_columns: 411 user_column_name = self.parameters.header_translate.get(label) 412 if user_column_name in self._expected_columns: 413 found_label.add(user_column_name) 414 else: 415 found_label.add(label) 416 417 not_found = self._expected_columns - found_label 418 419 if len(not_found) > 0: 420 raise Exception( 421 "Please make sure to include the columns %s" % ", ".join(not_found) 422 ) 423 424 def read_xml_peaks(self, data: str) -> DataFrame: 425 """ 426 Read peaks from a Bruker .xml file and return a pandas DataFrame. 427 428 Parameters 429 ---------- 430 data : str 431 The path to the .xml file. 432 433 Returns 434 ------- 435 pandas.DataFrame 436 A DataFrame containing the peak data with columns: 'm/z', 'I', 'Resolving Power', 'Area', 'S/N', 'fwhm'. 437 """ 438 from numpy import nan 439 440 with open(data, "r") as file: 441 content = file.readlines() 442 content = "".join(content) 443 bs_content = BeautifulSoup(content, features="xml") 444 peaks_xml = bs_content.find_all("pk") 445 446 # initialise lists of the peak variables 447 areas = [] 448 fwhms = [] 449 intensities = [] 450 mzs = [] 451 res = [] 452 sn = [] 453 # iterate through the peaks appending to each list 454 for peak in peaks_xml: 455 areas.append( 456 float(peak.get("a", nan)) 457 ) # Use a default value if key 'a' is missing 458 fwhms.append( 459 float(peak.get("fwhm", nan)) 460 ) # Use a default value if key 'fwhm' is missing 461 intensities.append( 462 float(peak.get("i", nan)) 463 ) # Use a default value if key 'i' is missing 464 mzs.append( 465 float(peak.get("mz", nan)) 466 ) # Use a default value if key 'mz' is missing 467 res.append( 468 float(peak.get("res", nan)) 469 ) # Use a default value if key 'res' is missing 470 sn.append( 471 float(peak.get("sn", nan)) 472 ) # Use a default value if key 'sn' is missing 473 474 # Compile pandas dataframe of these values 475 names = ["m/z", "I", "Resolving Power", "Area", "S/N", "fwhm"] 476 df = DataFrame(columns=names, dtype=float) 477 df["m/z"] = mzs 478 df["I"] = intensities 479 df["Resolving Power"] = res 480 df["Area"] = areas 481 df["S/N"] = sn 482 df["fwhm"] = fwhms 483 return df 484 485 def get_xml_polarity(self): 486 """ 487 Get the polarity from an XML peaklist. 488 489 Returns 490 ------- 491 int 492 The polarity of the XML peaklist. Returns -1 for negative polarity, +1 for positive polarity. 493 494 Raises 495 ------ 496 Exception 497 If the data type is not XML peaklist in Bruker format or if the polarity is unhandled. 498 """ 499 500 # Check its an actual xml 501 if not self.data_type or not self.delimiter: 502 self.set_data_type() 503 504 if isinstance(self.file_location, S3Path): 505 # data = self.file_location.open('rb').read() 506 data = BytesIO(self.file_location.open("rb").read()) 507 508 else: 509 data = self.file_location 510 511 if self.data_type != "xml": 512 raise Exception("This function is only for XML peaklists (Bruker format)") 513 514 with open(data, "r") as file: 515 content = file.readlines() 516 content = "".join(content) 517 bs_content = BeautifulSoup(content, features="xml") 518 polarity = bs_content.find_all("ms_spectrum")[0]["polarity"] 519 if polarity == "-": 520 return -1 521 elif polarity == "+": 522 return +1 523 else: 524 raise Exception("Polarity %s unhandled" % polarity)
class
MassListBaseClass:
25class MassListBaseClass: 26 """The MassListBaseClass object reads mass list data types and returns the mass spectrum obj 27 28 Parameters 29 ---------- 30 file_location : Path or S3Path 31 Full data path. 32 isCentroid : bool, optional 33 Determines the mass spectrum data structure. If set to True, it assumes centroid mode. If set to False, it assumes profile mode and attempts to peak pick. Default is True. 34 analyzer : str, optional 35 The analyzer used for the mass spectrum. Default is 'Unknown'. 36 instrument_label : str, optional 37 The label of the instrument used for the mass spectrum. Default is 'Unknown'. 38 sample_name : str, optional 39 The name of the sample. Default is None. 40 header_lines : int, optional 41 The number of lines to skip in the file, including the column labels line. Default is 0. 42 isThermoProfile : bool, optional 43 Determines the number of expected columns in the file. If set to True, only m/z and intensity columns are expected. Signal-to-noise ratio (S/N) and resolving power (RP) will be calculated based on the data. Default is False. 44 headerless : bool, optional 45 If True, assumes that there are no headers present in the file (e.g., a .xy file from Bruker) and assumes two columns: m/z and intensity. Default is False. 46 47 Attributes 48 ---------- 49 parameters : DataInputSetting 50 The data input settings for the mass spectrum. 51 data_type : str 52 The type of data in the file. 53 delimiter : str 54 The delimiter used to read text-based files. 55 56 Methods 57 ------- 58 * set_parameter_from_toml(parameters_path). Sets the data input settings from a TOML file. 59 * set_parameter_from_json(parameters_path). Sets the data input settings from a JSON file. 60 * get_dataframe(). Reads the file and returns the data as a pandas DataFrame. 61 * load_settings(mass_spec_obj, output_parameters). Loads the settings for the mass spectrum. 62 * get_output_parameters(polarity, scan_index=0). Returns the output parameters for the mass spectrum. 63 * clean_data_frame(dataframe). Cleans the data frame by removing columns that are not in the expected columns set. 64 65 """ 66 67 def __init__( 68 self, 69 file_location: Path | S3Path, 70 isCentroid: bool = True, 71 analyzer: str = "Unknown", 72 instrument_label: str = "Unknown", 73 sample_name: str = None, 74 header_lines: int = 0, 75 isThermoProfile: bool = False, 76 headerless: bool = False, 77 ): 78 self.file_location = ( 79 Path(file_location) if isinstance(file_location, str) else file_location 80 ) 81 82 if not self.file_location.exists(): 83 raise FileExistsError("File does not exist: %s" % file_location) 84 85 # (newline="\n") 86 87 self.header_lines = header_lines 88 89 if isThermoProfile: 90 self._expected_columns = {Labels.mz, Labels.abundance} 91 92 else: 93 self._expected_columns = { 94 Labels.mz, 95 Labels.abundance, 96 Labels.s2n, 97 Labels.rp, 98 } 99 100 self._delimiter = None 101 102 self.isCentroid = isCentroid 103 104 self.isThermoProfile = isThermoProfile 105 106 self.headerless = headerless 107 108 self._data_type = None 109 110 self.analyzer = analyzer 111 112 self.instrument_label = instrument_label 113 114 self.sample_name = sample_name 115 116 self._parameters = deepcopy(DataInputSetting()) 117 118 @property 119 def parameters(self): 120 return self._parameters 121 122 @parameters.setter 123 def parameters(self, instance_DataInputSetting): 124 self._parameters = instance_DataInputSetting 125 126 def set_parameter_from_toml(self, parameters_path): 127 self._parameters = load_and_set_toml_parameters_class( 128 "DataInput", self.parameters, parameters_path=parameters_path 129 ) 130 131 def set_parameter_from_json(self, parameters_path): 132 self._parameters = load_and_set_parameters_class( 133 "DataInput", self.parameters, parameters_path=parameters_path 134 ) 135 136 @property 137 def data_type(self): 138 return self._data_type 139 140 @data_type.setter 141 def data_type(self, data_type): 142 self._data_type = data_type 143 144 @property 145 def delimiter(self): 146 return self._delimiter 147 148 @delimiter.setter 149 def delimiter(self, delimiter): 150 self._delimiter = delimiter 151 152 def encoding_detector(self, file_location) -> str: 153 """ 154 Detects the encoding of a file. 155 156 Parameters 157 -------- 158 file_location : str 159 The location of the file to be analyzed. 160 161 Returns 162 -------- 163 str 164 The detected encoding of the file. 165 """ 166 167 with file_location.open("rb") as rawdata: 168 result = chardet.detect(rawdata.read(10000)) 169 return result["encoding"] 170 171 def set_data_type(self): 172 """ 173 Set the data type and delimiter based on the file extension. 174 175 Raises 176 ------ 177 TypeError 178 If the data type could not be automatically recognized. 179 """ 180 if self.file_location.suffix == ".csv": 181 self.data_type = "txt" 182 self.delimiter = "," 183 elif self.file_location.suffix == ".txt": 184 self.data_type = "txt" 185 self.delimiter = "\t" 186 elif self.file_location.suffix == ".tsv": 187 self.data_type = "txt" 188 self.delimiter = "\t" 189 elif self.file_location.suffix == ".xlsx": 190 self.data_type = "excel" 191 elif self.file_location.suffix == ".ascii": 192 self.data_type = "txt" 193 self.delimiter = " " 194 elif self.file_location.suffix == ".pkl": 195 self.data_type = "dataframe" 196 elif self.file_location.suffix == ".pks": 197 self.data_type = "pks" 198 self.delimiter = " " 199 self.header_lines = 9 200 elif self.file_location.suffix == ".xml": 201 self.data_type = "xml" 202 # self.delimiter = None 203 # self.header_lines = None 204 elif self.file_location.suffix == ".xy": 205 self.data_type = "txt" 206 self.delimiter = " " 207 self.header_lines = None 208 else: 209 raise TypeError( 210 "Data type could not be automatically recognized for %s; please set data type and delimiter manually." 211 % self.file_location.name 212 ) 213 214 def get_dataframe(self) -> DataFrame: 215 """ 216 Get the data as a pandas DataFrame. 217 218 Returns 219 ------- 220 pandas.DataFrame 221 The data as a pandas DataFrame. 222 223 Raises 224 ------ 225 TypeError 226 If the data type is not supported. 227 """ 228 229 if not self.data_type or not self.delimiter: 230 self.set_data_type() 231 232 if isinstance(self.file_location, S3Path): 233 data = BytesIO(self.file_location.open("rb").read()) 234 else: 235 data = self.file_location 236 237 if self.data_type == "txt": 238 if self.headerless: 239 dataframe = read_csv( 240 data, 241 skiprows=self.header_lines, 242 delimiter=self.delimiter, 243 header=None, 244 names=["m/z", "I"], 245 encoding=self.encoding_detector(self.file_location), 246 engine="python", 247 ) 248 else: 249 dataframe = read_csv( 250 data, 251 skiprows=self.header_lines, 252 delimiter=self.delimiter, 253 encoding=self.encoding_detector(self.file_location), 254 engine="python", 255 ) 256 257 elif self.data_type == "pks": 258 names = [ 259 "m/z", 260 "I", 261 "Scaled Peak Height", 262 "Resolving Power", 263 "Frequency", 264 "S/N", 265 ] 266 clean_data = [] 267 with self.file_location.open() as maglabfile: 268 for i in maglabfile.readlines()[8:-1]: 269 clean_data.append(i.split()) 270 dataframe = DataFrame(clean_data, columns=names) 271 272 elif self.data_type == "dataframe": 273 dataframe = read_pickle(data) 274 275 elif self.data_type == "excel": 276 dataframe = read_excel(data) 277 278 elif self.data_type == "xml": 279 dataframe = self.read_xml_peaks(data) 280 281 else: 282 raise TypeError("Data type %s is not supported" % self.data_type) 283 284 return dataframe 285 286 def load_settings(self, mass_spec_obj, output_parameters): 287 """ 288 #TODO loading output parameters from json file is not functional 289 Load settings from a JSON file and apply them to the given mass_spec_obj. 290 291 Parameters 292 ---------- 293 mass_spec_obj : MassSpec 294 The mass spectrum object to apply the settings to. 295 296 """ 297 import json 298 import warnings 299 300 settings_file_path = self.file_location.with_suffix(".json") 301 302 if settings_file_path.exists(): 303 self._parameters = load_and_set_parameters_class( 304 "DataInput", self._parameters, parameters_path=settings_file_path 305 ) 306 307 load_and_set_parameters_ms( 308 mass_spec_obj, parameters_path=settings_file_path 309 ) 310 311 else: 312 warnings.warn( 313 "auto settings loading is enabled but could not locate the file: %s. Please load the settings manually" 314 % settings_file_path 315 ) 316 317 # TODO this will load the setting from SettingCoreMS.json 318 # coreMSHFD5 overrides this function to import the attrs stored in the h5 file 319 # loaded_settings = {} 320 # loaded_settings['MoleculaSearch'] = self.get_scan_group_attr_data(scan_index, time_index, 'MoleculaSearchSetting') 321 # loaded_settings['MassSpecPeak'] = self.get_scan_group_attr_data(scan_index, time_index, 'MassSpecPeakSetting') 322 323 # loaded_settings['MassSpectrum'] = self.get_scan_group_attr_data(scan_index, time_index, 'MassSpectrumSetting') 324 # loaded_settings['Transient'] = self.get_scan_group_attr_data(scan_index, time_index, 'TransientSetting') 325 326 def get_output_parameters(self, polarity: int, scan_index: int = 0) -> dict: 327 """ 328 Get the output parameters for the mass spectrum. 329 330 Parameters 331 ---------- 332 polarity : int 333 The polarity of the mass spectrum +1 or -1. 334 scan_index : int, optional 335 The index of the scan. Default is 0. 336 337 Returns 338 ------- 339 dict 340 A dictionary containing the output parameters. 341 342 """ 343 from copy import deepcopy 344 345 output_parameters = default_parameters(self.file_location) 346 347 if self.isCentroid: 348 output_parameters["label"] = Labels.corems_centroid 349 else: 350 output_parameters["label"] = Labels.bruker_profile 351 352 output_parameters["analyzer"] = self.analyzer 353 354 output_parameters["instrument_label"] = self.instrument_label 355 356 output_parameters["sample_name"] = self.sample_name 357 358 output_parameters["Aterm"] = None 359 360 output_parameters["Bterm"] = None 361 362 output_parameters["Cterm"] = None 363 364 output_parameters["polarity"] = polarity 365 366 # scan_number and rt will be need to lc ms==== 367 368 output_parameters["mobility_scan"] = 0 369 370 output_parameters["mobility_rt"] = 0 371 372 output_parameters["scan_number"] = scan_index 373 374 output_parameters["rt"] = 0 375 376 return output_parameters 377 378 def clean_data_frame(self, dataframe): 379 """ 380 Clean the input dataframe by removing columns that are not expected. 381 382 Parameters 383 ---------- 384 pandas.DataFrame 385 The input dataframe to be cleaned. 386 387 """ 388 389 for column_name in dataframe.columns: 390 expected_column_name = self.parameters.header_translate.get(column_name) 391 if expected_column_name not in self._expected_columns: 392 del dataframe[column_name] 393 394 def check_columns(self, header_labels: list[str]): 395 """ 396 Check if the given header labels match the expected columns. 397 398 Parameters 399 ---------- 400 header_labels : list 401 The header labels to be checked. 402 403 Raises 404 ------ 405 Exception 406 If any expected column is not found in the header labels. 407 """ 408 found_label = set() 409 410 for label in header_labels: 411 if not label in self._expected_columns: 412 user_column_name = self.parameters.header_translate.get(label) 413 if user_column_name in self._expected_columns: 414 found_label.add(user_column_name) 415 else: 416 found_label.add(label) 417 418 not_found = self._expected_columns - found_label 419 420 if len(not_found) > 0: 421 raise Exception( 422 "Please make sure to include the columns %s" % ", ".join(not_found) 423 ) 424 425 def read_xml_peaks(self, data: str) -> DataFrame: 426 """ 427 Read peaks from a Bruker .xml file and return a pandas DataFrame. 428 429 Parameters 430 ---------- 431 data : str 432 The path to the .xml file. 433 434 Returns 435 ------- 436 pandas.DataFrame 437 A DataFrame containing the peak data with columns: 'm/z', 'I', 'Resolving Power', 'Area', 'S/N', 'fwhm'. 438 """ 439 from numpy import nan 440 441 with open(data, "r") as file: 442 content = file.readlines() 443 content = "".join(content) 444 bs_content = BeautifulSoup(content, features="xml") 445 peaks_xml = bs_content.find_all("pk") 446 447 # initialise lists of the peak variables 448 areas = [] 449 fwhms = [] 450 intensities = [] 451 mzs = [] 452 res = [] 453 sn = [] 454 # iterate through the peaks appending to each list 455 for peak in peaks_xml: 456 areas.append( 457 float(peak.get("a", nan)) 458 ) # Use a default value if key 'a' is missing 459 fwhms.append( 460 float(peak.get("fwhm", nan)) 461 ) # Use a default value if key 'fwhm' is missing 462 intensities.append( 463 float(peak.get("i", nan)) 464 ) # Use a default value if key 'i' is missing 465 mzs.append( 466 float(peak.get("mz", nan)) 467 ) # Use a default value if key 'mz' is missing 468 res.append( 469 float(peak.get("res", nan)) 470 ) # Use a default value if key 'res' is missing 471 sn.append( 472 float(peak.get("sn", nan)) 473 ) # Use a default value if key 'sn' is missing 474 475 # Compile pandas dataframe of these values 476 names = ["m/z", "I", "Resolving Power", "Area", "S/N", "fwhm"] 477 df = DataFrame(columns=names, dtype=float) 478 df["m/z"] = mzs 479 df["I"] = intensities 480 df["Resolving Power"] = res 481 df["Area"] = areas 482 df["S/N"] = sn 483 df["fwhm"] = fwhms 484 return df 485 486 def get_xml_polarity(self): 487 """ 488 Get the polarity from an XML peaklist. 489 490 Returns 491 ------- 492 int 493 The polarity of the XML peaklist. Returns -1 for negative polarity, +1 for positive polarity. 494 495 Raises 496 ------ 497 Exception 498 If the data type is not XML peaklist in Bruker format or if the polarity is unhandled. 499 """ 500 501 # Check its an actual xml 502 if not self.data_type or not self.delimiter: 503 self.set_data_type() 504 505 if isinstance(self.file_location, S3Path): 506 # data = self.file_location.open('rb').read() 507 data = BytesIO(self.file_location.open("rb").read()) 508 509 else: 510 data = self.file_location 511 512 if self.data_type != "xml": 513 raise Exception("This function is only for XML peaklists (Bruker format)") 514 515 with open(data, "r") as file: 516 content = file.readlines() 517 content = "".join(content) 518 bs_content = BeautifulSoup(content, features="xml") 519 polarity = bs_content.find_all("ms_spectrum")[0]["polarity"] 520 if polarity == "-": 521 return -1 522 elif polarity == "+": 523 return +1 524 else: 525 raise Exception("Polarity %s unhandled" % polarity)
The MassListBaseClass object reads mass list data types and returns the mass spectrum obj
Parameters
- file_location (Path or S3Path): Full data path.
- isCentroid (bool, optional): Determines the mass spectrum data structure. If set to True, it assumes centroid mode. If set to False, it assumes profile mode and attempts to peak pick. Default is True.
- analyzer (str, optional): The analyzer used for the mass spectrum. Default is 'Unknown'.
- instrument_label (str, optional): The label of the instrument used for the mass spectrum. Default is 'Unknown'.
- sample_name (str, optional): The name of the sample. Default is None.
- header_lines (int, optional): The number of lines to skip in the file, including the column labels line. Default is 0.
- isThermoProfile (bool, optional): Determines the number of expected columns in the file. If set to True, only m/z and intensity columns are expected. Signal-to-noise ratio (S/N) and resolving power (RP) will be calculated based on the data. Default is False.
- headerless (bool, optional): If True, assumes that there are no headers present in the file (e.g., a .xy file from Bruker) and assumes two columns: m/z and intensity. Default is False.
Attributes
- parameters (DataInputSetting): The data input settings for the mass spectrum.
- data_type (str): The type of data in the file.
- delimiter (str): The delimiter used to read text-based files.
Methods
- set_parameter_from_toml(parameters_path). Sets the data input settings from a TOML file.
- set_parameter_from_json(parameters_path). Sets the data input settings from a JSON file.
- get_dataframe(). Reads the file and returns the data as a pandas DataFrame.
- load_settings(mass_spec_obj, output_parameters). Loads the settings for the mass spectrum.
- get_output_parameters(polarity, scan_index=0). Returns the output parameters for the mass spectrum.
- clean_data_frame(dataframe). Cleans the data frame by removing columns that are not in the expected columns set.
MassListBaseClass( file_location: pathlib.Path | s3path.S3Path, isCentroid: bool = True, analyzer: str = 'Unknown', instrument_label: str = 'Unknown', sample_name: str = None, header_lines: int = 0, isThermoProfile: bool = False, headerless: bool = False)
67 def __init__( 68 self, 69 file_location: Path | S3Path, 70 isCentroid: bool = True, 71 analyzer: str = "Unknown", 72 instrument_label: str = "Unknown", 73 sample_name: str = None, 74 header_lines: int = 0, 75 isThermoProfile: bool = False, 76 headerless: bool = False, 77 ): 78 self.file_location = ( 79 Path(file_location) if isinstance(file_location, str) else file_location 80 ) 81 82 if not self.file_location.exists(): 83 raise FileExistsError("File does not exist: %s" % file_location) 84 85 # (newline="\n") 86 87 self.header_lines = header_lines 88 89 if isThermoProfile: 90 self._expected_columns = {Labels.mz, Labels.abundance} 91 92 else: 93 self._expected_columns = { 94 Labels.mz, 95 Labels.abundance, 96 Labels.s2n, 97 Labels.rp, 98 } 99 100 self._delimiter = None 101 102 self.isCentroid = isCentroid 103 104 self.isThermoProfile = isThermoProfile 105 106 self.headerless = headerless 107 108 self._data_type = None 109 110 self.analyzer = analyzer 111 112 self.instrument_label = instrument_label 113 114 self.sample_name = sample_name 115 116 self._parameters = deepcopy(DataInputSetting())
def
encoding_detector(self, file_location) -> str:
152 def encoding_detector(self, file_location) -> str: 153 """ 154 Detects the encoding of a file. 155 156 Parameters 157 -------- 158 file_location : str 159 The location of the file to be analyzed. 160 161 Returns 162 -------- 163 str 164 The detected encoding of the file. 165 """ 166 167 with file_location.open("rb") as rawdata: 168 result = chardet.detect(rawdata.read(10000)) 169 return result["encoding"]
Detects the encoding of a file.
Parameters
- file_location (str): The location of the file to be analyzed.
Returns
- str: The detected encoding of the file.
def
set_data_type(self):
171 def set_data_type(self): 172 """ 173 Set the data type and delimiter based on the file extension. 174 175 Raises 176 ------ 177 TypeError 178 If the data type could not be automatically recognized. 179 """ 180 if self.file_location.suffix == ".csv": 181 self.data_type = "txt" 182 self.delimiter = "," 183 elif self.file_location.suffix == ".txt": 184 self.data_type = "txt" 185 self.delimiter = "\t" 186 elif self.file_location.suffix == ".tsv": 187 self.data_type = "txt" 188 self.delimiter = "\t" 189 elif self.file_location.suffix == ".xlsx": 190 self.data_type = "excel" 191 elif self.file_location.suffix == ".ascii": 192 self.data_type = "txt" 193 self.delimiter = " " 194 elif self.file_location.suffix == ".pkl": 195 self.data_type = "dataframe" 196 elif self.file_location.suffix == ".pks": 197 self.data_type = "pks" 198 self.delimiter = " " 199 self.header_lines = 9 200 elif self.file_location.suffix == ".xml": 201 self.data_type = "xml" 202 # self.delimiter = None 203 # self.header_lines = None 204 elif self.file_location.suffix == ".xy": 205 self.data_type = "txt" 206 self.delimiter = " " 207 self.header_lines = None 208 else: 209 raise TypeError( 210 "Data type could not be automatically recognized for %s; please set data type and delimiter manually." 211 % self.file_location.name 212 )
Set the data type and delimiter based on the file extension.
Raises
- TypeError: If the data type could not be automatically recognized.
def
get_dataframe(self) -> pandas.core.frame.DataFrame:
214 def get_dataframe(self) -> DataFrame: 215 """ 216 Get the data as a pandas DataFrame. 217 218 Returns 219 ------- 220 pandas.DataFrame 221 The data as a pandas DataFrame. 222 223 Raises 224 ------ 225 TypeError 226 If the data type is not supported. 227 """ 228 229 if not self.data_type or not self.delimiter: 230 self.set_data_type() 231 232 if isinstance(self.file_location, S3Path): 233 data = BytesIO(self.file_location.open("rb").read()) 234 else: 235 data = self.file_location 236 237 if self.data_type == "txt": 238 if self.headerless: 239 dataframe = read_csv( 240 data, 241 skiprows=self.header_lines, 242 delimiter=self.delimiter, 243 header=None, 244 names=["m/z", "I"], 245 encoding=self.encoding_detector(self.file_location), 246 engine="python", 247 ) 248 else: 249 dataframe = read_csv( 250 data, 251 skiprows=self.header_lines, 252 delimiter=self.delimiter, 253 encoding=self.encoding_detector(self.file_location), 254 engine="python", 255 ) 256 257 elif self.data_type == "pks": 258 names = [ 259 "m/z", 260 "I", 261 "Scaled Peak Height", 262 "Resolving Power", 263 "Frequency", 264 "S/N", 265 ] 266 clean_data = [] 267 with self.file_location.open() as maglabfile: 268 for i in maglabfile.readlines()[8:-1]: 269 clean_data.append(i.split()) 270 dataframe = DataFrame(clean_data, columns=names) 271 272 elif self.data_type == "dataframe": 273 dataframe = read_pickle(data) 274 275 elif self.data_type == "excel": 276 dataframe = read_excel(data) 277 278 elif self.data_type == "xml": 279 dataframe = self.read_xml_peaks(data) 280 281 else: 282 raise TypeError("Data type %s is not supported" % self.data_type) 283 284 return dataframe
Get the data as a pandas DataFrame.
Returns
- pandas.DataFrame: The data as a pandas DataFrame.
Raises
- TypeError: If the data type is not supported.
def
load_settings(self, mass_spec_obj, output_parameters):
286 def load_settings(self, mass_spec_obj, output_parameters): 287 """ 288 #TODO loading output parameters from json file is not functional 289 Load settings from a JSON file and apply them to the given mass_spec_obj. 290 291 Parameters 292 ---------- 293 mass_spec_obj : MassSpec 294 The mass spectrum object to apply the settings to. 295 296 """ 297 import json 298 import warnings 299 300 settings_file_path = self.file_location.with_suffix(".json") 301 302 if settings_file_path.exists(): 303 self._parameters = load_and_set_parameters_class( 304 "DataInput", self._parameters, parameters_path=settings_file_path 305 ) 306 307 load_and_set_parameters_ms( 308 mass_spec_obj, parameters_path=settings_file_path 309 ) 310 311 else: 312 warnings.warn( 313 "auto settings loading is enabled but could not locate the file: %s. Please load the settings manually" 314 % settings_file_path 315 ) 316 317 # TODO this will load the setting from SettingCoreMS.json 318 # coreMSHFD5 overrides this function to import the attrs stored in the h5 file 319 # loaded_settings = {} 320 # loaded_settings['MoleculaSearch'] = self.get_scan_group_attr_data(scan_index, time_index, 'MoleculaSearchSetting') 321 # loaded_settings['MassSpecPeak'] = self.get_scan_group_attr_data(scan_index, time_index, 'MassSpecPeakSetting') 322 323 # loaded_settings['MassSpectrum'] = self.get_scan_group_attr_data(scan_index, time_index, 'MassSpectrumSetting') 324 # loaded_settings['Transient'] = self.get_scan_group_attr_data(scan_index, time_index, 'TransientSetting')
TODO loading output parameters from json file is not functional
Load settings from a JSON file and apply them to the given mass_spec_obj.
Parameters
- mass_spec_obj (MassSpec): The mass spectrum object to apply the settings to.
def
get_output_parameters(self, polarity: int, scan_index: int = 0) -> dict:
326 def get_output_parameters(self, polarity: int, scan_index: int = 0) -> dict: 327 """ 328 Get the output parameters for the mass spectrum. 329 330 Parameters 331 ---------- 332 polarity : int 333 The polarity of the mass spectrum +1 or -1. 334 scan_index : int, optional 335 The index of the scan. Default is 0. 336 337 Returns 338 ------- 339 dict 340 A dictionary containing the output parameters. 341 342 """ 343 from copy import deepcopy 344 345 output_parameters = default_parameters(self.file_location) 346 347 if self.isCentroid: 348 output_parameters["label"] = Labels.corems_centroid 349 else: 350 output_parameters["label"] = Labels.bruker_profile 351 352 output_parameters["analyzer"] = self.analyzer 353 354 output_parameters["instrument_label"] = self.instrument_label 355 356 output_parameters["sample_name"] = self.sample_name 357 358 output_parameters["Aterm"] = None 359 360 output_parameters["Bterm"] = None 361 362 output_parameters["Cterm"] = None 363 364 output_parameters["polarity"] = polarity 365 366 # scan_number and rt will be need to lc ms==== 367 368 output_parameters["mobility_scan"] = 0 369 370 output_parameters["mobility_rt"] = 0 371 372 output_parameters["scan_number"] = scan_index 373 374 output_parameters["rt"] = 0 375 376 return output_parameters
Get the output parameters for the mass spectrum.
Parameters
- polarity (int): The polarity of the mass spectrum +1 or -1.
- scan_index (int, optional): The index of the scan. Default is 0.
Returns
- dict: A dictionary containing the output parameters.
def
clean_data_frame(self, dataframe):
378 def clean_data_frame(self, dataframe): 379 """ 380 Clean the input dataframe by removing columns that are not expected. 381 382 Parameters 383 ---------- 384 pandas.DataFrame 385 The input dataframe to be cleaned. 386 387 """ 388 389 for column_name in dataframe.columns: 390 expected_column_name = self.parameters.header_translate.get(column_name) 391 if expected_column_name not in self._expected_columns: 392 del dataframe[column_name]
Clean the input dataframe by removing columns that are not expected.
Parameters
- pandas.DataFrame: The input dataframe to be cleaned.
def
check_columns(self, header_labels: list[str]):
394 def check_columns(self, header_labels: list[str]): 395 """ 396 Check if the given header labels match the expected columns. 397 398 Parameters 399 ---------- 400 header_labels : list 401 The header labels to be checked. 402 403 Raises 404 ------ 405 Exception 406 If any expected column is not found in the header labels. 407 """ 408 found_label = set() 409 410 for label in header_labels: 411 if not label in self._expected_columns: 412 user_column_name = self.parameters.header_translate.get(label) 413 if user_column_name in self._expected_columns: 414 found_label.add(user_column_name) 415 else: 416 found_label.add(label) 417 418 not_found = self._expected_columns - found_label 419 420 if len(not_found) > 0: 421 raise Exception( 422 "Please make sure to include the columns %s" % ", ".join(not_found) 423 )
Check if the given header labels match the expected columns.
Parameters
- header_labels (list): The header labels to be checked.
Raises
- Exception: If any expected column is not found in the header labels.
def
read_xml_peaks(self, data: str) -> pandas.core.frame.DataFrame:
425 def read_xml_peaks(self, data: str) -> DataFrame: 426 """ 427 Read peaks from a Bruker .xml file and return a pandas DataFrame. 428 429 Parameters 430 ---------- 431 data : str 432 The path to the .xml file. 433 434 Returns 435 ------- 436 pandas.DataFrame 437 A DataFrame containing the peak data with columns: 'm/z', 'I', 'Resolving Power', 'Area', 'S/N', 'fwhm'. 438 """ 439 from numpy import nan 440 441 with open(data, "r") as file: 442 content = file.readlines() 443 content = "".join(content) 444 bs_content = BeautifulSoup(content, features="xml") 445 peaks_xml = bs_content.find_all("pk") 446 447 # initialise lists of the peak variables 448 areas = [] 449 fwhms = [] 450 intensities = [] 451 mzs = [] 452 res = [] 453 sn = [] 454 # iterate through the peaks appending to each list 455 for peak in peaks_xml: 456 areas.append( 457 float(peak.get("a", nan)) 458 ) # Use a default value if key 'a' is missing 459 fwhms.append( 460 float(peak.get("fwhm", nan)) 461 ) # Use a default value if key 'fwhm' is missing 462 intensities.append( 463 float(peak.get("i", nan)) 464 ) # Use a default value if key 'i' is missing 465 mzs.append( 466 float(peak.get("mz", nan)) 467 ) # Use a default value if key 'mz' is missing 468 res.append( 469 float(peak.get("res", nan)) 470 ) # Use a default value if key 'res' is missing 471 sn.append( 472 float(peak.get("sn", nan)) 473 ) # Use a default value if key 'sn' is missing 474 475 # Compile pandas dataframe of these values 476 names = ["m/z", "I", "Resolving Power", "Area", "S/N", "fwhm"] 477 df = DataFrame(columns=names, dtype=float) 478 df["m/z"] = mzs 479 df["I"] = intensities 480 df["Resolving Power"] = res 481 df["Area"] = areas 482 df["S/N"] = sn 483 df["fwhm"] = fwhms 484 return df
Read peaks from a Bruker .xml file and return a pandas DataFrame.
Parameters
- data (str): The path to the .xml file.
Returns
- pandas.DataFrame: A DataFrame containing the peak data with columns: 'm/z', 'I', 'Resolving Power', 'Area', 'S/N', 'fwhm'.
def
get_xml_polarity(self):
486 def get_xml_polarity(self): 487 """ 488 Get the polarity from an XML peaklist. 489 490 Returns 491 ------- 492 int 493 The polarity of the XML peaklist. Returns -1 for negative polarity, +1 for positive polarity. 494 495 Raises 496 ------ 497 Exception 498 If the data type is not XML peaklist in Bruker format or if the polarity is unhandled. 499 """ 500 501 # Check its an actual xml 502 if not self.data_type or not self.delimiter: 503 self.set_data_type() 504 505 if isinstance(self.file_location, S3Path): 506 # data = self.file_location.open('rb').read() 507 data = BytesIO(self.file_location.open("rb").read()) 508 509 else: 510 data = self.file_location 511 512 if self.data_type != "xml": 513 raise Exception("This function is only for XML peaklists (Bruker format)") 514 515 with open(data, "r") as file: 516 content = file.readlines() 517 content = "".join(content) 518 bs_content = BeautifulSoup(content, features="xml") 519 polarity = bs_content.find_all("ms_spectrum")[0]["polarity"] 520 if polarity == "-": 521 return -1 522 elif polarity == "+": 523 return +1 524 else: 525 raise Exception("Polarity %s unhandled" % polarity)
Get the polarity from an XML peaklist.
Returns
- int: The polarity of the XML peaklist. Returns -1 for negative polarity, +1 for positive polarity.
Raises
- Exception: If the data type is not XML peaklist in Bruker format or if the polarity is unhandled.