corems.transient.input.brukerSolarix

  1__author__ = "Yuri E. Corilo"
  2__date__ = "Jun 12, 2019"
  3from copy import deepcopy
  4from datetime import datetime
  5from pathlib import Path
  6from xml.dom import minidom
  7
  8from numpy import dtype, float32, float64, frombuffer, fromfile, fromstring, genfromtxt
  9from s3path import S3Path
 10
 11from corems.encapsulation.factory.parameters import default_parameters
 12from corems.transient.factory.TransientClasses import Transient
 13
 14
 15class ReadBrukerSolarix(object):
 16    """A class used to Read a single Transient from Bruker's FT-MS acquisition station (fid, or ser)
 17
 18    Parameters
 19    ----------
 20    d_directory_location : str
 21        the full path of the .d folder
 22
 23    Attributes
 24    --------
 25    d_directory_location : str
 26        the full path of the .d folder
 27    file_location : str
 28        the full path of the .d folder
 29    parameter_filename_location : str
 30        the full path of the apexAcquisition.method file
 31    transient_data_path : str
 32        the full path of the fid or ser file
 33    scan_attr : str
 34        the full path of the scan.xml file
 35
 36
 37    Methods
 38    -------
 39    * get_transient().
 40        Read the data and settings returning a Transient class
 41    * get_scan_attr().
 42        Read the scan retention times, TIC values and scan indices.
 43    * locate_file(folder, type_file_name).
 44        Find the full path of a specific file within the acquisition .d folder or subfolders
 45    * parse_parameters(parameters_filename).
 46        Open the given file and retrieve all parameters from apexAcquisition.method
 47    * fix_freq_limits(d_parameters).
 48        Read and set the correct frequency limits for the spectrum
 49    * get_excite_sweep_range(filename).
 50        Determine excitation sweep range from ExciteSweep file
 51
 52    """
 53
 54    def __enter__(self):
 55        return self.get_transient()
 56
 57    def __exit__(self, exc_type, exc_val, exc_tb):
 58        return False
 59
 60    def __init__(self, d_directory_location):
 61        if isinstance(d_directory_location, str):
 62            d_directory_location = Path(d_directory_location)
 63
 64        if not d_directory_location.exists():
 65            raise FileNotFoundError("File does not exist: " + str(d_directory_location))
 66
 67        self.d_directory_location = d_directory_location
 68
 69        self.file_location = d_directory_location
 70
 71        try:
 72            self.parameter_filename_location = self.locate_file(
 73                d_directory_location, "apexAcquisition.method"
 74            )
 75            self.transient_data_path = d_directory_location / "fid"
 76
 77            if not self.transient_data_path.exists():
 78                self.transient_data_path = d_directory_location / "ser"
 79
 80                if not self.transient_data_path.exists():
 81                    raise FileNotFoundError("Could not locate transient data")
 82
 83                else:
 84                    # get scan attributes
 85                    self.scan_attr = d_directory_location / "scan.xml"
 86
 87        except:
 88            raise FileExistsError(
 89                "%s does not seem to be a valid Solarix Mass Spectrum"
 90                % (d_directory_location)
 91            )
 92
 93    def get_scan_attr(self):
 94        """Function to get the scan retention times, TIC values and scan indices.
 95
 96        Gets information from scan.xml file in the bruker .d folder.
 97        Note this file is only present in some .d format - e.g. for imaging mode data, it is not present.
 98
 99        Returns
100        -------
101        dict_scan_rt_tic : dict
102            a dictionary with scan number as key and rt and tic as values
103        """
104
105        from bs4 import BeautifulSoup
106
107        try:
108            soup = BeautifulSoup(self.scan_attr.open(), "xml")
109        except:
110            raise FileNotFoundError(
111                "Dataset does not appear to contain a 'scan.xml' file or it is misformated"
112            )
113
114        list_rt = [float(rt.text) for rt in soup.find_all("minutes")]
115        list_tic = [float(tic.text) for tic in soup.find_all("tic")]
116        list_scan = [int(scan.text) for scan in soup.find_all("count")]
117
118        dict_scan_rt_tic = dict(zip(list_scan, zip(list_rt, list_tic)))
119
120        return dict_scan_rt_tic
121
122    def get_transient(self, scan_number=1):
123        """Function to get the transient data and parameters from a Bruker Solarix .d folder.
124
125        Parameters
126        ----------
127        scan_number : int
128            the scan number to be read. Default is 1.
129
130        Returns
131        -------
132        Transient
133            a transient object
134        """
135
136        file_d_params = self.parse_parameters(self.parameter_filename_location)
137
138        self.fix_freq_limits(file_d_params)
139
140        from sys import platform
141
142        if platform == "win32":
143            # Windows...
144            dt = dtype("l")
145        else:
146            dt = dtype("i")
147
148        # get rt, scan, and tic from scan.xml file, otherwise  using 0 defaults values
149
150        output_parameters = deepcopy(default_parameters(self.d_directory_location))
151
152        if self.transient_data_path.name == "ser":
153            if self.scan_attr.exists():
154                dict_scan_rt_tic = self.get_scan_attr()
155
156                output_parameters["scan_number"] = scan_number
157
158                output_parameters["rt"] = dict_scan_rt_tic.get(scan_number)[0]
159
160                output_parameters["tic"] = dict_scan_rt_tic.get(scan_number)[1]
161
162        output_parameters["analyzer"] = "ICR"
163
164        output_parameters["label"] = "Bruker_Frequency"
165
166        output_parameters["Aterm"] = float(file_d_params.get("ML1"))
167
168        output_parameters["Bterm"] = float(file_d_params.get("ML2"))
169
170        output_parameters["Cterm"] = float(file_d_params.get("ML3"))
171
172        output_parameters["exc_high_freq"] = float(file_d_params.get("EXC_Freq_High"))
173
174        output_parameters["exc_low_freq"] = float(file_d_params.get("EXC_Freq_Low"))
175        try:
176            output_parameters["qpd_enabled"] = float(file_d_params.get("QPD_Enabled"))
177        except TypeError:  # for older datasets which dont have this variable
178            output_parameters["qpd_enabled"] = 0
179
180        output_parameters["mw_low"] = float(file_d_params.get("MW_low"))
181
182        output_parameters["mw_high"] = float(file_d_params.get("MW_high"))
183
184        output_parameters["bandwidth"] = float(file_d_params.get("SW_h"))
185
186        output_parameters["number_data_points"] = int(file_d_params.get("TD"))
187
188        output_parameters["polarity"] = str(file_d_params.get("Polarity"))
189
190        output_parameters["acquisition_time"] = file_d_params.get("acquisition_time")
191
192        data_points = int(file_d_params.get("TD"))
193
194        scan = output_parameters["scan_number"]
195        from io import BytesIO
196
197        if self.transient_data_path.name == "ser":
198            if isinstance(self.transient_data_path, S3Path):
199                databin = BytesIO(self.transient_data_path.open("rb").read())
200
201            else:
202                databin = self.transient_data_path.open("rb")
203
204            databin.seek((scan - 1) * 4 * data_points)
205            # read scan data and parse to 32int struct
206            data = frombuffer(databin.read(4 * data_points), dtype=dt)
207
208        else:
209            if isinstance(self.transient_data_path, S3Path):
210                data = frombuffer(self.transient_data_path.open("rb").read(), dtype=dt)
211            else:
212                data = fromfile(self.transient_data_path, dtype=dt)
213
214        return Transient(data, output_parameters)
215
216    #    for key, values in default_parameters.items():
217    #        print(key, values)
218    def fix_freq_limits(self, d_parameters):
219        """Function to read and set the correct frequency limits for the spectrum
220
221        Notes
222        --------
223        This is using the excitation limits from the apexAcquisition.method file,
224        which may not match the intended detection limits in edge cases.
225        In default acquisitions, excitation and detection are the same.
226        But, they may not be in some cases with selective excitation, custom excite waveforms, or in 2DMS applications.
227
228        Parameters
229        ----------
230        d_parameters : dict
231            a dictionary with the parameters from the apexAcquisition.method file
232        """
233
234        highfreq = float(d_parameters.get("EXC_Freq_High"))
235
236        lowfreq = float(d_parameters.get("EXC_Freq_Low"))
237
238        # CR for compatibility with Apex format as there is no EXciteSweep file
239        if not highfreq and lowfreq:
240            excitation_sweep_filelocation = self.locate_file(
241                self.d_directory_location, "ExciteSweep"
242            )
243            lowfreq, highfreq = self.get_excite_sweep_range(
244                excitation_sweep_filelocation
245            )
246            d_parameters["EXC_Freq_High"] = highfreq
247            d_parameters["EXC_Freq_Low"] = lowfreq
248
249    @staticmethod
250    def get_excite_sweep_range(filename):
251        """Function to determine excitation sweep range from ExciteSweep file
252
253        This looks at the first and last rows of the ExciteSweep file to determine the excitation frequency range.
254        Note that this assumes the excitation sweep was linear and the first and last rows are the lowest and highest frequencies.
255        This is presumably always true, but again may be incorrect for edge cases with custom excitation waveforms.
256
257        Parameters
258        ----------
259        filename : str
260            the full path to the ExciteSweep file
261
262        """
263        ExciteSweep_lines = genfromtxt(filename, comments="*", delimiter="\n")
264        # CR ready if we need the full array
265        highfreq = fromstring(ExciteSweep_lines[0])
266        lowfreq = fromstring(ExciteSweep_lines[-1])
267
268        return lowfreq[0], highfreq[0]
269
270    @staticmethod
271    def locate_file(folder, type_file_name="apexAcquisition.method"):
272        """Function to locate a file in a folder
273
274        Find the full path of a specific file within the acquisition .d folder or subfolders
275
276        Parameters
277        ----------
278        folder : str
279            the full path to the folder
280        type_file_name : str
281            the name of the file to be located
282            Expected options: ExciteSweep or apexAcquisition.method
283
284        Returns
285        -------
286        str
287            the full path to the file
288
289        Notes
290        -----
291        adapted from code from SPIKE library, https://github.com/spike-project/spike
292
293        """
294
295        from pathlib import Path
296
297        # directory_location = folder.glob( '**/*apexAcquisition.method')
298        directory_location = folder.glob("**/*" + type_file_name)
299        result = list(directory_location)
300        if len(result) > 1:
301            raise Exception(
302                "You have more than 1 %s file in the %s folder, using the first one"
303                % (type_file_name, folder)
304            )
305
306        elif len(result) == 0:
307            raise Exception(
308                "You don't have any %s file in the  %s folder, please double check the path"
309                % (type_file_name, folder)
310            )
311
312        return result[0]
313
314    @staticmethod
315    def parse_parameters(parameters_filename):
316        """Function to parse the parameters from apexAcquisition.method file
317
318        Open the given file and retrieve all parameters from apexAcquisition.method
319            None is written when no value for value is found
320
321            structure : <param name = "AMS_ActiveExclusion"><value>0</value></param>
322
323        Parameters
324        ----------
325        parameters_filename : str
326            the full path to the apexAcquisition.method file
327
328        Returns
329        -------
330        dict
331            a dictionary with the parameters and values
332
333        Notes
334        -----
335        Adapted from code from SPIKE library, https://github.com/spike-project/spike.
336        Code may not handle all possible parameters, but should be sufficient for most common use cases
337        """
338
339        # TODO: change to beautiful soup xml parsing
340
341        xmldoc = minidom.parse(parameters_filename.open())
342
343        x = xmldoc.documentElement
344        parameter_dict = {}
345        children = x.childNodes
346        for child in children:
347            # print( child.node)
348            if child.nodeName == "methodmetadata":
349                sections = child.childNodes
350                for section in sections:
351                    for element in section.childNodes:
352                        if element.nodeName == "date":
353                            # if element.nodeName == "primarykey":
354
355                            date_time_str = element.childNodes[0].nodeValue
356                            # parameter_dict["acquisition_time"] = pd.to_datetime(date_time_str, infer_datetime_format=True).to_pydatetime()
357                            parameter_dict["acquisition_time"] = datetime.strptime(
358                                date_time_str, "%b_%d_%Y %H:%M:%S.%f"
359                            )
360
361            if child.nodeName == "reportinfo":
362                sections = child.childNodes
363                for section in sections:
364                    if section.nodeName == "section":
365                        if section.getAttribute("title") == "Main":
366                            for element in section.childNodes:
367                                if element.nodeName == "section":
368                                    if element.getAttribute("title") == "Polarity":
369                                        if (
370                                            str(
371                                                element.childNodes[1].getAttribute(
372                                                    "value"
373                                                )
374                                            )
375                                            == "Negative"
376                                        ):
377                                            parameter_dict["Polarity"] = -1
378                                        else:
379                                            parameter_dict["Polarity"] = 1
380
381            if child.nodeName == "paramlist":
382                params = child.childNodes
383                for param in params:
384                    # print( param.nodeName)
385                    if param.nodeName == "param":
386                        paramenter_label = str(param.getAttribute("name"))
387                        for element in param.childNodes:
388                            if element.nodeName == "value":
389                                try:
390                                    parameter_value = str(element.firstChild.toxml())
391                                    # print v
392                                except:
393                                    parameter_value = None
394
395                            parameter_dict[paramenter_label] = parameter_value
396
397        return parameter_dict
398
399    def parse_sqlite(self, sqlite_filename="chromatography-data.sqlite"):
400        """ """
401        import sqlite3
402
403        def read_sqlite_file(file_path, table_name):
404            """
405            Read data from a SQLite database file and return it as a list of tuples
406
407            Parameters
408            ----------
409            file_path : str
410                the full path to the SQLite database file
411            table_name : str
412                the name of the table to be read
413
414            Returns
415            -------
416            list
417                a list of tuples with the data from the table
418            """
419            # Connect to the SQLite database file
420            conn = sqlite3.connect(file_path)
421            cursor = conn.cursor()
422
423            # Execute a query to select data from a table (replace 'table_name' with your table's name)
424            query = f"SELECT * FROM {table_name}"
425            cursor.execute(query)
426
427            # Fetch all rows from the result set
428            rows = cursor.fetchall()
429            stream = []
430            # Print or process the fetched rows
431            for row in rows:
432                stream.append(row)
433                # print(row)  # Print each row, you can also process it differently
434
435            # Close the cursor and the connection
436            cursor.close()
437            conn.close()
438            return stream
439
440        def parse_binary(binary, type):
441            """
442            Parse binary data from the sqlite data streams
443            """
444            if type == "double":
445                data = frombuffer(binary, dtype=float64)
446            elif type == "float":
447                data = frombuffer(binary, dtype=float32)
448            return data
449
450        sqlite_filelocation = self.locate_file(
451            self.d_directory_location, sqlite_filename
452        )
453        table_name = "TraceSources"
454        trace_sources = read_sqlite_file(sqlite_filelocation, table_name)
455        table_name = "TraceChunks"
456        trace_chunks = read_sqlite_file(sqlite_filelocation, table_name)
457        times = []
458        values = []
459        trace_type = {}
460
461        for index, source in enumerate(trace_sources):
462            trace_id = source[0]
463            trace_type[source[1]] = {"times": [], "values": []}
464            for index, chunk in enumerate(trace_chunks):
465                id = chunk[0]
466                times = parse_binary(chunk[1], "double")
467                values = parse_binary(chunk[2], "float")
468                for time, value in zip(times, values):
469                    if source[0] == id:
470                        trace_type[source[1]]["times"].append(time)
471                        trace_type[source[1]]["values"].append(value)
472
473        return trace_type
class ReadBrukerSolarix:
 16class ReadBrukerSolarix(object):
 17    """A class used to Read a single Transient from Bruker's FT-MS acquisition station (fid, or ser)
 18
 19    Parameters
 20    ----------
 21    d_directory_location : str
 22        the full path of the .d folder
 23
 24    Attributes
 25    --------
 26    d_directory_location : str
 27        the full path of the .d folder
 28    file_location : str
 29        the full path of the .d folder
 30    parameter_filename_location : str
 31        the full path of the apexAcquisition.method file
 32    transient_data_path : str
 33        the full path of the fid or ser file
 34    scan_attr : str
 35        the full path of the scan.xml file
 36
 37
 38    Methods
 39    -------
 40    * get_transient().
 41        Read the data and settings returning a Transient class
 42    * get_scan_attr().
 43        Read the scan retention times, TIC values and scan indices.
 44    * locate_file(folder, type_file_name).
 45        Find the full path of a specific file within the acquisition .d folder or subfolders
 46    * parse_parameters(parameters_filename).
 47        Open the given file and retrieve all parameters from apexAcquisition.method
 48    * fix_freq_limits(d_parameters).
 49        Read and set the correct frequency limits for the spectrum
 50    * get_excite_sweep_range(filename).
 51        Determine excitation sweep range from ExciteSweep file
 52
 53    """
 54
 55    def __enter__(self):
 56        return self.get_transient()
 57
 58    def __exit__(self, exc_type, exc_val, exc_tb):
 59        return False
 60
 61    def __init__(self, d_directory_location):
 62        if isinstance(d_directory_location, str):
 63            d_directory_location = Path(d_directory_location)
 64
 65        if not d_directory_location.exists():
 66            raise FileNotFoundError("File does not exist: " + str(d_directory_location))
 67
 68        self.d_directory_location = d_directory_location
 69
 70        self.file_location = d_directory_location
 71
 72        try:
 73            self.parameter_filename_location = self.locate_file(
 74                d_directory_location, "apexAcquisition.method"
 75            )
 76            self.transient_data_path = d_directory_location / "fid"
 77
 78            if not self.transient_data_path.exists():
 79                self.transient_data_path = d_directory_location / "ser"
 80
 81                if not self.transient_data_path.exists():
 82                    raise FileNotFoundError("Could not locate transient data")
 83
 84                else:
 85                    # get scan attributes
 86                    self.scan_attr = d_directory_location / "scan.xml"
 87
 88        except:
 89            raise FileExistsError(
 90                "%s does not seem to be a valid Solarix Mass Spectrum"
 91                % (d_directory_location)
 92            )
 93
 94    def get_scan_attr(self):
 95        """Function to get the scan retention times, TIC values and scan indices.
 96
 97        Gets information from scan.xml file in the bruker .d folder.
 98        Note this file is only present in some .d format - e.g. for imaging mode data, it is not present.
 99
100        Returns
101        -------
102        dict_scan_rt_tic : dict
103            a dictionary with scan number as key and rt and tic as values
104        """
105
106        from bs4 import BeautifulSoup
107
108        try:
109            soup = BeautifulSoup(self.scan_attr.open(), "xml")
110        except:
111            raise FileNotFoundError(
112                "Dataset does not appear to contain a 'scan.xml' file or it is misformated"
113            )
114
115        list_rt = [float(rt.text) for rt in soup.find_all("minutes")]
116        list_tic = [float(tic.text) for tic in soup.find_all("tic")]
117        list_scan = [int(scan.text) for scan in soup.find_all("count")]
118
119        dict_scan_rt_tic = dict(zip(list_scan, zip(list_rt, list_tic)))
120
121        return dict_scan_rt_tic
122
123    def get_transient(self, scan_number=1):
124        """Function to get the transient data and parameters from a Bruker Solarix .d folder.
125
126        Parameters
127        ----------
128        scan_number : int
129            the scan number to be read. Default is 1.
130
131        Returns
132        -------
133        Transient
134            a transient object
135        """
136
137        file_d_params = self.parse_parameters(self.parameter_filename_location)
138
139        self.fix_freq_limits(file_d_params)
140
141        from sys import platform
142
143        if platform == "win32":
144            # Windows...
145            dt = dtype("l")
146        else:
147            dt = dtype("i")
148
149        # get rt, scan, and tic from scan.xml file, otherwise  using 0 defaults values
150
151        output_parameters = deepcopy(default_parameters(self.d_directory_location))
152
153        if self.transient_data_path.name == "ser":
154            if self.scan_attr.exists():
155                dict_scan_rt_tic = self.get_scan_attr()
156
157                output_parameters["scan_number"] = scan_number
158
159                output_parameters["rt"] = dict_scan_rt_tic.get(scan_number)[0]
160
161                output_parameters["tic"] = dict_scan_rt_tic.get(scan_number)[1]
162
163        output_parameters["analyzer"] = "ICR"
164
165        output_parameters["label"] = "Bruker_Frequency"
166
167        output_parameters["Aterm"] = float(file_d_params.get("ML1"))
168
169        output_parameters["Bterm"] = float(file_d_params.get("ML2"))
170
171        output_parameters["Cterm"] = float(file_d_params.get("ML3"))
172
173        output_parameters["exc_high_freq"] = float(file_d_params.get("EXC_Freq_High"))
174
175        output_parameters["exc_low_freq"] = float(file_d_params.get("EXC_Freq_Low"))
176        try:
177            output_parameters["qpd_enabled"] = float(file_d_params.get("QPD_Enabled"))
178        except TypeError:  # for older datasets which dont have this variable
179            output_parameters["qpd_enabled"] = 0
180
181        output_parameters["mw_low"] = float(file_d_params.get("MW_low"))
182
183        output_parameters["mw_high"] = float(file_d_params.get("MW_high"))
184
185        output_parameters["bandwidth"] = float(file_d_params.get("SW_h"))
186
187        output_parameters["number_data_points"] = int(file_d_params.get("TD"))
188
189        output_parameters["polarity"] = str(file_d_params.get("Polarity"))
190
191        output_parameters["acquisition_time"] = file_d_params.get("acquisition_time")
192
193        data_points = int(file_d_params.get("TD"))
194
195        scan = output_parameters["scan_number"]
196        from io import BytesIO
197
198        if self.transient_data_path.name == "ser":
199            if isinstance(self.transient_data_path, S3Path):
200                databin = BytesIO(self.transient_data_path.open("rb").read())
201
202            else:
203                databin = self.transient_data_path.open("rb")
204
205            databin.seek((scan - 1) * 4 * data_points)
206            # read scan data and parse to 32int struct
207            data = frombuffer(databin.read(4 * data_points), dtype=dt)
208
209        else:
210            if isinstance(self.transient_data_path, S3Path):
211                data = frombuffer(self.transient_data_path.open("rb").read(), dtype=dt)
212            else:
213                data = fromfile(self.transient_data_path, dtype=dt)
214
215        return Transient(data, output_parameters)
216
217    #    for key, values in default_parameters.items():
218    #        print(key, values)
219    def fix_freq_limits(self, d_parameters):
220        """Function to read and set the correct frequency limits for the spectrum
221
222        Notes
223        --------
224        This is using the excitation limits from the apexAcquisition.method file,
225        which may not match the intended detection limits in edge cases.
226        In default acquisitions, excitation and detection are the same.
227        But, they may not be in some cases with selective excitation, custom excite waveforms, or in 2DMS applications.
228
229        Parameters
230        ----------
231        d_parameters : dict
232            a dictionary with the parameters from the apexAcquisition.method file
233        """
234
235        highfreq = float(d_parameters.get("EXC_Freq_High"))
236
237        lowfreq = float(d_parameters.get("EXC_Freq_Low"))
238
239        # CR for compatibility with Apex format as there is no EXciteSweep file
240        if not highfreq and lowfreq:
241            excitation_sweep_filelocation = self.locate_file(
242                self.d_directory_location, "ExciteSweep"
243            )
244            lowfreq, highfreq = self.get_excite_sweep_range(
245                excitation_sweep_filelocation
246            )
247            d_parameters["EXC_Freq_High"] = highfreq
248            d_parameters["EXC_Freq_Low"] = lowfreq
249
250    @staticmethod
251    def get_excite_sweep_range(filename):
252        """Function to determine excitation sweep range from ExciteSweep file
253
254        This looks at the first and last rows of the ExciteSweep file to determine the excitation frequency range.
255        Note that this assumes the excitation sweep was linear and the first and last rows are the lowest and highest frequencies.
256        This is presumably always true, but again may be incorrect for edge cases with custom excitation waveforms.
257
258        Parameters
259        ----------
260        filename : str
261            the full path to the ExciteSweep file
262
263        """
264        ExciteSweep_lines = genfromtxt(filename, comments="*", delimiter="\n")
265        # CR ready if we need the full array
266        highfreq = fromstring(ExciteSweep_lines[0])
267        lowfreq = fromstring(ExciteSweep_lines[-1])
268
269        return lowfreq[0], highfreq[0]
270
271    @staticmethod
272    def locate_file(folder, type_file_name="apexAcquisition.method"):
273        """Function to locate a file in a folder
274
275        Find the full path of a specific file within the acquisition .d folder or subfolders
276
277        Parameters
278        ----------
279        folder : str
280            the full path to the folder
281        type_file_name : str
282            the name of the file to be located
283            Expected options: ExciteSweep or apexAcquisition.method
284
285        Returns
286        -------
287        str
288            the full path to the file
289
290        Notes
291        -----
292        adapted from code from SPIKE library, https://github.com/spike-project/spike
293
294        """
295
296        from pathlib import Path
297
298        # directory_location = folder.glob( '**/*apexAcquisition.method')
299        directory_location = folder.glob("**/*" + type_file_name)
300        result = list(directory_location)
301        if len(result) > 1:
302            raise Exception(
303                "You have more than 1 %s file in the %s folder, using the first one"
304                % (type_file_name, folder)
305            )
306
307        elif len(result) == 0:
308            raise Exception(
309                "You don't have any %s file in the  %s folder, please double check the path"
310                % (type_file_name, folder)
311            )
312
313        return result[0]
314
315    @staticmethod
316    def parse_parameters(parameters_filename):
317        """Function to parse the parameters from apexAcquisition.method file
318
319        Open the given file and retrieve all parameters from apexAcquisition.method
320            None is written when no value for value is found
321
322            structure : <param name = "AMS_ActiveExclusion"><value>0</value></param>
323
324        Parameters
325        ----------
326        parameters_filename : str
327            the full path to the apexAcquisition.method file
328
329        Returns
330        -------
331        dict
332            a dictionary with the parameters and values
333
334        Notes
335        -----
336        Adapted from code from SPIKE library, https://github.com/spike-project/spike.
337        Code may not handle all possible parameters, but should be sufficient for most common use cases
338        """
339
340        # TODO: change to beautiful soup xml parsing
341
342        xmldoc = minidom.parse(parameters_filename.open())
343
344        x = xmldoc.documentElement
345        parameter_dict = {}
346        children = x.childNodes
347        for child in children:
348            # print( child.node)
349            if child.nodeName == "methodmetadata":
350                sections = child.childNodes
351                for section in sections:
352                    for element in section.childNodes:
353                        if element.nodeName == "date":
354                            # if element.nodeName == "primarykey":
355
356                            date_time_str = element.childNodes[0].nodeValue
357                            # parameter_dict["acquisition_time"] = pd.to_datetime(date_time_str, infer_datetime_format=True).to_pydatetime()
358                            parameter_dict["acquisition_time"] = datetime.strptime(
359                                date_time_str, "%b_%d_%Y %H:%M:%S.%f"
360                            )
361
362            if child.nodeName == "reportinfo":
363                sections = child.childNodes
364                for section in sections:
365                    if section.nodeName == "section":
366                        if section.getAttribute("title") == "Main":
367                            for element in section.childNodes:
368                                if element.nodeName == "section":
369                                    if element.getAttribute("title") == "Polarity":
370                                        if (
371                                            str(
372                                                element.childNodes[1].getAttribute(
373                                                    "value"
374                                                )
375                                            )
376                                            == "Negative"
377                                        ):
378                                            parameter_dict["Polarity"] = -1
379                                        else:
380                                            parameter_dict["Polarity"] = 1
381
382            if child.nodeName == "paramlist":
383                params = child.childNodes
384                for param in params:
385                    # print( param.nodeName)
386                    if param.nodeName == "param":
387                        paramenter_label = str(param.getAttribute("name"))
388                        for element in param.childNodes:
389                            if element.nodeName == "value":
390                                try:
391                                    parameter_value = str(element.firstChild.toxml())
392                                    # print v
393                                except:
394                                    parameter_value = None
395
396                            parameter_dict[paramenter_label] = parameter_value
397
398        return parameter_dict
399
400    def parse_sqlite(self, sqlite_filename="chromatography-data.sqlite"):
401        """ """
402        import sqlite3
403
404        def read_sqlite_file(file_path, table_name):
405            """
406            Read data from a SQLite database file and return it as a list of tuples
407
408            Parameters
409            ----------
410            file_path : str
411                the full path to the SQLite database file
412            table_name : str
413                the name of the table to be read
414
415            Returns
416            -------
417            list
418                a list of tuples with the data from the table
419            """
420            # Connect to the SQLite database file
421            conn = sqlite3.connect(file_path)
422            cursor = conn.cursor()
423
424            # Execute a query to select data from a table (replace 'table_name' with your table's name)
425            query = f"SELECT * FROM {table_name}"
426            cursor.execute(query)
427
428            # Fetch all rows from the result set
429            rows = cursor.fetchall()
430            stream = []
431            # Print or process the fetched rows
432            for row in rows:
433                stream.append(row)
434                # print(row)  # Print each row, you can also process it differently
435
436            # Close the cursor and the connection
437            cursor.close()
438            conn.close()
439            return stream
440
441        def parse_binary(binary, type):
442            """
443            Parse binary data from the sqlite data streams
444            """
445            if type == "double":
446                data = frombuffer(binary, dtype=float64)
447            elif type == "float":
448                data = frombuffer(binary, dtype=float32)
449            return data
450
451        sqlite_filelocation = self.locate_file(
452            self.d_directory_location, sqlite_filename
453        )
454        table_name = "TraceSources"
455        trace_sources = read_sqlite_file(sqlite_filelocation, table_name)
456        table_name = "TraceChunks"
457        trace_chunks = read_sqlite_file(sqlite_filelocation, table_name)
458        times = []
459        values = []
460        trace_type = {}
461
462        for index, source in enumerate(trace_sources):
463            trace_id = source[0]
464            trace_type[source[1]] = {"times": [], "values": []}
465            for index, chunk in enumerate(trace_chunks):
466                id = chunk[0]
467                times = parse_binary(chunk[1], "double")
468                values = parse_binary(chunk[2], "float")
469                for time, value in zip(times, values):
470                    if source[0] == id:
471                        trace_type[source[1]]["times"].append(time)
472                        trace_type[source[1]]["values"].append(value)
473
474        return trace_type

