corems.transient.input.brukerSolarix
1__author__ = "Yuri E. Corilo" 2__date__ = "Jun 12, 2019" 3from copy import deepcopy 4from datetime import datetime 5from pathlib import Path 6from xml.dom import minidom 7 8from numpy import dtype, float32, float64, frombuffer, fromfile, fromstring, genfromtxt 9from s3path import S3Path 10 11from corems.encapsulation.factory.parameters import default_parameters 12from corems.transient.factory.TransientClasses import Transient 13 14 15class ReadBrukerSolarix(object): 16 """A class used to Read a single Transient from Bruker's FT-MS acquisition station (fid, or ser) 17 18 Parameters 19 ---------- 20 d_directory_location : str 21 the full path of the .d folder 22 23 Attributes 24 -------- 25 d_directory_location : str 26 the full path of the .d folder 27 file_location : str 28 the full path of the .d folder 29 parameter_filename_location : str 30 the full path of the apexAcquisition.method file 31 transient_data_path : str 32 the full path of the fid or ser file 33 scan_attr : str 34 the full path of the scan.xml file 35 36 37 Methods 38 ------- 39 * get_transient(). 40 Read the data and settings returning a Transient class 41 * get_scan_attr(). 42 Read the scan retention times, TIC values and scan indices. 43 * locate_file(folder, type_file_name). 44 Find the full path of a specific file within the acquisition .d folder or subfolders 45 * parse_parameters(parameters_filename). 46 Open the given file and retrieve all parameters from apexAcquisition.method 47 * fix_freq_limits(d_parameters). 48 Read and set the correct frequency limits for the spectrum 49 * get_excite_sweep_range(filename). 50 Determine excitation sweep range from ExciteSweep file 51 52 """ 53 54 def __enter__(self): 55 return self.get_transient() 56 57 def __exit__(self, exc_type, exc_val, exc_tb): 58 return False 59 60 def __init__(self, d_directory_location): 61 if isinstance(d_directory_location, str): 62 d_directory_location = Path(d_directory_location) 63 64 if not d_directory_location.exists(): 65 raise FileNotFoundError("File does not exist: " + str(d_directory_location)) 66 67 self.d_directory_location = d_directory_location 68 69 self.file_location = d_directory_location 70 71 try: 72 self.parameter_filename_location = self.locate_file( 73 d_directory_location, "apexAcquisition.method" 74 ) 75 self.transient_data_path = d_directory_location / "fid" 76 77 if not self.transient_data_path.exists(): 78 self.transient_data_path = d_directory_location / "ser" 79 80 if not self.transient_data_path.exists(): 81 raise FileNotFoundError("Could not locate transient data") 82 83 else: 84 # get scan attributes 85 self.scan_attr = d_directory_location / "scan.xml" 86 87 except: 88 raise FileExistsError( 89 "%s does not seem to be a valid Solarix Mass Spectrum" 90 % (d_directory_location) 91 ) 92 93 def get_scan_attr(self): 94 """Function to get the scan retention times, TIC values and scan indices. 95 96 Gets information from scan.xml file in the bruker .d folder. 97 Note this file is only present in some .d format - e.g. for imaging mode data, it is not present. 98 99 Returns 100 ------- 101 dict_scan_rt_tic : dict 102 a dictionary with scan number as key and rt and tic as values 103 """ 104 105 from bs4 import BeautifulSoup 106 107 try: 108 soup = BeautifulSoup(self.scan_attr.open(), "xml") 109 except: 110 raise FileNotFoundError( 111 "Dataset does not appear to contain a 'scan.xml' file or it is misformated" 112 ) 113 114 list_rt = [float(rt.text) for rt in soup.find_all("minutes")] 115 list_tic = [float(tic.text) for tic in soup.find_all("tic")] 116 list_scan = [int(scan.text) for scan in soup.find_all("count")] 117 118 dict_scan_rt_tic = dict(zip(list_scan, zip(list_rt, list_tic))) 119 120 return dict_scan_rt_tic 121 122 def get_transient(self, scan_number=1): 123 """Function to get the transient data and parameters from a Bruker Solarix .d folder. 124 125 Parameters 126 ---------- 127 scan_number : int 128 the scan number to be read. Default is 1. 129 130 Returns 131 ------- 132 Transient 133 a transient object 134 """ 135 136 file_d_params = self.parse_parameters(self.parameter_filename_location) 137 138 self.fix_freq_limits(file_d_params) 139 140 from sys import platform 141 142 if platform == "win32": 143 # Windows... 144 dt = dtype("l") 145 else: 146 dt = dtype("i") 147 148 # get rt, scan, and tic from scan.xml file, otherwise using 0 defaults values 149 150 output_parameters = deepcopy(default_parameters(self.d_directory_location)) 151 152 if self.transient_data_path.name == "ser": 153 if self.scan_attr.exists(): 154 dict_scan_rt_tic = self.get_scan_attr() 155 156 output_parameters["scan_number"] = scan_number 157 158 output_parameters["rt"] = dict_scan_rt_tic.get(scan_number)[0] 159 160 output_parameters["tic"] = dict_scan_rt_tic.get(scan_number)[1] 161 162 output_parameters["analyzer"] = "ICR" 163 164 output_parameters["label"] = "Bruker_Frequency" 165 166 output_parameters["Aterm"] = float(file_d_params.get("ML1")) 167 168 output_parameters["Bterm"] = float(file_d_params.get("ML2")) 169 170 output_parameters["Cterm"] = float(file_d_params.get("ML3")) 171 172 output_parameters["exc_high_freq"] = float(file_d_params.get("EXC_Freq_High")) 173 174 output_parameters["exc_low_freq"] = float(file_d_params.get("EXC_Freq_Low")) 175 try: 176 output_parameters["qpd_enabled"] = float(file_d_params.get("QPD_Enabled")) 177 except TypeError: # for older datasets which dont have this variable 178 output_parameters["qpd_enabled"] = 0 179 180 output_parameters["mw_low"] = float(file_d_params.get("MW_low")) 181 182 output_parameters["mw_high"] = float(file_d_params.get("MW_high")) 183 184 output_parameters["bandwidth"] = float(file_d_params.get("SW_h")) 185 186 output_parameters["number_data_points"] = int(file_d_params.get("TD")) 187 188 output_parameters["polarity"] = str(file_d_params.get("Polarity")) 189 190 output_parameters["acquisition_time"] = file_d_params.get("acquisition_time") 191 192 data_points = int(file_d_params.get("TD")) 193 194 scan = output_parameters["scan_number"] 195 from io import BytesIO 196 197 if self.transient_data_path.name == "ser": 198 if isinstance(self.transient_data_path, S3Path): 199 databin = BytesIO(self.transient_data_path.open("rb").read()) 200 201 else: 202 databin = self.transient_data_path.open("rb") 203 204 databin.seek((scan - 1) * 4 * data_points) 205 # read scan data and parse to 32int struct 206 data = frombuffer(databin.read(4 * data_points), dtype=dt) 207 208 else: 209 if isinstance(self.transient_data_path, S3Path): 210 data = frombuffer(self.transient_data_path.open("rb").read(), dtype=dt) 211 else: 212 data = fromfile(self.transient_data_path, dtype=dt) 213 214 return Transient(data, output_parameters) 215 216 # for key, values in default_parameters.items(): 217 # print(key, values) 218 def fix_freq_limits(self, d_parameters): 219 """Function to read and set the correct frequency limits for the spectrum 220 221 Notes 222 -------- 223 This is using the excitation limits from the apexAcquisition.method file, 224 which may not match the intended detection limits in edge cases. 225 In default acquisitions, excitation and detection are the same. 226 But, they may not be in some cases with selective excitation, custom excite waveforms, or in 2DMS applications. 227 228 Parameters 229 ---------- 230 d_parameters : dict 231 a dictionary with the parameters from the apexAcquisition.method file 232 """ 233 234 highfreq = float(d_parameters.get("EXC_Freq_High")) 235 236 lowfreq = float(d_parameters.get("EXC_Freq_Low")) 237 238 # CR for compatibility with Apex format as there is no EXciteSweep file 239 if not highfreq and lowfreq: 240 excitation_sweep_filelocation = self.locate_file( 241 self.d_directory_location, "ExciteSweep" 242 ) 243 lowfreq, highfreq = self.get_excite_sweep_range( 244 excitation_sweep_filelocation 245 ) 246 d_parameters["EXC_Freq_High"] = highfreq 247 d_parameters["EXC_Freq_Low"] = lowfreq 248 249 @staticmethod 250 def get_excite_sweep_range(filename): 251 """Function to determine excitation sweep range from ExciteSweep file 252 253 This looks at the first and last rows of the ExciteSweep file to determine the excitation frequency range. 254 Note that this assumes the excitation sweep was linear and the first and last rows are the lowest and highest frequencies. 255 This is presumably always true, but again may be incorrect for edge cases with custom excitation waveforms. 256 257 Parameters 258 ---------- 259 filename : str 260 the full path to the ExciteSweep file 261 262 """ 263 ExciteSweep_lines = genfromtxt(filename, comments="*", delimiter="\n") 264 # CR ready if we need the full array 265 highfreq = fromstring(ExciteSweep_lines[0]) 266 lowfreq = fromstring(ExciteSweep_lines[-1]) 267 268 return lowfreq[0], highfreq[0] 269 270 @staticmethod 271 def locate_file(folder, type_file_name="apexAcquisition.method"): 272 """Function to locate a file in a folder 273 274 Find the full path of a specific file within the acquisition .d folder or subfolders 275 276 Parameters 277 ---------- 278 folder : str 279 the full path to the folder 280 type_file_name : str 281 the name of the file to be located 282 Expected options: ExciteSweep or apexAcquisition.method 283 284 Returns 285 ------- 286 str 287 the full path to the file 288 289 Notes 290 ----- 291 adapted from code from SPIKE library, https://github.com/spike-project/spike 292 293 """ 294 295 from pathlib import Path 296 297 # directory_location = folder.glob( '**/*apexAcquisition.method') 298 directory_location = folder.glob("**/*" + type_file_name) 299 result = list(directory_location) 300 if len(result) > 1: 301 raise Exception( 302 "You have more than 1 %s file in the %s folder, using the first one" 303 % (type_file_name, folder) 304 ) 305 306 elif len(result) == 0: 307 raise Exception( 308 "You don't have any %s file in the %s folder, please double check the path" 309 % (type_file_name, folder) 310 ) 311 312 return result[0] 313 314 @staticmethod 315 def parse_parameters(parameters_filename): 316 """Function to parse the parameters from apexAcquisition.method file 317 318 Open the given file and retrieve all parameters from apexAcquisition.method 319 None is written when no value for value is found 320 321 structure : <param name = "AMS_ActiveExclusion"><value>0</value></param> 322 323 Parameters 324 ---------- 325 parameters_filename : str 326 the full path to the apexAcquisition.method file 327 328 Returns 329 ------- 330 dict 331 a dictionary with the parameters and values 332 333 Notes 334 ----- 335 Adapted from code from SPIKE library, https://github.com/spike-project/spike. 336 Code may not handle all possible parameters, but should be sufficient for most common use cases 337 """ 338 339 # TODO: change to beautiful soup xml parsing 340 341 xmldoc = minidom.parse(parameters_filename.open()) 342 343 x = xmldoc.documentElement 344 parameter_dict = {} 345 children = x.childNodes 346 for child in children: 347 # print( child.node) 348 if child.nodeName == "methodmetadata": 349 sections = child.childNodes 350 for section in sections: 351 for element in section.childNodes: 352 if element.nodeName == "date": 353 # if element.nodeName == "primarykey": 354 355 date_time_str = element.childNodes[0].nodeValue 356 # parameter_dict["acquisition_time"] = pd.to_datetime(date_time_str, infer_datetime_format=True).to_pydatetime() 357 parameter_dict["acquisition_time"] = datetime.strptime( 358 date_time_str, "%b_%d_%Y %H:%M:%S.%f" 359 ) 360 361 if child.nodeName == "reportinfo": 362 sections = child.childNodes 363 for section in sections: 364 if section.nodeName == "section": 365 if section.getAttribute("title") == "Main": 366 for element in section.childNodes: 367 if element.nodeName == "section": 368 if element.getAttribute("title") == "Polarity": 369 if ( 370 str( 371 element.childNodes[1].getAttribute( 372 "value" 373 ) 374 ) 375 == "Negative" 376 ): 377 parameter_dict["Polarity"] = -1 378 else: 379 parameter_dict["Polarity"] = 1 380 381 if child.nodeName == "paramlist": 382 params = child.childNodes 383 for param in params: 384 # print( param.nodeName) 385 if param.nodeName == "param": 386 paramenter_label = str(param.getAttribute("name")) 387 for element in param.childNodes: 388 if element.nodeName == "value": 389 try: 390 parameter_value = str(element.firstChild.toxml()) 391 # print v 392 except: 393 parameter_value = None 394 395 parameter_dict[paramenter_label] = parameter_value 396 397 return parameter_dict 398 399 def parse_sqlite(self, sqlite_filename="chromatography-data.sqlite"): 400 """ """ 401 import sqlite3 402 403 def read_sqlite_file(file_path, table_name): 404 """ 405 Read data from a SQLite database file and return it as a list of tuples 406 407 Parameters 408 ---------- 409 file_path : str 410 the full path to the SQLite database file 411 table_name : str 412 the name of the table to be read 413 414 Returns 415 ------- 416 list 417 a list of tuples with the data from the table 418 """ 419 # Connect to the SQLite database file 420 conn = sqlite3.connect(file_path) 421 cursor = conn.cursor() 422 423 # Execute a query to select data from a table (replace 'table_name' with your table's name) 424 query = f"SELECT * FROM {table_name}" 425 cursor.execute(query) 426 427 # Fetch all rows from the result set 428 rows = cursor.fetchall() 429 stream = [] 430 # Print or process the fetched rows 431 for row in rows: 432 stream.append(row) 433 # print(row) # Print each row, you can also process it differently 434 435 # Close the cursor and the connection 436 cursor.close() 437 conn.close() 438 return stream 439 440 def parse_binary(binary, type): 441 """ 442 Parse binary data from the sqlite data streams 443 """ 444 if type == "double": 445 data = frombuffer(binary, dtype=float64) 446 elif type == "float": 447 data = frombuffer(binary, dtype=float32) 448 return data 449 450 sqlite_filelocation = self.locate_file( 451 self.d_directory_location, sqlite_filename 452 ) 453 table_name = "TraceSources" 454 trace_sources = read_sqlite_file(sqlite_filelocation, table_name) 455 table_name = "TraceChunks" 456 trace_chunks = read_sqlite_file(sqlite_filelocation, table_name) 457 times = [] 458 values = [] 459 trace_type = {} 460 461 for index, source in enumerate(trace_sources): 462 trace_id = source[0] 463 trace_type[source[1]] = {"times": [], "values": []} 464 for index, chunk in enumerate(trace_chunks): 465 id = chunk[0] 466 times = parse_binary(chunk[1], "double") 467 values = parse_binary(chunk[2], "float") 468 for time, value in zip(times, values): 469 if source[0] == id: 470 trace_type[source[1]]["times"].append(time) 471 trace_type[source[1]]["values"].append(value) 472 473 return trace_type
16class ReadBrukerSolarix(object): 17 """A class used to Read a single Transient from Bruker's FT-MS acquisition station (fid, or ser) 18 19 Parameters 20 ---------- 21 d_directory_location : str 22 the full path of the .d folder 23 24 Attributes 25 -------- 26 d_directory_location : str 27 the full path of the .d folder 28 file_location : str 29 the full path of the .d folder 30 parameter_filename_location : str 31 the full path of the apexAcquisition.method file 32 transient_data_path : str 33 the full path of the fid or ser file 34 scan_attr : str 35 the full path of the scan.xml file 36 37 38 Methods 39 ------- 40 * get_transient(). 41 Read the data and settings returning a Transient class 42 * get_scan_attr(). 43 Read the scan retention times, TIC values and scan indices. 44 * locate_file(folder, type_file_name). 45 Find the full path of a specific file within the acquisition .d folder or subfolders 46 * parse_parameters(parameters_filename). 47 Open the given file and retrieve all parameters from apexAcquisition.method 48 * fix_freq_limits(d_parameters). 49 Read and set the correct frequency limits for the spectrum 50 * get_excite_sweep_range(filename). 51 Determine excitation sweep range from ExciteSweep file 52 53 """ 54 55 def __enter__(self): 56 return self.get_transient() 57 58 def __exit__(self, exc_type, exc_val, exc_tb): 59 return False 60 61 def __init__(self, d_directory_location): 62 if isinstance(d_directory_location, str): 63 d_directory_location = Path(d_directory_location) 64 65 if not d_directory_location.exists(): 66 raise FileNotFoundError("File does not exist: " + str(d_directory_location)) 67 68 self.d_directory_location = d_directory_location 69 70 self.file_location = d_directory_location 71 72 try: 73 self.parameter_filename_location = self.locate_file( 74 d_directory_location, "apexAcquisition.method" 75 ) 76 self.transient_data_path = d_directory_location / "fid" 77 78 if not self.transient_data_path.exists(): 79 self.transient_data_path = d_directory_location / "ser" 80 81 if not self.transient_data_path.exists(): 82 raise FileNotFoundError("Could not locate transient data") 83 84 else: 85 # get scan attributes 86 self.scan_attr = d_directory_location / "scan.xml" 87 88 except: 89 raise FileExistsError( 90 "%s does not seem to be a valid Solarix Mass Spectrum" 91 % (d_directory_location) 92 ) 93 94 def get_scan_attr(self): 95 """Function to get the scan retention times, TIC values and scan indices. 96 97 Gets information from scan.xml file in the bruker .d folder. 98 Note this file is only present in some .d format - e.g. for imaging mode data, it is not present. 99 100 Returns 101 ------- 102 dict_scan_rt_tic : dict 103 a dictionary with scan number as key and rt and tic as values 104 """ 105 106 from bs4 import BeautifulSoup 107 108 try: 109 soup = BeautifulSoup(self.scan_attr.open(), "xml") 110 except: 111 raise FileNotFoundError( 112 "Dataset does not appear to contain a 'scan.xml' file or it is misformated" 113 ) 114 115 list_rt = [float(rt.text) for rt in soup.find_all("minutes")] 116 list_tic = [float(tic.text) for tic in soup.find_all("tic")] 117 list_scan = [int(scan.text) for scan in soup.find_all("count")] 118 119 dict_scan_rt_tic = dict(zip(list_scan, zip(list_rt, list_tic))) 120 121 return dict_scan_rt_tic 122 123 def get_transient(self, scan_number=1): 124 """Function to get the transient data and parameters from a Bruker Solarix .d folder. 125 126 Parameters 127 ---------- 128 scan_number : int 129 the scan number to be read. Default is 1. 130 131 Returns 132 ------- 133 Transient 134 a transient object 135 """ 136 137 file_d_params = self.parse_parameters(self.parameter_filename_location) 138 139 self.fix_freq_limits(file_d_params) 140 141 from sys import platform 142 143 if platform == "win32": 144 # Windows... 145 dt = dtype("l") 146 else: 147 dt = dtype("i") 148 149 # get rt, scan, and tic from scan.xml file, otherwise using 0 defaults values 150 151 output_parameters = deepcopy(default_parameters(self.d_directory_location)) 152 153 if self.transient_data_path.name == "ser": 154 if self.scan_attr.exists(): 155 dict_scan_rt_tic = self.get_scan_attr() 156 157 output_parameters["scan_number"] = scan_number 158 159 output_parameters["rt"] = dict_scan_rt_tic.get(scan_number)[0] 160 161 output_parameters["tic"] = dict_scan_rt_tic.get(scan_number)[1] 162 163 output_parameters["analyzer"] = "ICR" 164 165 output_parameters["label"] = "Bruker_Frequency" 166 167 output_parameters["Aterm"] = float(file_d_params.get("ML1")) 168 169 output_parameters["Bterm"] = float(file_d_params.get("ML2")) 170 171 output_parameters["Cterm"] = float(file_d_params.get("ML3")) 172 173 output_parameters["exc_high_freq"] = float(file_d_params.get("EXC_Freq_High")) 174 175 output_parameters["exc_low_freq"] = float(file_d_params.get("EXC_Freq_Low")) 176 try: 177 output_parameters["qpd_enabled"] = float(file_d_params.get("QPD_Enabled")) 178 except TypeError: # for older datasets which dont have this variable 179 output_parameters["qpd_enabled"] = 0 180 181 output_parameters["mw_low"] = float(file_d_params.get("MW_low")) 182 183 output_parameters["mw_high"] = float(file_d_params.get("MW_high")) 184 185 output_parameters["bandwidth"] = float(file_d_params.get("SW_h")) 186 187 output_parameters["number_data_points"] = int(file_d_params.get("TD")) 188 189 output_parameters["polarity"] = str(file_d_params.get("Polarity")) 190 191 output_parameters["acquisition_time"] = file_d_params.get("acquisition_time") 192 193 data_points = int(file_d_params.get("TD")) 194 195 scan = output_parameters["scan_number"] 196 from io import BytesIO 197 198 if self.transient_data_path.name == "ser": 199 if isinstance(self.transient_data_path, S3Path): 200 databin = BytesIO(self.transient_data_path.open("rb").read()) 201 202 else: 203 databin = self.transient_data_path.open("rb") 204 205 databin.seek((scan - 1) * 4 * data_points) 206 # read scan data and parse to 32int struct 207 data = frombuffer(databin.read(4 * data_points), dtype=dt) 208 209 else: 210 if isinstance(self.transient_data_path, S3Path): 211 data = frombuffer(self.transient_data_path.open("rb").read(), dtype=dt) 212 else: 213 data = fromfile(self.transient_data_path, dtype=dt) 214 215 return Transient(data, output_parameters) 216 217 # for key, values in default_parameters.items(): 218 # print(key, values) 219 def fix_freq_limits(self, d_parameters): 220 """Function to read and set the correct frequency limits for the spectrum 221 222 Notes 223 -------- 224 This is using the excitation limits from the apexAcquisition.method file, 225 which may not match the intended detection limits in edge cases. 226 In default acquisitions, excitation and detection are the same. 227 But, they may not be in some cases with selective excitation, custom excite waveforms, or in 2DMS applications. 228 229 Parameters 230 ---------- 231 d_parameters : dict 232 a dictionary with the parameters from the apexAcquisition.method file 233 """ 234 235 highfreq = float(d_parameters.get("EXC_Freq_High")) 236 237 lowfreq = float(d_parameters.get("EXC_Freq_Low")) 238 239 # CR for compatibility with Apex format as there is no EXciteSweep file 240 if not highfreq and lowfreq: 241 excitation_sweep_filelocation = self.locate_file( 242 self.d_directory_location, "ExciteSweep" 243 ) 244 lowfreq, highfreq = self.get_excite_sweep_range( 245 excitation_sweep_filelocation 246 ) 247 d_parameters["EXC_Freq_High"] = highfreq 248 d_parameters["EXC_Freq_Low"] = lowfreq 249 250 @staticmethod 251 def get_excite_sweep_range(filename): 252 """Function to determine excitation sweep range from ExciteSweep file 253 254 This looks at the first and last rows of the ExciteSweep file to determine the excitation frequency range. 255 Note that this assumes the excitation sweep was linear and the first and last rows are the lowest and highest frequencies. 256 This is presumably always true, but again may be incorrect for edge cases with custom excitation waveforms. 257 258 Parameters 259 ---------- 260 filename : str 261 the full path to the ExciteSweep file 262 263 """ 264 ExciteSweep_lines = genfromtxt(filename, comments="*", delimiter="\n") 265 # CR ready if we need the full array 266 highfreq = fromstring(ExciteSweep_lines[0]) 267 lowfreq = fromstring(ExciteSweep_lines[-1]) 268 269 return lowfreq[0], highfreq[0] 270 271 @staticmethod 272 def locate_file(folder, type_file_name="apexAcquisition.method"): 273 """Function to locate a file in a folder 274 275 Find the full path of a specific file within the acquisition .d folder or subfolders 276 277 Parameters 278 ---------- 279 folder : str 280 the full path to the folder 281 type_file_name : str 282 the name of the file to be located 283 Expected options: ExciteSweep or apexAcquisition.method 284 285 Returns 286 ------- 287 str 288 the full path to the file 289 290 Notes 291 ----- 292 adapted from code from SPIKE library, https://github.com/spike-project/spike 293 294 """ 295 296 from pathlib import Path 297 298 # directory_location = folder.glob( '**/*apexAcquisition.method') 299 directory_location = folder.glob("**/*" + type_file_name) 300 result = list(directory_location) 301 if len(result) > 1: 302 raise Exception( 303 "You have more than 1 %s file in the %s folder, using the first one" 304 % (type_file_name, folder) 305 ) 306 307 elif len(result) == 0: 308 raise Exception( 309 "You don't have any %s file in the %s folder, please double check the path" 310 % (type_file_name, folder) 311 ) 312 313 return result[0] 314 315 @staticmethod 316 def parse_parameters(parameters_filename): 317 """Function to parse the parameters from apexAcquisition.method file 318 319 Open the given file and retrieve all parameters from apexAcquisition.method 320 None is written when no value for value is found 321 322 structure : <param name = "AMS_ActiveExclusion"><value>0</value></param> 323 324 Parameters 325 ---------- 326 parameters_filename : str 327 the full path to the apexAcquisition.method file 328 329 Returns 330 ------- 331 dict 332 a dictionary with the parameters and values 333 334 Notes 335 ----- 336 Adapted from code from SPIKE library, https://github.com/spike-project/spike. 337 Code may not handle all possible parameters, but should be sufficient for most common use cases 338 """ 339 340 # TODO: change to beautiful soup xml parsing 341 342 xmldoc = minidom.parse(parameters_filename.open()) 343 344 x = xmldoc.documentElement 345 parameter_dict = {} 346 children = x.childNodes 347 for child in children: 348 # print( child.node) 349 if child.nodeName == "methodmetadata": 350 sections = child.childNodes 351 for section in sections: 352 for element in section.childNodes: 353 if element.nodeName == "date": 354 # if element.nodeName == "primarykey": 355 356 date_time_str = element.childNodes[0].nodeValue 357 # parameter_dict["acquisition_time"] = pd.to_datetime(date_time_str, infer_datetime_format=True).to_pydatetime() 358 parameter_dict["acquisition_time"] = datetime.strptime( 359 date_time_str, "%b_%d_%Y %H:%M:%S.%f" 360 ) 361 362 if child.nodeName == "reportinfo": 363 sections = child.childNodes 364 for section in sections: 365 if section.nodeName == "section": 366 if section.getAttribute("title") == "Main": 367 for element in section.childNodes: 368 if element.nodeName == "section": 369 if element.getAttribute("title") == "Polarity": 370 if ( 371 str( 372 element.childNodes[1].getAttribute( 373 "value" 374 ) 375 ) 376 == "Negative" 377 ): 378 parameter_dict["Polarity"] = -1 379 else: 380 parameter_dict["Polarity"] = 1 381 382 if child.nodeName == "paramlist": 383 params = child.childNodes 384 for param in params: 385 # print( param.nodeName) 386 if param.nodeName == "param": 387 paramenter_label = str(param.getAttribute("name")) 388 for element in param.childNodes: 389 if element.nodeName == "value": 390 try: 391 parameter_value = str(element.firstChild.toxml()) 392 # print v 393 except: 394 parameter_value = None 395 396 parameter_dict[paramenter_label] = parameter_value 397 398 return parameter_dict 399 400 def parse_sqlite(self, sqlite_filename="chromatography-data.sqlite"): 401 """ """ 402 import sqlite3 403 404 def read_sqlite_file(file_path, table_name): 405 """ 406 Read data from a SQLite database file and return it as a list of tuples 407 408 Parameters 409 ---------- 410 file_path : str 411 the full path to the SQLite database file 412 table_name : str 413 the name of the table to be read 414 415 Returns 416 ------- 417 list 418 a list of tuples with the data from the table 419 """ 420 # Connect to the SQLite database file 421 conn = sqlite3.connect(file_path) 422 cursor = conn.cursor() 423 424 # Execute a query to select data from a table (replace 'table_name' with your table's name) 425 query = f"SELECT * FROM {table_name}" 426 cursor.execute(query) 427 428 # Fetch all rows from the result set 429 rows = cursor.fetchall() 430 stream = [] 431 # Print or process the fetched rows 432 for row in rows: 433 stream.append(row) 434 # print(row) # Print each row, you can also process it differently 435 436 # Close the cursor and the connection 437 cursor.close() 438 conn.close() 439 return stream 440 441 def parse_binary(binary, type): 442 """ 443 Parse binary data from the sqlite data streams 444 """ 445 if type == "double": 446 data = frombuffer(binary, dtype=float64) 447 elif type == "float": 448 data = frombuffer(binary, dtype=float32) 449 return data 450 451 sqlite_filelocation = self.locate_file( 452 self.d_directory_location, sqlite_filename 453 ) 454 table_name = "TraceSources" 455 trace_sources = read_sqlite_file(sqlite_filelocation, table_name) 456 table_name = "TraceChunks" 457 trace_chunks = read_sqlite_file(sqlite_filelocation, table_name) 458 times = [] 459 values = [] 460 trace_type = {} 461 462 for index, source in enumerate(trace_sources): 463 trace_id = source[0] 464 trace_type[source[1]] = {"times": [], "values": []} 465 for index, chunk in enumerate(trace_chunks): 466 id = chunk[0] 467 times = parse_binary(chunk[1], "double") 468 values = parse_binary(chunk[2], "float") 469 for time, value in zip(times, values): 470 if source[0] == id: 471 trace_type[source[1]]["times"].append(time) 472 trace_type[source[1]]["values"].append(value) 473 474 return trace_type
A class used to Read a single Transient from Bruker's FT-MS acquisition station (fid, or ser)
Parameters
- d_directory_location (str): the full path of the .d folder
Attributes
- d_directory_location (str): the full path of the .d folder
- file_location (str): the full path of the .d folder
- parameter_filename_location (str): the full path of the apexAcquisition.method file
- transient_data_path (str): the full path of the fid or ser file
- scan_attr (str): the full path of the scan.xml file
Methods
- get_transient(). Read the data and settings returning a Transient class
- get_scan_attr(). Read the scan retention times, TIC values and scan indices.
- locate_file(folder, type_file_name). Find the full path of a specific file within the acquisition .d folder or subfolders
- parse_parameters(parameters_filename). Open the given file and retrieve all parameters from apexAcquisition.method
- fix_freq_limits(d_parameters). Read and set the correct frequency limits for the spectrum
- get_excite_sweep_range(filename). Determine excitation sweep range from ExciteSweep file
61 def __init__(self, d_directory_location): 62 if isinstance(d_directory_location, str): 63 d_directory_location = Path(d_directory_location) 64 65 if not d_directory_location.exists(): 66 raise FileNotFoundError("File does not exist: " + str(d_directory_location)) 67 68 self.d_directory_location = d_directory_location 69 70 self.file_location = d_directory_location 71 72 try: 73 self.parameter_filename_location = self.locate_file( 74 d_directory_location, "apexAcquisition.method" 75 ) 76 self.transient_data_path = d_directory_location / "fid" 77 78 if not self.transient_data_path.exists(): 79 self.transient_data_path = d_directory_location / "ser" 80 81 if not self.transient_data_path.exists(): 82 raise FileNotFoundError("Could not locate transient data") 83 84 else: 85 # get scan attributes 86 self.scan_attr = d_directory_location / "scan.xml" 87 88 except: 89 raise FileExistsError( 90 "%s does not seem to be a valid Solarix Mass Spectrum" 91 % (d_directory_location) 92 )
94 def get_scan_attr(self): 95 """Function to get the scan retention times, TIC values and scan indices. 96 97 Gets information from scan.xml file in the bruker .d folder. 98 Note this file is only present in some .d format - e.g. for imaging mode data, it is not present. 99 100 Returns 101 ------- 102 dict_scan_rt_tic : dict 103 a dictionary with scan number as key and rt and tic as values 104 """ 105 106 from bs4 import BeautifulSoup 107 108 try: 109 soup = BeautifulSoup(self.scan_attr.open(), "xml") 110 except: 111 raise FileNotFoundError( 112 "Dataset does not appear to contain a 'scan.xml' file or it is misformated" 113 ) 114 115 list_rt = [float(rt.text) for rt in soup.find_all("minutes")] 116 list_tic = [float(tic.text) for tic in soup.find_all("tic")] 117 list_scan = [int(scan.text) for scan in soup.find_all("count")] 118 119 dict_scan_rt_tic = dict(zip(list_scan, zip(list_rt, list_tic))) 120 121 return dict_scan_rt_tic
Function to get the scan retention times, TIC values and scan indices.
Gets information from scan.xml file in the bruker .d folder. Note this file is only present in some .d format - e.g. for imaging mode data, it is not present.
Returns
- dict_scan_rt_tic (dict): a dictionary with scan number as key and rt and tic as values
123 def get_transient(self, scan_number=1): 124 """Function to get the transient data and parameters from a Bruker Solarix .d folder. 125 126 Parameters 127 ---------- 128 scan_number : int 129 the scan number to be read. Default is 1. 130 131 Returns 132 ------- 133 Transient 134 a transient object 135 """ 136 137 file_d_params = self.parse_parameters(self.parameter_filename_location) 138 139 self.fix_freq_limits(file_d_params) 140 141 from sys import platform 142 143 if platform == "win32": 144 # Windows... 145 dt = dtype("l") 146 else: 147 dt = dtype("i") 148 149 # get rt, scan, and tic from scan.xml file, otherwise using 0 defaults values 150 151 output_parameters = deepcopy(default_parameters(self.d_directory_location)) 152 153 if self.transient_data_path.name == "ser": 154 if self.scan_attr.exists(): 155 dict_scan_rt_tic = self.get_scan_attr() 156 157 output_parameters["scan_number"] = scan_number 158 159 output_parameters["rt"] = dict_scan_rt_tic.get(scan_number)[0] 160 161 output_parameters["tic"] = dict_scan_rt_tic.get(scan_number)[1] 162 163 output_parameters["analyzer"] = "ICR" 164 165 output_parameters["label"] = "Bruker_Frequency" 166 167 output_parameters["Aterm"] = float(file_d_params.get("ML1")) 168 169 output_parameters["Bterm"] = float(file_d_params.get("ML2")) 170 171 output_parameters["Cterm"] = float(file_d_params.get("ML3")) 172 173 output_parameters["exc_high_freq"] = float(file_d_params.get("EXC_Freq_High")) 174 175 output_parameters["exc_low_freq"] = float(file_d_params.get("EXC_Freq_Low")) 176 try: 177 output_parameters["qpd_enabled"] = float(file_d_params.get("QPD_Enabled")) 178 except TypeError: # for older datasets which dont have this variable 179 output_parameters["qpd_enabled"] = 0 180 181 output_parameters["mw_low"] = float(file_d_params.get("MW_low")) 182 183 output_parameters["mw_high"] = float(file_d_params.get("MW_high")) 184 185 output_parameters["bandwidth"] = float(file_d_params.get("SW_h")) 186 187 output_parameters["number_data_points"] = int(file_d_params.get("TD")) 188 189 output_parameters["polarity"] = str(file_d_params.get("Polarity")) 190 191 output_parameters["acquisition_time"] = file_d_params.get("acquisition_time") 192 193 data_points = int(file_d_params.get("TD")) 194 195 scan = output_parameters["scan_number"] 196 from io import BytesIO 197 198 if self.transient_data_path.name == "ser": 199 if isinstance(self.transient_data_path, S3Path): 200 databin = BytesIO(self.transient_data_path.open("rb").read()) 201 202 else: 203 databin = self.transient_data_path.open("rb") 204 205 databin.seek((scan - 1) * 4 * data_points) 206 # read scan data and parse to 32int struct 207 data = frombuffer(databin.read(4 * data_points), dtype=dt) 208 209 else: 210 if isinstance(self.transient_data_path, S3Path): 211 data = frombuffer(self.transient_data_path.open("rb").read(), dtype=dt) 212 else: 213 data = fromfile(self.transient_data_path, dtype=dt) 214 215 return Transient(data, output_parameters)
Function to get the transient data and parameters from a Bruker Solarix .d folder.
Parameters
- scan_number (int): the scan number to be read. Default is 1.
Returns
- Transient: a transient object
219 def fix_freq_limits(self, d_parameters): 220 """Function to read and set the correct frequency limits for the spectrum 221 222 Notes 223 -------- 224 This is using the excitation limits from the apexAcquisition.method file, 225 which may not match the intended detection limits in edge cases. 226 In default acquisitions, excitation and detection are the same. 227 But, they may not be in some cases with selective excitation, custom excite waveforms, or in 2DMS applications. 228 229 Parameters 230 ---------- 231 d_parameters : dict 232 a dictionary with the parameters from the apexAcquisition.method file 233 """ 234 235 highfreq = float(d_parameters.get("EXC_Freq_High")) 236 237 lowfreq = float(d_parameters.get("EXC_Freq_Low")) 238 239 # CR for compatibility with Apex format as there is no EXciteSweep file 240 if not highfreq and lowfreq: 241 excitation_sweep_filelocation = self.locate_file( 242 self.d_directory_location, "ExciteSweep" 243 ) 244 lowfreq, highfreq = self.get_excite_sweep_range( 245 excitation_sweep_filelocation 246 ) 247 d_parameters["EXC_Freq_High"] = highfreq 248 d_parameters["EXC_Freq_Low"] = lowfreq
Function to read and set the correct frequency limits for the spectrum
Notes
This is using the excitation limits from the apexAcquisition.method file, which may not match the intended detection limits in edge cases. In default acquisitions, excitation and detection are the same. But, they may not be in some cases with selective excitation, custom excite waveforms, or in 2DMS applications.
Parameters
- d_parameters (dict): a dictionary with the parameters from the apexAcquisition.method file
250 @staticmethod 251 def get_excite_sweep_range(filename): 252 """Function to determine excitation sweep range from ExciteSweep file 253 254 This looks at the first and last rows of the ExciteSweep file to determine the excitation frequency range. 255 Note that this assumes the excitation sweep was linear and the first and last rows are the lowest and highest frequencies. 256 This is presumably always true, but again may be incorrect for edge cases with custom excitation waveforms. 257 258 Parameters 259 ---------- 260 filename : str 261 the full path to the ExciteSweep file 262 263 """ 264 ExciteSweep_lines = genfromtxt(filename, comments="*", delimiter="\n") 265 # CR ready if we need the full array 266 highfreq = fromstring(ExciteSweep_lines[0]) 267 lowfreq = fromstring(ExciteSweep_lines[-1]) 268 269 return lowfreq[0], highfreq[0]
Function to determine excitation sweep range from ExciteSweep file
This looks at the first and last rows of the ExciteSweep file to determine the excitation frequency range. Note that this assumes the excitation sweep was linear and the first and last rows are the lowest and highest frequencies. This is presumably always true, but again may be incorrect for edge cases with custom excitation waveforms.
Parameters
- filename (str): the full path to the ExciteSweep file
271 @staticmethod 272 def locate_file(folder, type_file_name="apexAcquisition.method"): 273 """Function to locate a file in a folder 274 275 Find the full path of a specific file within the acquisition .d folder or subfolders 276 277 Parameters 278 ---------- 279 folder : str 280 the full path to the folder 281 type_file_name : str 282 the name of the file to be located 283 Expected options: ExciteSweep or apexAcquisition.method 284 285 Returns 286 ------- 287 str 288 the full path to the file 289 290 Notes 291 ----- 292 adapted from code from SPIKE library, https://github.com/spike-project/spike 293 294 """ 295 296 from pathlib import Path 297 298 # directory_location = folder.glob( '**/*apexAcquisition.method') 299 directory_location = folder.glob("**/*" + type_file_name) 300 result = list(directory_location) 301 if len(result) > 1: 302 raise Exception( 303 "You have more than 1 %s file in the %s folder, using the first one" 304 % (type_file_name, folder) 305 ) 306 307 elif len(result) == 0: 308 raise Exception( 309 "You don't have any %s file in the %s folder, please double check the path" 310 % (type_file_name, folder) 311 ) 312 313 return result[0]
Function to locate a file in a folder
Find the full path of a specific file within the acquisition .d folder or subfolders
Parameters
- folder (str): the full path to the folder
- type_file_name (str): the name of the file to be located Expected options: ExciteSweep or apexAcquisition.method
Returns
- str: the full path to the file
Notes
adapted from code from SPIKE library, https://github.com/spike-project/spike
315 @staticmethod 316 def parse_parameters(parameters_filename): 317 """Function to parse the parameters from apexAcquisition.method file 318 319 Open the given file and retrieve all parameters from apexAcquisition.method 320 None is written when no value for value is found 321 322 structure : <param name = "AMS_ActiveExclusion"><value>0</value></param> 323 324 Parameters 325 ---------- 326 parameters_filename : str 327 the full path to the apexAcquisition.method file 328 329 Returns 330 ------- 331 dict 332 a dictionary with the parameters and values 333 334 Notes 335 ----- 336 Adapted from code from SPIKE library, https://github.com/spike-project/spike. 337 Code may not handle all possible parameters, but should be sufficient for most common use cases 338 """ 339 340 # TODO: change to beautiful soup xml parsing 341 342 xmldoc = minidom.parse(parameters_filename.open()) 343 344 x = xmldoc.documentElement 345 parameter_dict = {} 346 children = x.childNodes 347 for child in children: 348 # print( child.node) 349 if child.nodeName == "methodmetadata": 350 sections = child.childNodes 351 for section in sections: 352 for element in section.childNodes: 353 if element.nodeName == "date": 354 # if element.nodeName == "primarykey": 355 356 date_time_str = element.childNodes[0].nodeValue 357 # parameter_dict["acquisition_time"] = pd.to_datetime(date_time_str, infer_datetime_format=True).to_pydatetime() 358 parameter_dict["acquisition_time"] = datetime.strptime( 359 date_time_str, "%b_%d_%Y %H:%M:%S.%f" 360 ) 361 362 if child.nodeName == "reportinfo": 363 sections = child.childNodes 364 for section in sections: 365 if section.nodeName == "section": 366 if section.getAttribute("title") == "Main": 367 for element in section.childNodes: 368 if element.nodeName == "section": 369 if element.getAttribute("title") == "Polarity": 370 if ( 371 str( 372 element.childNodes[1].getAttribute( 373 "value" 374 ) 375 ) 376 == "Negative" 377 ): 378 parameter_dict["Polarity"] = -1 379 else: 380 parameter_dict["Polarity"] = 1 381 382 if child.nodeName == "paramlist": 383 params = child.childNodes 384 for param in params: 385 # print( param.nodeName) 386 if param.nodeName == "param": 387 paramenter_label = str(param.getAttribute("name")) 388 for element in param.childNodes: 389 if element.nodeName == "value": 390 try: 391 parameter_value = str(element.firstChild.toxml()) 392 # print v 393 except: 394 parameter_value = None 395 396 parameter_dict[paramenter_label] = parameter_value 397 398 return parameter_dict
Function to parse the parameters from apexAcquisition.method file
Open the given file and retrieve all parameters from apexAcquisition.method None is written when no value for value is found
structure : <param name = "AMS_ActiveExclusion"><value>0</value></param>
Parameters
- parameters_filename (str): the full path to the apexAcquisition.method file
Returns
- dict: a dictionary with the parameters and values
Notes
Adapted from code from SPIKE library, https://github.com/spike-project/spike. Code may not handle all possible parameters, but should be sufficient for most common use cases
400 def parse_sqlite(self, sqlite_filename="chromatography-data.sqlite"): 401 """ """ 402 import sqlite3 403 404 def read_sqlite_file(file_path, table_name): 405 """ 406 Read data from a SQLite database file and return it as a list of tuples 407 408 Parameters 409 ---------- 410 file_path : str 411 the full path to the SQLite database file 412 table_name : str 413 the name of the table to be read 414 415 Returns 416 ------- 417 list 418 a list of tuples with the data from the table 419 """ 420 # Connect to the SQLite database file 421 conn = sqlite3.connect(file_path) 422 cursor = conn.cursor() 423 424 # Execute a query to select data from a table (replace 'table_name' with your table's name) 425 query = f"SELECT * FROM {table_name}" 426 cursor.execute(query) 427 428 # Fetch all rows from the result set 429 rows = cursor.fetchall() 430 stream = [] 431 # Print or process the fetched rows 432 for row in rows: 433 stream.append(row) 434 # print(row) # Print each row, you can also process it differently 435 436 # Close the cursor and the connection 437 cursor.close() 438 conn.close() 439 return stream 440 441 def parse_binary(binary, type): 442 """ 443 Parse binary data from the sqlite data streams 444 """ 445 if type == "double": 446 data = frombuffer(binary, dtype=float64) 447 elif type == "float": 448 data = frombuffer(binary, dtype=float32) 449 return data 450 451 sqlite_filelocation = self.locate_file( 452 self.d_directory_location, sqlite_filename 453 ) 454 table_name = "TraceSources" 455 trace_sources = read_sqlite_file(sqlite_filelocation, table_name) 456 table_name = "TraceChunks" 457 trace_chunks = read_sqlite_file(sqlite_filelocation, table_name) 458 times = [] 459 values = [] 460 trace_type = {} 461 462 for index, source in enumerate(trace_sources): 463 trace_id = source[0] 464 trace_type[source[1]] = {"times": [], "values": []} 465 for index, chunk in enumerate(trace_chunks): 466 id = chunk[0] 467 times = parse_binary(chunk[1], "double") 468 values = parse_binary(chunk[2], "float") 469 for time, value in zip(times, values): 470 if source[0] == id: 471 trace_type[source[1]]["times"].append(time) 472 trace_type[source[1]]["values"].append(value) 473 474 return trace_type