corems.transient.input.brukerSolarix
1__author__ = "Yuri E. Corilo" 2__date__ = "Jun 12, 2019" 3from copy import deepcopy 4from datetime import datetime 5from pathlib import Path 6from xml.dom import minidom 7 8from numpy import dtype, float32, float64, frombuffer, fromfile, fromstring, genfromtxt 9from s3path import S3Path 10 11from corems.encapsulation.factory.parameters import default_parameters 12from corems.transient.factory.TransientClasses import Transient 13from corems.mass_spectra.input.brukerSolarix_utils import get_scan_attributes 14 15 16class ReadBrukerSolarix(object): 17 """A class used to Read a single Transient from Bruker's FT-MS acquisition station (fid, or ser) 18 19 Parameters 20 ---------- 21 d_directory_location : str 22 the full path of the .d folder 23 24 Attributes 25 -------- 26 d_directory_location : str 27 the full path of the .d folder 28 file_location : str 29 the full path of the .d folder 30 parameter_filename_location : str 31 the full path of the apexAcquisition.method file 32 transient_data_path : str 33 the full path of the fid or ser file 34 scan_attr : str 35 the full path of the scan.xml file 36 37 38 Methods 39 ------- 40 * get_transient(). 41 Read the data and settings returning a Transient class 42 * get_scan_attr(). 43 Read the scan retention times, TIC values and scan indices. 44 * locate_file(folder, type_file_name). 45 Find the full path of a specific file within the acquisition .d folder or subfolders 46 * parse_parameters(parameters_filename). 47 Open the given file and retrieve all parameters from apexAcquisition.method 48 * fix_freq_limits(d_parameters). 49 Read and set the correct frequency limits for the spectrum 50 * get_excite_sweep_range(filename). 51 Determine excitation sweep range from ExciteSweep file 52 53 """ 54 55 def __enter__(self): 56 return self.get_transient() 57 58 def __exit__(self, exc_type, exc_val, exc_tb): 59 return False 60 61 def __init__(self, d_directory_location): 62 if isinstance(d_directory_location, str): 63 d_directory_location = Path(d_directory_location) 64 65 if not d_directory_location.exists(): 66 raise FileNotFoundError("File does not exist: " + str(d_directory_location)) 67 68 self.d_directory_location = d_directory_location 69 70 self.file_location = d_directory_location 71 72 try: 73 self.parameter_filename_location = self.locate_file( 74 d_directory_location, "apexAcquisition.method" 75 ) 76 self.transient_data_path = d_directory_location / "fid" 77 78 if not self.transient_data_path.exists(): 79 self.transient_data_path = d_directory_location / "ser" 80 81 if not self.transient_data_path.exists(): 82 raise FileNotFoundError("Could not locate transient data") 83 84 else: 85 # get scan attributes 86 self.scan_attr = d_directory_location / "scan.xml" 87 self.imaging_info_attr = d_directory_location / "ImagingInfo.xml" 88 89 90 except: 91 raise FileExistsError( 92 "%s does not seem to be a valid Solarix Mass Spectrum" 93 % (d_directory_location) 94 ) 95 96 def get_scan_attr(self): 97 """Function to get the scan retention times, TIC values and scan indices. 98 99 Gets information from scan.xml file in the bruker .d folder. 100 Note this file is only present in some .d format - e.g. for imaging mode data, it is not present. 101 102 Returns 103 ------- 104 dict_scan_rt_tic : dict 105 a dictionary with scan number as key and rt and tic as values 106 """ 107 108 return get_scan_attributes(self.scan_attr, self.imaging_info_attr) 109 110 111 def get_transient(self, scan_number=1): 112 """Function to get the transient data and parameters from a Bruker Solarix .d folder. 113 114 Parameters 115 ---------- 116 scan_number : int 117 the scan number to be read. Default is 1. 118 119 Returns 120 ------- 121 Transient 122 a transient object 123 """ 124 125 file_d_params = self.parse_parameters(self.parameter_filename_location) 126 127 self.fix_freq_limits(file_d_params) 128 129 from sys import platform 130 131 if platform == "win32": 132 # Windows... 133 dt = dtype("l") 134 else: 135 dt = dtype("i") 136 137 # get rt, scan, and tic from scan.xml file, otherwise using 0 defaults values 138 139 output_parameters = deepcopy(default_parameters(self.d_directory_location)) 140 141 if self.transient_data_path.name == "ser": 142 dict_scan_rt_tic = self.get_scan_attr() 143 144 output_parameters["scan_number"] = scan_number 145 146 output_parameters["rt"] = dict_scan_rt_tic.get(scan_number)[0] 147 148 output_parameters["tic"] = dict_scan_rt_tic.get(scan_number)[1] 149 150 output_parameters["analyzer"] = "ICR" 151 152 output_parameters["label"] = "Bruker_Frequency" 153 154 output_parameters["Aterm"] = float(file_d_params.get("ML1")) 155 156 output_parameters["Bterm"] = float(file_d_params.get("ML2")) 157 158 output_parameters["Cterm"] = float(file_d_params.get("ML3")) 159 160 output_parameters["exc_high_freq"] = float(file_d_params.get("EXC_Freq_High")) 161 162 output_parameters["exc_low_freq"] = float(file_d_params.get("EXC_Freq_Low")) 163 try: 164 output_parameters["qpd_enabled"] = float(file_d_params.get("QPD_Enabled")) 165 except TypeError: # for older datasets which dont have this variable 166 output_parameters["qpd_enabled"] = 0 167 168 output_parameters["mw_low"] = float(file_d_params.get("MW_low")) 169 170 output_parameters["mw_high"] = float(file_d_params.get("MW_high")) 171 172 output_parameters["bandwidth"] = float(file_d_params.get("SW_h")) 173 174 output_parameters["number_data_points"] = int(file_d_params.get("TD")) 175 176 output_parameters["polarity"] = str(file_d_params.get("Polarity")) 177 178 output_parameters["acquisition_time"] = file_d_params.get("acquisition_time") 179 180 data_points = int(file_d_params.get("TD")) 181 182 scan = output_parameters["scan_number"] 183 from io import BytesIO 184 185 if self.transient_data_path.name == "ser": 186 if isinstance(self.transient_data_path, S3Path): 187 databin = BytesIO(self.transient_data_path.open("rb").read()) 188 189 else: 190 databin = self.transient_data_path.open("rb") 191 192 databin.seek((scan - 1) * 4 * data_points) 193 # read scan data and parse to 32int struct 194 data = frombuffer(databin.read(4 * data_points), dtype=dt) 195 196 else: 197 if isinstance(self.transient_data_path, S3Path): 198 data = frombuffer(self.transient_data_path.open("rb").read(), dtype=dt) 199 else: 200 data = fromfile(self.transient_data_path, dtype=dt) 201 202 return Transient(data, output_parameters) 203 204 # for key, values in default_parameters.items(): 205 # print(key, values) 206 def fix_freq_limits(self, d_parameters): 207 """Function to read and set the correct frequency limits for the spectrum 208 209 Notes 210 -------- 211 This is using the excitation limits from the apexAcquisition.method file, 212 which may not match the intended detection limits in edge cases. 213 In default acquisitions, excitation and detection are the same. 214 But, they may not be in some cases with selective excitation, custom excite waveforms, or in 2DMS applications. 215 216 Parameters 217 ---------- 218 d_parameters : dict 219 a dictionary with the parameters from the apexAcquisition.method file 220 """ 221 222 highfreq = float(d_parameters.get("EXC_Freq_High")) 223 224 lowfreq = float(d_parameters.get("EXC_Freq_Low")) 225 226 # CR for compatibility with Apex format as there is no EXciteSweep file 227 if not highfreq and lowfreq: 228 excitation_sweep_filelocation = self.locate_file( 229 self.d_directory_location, "ExciteSweep" 230 ) 231 lowfreq, highfreq = self.get_excite_sweep_range( 232 excitation_sweep_filelocation 233 ) 234 d_parameters["EXC_Freq_High"] = highfreq 235 d_parameters["EXC_Freq_Low"] = lowfreq 236 237 @staticmethod 238 def get_excite_sweep_range(filename): 239 """Function to determine excitation sweep range from ExciteSweep file 240 241 This looks at the first and last rows of the ExciteSweep file to determine the excitation frequency range. 242 Note that this assumes the excitation sweep was linear and the first and last rows are the lowest and highest frequencies. 243 This is presumably always true, but again may be incorrect for edge cases with custom excitation waveforms. 244 245 Parameters 246 ---------- 247 filename : str 248 the full path to the ExciteSweep file 249 250 """ 251 ExciteSweep_lines = genfromtxt(filename, comments="*", delimiter="\n") 252 # CR ready if we need the full array 253 highfreq = fromstring(ExciteSweep_lines[0]) 254 lowfreq = fromstring(ExciteSweep_lines[-1]) 255 256 return lowfreq[0], highfreq[0] 257 258 @staticmethod 259 def locate_file(folder, type_file_name="apexAcquisition.method"): 260 """Function to locate a file in a folder 261 262 Find the full path of a specific file within the acquisition .d folder or subfolders 263 264 Parameters 265 ---------- 266 folder : str 267 the full path to the folder 268 type_file_name : str 269 the name of the file to be located 270 Expected options: ExciteSweep or apexAcquisition.method 271 272 Returns 273 ------- 274 str 275 the full path to the file 276 277 Notes 278 ----- 279 adapted from code from SPIKE library, https://github.com/spike-project/spike 280 281 """ 282 283 from pathlib import Path 284 285 # directory_location = folder.glob( '**/*apexAcquisition.method') 286 directory_location = folder.glob("**/*" + type_file_name) 287 result = list(directory_location) 288 if len(result) > 1: 289 raise Exception( 290 "You have more than 1 %s file in the %s folder, using the first one" 291 % (type_file_name, folder) 292 ) 293 294 elif len(result) == 0: 295 raise Exception( 296 "You don't have any %s file in the %s folder, please double check the path" 297 % (type_file_name, folder) 298 ) 299 300 return result[0] 301 302 @staticmethod 303 def parse_parameters(parameters_filename): 304 """Function to parse the parameters from apexAcquisition.method file 305 306 Open the given file and retrieve all parameters from apexAcquisition.method 307 None is written when no value for value is found 308 309 structure : <param name = "AMS_ActiveExclusion"><value>0</value></param> 310 311 Parameters 312 ---------- 313 parameters_filename : str 314 the full path to the apexAcquisition.method file 315 316 Returns 317 ------- 318 dict 319 a dictionary with the parameters and values 320 321 Notes 322 ----- 323 Adapted from code from SPIKE library, https://github.com/spike-project/spike. 324 Code may not handle all possible parameters, but should be sufficient for most common use cases 325 """ 326 327 # TODO: change to beautiful soup xml parsing 328 329 xmldoc = minidom.parse(parameters_filename.open()) 330 331 x = xmldoc.documentElement 332 parameter_dict = {} 333 children = x.childNodes 334 for child in children: 335 # print( child.node) 336 if child.nodeName == "methodmetadata": 337 sections = child.childNodes 338 for section in sections: 339 for element in section.childNodes: 340 if element.nodeName == "date": 341 # if element.nodeName == "primarykey": 342 343 date_time_str = element.childNodes[0].nodeValue 344 # parameter_dict["acquisition_time"] = pd.to_datetime(date_time_str, infer_datetime_format=True).to_pydatetime() 345 parameter_dict["acquisition_time"] = datetime.strptime( 346 date_time_str, "%b_%d_%Y %H:%M:%S.%f" 347 ) 348 349 if child.nodeName == "reportinfo": 350 sections = child.childNodes 351 for section in sections: 352 if section.nodeName == "section": 353 if section.getAttribute("title") == "Main": 354 for element in section.childNodes: 355 if element.nodeName == "section": 356 if element.getAttribute("title") == "Polarity": 357 if ( 358 str( 359 element.childNodes[1].getAttribute( 360 "value" 361 ) 362 ) 363 == "Negative" 364 ): 365 parameter_dict["Polarity"] = -1 366 else: 367 parameter_dict["Polarity"] = 1 368 369 if child.nodeName == "paramlist": 370 params = child.childNodes 371 for param in params: 372 # print( param.nodeName) 373 if param.nodeName == "param": 374 paramenter_label = str(param.getAttribute("name")) 375 for element in param.childNodes: 376 if element.nodeName == "value": 377 try: 378 parameter_value = str(element.firstChild.toxml()) 379 # print v 380 except: 381 parameter_value = None 382 383 parameter_dict[paramenter_label] = parameter_value 384 385 return parameter_dict 386 387 def parse_sqlite(self, sqlite_filename="chromatography-data.sqlite"): 388 """ """ 389 import sqlite3 390 391 def read_sqlite_file(file_path, table_name): 392 """ 393 Read data from a SQLite database file and return it as a list of tuples 394 395 Parameters 396 ---------- 397 file_path : str 398 the full path to the SQLite database file 399 table_name : str 400 the name of the table to be read 401 402 Returns 403 ------- 404 list 405 a list of tuples with the data from the table 406 """ 407 # Connect to the SQLite database file 408 conn = sqlite3.connect(file_path) 409 cursor = conn.cursor() 410 411 # Execute a query to select data from a table (replace 'table_name' with your table's name) 412 query = f"SELECT * FROM {table_name}" 413 cursor.execute(query) 414 415 # Fetch all rows from the result set 416 rows = cursor.fetchall() 417 stream = [] 418 # Print or process the fetched rows 419 for row in rows: 420 stream.append(row) 421 # print(row) # Print each row, you can also process it differently 422 423 # Close the cursor and the connection 424 cursor.close() 425 conn.close() 426 return stream 427 428 def parse_binary(binary, type): 429 """ 430 Parse binary data from the sqlite data streams 431 """ 432 if type == "double": 433 data = frombuffer(binary, dtype=float64) 434 elif type == "float": 435 data = frombuffer(binary, dtype=float32) 436 return data 437 438 sqlite_filelocation = self.locate_file( 439 self.d_directory_location, sqlite_filename 440 ) 441 table_name = "TraceSources" 442 trace_sources = read_sqlite_file(sqlite_filelocation, table_name) 443 table_name = "TraceChunks" 444 trace_chunks = read_sqlite_file(sqlite_filelocation, table_name) 445 times = [] 446 values = [] 447 trace_type = {} 448 449 for index, source in enumerate(trace_sources): 450 trace_id = source[0] 451 trace_type[source[1]] = {"times": [], "values": []} 452 for index, chunk in enumerate(trace_chunks): 453 id = chunk[0] 454 times = parse_binary(chunk[1], "double") 455 values = parse_binary(chunk[2], "float") 456 for time, value in zip(times, values): 457 if source[0] == id: 458 trace_type[source[1]]["times"].append(time) 459 trace_type[source[1]]["values"].append(value) 460 461 return trace_type
17class ReadBrukerSolarix(object): 18 """A class used to Read a single Transient from Bruker's FT-MS acquisition station (fid, or ser) 19 20 Parameters 21 ---------- 22 d_directory_location : str 23 the full path of the .d folder 24 25 Attributes 26 -------- 27 d_directory_location : str 28 the full path of the .d folder 29 file_location : str 30 the full path of the .d folder 31 parameter_filename_location : str 32 the full path of the apexAcquisition.method file 33 transient_data_path : str 34 the full path of the fid or ser file 35 scan_attr : str 36 the full path of the scan.xml file 37 38 39 Methods 40 ------- 41 * get_transient(). 42 Read the data and settings returning a Transient class 43 * get_scan_attr(). 44 Read the scan retention times, TIC values and scan indices. 45 * locate_file(folder, type_file_name). 46 Find the full path of a specific file within the acquisition .d folder or subfolders 47 * parse_parameters(parameters_filename). 48 Open the given file and retrieve all parameters from apexAcquisition.method 49 * fix_freq_limits(d_parameters). 50 Read and set the correct frequency limits for the spectrum 51 * get_excite_sweep_range(filename). 52 Determine excitation sweep range from ExciteSweep file 53 54 """ 55 56 def __enter__(self): 57 return self.get_transient() 58 59 def __exit__(self, exc_type, exc_val, exc_tb): 60 return False 61 62 def __init__(self, d_directory_location): 63 if isinstance(d_directory_location, str): 64 d_directory_location = Path(d_directory_location) 65 66 if not d_directory_location.exists(): 67 raise FileNotFoundError("File does not exist: " + str(d_directory_location)) 68 69 self.d_directory_location = d_directory_location 70 71 self.file_location = d_directory_location 72 73 try: 74 self.parameter_filename_location = self.locate_file( 75 d_directory_location, "apexAcquisition.method" 76 ) 77 self.transient_data_path = d_directory_location / "fid" 78 79 if not self.transient_data_path.exists(): 80 self.transient_data_path = d_directory_location / "ser" 81 82 if not self.transient_data_path.exists(): 83 raise FileNotFoundError("Could not locate transient data") 84 85 else: 86 # get scan attributes 87 self.scan_attr = d_directory_location / "scan.xml" 88 self.imaging_info_attr = d_directory_location / "ImagingInfo.xml" 89 90 91 except: 92 raise FileExistsError( 93 "%s does not seem to be a valid Solarix Mass Spectrum" 94 % (d_directory_location) 95 ) 96 97 def get_scan_attr(self): 98 """Function to get the scan retention times, TIC values and scan indices. 99 100 Gets information from scan.xml file in the bruker .d folder. 101 Note this file is only present in some .d format - e.g. for imaging mode data, it is not present. 102 103 Returns 104 ------- 105 dict_scan_rt_tic : dict 106 a dictionary with scan number as key and rt and tic as values 107 """ 108 109 return get_scan_attributes(self.scan_attr, self.imaging_info_attr) 110 111 112 def get_transient(self, scan_number=1): 113 """Function to get the transient data and parameters from a Bruker Solarix .d folder. 114 115 Parameters 116 ---------- 117 scan_number : int 118 the scan number to be read. Default is 1. 119 120 Returns 121 ------- 122 Transient 123 a transient object 124 """ 125 126 file_d_params = self.parse_parameters(self.parameter_filename_location) 127 128 self.fix_freq_limits(file_d_params) 129 130 from sys import platform 131 132 if platform == "win32": 133 # Windows... 134 dt = dtype("l") 135 else: 136 dt = dtype("i") 137 138 # get rt, scan, and tic from scan.xml file, otherwise using 0 defaults values 139 140 output_parameters = deepcopy(default_parameters(self.d_directory_location)) 141 142 if self.transient_data_path.name == "ser": 143 dict_scan_rt_tic = self.get_scan_attr() 144 145 output_parameters["scan_number"] = scan_number 146 147 output_parameters["rt"] = dict_scan_rt_tic.get(scan_number)[0] 148 149 output_parameters["tic"] = dict_scan_rt_tic.get(scan_number)[1] 150 151 output_parameters["analyzer"] = "ICR" 152 153 output_parameters["label"] = "Bruker_Frequency" 154 155 output_parameters["Aterm"] = float(file_d_params.get("ML1")) 156 157 output_parameters["Bterm"] = float(file_d_params.get("ML2")) 158 159 output_parameters["Cterm"] = float(file_d_params.get("ML3")) 160 161 output_parameters["exc_high_freq"] = float(file_d_params.get("EXC_Freq_High")) 162 163 output_parameters["exc_low_freq"] = float(file_d_params.get("EXC_Freq_Low")) 164 try: 165 output_parameters["qpd_enabled"] = float(file_d_params.get("QPD_Enabled")) 166 except TypeError: # for older datasets which dont have this variable 167 output_parameters["qpd_enabled"] = 0 168 169 output_parameters["mw_low"] = float(file_d_params.get("MW_low")) 170 171 output_parameters["mw_high"] = float(file_d_params.get("MW_high")) 172 173 output_parameters["bandwidth"] = float(file_d_params.get("SW_h")) 174 175 output_parameters["number_data_points"] = int(file_d_params.get("TD")) 176 177 output_parameters["polarity"] = str(file_d_params.get("Polarity")) 178 179 output_parameters["acquisition_time"] = file_d_params.get("acquisition_time") 180 181 data_points = int(file_d_params.get("TD")) 182 183 scan = output_parameters["scan_number"] 184 from io import BytesIO 185 186 if self.transient_data_path.name == "ser": 187 if isinstance(self.transient_data_path, S3Path): 188 databin = BytesIO(self.transient_data_path.open("rb").read()) 189 190 else: 191 databin = self.transient_data_path.open("rb") 192 193 databin.seek((scan - 1) * 4 * data_points) 194 # read scan data and parse to 32int struct 195 data = frombuffer(databin.read(4 * data_points), dtype=dt) 196 197 else: 198 if isinstance(self.transient_data_path, S3Path): 199 data = frombuffer(self.transient_data_path.open("rb").read(), dtype=dt) 200 else: 201 data = fromfile(self.transient_data_path, dtype=dt) 202 203 return Transient(data, output_parameters) 204 205 # for key, values in default_parameters.items(): 206 # print(key, values) 207 def fix_freq_limits(self, d_parameters): 208 """Function to read and set the correct frequency limits for the spectrum 209 210 Notes 211 -------- 212 This is using the excitation limits from the apexAcquisition.method file, 213 which may not match the intended detection limits in edge cases. 214 In default acquisitions, excitation and detection are the same. 215 But, they may not be in some cases with selective excitation, custom excite waveforms, or in 2DMS applications. 216 217 Parameters 218 ---------- 219 d_parameters : dict 220 a dictionary with the parameters from the apexAcquisition.method file 221 """ 222 223 highfreq = float(d_parameters.get("EXC_Freq_High")) 224 225 lowfreq = float(d_parameters.get("EXC_Freq_Low")) 226 227 # CR for compatibility with Apex format as there is no EXciteSweep file 228 if not highfreq and lowfreq: 229 excitation_sweep_filelocation = self.locate_file( 230 self.d_directory_location, "ExciteSweep" 231 ) 232 lowfreq, highfreq = self.get_excite_sweep_range( 233 excitation_sweep_filelocation 234 ) 235 d_parameters["EXC_Freq_High"] = highfreq 236 d_parameters["EXC_Freq_Low"] = lowfreq 237 238 @staticmethod 239 def get_excite_sweep_range(filename): 240 """Function to determine excitation sweep range from ExciteSweep file 241 242 This looks at the first and last rows of the ExciteSweep file to determine the excitation frequency range. 243 Note that this assumes the excitation sweep was linear and the first and last rows are the lowest and highest frequencies. 244 This is presumably always true, but again may be incorrect for edge cases with custom excitation waveforms. 245 246 Parameters 247 ---------- 248 filename : str 249 the full path to the ExciteSweep file 250 251 """ 252 ExciteSweep_lines = genfromtxt(filename, comments="*", delimiter="\n") 253 # CR ready if we need the full array 254 highfreq = fromstring(ExciteSweep_lines[0]) 255 lowfreq = fromstring(ExciteSweep_lines[-1]) 256 257 return lowfreq[0], highfreq[0] 258 259 @staticmethod 260 def locate_file(folder, type_file_name="apexAcquisition.method"): 261 """Function to locate a file in a folder 262 263 Find the full path of a specific file within the acquisition .d folder or subfolders 264 265 Parameters 266 ---------- 267 folder : str 268 the full path to the folder 269 type_file_name : str 270 the name of the file to be located 271 Expected options: ExciteSweep or apexAcquisition.method 272 273 Returns 274 ------- 275 str 276 the full path to the file 277 278 Notes 279 ----- 280 adapted from code from SPIKE library, https://github.com/spike-project/spike 281 282 """ 283 284 from pathlib import Path 285 286 # directory_location = folder.glob( '**/*apexAcquisition.method') 287 directory_location = folder.glob("**/*" + type_file_name) 288 result = list(directory_location) 289 if len(result) > 1: 290 raise Exception( 291 "You have more than 1 %s file in the %s folder, using the first one" 292 % (type_file_name, folder) 293 ) 294 295 elif len(result) == 0: 296 raise Exception( 297 "You don't have any %s file in the %s folder, please double check the path" 298 % (type_file_name, folder) 299 ) 300 301 return result[0] 302 303 @staticmethod 304 def parse_parameters(parameters_filename): 305 """Function to parse the parameters from apexAcquisition.method file 306 307 Open the given file and retrieve all parameters from apexAcquisition.method 308 None is written when no value for value is found 309 310 structure : <param name = "AMS_ActiveExclusion"><value>0</value></param> 311 312 Parameters 313 ---------- 314 parameters_filename : str 315 the full path to the apexAcquisition.method file 316 317 Returns 318 ------- 319 dict 320 a dictionary with the parameters and values 321 322 Notes 323 ----- 324 Adapted from code from SPIKE library, https://github.com/spike-project/spike. 325 Code may not handle all possible parameters, but should be sufficient for most common use cases 326 """ 327 328 # TODO: change to beautiful soup xml parsing 329 330 xmldoc = minidom.parse(parameters_filename.open()) 331 332 x = xmldoc.documentElement 333 parameter_dict = {} 334 children = x.childNodes 335 for child in children: 336 # print( child.node) 337 if child.nodeName == "methodmetadata": 338 sections = child.childNodes 339 for section in sections: 340 for element in section.childNodes: 341 if element.nodeName == "date": 342 # if element.nodeName == "primarykey": 343 344 date_time_str = element.childNodes[0].nodeValue 345 # parameter_dict["acquisition_time"] = pd.to_datetime(date_time_str, infer_datetime_format=True).to_pydatetime() 346 parameter_dict["acquisition_time"] = datetime.strptime( 347 date_time_str, "%b_%d_%Y %H:%M:%S.%f" 348 ) 349 350 if child.nodeName == "reportinfo": 351 sections = child.childNodes 352 for section in sections: 353 if section.nodeName == "section": 354 if section.getAttribute("title") == "Main": 355 for element in section.childNodes: 356 if element.nodeName == "section": 357 if element.getAttribute("title") == "Polarity": 358 if ( 359 str( 360 element.childNodes[1].getAttribute( 361 "value" 362 ) 363 ) 364 == "Negative" 365 ): 366 parameter_dict["Polarity"] = -1 367 else: 368 parameter_dict["Polarity"] = 1 369 370 if child.nodeName == "paramlist": 371 params = child.childNodes 372 for param in params: 373 # print( param.nodeName) 374 if param.nodeName == "param": 375 paramenter_label = str(param.getAttribute("name")) 376 for element in param.childNodes: 377 if element.nodeName == "value": 378 try: 379 parameter_value = str(element.firstChild.toxml()) 380 # print v 381 except: 382 parameter_value = None 383 384 parameter_dict[paramenter_label] = parameter_value 385 386 return parameter_dict 387 388 def parse_sqlite(self, sqlite_filename="chromatography-data.sqlite"): 389 """ """ 390 import sqlite3 391 392 def read_sqlite_file(file_path, table_name): 393 """ 394 Read data from a SQLite database file and return it as a list of tuples 395 396 Parameters 397 ---------- 398 file_path : str 399 the full path to the SQLite database file 400 table_name : str 401 the name of the table to be read 402 403 Returns 404 ------- 405 list 406 a list of tuples with the data from the table 407 """ 408 # Connect to the SQLite database file 409 conn = sqlite3.connect(file_path) 410 cursor = conn.cursor() 411 412 # Execute a query to select data from a table (replace 'table_name' with your table's name) 413 query = f"SELECT * FROM {table_name}" 414 cursor.execute(query) 415 416 # Fetch all rows from the result set 417 rows = cursor.fetchall() 418 stream = [] 419 # Print or process the fetched rows 420 for row in rows: 421 stream.append(row) 422 # print(row) # Print each row, you can also process it differently 423 424 # Close the cursor and the connection 425 cursor.close() 426 conn.close() 427 return stream 428 429 def parse_binary(binary, type): 430 """ 431 Parse binary data from the sqlite data streams 432 """ 433 if type == "double": 434 data = frombuffer(binary, dtype=float64) 435 elif type == "float": 436 data = frombuffer(binary, dtype=float32) 437 return data 438 439 sqlite_filelocation = self.locate_file( 440 self.d_directory_location, sqlite_filename 441 ) 442 table_name = "TraceSources" 443 trace_sources = read_sqlite_file(sqlite_filelocation, table_name) 444 table_name = "TraceChunks" 445 trace_chunks = read_sqlite_file(sqlite_filelocation, table_name) 446 times = [] 447 values = [] 448 trace_type = {} 449 450 for index, source in enumerate(trace_sources): 451 trace_id = source[0] 452 trace_type[source[1]] = {"times": [], "values": []} 453 for index, chunk in enumerate(trace_chunks): 454 id = chunk[0] 455 times = parse_binary(chunk[1], "double") 456 values = parse_binary(chunk[2], "float") 457 for time, value in zip(times, values): 458 if source[0] == id: 459 trace_type[source[1]]["times"].append(time) 460 trace_type[source[1]]["values"].append(value) 461 462 return trace_type
A class used to Read a single Transient from Bruker's FT-MS acquisition station (fid, or ser)
Parameters
- d_directory_location (str): the full path of the .d folder
Attributes
- d_directory_location (str): the full path of the .d folder
- file_location (str): the full path of the .d folder
- parameter_filename_location (str): the full path of the apexAcquisition.method file
- transient_data_path (str): the full path of the fid or ser file
- scan_attr (str): the full path of the scan.xml file
Methods
- get_transient(). Read the data and settings returning a Transient class
- get_scan_attr(). Read the scan retention times, TIC values and scan indices.
- locate_file(folder, type_file_name). Find the full path of a specific file within the acquisition .d folder or subfolders
- parse_parameters(parameters_filename). Open the given file and retrieve all parameters from apexAcquisition.method
- fix_freq_limits(d_parameters). Read and set the correct frequency limits for the spectrum
- get_excite_sweep_range(filename). Determine excitation sweep range from ExciteSweep file
62 def __init__(self, d_directory_location): 63 if isinstance(d_directory_location, str): 64 d_directory_location = Path(d_directory_location) 65 66 if not d_directory_location.exists(): 67 raise FileNotFoundError("File does not exist: " + str(d_directory_location)) 68 69 self.d_directory_location = d_directory_location 70 71 self.file_location = d_directory_location 72 73 try: 74 self.parameter_filename_location = self.locate_file( 75 d_directory_location, "apexAcquisition.method" 76 ) 77 self.transient_data_path = d_directory_location / "fid" 78 79 if not self.transient_data_path.exists(): 80 self.transient_data_path = d_directory_location / "ser" 81 82 if not self.transient_data_path.exists(): 83 raise FileNotFoundError("Could not locate transient data") 84 85 else: 86 # get scan attributes 87 self.scan_attr = d_directory_location / "scan.xml" 88 self.imaging_info_attr = d_directory_location / "ImagingInfo.xml" 89 90 91 except: 92 raise FileExistsError( 93 "%s does not seem to be a valid Solarix Mass Spectrum" 94 % (d_directory_location) 95 )
97 def get_scan_attr(self): 98 """Function to get the scan retention times, TIC values and scan indices. 99 100 Gets information from scan.xml file in the bruker .d folder. 101 Note this file is only present in some .d format - e.g. for imaging mode data, it is not present. 102 103 Returns 104 ------- 105 dict_scan_rt_tic : dict 106 a dictionary with scan number as key and rt and tic as values 107 """ 108 109 return get_scan_attributes(self.scan_attr, self.imaging_info_attr)
Function to get the scan retention times, TIC values and scan indices.
Gets information from scan.xml file in the bruker .d folder. Note this file is only present in some .d format - e.g. for imaging mode data, it is not present.
Returns
- dict_scan_rt_tic (dict): a dictionary with scan number as key and rt and tic as values
112 def get_transient(self, scan_number=1): 113 """Function to get the transient data and parameters from a Bruker Solarix .d folder. 114 115 Parameters 116 ---------- 117 scan_number : int 118 the scan number to be read. Default is 1. 119 120 Returns 121 ------- 122 Transient 123 a transient object 124 """ 125 126 file_d_params = self.parse_parameters(self.parameter_filename_location) 127 128 self.fix_freq_limits(file_d_params) 129 130 from sys import platform 131 132 if platform == "win32": 133 # Windows... 134 dt = dtype("l") 135 else: 136 dt = dtype("i") 137 138 # get rt, scan, and tic from scan.xml file, otherwise using 0 defaults values 139 140 output_parameters = deepcopy(default_parameters(self.d_directory_location)) 141 142 if self.transient_data_path.name == "ser": 143 dict_scan_rt_tic = self.get_scan_attr() 144 145 output_parameters["scan_number"] = scan_number 146 147 output_parameters["rt"] = dict_scan_rt_tic.get(scan_number)[0] 148 149 output_parameters["tic"] = dict_scan_rt_tic.get(scan_number)[1] 150 151 output_parameters["analyzer"] = "ICR" 152 153 output_parameters["label"] = "Bruker_Frequency" 154 155 output_parameters["Aterm"] = float(file_d_params.get("ML1")) 156 157 output_parameters["Bterm"] = float(file_d_params.get("ML2")) 158 159 output_parameters["Cterm"] = float(file_d_params.get("ML3")) 160 161 output_parameters["exc_high_freq"] = float(file_d_params.get("EXC_Freq_High")) 162 163 output_parameters["exc_low_freq"] = float(file_d_params.get("EXC_Freq_Low")) 164 try: 165 output_parameters["qpd_enabled"] = float(file_d_params.get("QPD_Enabled")) 166 except TypeError: # for older datasets which dont have this variable 167 output_parameters["qpd_enabled"] = 0 168 169 output_parameters["mw_low"] = float(file_d_params.get("MW_low")) 170 171 output_parameters["mw_high"] = float(file_d_params.get("MW_high")) 172 173 output_parameters["bandwidth"] = float(file_d_params.get("SW_h")) 174 175 output_parameters["number_data_points"] = int(file_d_params.get("TD")) 176 177 output_parameters["polarity"] = str(file_d_params.get("Polarity")) 178 179 output_parameters["acquisition_time"] = file_d_params.get("acquisition_time") 180 181 data_points = int(file_d_params.get("TD")) 182 183 scan = output_parameters["scan_number"] 184 from io import BytesIO 185 186 if self.transient_data_path.name == "ser": 187 if isinstance(self.transient_data_path, S3Path): 188 databin = BytesIO(self.transient_data_path.open("rb").read()) 189 190 else: 191 databin = self.transient_data_path.open("rb") 192 193 databin.seek((scan - 1) * 4 * data_points) 194 # read scan data and parse to 32int struct 195 data = frombuffer(databin.read(4 * data_points), dtype=dt) 196 197 else: 198 if isinstance(self.transient_data_path, S3Path): 199 data = frombuffer(self.transient_data_path.open("rb").read(), dtype=dt) 200 else: 201 data = fromfile(self.transient_data_path, dtype=dt) 202 203 return Transient(data, output_parameters)
Function to get the transient data and parameters from a Bruker Solarix .d folder.
Parameters
- scan_number (int): the scan number to be read. Default is 1.
Returns
- Transient: a transient object
207 def fix_freq_limits(self, d_parameters): 208 """Function to read and set the correct frequency limits for the spectrum 209 210 Notes 211 -------- 212 This is using the excitation limits from the apexAcquisition.method file, 213 which may not match the intended detection limits in edge cases. 214 In default acquisitions, excitation and detection are the same. 215 But, they may not be in some cases with selective excitation, custom excite waveforms, or in 2DMS applications. 216 217 Parameters 218 ---------- 219 d_parameters : dict 220 a dictionary with the parameters from the apexAcquisition.method file 221 """ 222 223 highfreq = float(d_parameters.get("EXC_Freq_High")) 224 225 lowfreq = float(d_parameters.get("EXC_Freq_Low")) 226 227 # CR for compatibility with Apex format as there is no EXciteSweep file 228 if not highfreq and lowfreq: 229 excitation_sweep_filelocation = self.locate_file( 230 self.d_directory_location, "ExciteSweep" 231 ) 232 lowfreq, highfreq = self.get_excite_sweep_range( 233 excitation_sweep_filelocation 234 ) 235 d_parameters["EXC_Freq_High"] = highfreq 236 d_parameters["EXC_Freq_Low"] = lowfreq
Function to read and set the correct frequency limits for the spectrum
Notes
This is using the excitation limits from the apexAcquisition.method file, which may not match the intended detection limits in edge cases. In default acquisitions, excitation and detection are the same. But, they may not be in some cases with selective excitation, custom excite waveforms, or in 2DMS applications.
Parameters
- d_parameters (dict): a dictionary with the parameters from the apexAcquisition.method file
238 @staticmethod 239 def get_excite_sweep_range(filename): 240 """Function to determine excitation sweep range from ExciteSweep file 241 242 This looks at the first and last rows of the ExciteSweep file to determine the excitation frequency range. 243 Note that this assumes the excitation sweep was linear and the first and last rows are the lowest and highest frequencies. 244 This is presumably always true, but again may be incorrect for edge cases with custom excitation waveforms. 245 246 Parameters 247 ---------- 248 filename : str 249 the full path to the ExciteSweep file 250 251 """ 252 ExciteSweep_lines = genfromtxt(filename, comments="*", delimiter="\n") 253 # CR ready if we need the full array 254 highfreq = fromstring(ExciteSweep_lines[0]) 255 lowfreq = fromstring(ExciteSweep_lines[-1]) 256 257 return lowfreq[0], highfreq[0]
Function to determine excitation sweep range from ExciteSweep file
This looks at the first and last rows of the ExciteSweep file to determine the excitation frequency range. Note that this assumes the excitation sweep was linear and the first and last rows are the lowest and highest frequencies. This is presumably always true, but again may be incorrect for edge cases with custom excitation waveforms.
Parameters
- filename (str): the full path to the ExciteSweep file
259 @staticmethod 260 def locate_file(folder, type_file_name="apexAcquisition.method"): 261 """Function to locate a file in a folder 262 263 Find the full path of a specific file within the acquisition .d folder or subfolders 264 265 Parameters 266 ---------- 267 folder : str 268 the full path to the folder 269 type_file_name : str 270 the name of the file to be located 271 Expected options: ExciteSweep or apexAcquisition.method 272 273 Returns 274 ------- 275 str 276 the full path to the file 277 278 Notes 279 ----- 280 adapted from code from SPIKE library, https://github.com/spike-project/spike 281 282 """ 283 284 from pathlib import Path 285 286 # directory_location = folder.glob( '**/*apexAcquisition.method') 287 directory_location = folder.glob("**/*" + type_file_name) 288 result = list(directory_location) 289 if len(result) > 1: 290 raise Exception( 291 "You have more than 1 %s file in the %s folder, using the first one" 292 % (type_file_name, folder) 293 ) 294 295 elif len(result) == 0: 296 raise Exception( 297 "You don't have any %s file in the %s folder, please double check the path" 298 % (type_file_name, folder) 299 ) 300 301 return result[0]
Function to locate a file in a folder
Find the full path of a specific file within the acquisition .d folder or subfolders
Parameters
- folder (str): the full path to the folder
- type_file_name (str): the name of the file to be located Expected options: ExciteSweep or apexAcquisition.method
Returns
- str: the full path to the file
Notes
adapted from code from SPIKE library, https://github.com/spike-project/spike
303 @staticmethod 304 def parse_parameters(parameters_filename): 305 """Function to parse the parameters from apexAcquisition.method file 306 307 Open the given file and retrieve all parameters from apexAcquisition.method 308 None is written when no value for value is found 309 310 structure : <param name = "AMS_ActiveExclusion"><value>0</value></param> 311 312 Parameters 313 ---------- 314 parameters_filename : str 315 the full path to the apexAcquisition.method file 316 317 Returns 318 ------- 319 dict 320 a dictionary with the parameters and values 321 322 Notes 323 ----- 324 Adapted from code from SPIKE library, https://github.com/spike-project/spike. 325 Code may not handle all possible parameters, but should be sufficient for most common use cases 326 """ 327 328 # TODO: change to beautiful soup xml parsing 329 330 xmldoc = minidom.parse(parameters_filename.open()) 331 332 x = xmldoc.documentElement 333 parameter_dict = {} 334 children = x.childNodes 335 for child in children: 336 # print( child.node) 337 if child.nodeName == "methodmetadata": 338 sections = child.childNodes 339 for section in sections: 340 for element in section.childNodes: 341 if element.nodeName == "date": 342 # if element.nodeName == "primarykey": 343 344 date_time_str = element.childNodes[0].nodeValue 345 # parameter_dict["acquisition_time"] = pd.to_datetime(date_time_str, infer_datetime_format=True).to_pydatetime() 346 parameter_dict["acquisition_time"] = datetime.strptime( 347 date_time_str, "%b_%d_%Y %H:%M:%S.%f" 348 ) 349 350 if child.nodeName == "reportinfo": 351 sections = child.childNodes 352 for section in sections: 353 if section.nodeName == "section": 354 if section.getAttribute("title") == "Main": 355 for element in section.childNodes: 356 if element.nodeName == "section": 357 if element.getAttribute("title") == "Polarity": 358 if ( 359 str( 360 element.childNodes[1].getAttribute( 361 "value" 362 ) 363 ) 364 == "Negative" 365 ): 366 parameter_dict["Polarity"] = -1 367 else: 368 parameter_dict["Polarity"] = 1 369 370 if child.nodeName == "paramlist": 371 params = child.childNodes 372 for param in params: 373 # print( param.nodeName) 374 if param.nodeName == "param": 375 paramenter_label = str(param.getAttribute("name")) 376 for element in param.childNodes: 377 if element.nodeName == "value": 378 try: 379 parameter_value = str(element.firstChild.toxml()) 380 # print v 381 except: 382 parameter_value = None 383 384 parameter_dict[paramenter_label] = parameter_value 385 386 return parameter_dict
Function to parse the parameters from apexAcquisition.method file
Open the given file and retrieve all parameters from apexAcquisition.method None is written when no value for value is found
structure : <param name = "AMS_ActiveExclusion"><value>0</value></param>
Parameters
- parameters_filename (str): the full path to the apexAcquisition.method file
Returns
- dict: a dictionary with the parameters and values
Notes
Adapted from code from SPIKE library, https://github.com/spike-project/spike. Code may not handle all possible parameters, but should be sufficient for most common use cases
388 def parse_sqlite(self, sqlite_filename="chromatography-data.sqlite"): 389 """ """ 390 import sqlite3 391 392 def read_sqlite_file(file_path, table_name): 393 """ 394 Read data from a SQLite database file and return it as a list of tuples 395 396 Parameters 397 ---------- 398 file_path : str 399 the full path to the SQLite database file 400 table_name : str 401 the name of the table to be read 402 403 Returns 404 ------- 405 list 406 a list of tuples with the data from the table 407 """ 408 # Connect to the SQLite database file 409 conn = sqlite3.connect(file_path) 410 cursor = conn.cursor() 411 412 # Execute a query to select data from a table (replace 'table_name' with your table's name) 413 query = f"SELECT * FROM {table_name}" 414 cursor.execute(query) 415 416 # Fetch all rows from the result set 417 rows = cursor.fetchall() 418 stream = [] 419 # Print or process the fetched rows 420 for row in rows: 421 stream.append(row) 422 # print(row) # Print each row, you can also process it differently 423 424 # Close the cursor and the connection 425 cursor.close() 426 conn.close() 427 return stream 428 429 def parse_binary(binary, type): 430 """ 431 Parse binary data from the sqlite data streams 432 """ 433 if type == "double": 434 data = frombuffer(binary, dtype=float64) 435 elif type == "float": 436 data = frombuffer(binary, dtype=float32) 437 return data 438 439 sqlite_filelocation = self.locate_file( 440 self.d_directory_location, sqlite_filename 441 ) 442 table_name = "TraceSources" 443 trace_sources = read_sqlite_file(sqlite_filelocation, table_name) 444 table_name = "TraceChunks" 445 trace_chunks = read_sqlite_file(sqlite_filelocation, table_name) 446 times = [] 447 values = [] 448 trace_type = {} 449 450 for index, source in enumerate(trace_sources): 451 trace_id = source[0] 452 trace_type[source[1]] = {"times": [], "values": []} 453 for index, chunk in enumerate(trace_chunks): 454 id = chunk[0] 455 times = parse_binary(chunk[1], "double") 456 values = parse_binary(chunk[2], "float") 457 for time, value in zip(times, values): 458 if source[0] == id: 459 trace_type[source[1]]["times"].append(time) 460 trace_type[source[1]]["values"].append(value) 461 462 return trace_type