A class used to Read a single Transient from Bruker's FT-MS acquisition station (fid, or ser)

Parameters
  • d_directory_location (str): the full path of the .d folder
Attributes
  • d_directory_location (str): the full path of the .d folder
  • file_location (str): the full path of the .d folder
  • parameter_filename_location (str): the full path of the apexAcquisition.method file
  • transient_data_path (str): the full path of the fid or ser file
  • scan_attr (str): the full path of the scan.xml file
Methods
  • get_transient(). Read the data and settings returning a Transient class
  • get_scan_attr(). Read the scan retention times, TIC values and scan indices.
  • locate_file(folder, type_file_name). Find the full path of a specific file within the acquisition .d folder or subfolders
  • parse_parameters(parameters_filename). Open the given file and retrieve all parameters from apexAcquisition.method
  • fix_freq_limits(d_parameters). Read and set the correct frequency limits for the spectrum
  • get_excite_sweep_range(filename). Determine excitation sweep range from ExciteSweep file
ReadBrukerSolarix(d_directory_location)
61    def __init__(self, d_directory_location):
62        if isinstance(d_directory_location, str):
63            d_directory_location = Path(d_directory_location)
64
65        if not d_directory_location.exists():
66            raise FileNotFoundError("File does not exist: " + str(d_directory_location))
67
68        self.d_directory_location = d_directory_location
69
70        self.file_location = d_directory_location
71
72        try:
73            self.parameter_filename_location = self.locate_file(
74                d_directory_location, "apexAcquisition.method"
75            )
76            self.transient_data_path = d_directory_location / "fid"
77
78            if not self.transient_data_path.exists():
79                self.transient_data_path = d_directory_location / "ser"
80
81                if not self.transient_data_path.exists():
82                    raise FileNotFoundError("Could not locate transient data")
83
84                else:
85                    # get scan attributes
86                    self.scan_attr = d_directory_location / "scan.xml"
87
88        except:
89            raise FileExistsError(
90                "%s does not seem to be a valid Solarix Mass Spectrum"
91                % (d_directory_location)
92            )
d_directory_location
file_location
def get_scan_attr(self):
 94    def get_scan_attr(self):
 95        """Function to get the scan retention times, TIC values and scan indices.
 96
 97        Gets information from scan.xml file in the bruker .d folder.
 98        Note this file is only present in some .d format - e.g. for imaging mode data, it is not present.
 99
100        Returns
101        -------
102        dict_scan_rt_tic : dict
103            a dictionary with scan number as key and rt and tic as values
104        """
105
106        from bs4 import BeautifulSoup
107
108        try:
109            soup = BeautifulSoup(self.scan_attr.open(), "xml")
110        except:
111            raise FileNotFoundError(
112                "Dataset does not appear to contain a 'scan.xml' file or it is misformated"
113            )
114
115        list_rt = [float(rt.text) for rt in soup.find_all("minutes")]
116        list_tic = [float(tic.text) for tic in soup.find_all("tic")]
117        list_scan = [int(scan.text) for scan in soup.find_all("count")]
118
119        dict_scan_rt_tic = dict(zip(list_scan, zip(list_rt, list_tic)))
120
121        return dict_scan_rt_tic

Function to get the scan retention times, TIC values and scan indices.

Gets information from scan.xml file in the bruker .d folder. Note this file is only present in some .d format - e.g. for imaging mode data, it is not present.

Returns
  • dict_scan_rt_tic (dict): a dictionary with scan number as key and rt and tic as values
def get_transient(self, scan_number=1):
123    def get_transient(self, scan_number=1):
124        """Function to get the transient data and parameters from a Bruker Solarix .d folder.
125
126        Parameters
127        ----------
128        scan_number : int
129            the scan number to be read. Default is 1.
130
131        Returns
132        -------
133        Transient
134            a transient object
135        """
136
137        file_d_params = self.parse_parameters(self.parameter_filename_location)
138
139        self.fix_freq_limits(file_d_params)
140
141        from sys import platform
142
143        if platform == "win32":
144            # Windows...
145            dt = dtype("l")
146        else:
147            dt = dtype("i")
148
149        # get rt, scan, and tic from scan.xml file, otherwise  using 0 defaults values
150
151        output_parameters = deepcopy(default_parameters(self.d_directory_location))
152
153        if self.transient_data_path.name == "ser":
154            if self.scan_attr.exists():
155                dict_scan_rt_tic = self.get_scan_attr()
156
157                output_parameters["scan_number"] = scan_number
158
159                output_parameters["rt"] = dict_scan_rt_tic.get(scan_number)[0]
160
161                output_parameters["tic"] = dict_scan_rt_tic.get(scan_number)[1]
162
163        output_parameters["analyzer"] = "ICR"
164
165        output_parameters["label"] = "Bruker_Frequency"
166
167        output_parameters["Aterm"] = float(file_d_params.get("ML1"))
168
169        output_parameters["Bterm"] = float(file_d_params.get("ML2"))
170
171        output_parameters["Cterm"] = float(file_d_params.get("ML3"))
172
173        output_parameters["exc_high_freq"] = float(file_d_params.get("EXC_Freq_High"))
174
175        output_parameters["exc_low_freq"] = float(file_d_params.get("EXC_Freq_Low"))
176        try:
177            output_parameters["qpd_enabled"] = float(file_d_params.get("QPD_Enabled"))
178        except TypeError:  # for older datasets which dont have this variable
179            output_parameters["qpd_enabled"] = 0
180
181        output_parameters["mw_low"] = float(file_d_params.get("MW_low"))
182
183        output_parameters["mw_high"] = float(file_d_params.get("MW_high"))
184
185        output_parameters["bandwidth"] = float(file_d_params.get("SW_h"))
186
187        output_parameters["number_data_points"] = int(file_d_params.get("TD"))
188
189        output_parameters["polarity"] = str(file_d_params.get("Polarity"))
190
191        output_parameters["acquisition_time"] = file_d_params.get("acquisition_time")
192
193        data_points = int(file_d_params.get("TD"))
194
195        scan = output_parameters["scan_number"]
196        from io import BytesIO
197
198        if self.transient_data_path.name == "ser":
199            if isinstance(self.transient_data_path, S3Path):
200                databin = BytesIO(self.transient_data_path.open("rb").read())
201
202            else:
203                databin = self.transient_data_path.open("rb")
204
205            databin.seek((scan - 1) * 4 * data_points)
206            # read scan data and parse to 32int struct
207            data = frombuffer(databin.read(4 * data_points), dtype=dt)
208
209        else:
210            if isinstance(self.transient_data_path, S3Path):
211                data = frombuffer(self.transient_data_path.open("rb").read(), dtype=dt)
212            else:
213                data = fromfile(self.transient_data_path, dtype=dt)
214
215        return Transient(data, output_parameters)

Function to get the transient data and parameters from a Bruker Solarix .d folder.

Parameters
  • scan_number (int): the scan number to be read. Default is 1.
Returns
  • Transient: a transient object
def fix_freq_limits(self, d_parameters):
219    def fix_freq_limits(self, d_parameters):
220        """Function to read and set the correct frequency limits for the spectrum
221
222        Notes
223        --------
224        This is using the excitation limits from the apexAcquisition.method file,
225        which may not match the intended detection limits in edge cases.
226        In default acquisitions, excitation and detection are the same.
227        But, they may not be in some cases with selective excitation, custom excite waveforms, or in 2DMS applications.
228
229        Parameters
230        ----------
231        d_parameters : dict
232            a dictionary with the parameters from the apexAcquisition.method file
233        """
234
235        highfreq = float(d_parameters.get("EXC_Freq_High"))
236
237        lowfreq = float(d_parameters.get("EXC_Freq_Low"))
238
239        # CR for compatibility with Apex format as there is no EXciteSweep file
240        if not highfreq and lowfreq:
241            excitation_sweep_filelocation = self.locate_file(
242                self.d_directory_location, "ExciteSweep"
243            )
244            lowfreq, highfreq = self.get_excite_sweep_range(
245                excitation_sweep_filelocation
246            )
247            d_parameters["EXC_Freq_High"] = highfreq
248            d_parameters["EXC_Freq_Low"] = lowfreq

Function to read and set the correct frequency limits for the spectrum

Notes

This is using the excitation limits from the apexAcquisition.method file, which may not match the intended detection limits in edge cases. In default acquisitions, excitation and detection are the same. But, they may not be in some cases with selective excitation, custom excite waveforms, or in 2DMS applications.

Parameters
  • d_parameters (dict): a dictionary with the parameters from the apexAcquisition.method file
@staticmethod
def get_excite_sweep_range(filename):
250    @staticmethod
251    def get_excite_sweep_range(filename):
252        """Function to determine excitation sweep range from ExciteSweep file
253
254        This looks at the first and last rows of the ExciteSweep file to determine the excitation frequency range.
255        Note that this assumes the excitation sweep was linear and the first and last rows are the lowest and highest frequencies.
256        This is presumably always true, but again may be incorrect for edge cases with custom excitation waveforms.
257
258        Parameters
259        ----------
260        filename : str
261            the full path to the ExciteSweep file
262
263        """
264        ExciteSweep_lines = genfromtxt(filename, comments="*", delimiter="\n")
265        # CR ready if we need the full array
266        highfreq = fromstring(ExciteSweep_lines[0])
267        lowfreq = fromstring(ExciteSweep_lines[-1])
268
269        return lowfreq[0], highfreq[0]

Function to determine excitation sweep range from ExciteSweep file

This looks at the first and last rows of the ExciteSweep file to determine the excitation frequency range. Note that this assumes the excitation sweep was linear and the first and last rows are the lowest and highest frequencies. This is presumably always true, but again may be incorrect for edge cases with custom excitation waveforms.

Parameters
  • filename (str): the full path to the ExciteSweep file
@staticmethod
def locate_file(folder, type_file_name='apexAcquisition.method'):
271    @staticmethod
272    def locate_file(folder, type_file_name="apexAcquisition.method"):
273        """Function to locate a file in a folder
274
275        Find the full path of a specific file within the acquisition .d folder or subfolders
276
277        Parameters
278        ----------
279        folder : str
280            the full path to the folder
281        type_file_name : str
282            the name of the file to be located
283            Expected options: ExciteSweep or apexAcquisition.method
284
285        Returns
286        -------
287        str
288            the full path to the file
289
290        Notes
291        -----
292        adapted from code from SPIKE library, https://github.com/spike-project/spike
293
294        """
295
296        from pathlib import Path
297
298        # directory_location = folder.glob( '**/*apexAcquisition.method')
299        directory_location = folder.glob("**/*" + type_file_name)
300        result = list(directory_location)
301        if len(result) > 1:
302            raise Exception(
303                "You have more than 1 %s file in the %s folder, using the first one"
304                % (type_file_name, folder)
305            )
306
307        elif len(result) == 0:
308            raise Exception(
309                "You don't have any %s file in the  %s folder, please double check the path"
310                % (type_file_name, folder)
311            )
312
313        return result[0]

Function to locate a file in a folder

Find the full path of a specific file within the acquisition .d folder or subfolders

Parameters
  • folder (str): the full path to the folder
  • type_file_name (str): the name of the file to be located Expected options: ExciteSweep or apexAcquisition.method
Returns
  • str: the full path to the file
Notes

adapted from code from SPIKE library, https://github.com/spike-project/spike

@staticmethod
def parse_parameters(parameters_filename):
315    @staticmethod
316    def parse_parameters(parameters_filename):
317        """Function to parse the parameters from apexAcquisition.method file
318
319        Open the given file and retrieve all parameters from apexAcquisition.method
320            None is written when no value for value is found
321
322            structure : <param name = "AMS_ActiveExclusion"><value>0</value></param>
323
324        Parameters
325        ----------
326        parameters_filename : str
327            the full path to the apexAcquisition.method file
328
329        Returns
330        -------
331        dict
332            a dictionary with the parameters and values
333
334        Notes
335        -----
336        Adapted from code from SPIKE library, https://github.com/spike-project/spike.
337        Code may not handle all possible parameters, but should be sufficient for most common use cases
338        """
339
340        # TODO: change to beautiful soup xml parsing
341
342        xmldoc = minidom.parse(parameters_filename.open())
343
344        x = xmldoc.documentElement
345        parameter_dict = {}
346        children = x.childNodes
347        for child in children:
348            # print( child.node)
349            if child.nodeName == "methodmetadata":
350                sections = child.childNodes
351                for section in sections:
352                    for element in section.childNodes:
353                        if element.nodeName == "date":
354                            # if element.nodeName == "primarykey":
355
356                            date_time_str = element.childNodes[0].nodeValue
357                            # parameter_dict["acquisition_time"] = pd.to_datetime(date_time_str, infer_datetime_format=True).to_pydatetime()
358                            parameter_dict["acquisition_time"] = datetime.strptime(
359                                date_time_str, "%b_%d_%Y %H:%M:%S.%f"
360                            )
361
362            if child.nodeName == "reportinfo":
363                sections = child.childNodes
364                for section in sections:
365                    if section.nodeName == "section":
366                        if section.getAttribute("title") == "Main":
367                            for element in section.childNodes:
368                                if element.nodeName == "section":
369                                    if element.getAttribute("title") == "Polarity":
370                                        if (
371                                            str(
372                                                element.childNodes[1].getAttribute(
373                                                    "value"
374                                                )
375                                            )
376                                            == "Negative"
377                                        ):
378                                            parameter_dict["Polarity"] = -1
379                                        else:
380                                            parameter_dict["Polarity"] = 1
381
382            if child.nodeName == "paramlist":
383                params = child.childNodes
384                for param in params:
385                    # print( param.nodeName)
386                    if param.nodeName == "param":
387                        paramenter_label = str(param.getAttribute("name"))
388                        for element in param.childNodes:
389                            if element.nodeName == "value":
390                                try:
391                                    parameter_value = str(element.firstChild.toxml())
392                                    # print v
393                                except:
394                                    parameter_value = None
395
396                            parameter_dict[paramenter_label] = parameter_value
397
398        return parameter_dict

Function to parse the parameters from apexAcquisition.method file

Open the given file and retrieve all parameters from apexAcquisition.method None is written when no value for value is found

structure : <param name = "AMS_ActiveExclusion"><value>0</value></param>
Parameters
  • parameters_filename (str): the full path to the apexAcquisition.method file
Returns
  • dict: a dictionary with the parameters and values
Notes

Adapted from code from SPIKE library, https://github.com/spike-project/spike. Code may not handle all possible parameters, but should be sufficient for most common use cases

def parse_sqlite(self, sqlite_filename='chromatography-data.sqlite'):
400    def parse_sqlite(self, sqlite_filename="chromatography-data.sqlite"):
401        """ """
402        import sqlite3
403
404        def read_sqlite_file(file_path, table_name):
405            """
406            Read data from a SQLite database file and return it as a list of tuples
407
408            Parameters
409            ----------
410            file_path : str
411                the full path to the SQLite database file
412            table_name : str
413                the name of the table to be read
414
415            Returns
416            -------
417            list
418                a list of tuples with the data from the table
419            """
420            # Connect to the SQLite database file
421            conn = sqlite3.connect(file_path)
422            cursor = conn.cursor()
423
424            # Execute a query to select data from a table (replace 'table_name' with your table's name)
425            query = f"SELECT * FROM {table_name}"
426            cursor.execute(query)
427
428            # Fetch all rows from the result set
429            rows = cursor.fetchall()
430            stream = []
431            # Print or process the fetched rows
432            for row in rows:
433                stream.append(row)
434                # print(row)  # Print each row, you can also process it differently
435
436            # Close the cursor and the connection
437            cursor.close()
438            conn.close()
439            return stream
440
441        def parse_binary(binary, type):
442            """
443            Parse binary data from the sqlite data streams
444            """
445            if type == "double":
446                data = frombuffer(binary, dtype=float64)
447            elif type == "float":
448                data = frombuffer(binary, dtype=float32)
449            return data
450
451        sqlite_filelocation = self.locate_file(
452            self.d_directory_location, sqlite_filename
453        )
454        table_name = "TraceSources"
455        trace_sources = read_sqlite_file(sqlite_filelocation, table_name)
456        table_name = "TraceChunks"
457        trace_chunks = read_sqlite_file(sqlite_filelocation, table_name)
458        times = []
459        values = []
460        trace_type = {}
461
462        for index, source in enumerate(trace_sources):
463            trace_id = source[0]
464            trace_type[source[1]] = {"times": [], "values": []}
465            for index, chunk in enumerate(trace_chunks):
466                id = chunk[0]
467                times = parse_binary(chunk[1], "double")
468                values = parse_binary(chunk[2], "float")
469                for time, value in zip(times, values):
470                    if source[0] == id:
471                        trace_type[source[1]]["times"].append(time)
472                        trace_type[source[1]]["values"].append(value)
473
474        return trace_type