corems.transient.input.brukerSolarix

  1__author__ = "Yuri E. Corilo"
  2__date__ = "Jun 12, 2019"
  3from copy import deepcopy
  4from datetime import datetime
  5from pathlib import Path
  6from xml.dom import minidom
  7
  8from numpy import dtype, float32, float64, frombuffer, fromfile, fromstring, genfromtxt
  9from s3path import S3Path
 10
 11from corems.encapsulation.factory.parameters import default_parameters
 12from corems.transient.factory.TransientClasses import Transient
 13from corems.mass_spectra.input.brukerSolarix_utils import get_scan_attributes
 14
 15
 16class ReadBrukerSolarix(object):
 17    """A class used to Read a single Transient from Bruker's FT-MS acquisition station (fid, or ser)
 18
 19    Parameters
 20    ----------
 21    d_directory_location : str
 22        the full path of the .d folder
 23
 24    Attributes
 25    --------
 26    d_directory_location : str
 27        the full path of the .d folder
 28    file_location : str
 29        the full path of the .d folder
 30    parameter_filename_location : str
 31        the full path of the apexAcquisition.method file
 32    transient_data_path : str
 33        the full path of the fid or ser file
 34    scan_attr : str
 35        the full path of the scan.xml file
 36
 37
 38    Methods
 39    -------
 40    * get_transient().
 41        Read the data and settings returning a Transient class
 42    * get_scan_attr().
 43        Read the scan retention times, TIC values and scan indices.
 44    * locate_file(folder, type_file_name).
 45        Find the full path of a specific file within the acquisition .d folder or subfolders
 46    * parse_parameters(parameters_filename).
 47        Open the given file and retrieve all parameters from apexAcquisition.method
 48    * fix_freq_limits(d_parameters).
 49        Read and set the correct frequency limits for the spectrum
 50    * get_excite_sweep_range(filename).
 51        Determine excitation sweep range from ExciteSweep file
 52
 53    """
 54
 55    def __enter__(self):
 56        return self.get_transient()
 57
 58    def __exit__(self, exc_type, exc_val, exc_tb):
 59        return False
 60
 61    def __init__(self, d_directory_location):
 62        if isinstance(d_directory_location, str):
 63            d_directory_location = Path(d_directory_location)
 64
 65        if not d_directory_location.exists():
 66            raise FileNotFoundError("File does not exist: " + str(d_directory_location))
 67
 68        self.d_directory_location = d_directory_location
 69
 70        self.file_location = d_directory_location
 71
 72        try:
 73            self.parameter_filename_location = self.locate_file(
 74                d_directory_location, "apexAcquisition.method"
 75            )
 76            self.transient_data_path = d_directory_location / "fid"
 77
 78            if not self.transient_data_path.exists():
 79                self.transient_data_path = d_directory_location / "ser"
 80
 81                if not self.transient_data_path.exists():
 82                    raise FileNotFoundError("Could not locate transient data")
 83
 84                else:
 85                    # get scan attributes
 86                    self.scan_attr = d_directory_location / "scan.xml"
 87                    self.imaging_info_attr = d_directory_location / "ImagingInfo.xml"
 88    
 89
 90        except:
 91            raise FileExistsError(
 92                "%s does not seem to be a valid Solarix Mass Spectrum"
 93                % (d_directory_location)
 94            )
 95
 96    def get_scan_attr(self):
 97        """Function to get the scan retention times, TIC values and scan indices.
 98
 99        Gets information from scan.xml file in the bruker .d folder.
100        Note this file is only present in some .d format - e.g. for imaging mode data, it is not present.
101
102        Returns
103        -------
104        dict_scan_rt_tic : dict
105            a dictionary with scan number as key and rt and tic as values
106        """
107
108        return get_scan_attributes(self.scan_attr, self.imaging_info_attr)
109
110
111    def get_transient(self, scan_number=1):
112        """Function to get the transient data and parameters from a Bruker Solarix .d folder.
113
114        Parameters
115        ----------
116        scan_number : int
117            the scan number to be read. Default is 1.
118
119        Returns
120        -------
121        Transient
122            a transient object
123        """
124
125        file_d_params = self.parse_parameters(self.parameter_filename_location)
126
127        self.fix_freq_limits(file_d_params)
128
129        from sys import platform
130
131        if platform == "win32":
132            # Windows...
133            dt = dtype("l")
134        else:
135            dt = dtype("i")
136
137        # get rt, scan, and tic from scan.xml file, otherwise  using 0 defaults values
138
139        output_parameters = deepcopy(default_parameters(self.d_directory_location))
140
141        if self.transient_data_path.name == "ser":
142            dict_scan_rt_tic = self.get_scan_attr()
143
144            output_parameters["scan_number"] = scan_number
145
146            output_parameters["rt"] = dict_scan_rt_tic.get(scan_number)[0]
147
148            output_parameters["tic"] = dict_scan_rt_tic.get(scan_number)[1]
149
150        output_parameters["analyzer"] = "ICR"
151
152        output_parameters["label"] = "Bruker_Frequency"
153
154        output_parameters["Aterm"] = float(file_d_params.get("ML1"))
155
156        output_parameters["Bterm"] = float(file_d_params.get("ML2"))
157
158        output_parameters["Cterm"] = float(file_d_params.get("ML3"))
159
160        output_parameters["exc_high_freq"] = float(file_d_params.get("EXC_Freq_High"))
161
162        output_parameters["exc_low_freq"] = float(file_d_params.get("EXC_Freq_Low"))
163        try:
164            output_parameters["qpd_enabled"] = float(file_d_params.get("QPD_Enabled"))
165        except TypeError:  # for older datasets which dont have this variable
166            output_parameters["qpd_enabled"] = 0
167
168        output_parameters["mw_low"] = float(file_d_params.get("MW_low"))
169
170        output_parameters["mw_high"] = float(file_d_params.get("MW_high"))
171
172        output_parameters["bandwidth"] = float(file_d_params.get("SW_h"))
173
174        output_parameters["number_data_points"] = int(file_d_params.get("TD"))
175
176        output_parameters["polarity"] = str(file_d_params.get("Polarity"))
177
178        output_parameters["acquisition_time"] = file_d_params.get("acquisition_time")
179
180        data_points = int(file_d_params.get("TD"))
181
182        scan = output_parameters["scan_number"]
183        from io import BytesIO
184
185        if self.transient_data_path.name == "ser":
186            if isinstance(self.transient_data_path, S3Path):
187                databin = BytesIO(self.transient_data_path.open("rb").read())
188
189            else:
190                databin = self.transient_data_path.open("rb")
191                
192            databin.seek((scan - 1) * 4 * data_points)
193            # read scan data and parse to 32int struct
194            data = frombuffer(databin.read(4 * data_points), dtype=dt)
195
196        else:
197            if isinstance(self.transient_data_path, S3Path):
198                data = frombuffer(self.transient_data_path.open("rb").read(), dtype=dt)
199            else:
200                data = fromfile(self.transient_data_path, dtype=dt)
201
202        return Transient(data, output_parameters)
203
204    #    for key, values in default_parameters.items():
205    #        print(key, values)
206    def fix_freq_limits(self, d_parameters):
207        """Function to read and set the correct frequency limits for the spectrum
208
209        Notes
210        --------
211        This is using the excitation limits from the apexAcquisition.method file,
212        which may not match the intended detection limits in edge cases.
213        In default acquisitions, excitation and detection are the same.
214        But, they may not be in some cases with selective excitation, custom excite waveforms, or in 2DMS applications.
215
216        Parameters
217        ----------
218        d_parameters : dict
219            a dictionary with the parameters from the apexAcquisition.method file
220        """
221
222        highfreq = float(d_parameters.get("EXC_Freq_High"))
223
224        lowfreq = float(d_parameters.get("EXC_Freq_Low"))
225
226        # CR for compatibility with Apex format as there is no EXciteSweep file
227        if not highfreq and lowfreq:
228            excitation_sweep_filelocation = self.locate_file(
229                self.d_directory_location, "ExciteSweep"
230            )
231            lowfreq, highfreq = self.get_excite_sweep_range(
232                excitation_sweep_filelocation
233            )
234            d_parameters["EXC_Freq_High"] = highfreq
235            d_parameters["EXC_Freq_Low"] = lowfreq
236
237    @staticmethod
238    def get_excite_sweep_range(filename):
239        """Function to determine excitation sweep range from ExciteSweep file
240
241        This looks at the first and last rows of the ExciteSweep file to determine the excitation frequency range.
242        Note that this assumes the excitation sweep was linear and the first and last rows are the lowest and highest frequencies.
243        This is presumably always true, but again may be incorrect for edge cases with custom excitation waveforms.
244
245        Parameters
246        ----------
247        filename : str
248            the full path to the ExciteSweep file
249
250        """
251        ExciteSweep_lines = genfromtxt(filename, comments="*", delimiter="\n")
252        # CR ready if we need the full array
253        highfreq = fromstring(ExciteSweep_lines[0])
254        lowfreq = fromstring(ExciteSweep_lines[-1])
255
256        return lowfreq[0], highfreq[0]
257
258    @staticmethod
259    def locate_file(folder, type_file_name="apexAcquisition.method"):
260        """Function to locate a file in a folder
261
262        Find the full path of a specific file within the acquisition .d folder or subfolders
263
264        Parameters
265        ----------
266        folder : str
267            the full path to the folder
268        type_file_name : str
269            the name of the file to be located
270            Expected options: ExciteSweep or apexAcquisition.method
271
272        Returns
273        -------
274        str
275            the full path to the file
276
277        Notes
278        -----
279        adapted from code from SPIKE library, https://github.com/spike-project/spike
280
281        """
282
283        from pathlib import Path
284
285        # directory_location = folder.glob( '**/*apexAcquisition.method')
286        directory_location = folder.glob("**/*" + type_file_name)
287        result = list(directory_location)
288        if len(result) > 1:
289            raise Exception(
290                "You have more than 1 %s file in the %s folder, using the first one"
291                % (type_file_name, folder)
292            )
293
294        elif len(result) == 0:
295            raise Exception(
296                "You don't have any %s file in the  %s folder, please double check the path"
297                % (type_file_name, folder)
298            )
299
300        return result[0]
301
302    @staticmethod
303    def parse_parameters(parameters_filename):
304        """Function to parse the parameters from apexAcquisition.method file
305
306        Open the given file and retrieve all parameters from apexAcquisition.method
307            None is written when no value for value is found
308
309            structure : <param name = "AMS_ActiveExclusion"><value>0</value></param>
310
311        Parameters
312        ----------
313        parameters_filename : str
314            the full path to the apexAcquisition.method file
315
316        Returns
317        -------
318        dict
319            a dictionary with the parameters and values
320
321        Notes
322        -----
323        Adapted from code from SPIKE library, https://github.com/spike-project/spike.
324        Code may not handle all possible parameters, but should be sufficient for most common use cases
325        """
326
327        # TODO: change to beautiful soup xml parsing
328
329        xmldoc = minidom.parse(parameters_filename.open())
330
331        x = xmldoc.documentElement
332        parameter_dict = {}
333        children = x.childNodes
334        for child in children:
335            # print( child.node)
336            if child.nodeName == "methodmetadata":
337                sections = child.childNodes
338                for section in sections:
339                    for element in section.childNodes:
340                        if element.nodeName == "date":
341                            # if element.nodeName == "primarykey":
342
343                            date_time_str = element.childNodes[0].nodeValue
344                            # parameter_dict["acquisition_time"] = pd.to_datetime(date_time_str, infer_datetime_format=True).to_pydatetime()
345                            parameter_dict["acquisition_time"] = datetime.strptime(
346                                date_time_str, "%b_%d_%Y %H:%M:%S.%f"
347                            )
348
349            if child.nodeName == "reportinfo":
350                sections = child.childNodes
351                for section in sections:
352                    if section.nodeName == "section":
353                        if section.getAttribute("title") == "Main":
354                            for element in section.childNodes:
355                                if element.nodeName == "section":
356                                    if element.getAttribute("title") == "Polarity":
357                                        if (
358                                            str(
359                                                element.childNodes[1].getAttribute(
360                                                    "value"
361                                                )
362                                            )
363                                            == "Negative"
364                                        ):
365                                            parameter_dict["Polarity"] = -1
366                                        else:
367                                            parameter_dict["Polarity"] = 1
368
369            if child.nodeName == "paramlist":
370                params = child.childNodes
371                for param in params:
372                    # print( param.nodeName)
373                    if param.nodeName == "param":
374                        paramenter_label = str(param.getAttribute("name"))
375                        for element in param.childNodes:
376                            if element.nodeName == "value":
377                                try:
378                                    parameter_value = str(element.firstChild.toxml())
379                                    # print v
380                                except:
381                                    parameter_value = None
382
383                            parameter_dict[paramenter_label] = parameter_value
384
385        return parameter_dict
386
387    def parse_sqlite(self, sqlite_filename="chromatography-data.sqlite"):
388        """ """
389        import sqlite3
390
391        def read_sqlite_file(file_path, table_name):
392            """
393            Read data from a SQLite database file and return it as a list of tuples
394
395            Parameters
396            ----------
397            file_path : str
398                the full path to the SQLite database file
399            table_name : str
400                the name of the table to be read
401
402            Returns
403            -------
404            list
405                a list of tuples with the data from the table
406            """
407            # Connect to the SQLite database file
408            conn = sqlite3.connect(file_path)
409            cursor = conn.cursor()
410
411            # Execute a query to select data from a table (replace 'table_name' with your table's name)
412            query = f"SELECT * FROM {table_name}"
413            cursor.execute(query)
414
415            # Fetch all rows from the result set
416            rows = cursor.fetchall()
417            stream = []
418            # Print or process the fetched rows
419            for row in rows:
420                stream.append(row)
421                # print(row)  # Print each row, you can also process it differently
422
423            # Close the cursor and the connection
424            cursor.close()
425            conn.close()
426            return stream
427
428        def parse_binary(binary, type):
429            """
430            Parse binary data from the sqlite data streams
431            """
432            if type == "double":
433                data = frombuffer(binary, dtype=float64)
434            elif type == "float":
435                data = frombuffer(binary, dtype=float32)
436            return data
437
438        sqlite_filelocation = self.locate_file(
439            self.d_directory_location, sqlite_filename
440        )
441        table_name = "TraceSources"
442        trace_sources = read_sqlite_file(sqlite_filelocation, table_name)
443        table_name = "TraceChunks"
444        trace_chunks = read_sqlite_file(sqlite_filelocation, table_name)
445        times = []
446        values = []
447        trace_type = {}
448
449        for index, source in enumerate(trace_sources):
450            trace_id = source[0]
451            trace_type[source[1]] = {"times": [], "values": []}
452            for index, chunk in enumerate(trace_chunks):
453                id = chunk[0]
454                times = parse_binary(chunk[1], "double")
455                values = parse_binary(chunk[2], "float")
456                for time, value in zip(times, values):
457                    if source[0] == id:
458                        trace_type[source[1]]["times"].append(time)
459                        trace_type[source[1]]["values"].append(value)
460
461        return trace_type
class ReadBrukerSolarix:
 17class ReadBrukerSolarix(object):
 18    """A class used to Read a single Transient from Bruker's FT-MS acquisition station (fid, or ser)
 19
 20    Parameters
 21    ----------
 22    d_directory_location : str
 23        the full path of the .d folder
 24
 25    Attributes
 26    --------
 27    d_directory_location : str
 28        the full path of the .d folder
 29    file_location : str
 30        the full path of the .d folder
 31    parameter_filename_location : str
 32        the full path of the apexAcquisition.method file
 33    transient_data_path : str
 34        the full path of the fid or ser file
 35    scan_attr : str
 36        the full path of the scan.xml file
 37
 38
 39    Methods
 40    -------
 41    * get_transient().
 42        Read the data and settings returning a Transient class
 43    * get_scan_attr().
 44        Read the scan retention times, TIC values and scan indices.
 45    * locate_file(folder, type_file_name).
 46        Find the full path of a specific file within the acquisition .d folder or subfolders
 47    * parse_parameters(parameters_filename).
 48        Open the given file and retrieve all parameters from apexAcquisition.method
 49    * fix_freq_limits(d_parameters).
 50        Read and set the correct frequency limits for the spectrum
 51    * get_excite_sweep_range(filename).
 52        Determine excitation sweep range from ExciteSweep file
 53
 54    """
 55
 56    def __enter__(self):
 57        return self.get_transient()
 58
 59    def __exit__(self, exc_type, exc_val, exc_tb):
 60        return False
 61
 62    def __init__(self, d_directory_location):
 63        if isinstance(d_directory_location, str):
 64            d_directory_location = Path(d_directory_location)
 65
 66        if not d_directory_location.exists():
 67            raise FileNotFoundError("File does not exist: " + str(d_directory_location))
 68
 69        self.d_directory_location = d_directory_location
 70
 71        self.file_location = d_directory_location
 72
 73        try:
 74            self.parameter_filename_location = self.locate_file(
 75                d_directory_location, "apexAcquisition.method"
 76            )
 77            self.transient_data_path = d_directory_location / "fid"
 78
 79            if not self.transient_data_path.exists():
 80                self.transient_data_path = d_directory_location / "ser"
 81
 82                if not self.transient_data_path.exists():
 83                    raise FileNotFoundError("Could not locate transient data")
 84
 85                else:
 86                    # get scan attributes
 87                    self.scan_attr = d_directory_location / "scan.xml"
 88                    self.imaging_info_attr = d_directory_location / "ImagingInfo.xml"
 89    
 90
 91        except:
 92            raise FileExistsError(
 93                "%s does not seem to be a valid Solarix Mass Spectrum"
 94                % (d_directory_location)
 95            )
 96
 97    def get_scan_attr(self):
 98        """Function to get the scan retention times, TIC values and scan indices.
 99
100        Gets information from scan.xml file in the bruker .d folder.
101        Note this file is only present in some .d format - e.g. for imaging mode data, it is not present.
102
103        Returns
104        -------
105        dict_scan_rt_tic : dict
106            a dictionary with scan number as key and rt and tic as values
107        """
108
109        return get_scan_attributes(self.scan_attr, self.imaging_info_attr)
110
111
112    def get_transient(self, scan_number=1):
113        """Function to get the transient data and parameters from a Bruker Solarix .d folder.
114
115        Parameters
116        ----------
117        scan_number : int
118            the scan number to be read. Default is 1.
119
120        Returns
121        -------
122        Transient
123            a transient object
124        """
125
126        file_d_params = self.parse_parameters(self.parameter_filename_location)
127
128        self.fix_freq_limits(file_d_params)
129
130        from sys import platform
131
132        if platform == "win32":
133            # Windows...
134            dt = dtype("l")
135        else:
136            dt = dtype("i")
137
138        # get rt, scan, and tic from scan.xml file, otherwise  using 0 defaults values
139
140        output_parameters = deepcopy(default_parameters(self.d_directory_location))
141
142        if self.transient_data_path.name == "ser":
143            dict_scan_rt_tic = self.get_scan_attr()
144
145            output_parameters["scan_number"] = scan_number
146
147            output_parameters["rt"] = dict_scan_rt_tic.get(scan_number)[0]
148
149            output_parameters["tic"] = dict_scan_rt_tic.get(scan_number)[1]
150
151        output_parameters["analyzer"] = "ICR"
152
153        output_parameters["label"] = "Bruker_Frequency"
154
155        output_parameters["Aterm"] = float(file_d_params.get("ML1"))
156
157        output_parameters["Bterm"] = float(file_d_params.get("ML2"))
158
159        output_parameters["Cterm"] = float(file_d_params.get("ML3"))
160
161        output_parameters["exc_high_freq"] = float(file_d_params.get("EXC_Freq_High"))
162
163        output_parameters["exc_low_freq"] = float(file_d_params.get("EXC_Freq_Low"))
164        try:
165            output_parameters["qpd_enabled"] = float(file_d_params.get("QPD_Enabled"))
166        except TypeError:  # for older datasets which dont have this variable
167            output_parameters["qpd_enabled"] = 0
168
169        output_parameters["mw_low"] = float(file_d_params.get("MW_low"))
170
171        output_parameters["mw_high"] = float(file_d_params.get("MW_high"))
172
173        output_parameters["bandwidth"] = float(file_d_params.get("SW_h"))
174
175        output_parameters["number_data_points"] = int(file_d_params.get("TD"))
176
177        output_parameters["polarity"] = str(file_d_params.get("Polarity"))
178
179        output_parameters["acquisition_time"] = file_d_params.get("acquisition_time")
180
181        data_points = int(file_d_params.get("TD"))
182
183        scan = output_parameters["scan_number"]
184        from io import BytesIO
185
186        if self.transient_data_path.name == "ser":
187            if isinstance(self.transient_data_path, S3Path):
188                databin = BytesIO(self.transient_data_path.open("rb").read())
189
190            else:
191                databin = self.transient_data_path.open("rb")
192                
193            databin.seek((scan - 1) * 4 * data_points)
194            # read scan data and parse to 32int struct
195            data = frombuffer(databin.read(4 * data_points), dtype=dt)
196
197        else:
198            if isinstance(self.transient_data_path, S3Path):
199                data = frombuffer(self.transient_data_path.open("rb").read(), dtype=dt)
200            else:
201                data = fromfile(self.transient_data_path, dtype=dt)
202
203        return Transient(data, output_parameters)
204
205    #    for key, values in default_parameters.items():
206    #        print(key, values)
207    def fix_freq_limits(self, d_parameters):
208        """Function to read and set the correct frequency limits for the spectrum
209
210        Notes
211        --------
212        This is using the excitation limits from the apexAcquisition.method file,
213        which may not match the intended detection limits in edge cases.
214        In default acquisitions, excitation and detection are the same.
215        But, they may not be in some cases with selective excitation, custom excite waveforms, or in 2DMS applications.
216
217        Parameters
218        ----------
219        d_parameters : dict
220            a dictionary with the parameters from the apexAcquisition.method file
221        """
222
223        highfreq = float(d_parameters.get("EXC_Freq_High"))
224
225        lowfreq = float(d_parameters.get("EXC_Freq_Low"))
226
227        # CR for compatibility with Apex format as there is no EXciteSweep file
228        if not highfreq and lowfreq:
229            excitation_sweep_filelocation = self.locate_file(
230                self.d_directory_location, "ExciteSweep"
231            )
232            lowfreq, highfreq = self.get_excite_sweep_range(
233                excitation_sweep_filelocation
234            )
235            d_parameters["EXC_Freq_High"] = highfreq
236            d_parameters["EXC_Freq_Low"] = lowfreq
237
238    @staticmethod
239    def get_excite_sweep_range(filename):
240        """Function to determine excitation sweep range from ExciteSweep file
241
242        This looks at the first and last rows of the ExciteSweep file to determine the excitation frequency range.
243        Note that this assumes the excitation sweep was linear and the first and last rows are the lowest and highest frequencies.
244        This is presumably always true, but again may be incorrect for edge cases with custom excitation waveforms.
245
246        Parameters
247        ----------
248        filename : str
249            the full path to the ExciteSweep file
250
251        """
252        ExciteSweep_lines = genfromtxt(filename, comments="*", delimiter="\n")
253        # CR ready if we need the full array
254        highfreq = fromstring(ExciteSweep_lines[0])
255        lowfreq = fromstring(ExciteSweep_lines[-1])
256
257        return lowfreq[0], highfreq[0]
258
259    @staticmethod
260    def locate_file(folder, type_file_name="apexAcquisition.method"):
261        """Function to locate a file in a folder
262
263        Find the full path of a specific file within the acquisition .d folder or subfolders
264
265        Parameters
266        ----------
267        folder : str
268            the full path to the folder
269        type_file_name : str
270            the name of the file to be located
271            Expected options: ExciteSweep or apexAcquisition.method
272
273        Returns
274        -------
275        str
276            the full path to the file
277
278        Notes
279        -----
280        adapted from code from SPIKE library, https://github.com/spike-project/spike
281
282        """
283
284        from pathlib import Path
285
286        # directory_location = folder.glob( '**/*apexAcquisition.method')
287        directory_location = folder.glob("**/*" + type_file_name)
288        result = list(directory_location)
289        if len(result) > 1:
290            raise Exception(
291                "You have more than 1 %s file in the %s folder, using the first one"
292                % (type_file_name, folder)
293            )
294
295        elif len(result) == 0:
296            raise Exception(
297                "You don't have any %s file in the  %s folder, please double check the path"
298                % (type_file_name, folder)
299            )
300
301        return result[0]
302
303    @staticmethod
304    def parse_parameters(parameters_filename):
305        """Function to parse the parameters from apexAcquisition.method file
306
307        Open the given file and retrieve all parameters from apexAcquisition.method
308            None is written when no value for value is found
309
310            structure : <param name = "AMS_ActiveExclusion"><value>0</value></param>
311
312        Parameters
313        ----------
314        parameters_filename : str
315            the full path to the apexAcquisition.method file
316
317        Returns
318        -------
319        dict
320            a dictionary with the parameters and values
321
322        Notes
323        -----
324        Adapted from code from SPIKE library, https://github.com/spike-project/spike.
325        Code may not handle all possible parameters, but should be sufficient for most common use cases
326        """
327
328        # TODO: change to beautiful soup xml parsing
329
330        xmldoc = minidom.parse(parameters_filename.open())
331
332        x = xmldoc.documentElement
333        parameter_dict = {}
334        children = x.childNodes
335        for child in children:
336            # print( child.node)
337            if child.nodeName == "methodmetadata":
338                sections = child.childNodes
339                for section in sections:
340                    for element in section.childNodes:
341                        if element.nodeName == "date":
342                            # if element.nodeName == "primarykey":
343
344                            date_time_str = element.childNodes[0].nodeValue
345                            # parameter_dict["acquisition_time"] = pd.to_datetime(date_time_str, infer_datetime_format=True).to_pydatetime()
346                            parameter_dict["acquisition_time"] = datetime.strptime(
347                                date_time_str, "%b_%d_%Y %H:%M:%S.%f"
348                            )
349
350            if child.nodeName == "reportinfo":
351                sections = child.childNodes
352                for section in sections:
353                    if section.nodeName == "section":
354                        if section.getAttribute("title") == "Main":
355                            for element in section.childNodes:
356                                if element.nodeName == "section":
357                                    if element.getAttribute("title") == "Polarity":
358                                        if (
359                                            str(
360                                                element.childNodes[1].getAttribute(
361                                                    "value"
362                                                )
363                                            )
364                                            == "Negative"
365                                        ):
366                                            parameter_dict["Polarity"] = -1
367                                        else:
368                                            parameter_dict["Polarity"] = 1
369
370            if child.nodeName == "paramlist":
371                params = child.childNodes
372                for param in params:
373                    # print( param.nodeName)
374                    if param.nodeName == "param":
375                        paramenter_label = str(param.getAttribute("name"))
376                        for element in param.childNodes:
377                            if element.nodeName == "value":
378                                try:
379                                    parameter_value = str(element.firstChild.toxml())
380                                    # print v
381                                except:
382                                    parameter_value = None
383
384                            parameter_dict[paramenter_label] = parameter_value
385
386        return parameter_dict
387
388    def parse_sqlite(self, sqlite_filename="chromatography-data.sqlite"):
389        """ """
390        import sqlite3
391
392        def read_sqlite_file(file_path, table_name):
393            """
394            Read data from a SQLite database file and return it as a list of tuples
395
396            Parameters
397            ----------
398            file_path : str
399                the full path to the SQLite database file
400            table_name : str
401                the name of the table to be read
402
403            Returns
404            -------
405            list
406                a list of tuples with the data from the table
407            """
408            # Connect to the SQLite database file
409            conn = sqlite3.connect(file_path)
410            cursor = conn.cursor()
411
412            # Execute a query to select data from a table (replace 'table_name' with your table's name)
413            query = f"SELECT * FROM {table_name}"
414            cursor.execute(query)
415
416            # Fetch all rows from the result set
417            rows = cursor.fetchall()
418            stream = []
419            # Print or process the fetched rows
420            for row in rows:
421                stream.append(row)
422                # print(row)  # Print each row, you can also process it differently
423
424            # Close the cursor and the connection
425            cursor.close()
426            conn.close()
427            return stream
428
429        def parse_binary(binary, type):
430            """
431            Parse binary data from the sqlite data streams
432            """
433            if type == "double":
434                data = frombuffer(binary, dtype=float64)
435            elif type == "float":
436                data = frombuffer(binary, dtype=float32)
437            return data
438
439        sqlite_filelocation = self.locate_file(
440            self.d_directory_location, sqlite_filename
441        )
442        table_name = "TraceSources"
443        trace_sources = read_sqlite_file(sqlite_filelocation, table_name)
444        table_name = "TraceChunks"
445        trace_chunks = read_sqlite_file(sqlite_filelocation, table_name)
446        times = []
447        values = []
448        trace_type = {}
449
450        for index, source in enumerate(trace_sources):
451            trace_id = source[0]
452            trace_type[source[1]] = {"times": [], "values": []}
453            for index, chunk in enumerate(trace_chunks):
454                id = chunk[0]
455                times = parse_binary(chunk[1], "double")
456                values = parse_binary(chunk[2], "float")
457                for time, value in zip(times, values):
458                    if source[0] == id:
459                        trace_type[source[1]]["times"].append(time)
460                        trace_type[source[1]]["values"].append(value)
461
462        return trace_type

A class used to Read a single Transient from Bruker's FT-MS acquisition station (fid, or ser)

Parameters
  • d_directory_location (str): the full path of the .d folder
Attributes
  • d_directory_location (str): the full path of the .d folder
  • file_location (str): the full path of the .d folder
  • parameter_filename_location (str): the full path of the apexAcquisition.method file
  • transient_data_path (str): the full path of the fid or ser file
  • scan_attr (str): the full path of the scan.xml file
Methods
  • get_transient(). Read the data and settings returning a Transient class
  • get_scan_attr(). Read the scan retention times, TIC values and scan indices.
  • locate_file(folder, type_file_name). Find the full path of a specific file within the acquisition .d folder or subfolders
  • parse_parameters(parameters_filename). Open the given file and retrieve all parameters from apexAcquisition.method
  • fix_freq_limits(d_parameters). Read and set the correct frequency limits for the spectrum
  • get_excite_sweep_range(filename). Determine excitation sweep range from ExciteSweep file
ReadBrukerSolarix(d_directory_location)
62    def __init__(self, d_directory_location):
63        if isinstance(d_directory_location, str):
64            d_directory_location = Path(d_directory_location)
65
66        if not d_directory_location.exists():
67            raise FileNotFoundError("File does not exist: " + str(d_directory_location))
68
69        self.d_directory_location = d_directory_location
70
71        self.file_location = d_directory_location
72
73        try:
74            self.parameter_filename_location = self.locate_file(
75                d_directory_location, "apexAcquisition.method"
76            )
77            self.transient_data_path = d_directory_location / "fid"
78
79            if not self.transient_data_path.exists():
80                self.transient_data_path = d_directory_location / "ser"
81
82                if not self.transient_data_path.exists():
83                    raise FileNotFoundError("Could not locate transient data")
84
85                else:
86                    # get scan attributes
87                    self.scan_attr = d_directory_location / "scan.xml"
88                    self.imaging_info_attr = d_directory_location / "ImagingInfo.xml"
89    
90
91        except:
92            raise FileExistsError(
93                "%s does not seem to be a valid Solarix Mass Spectrum"
94                % (d_directory_location)
95            )
d_directory_location
file_location
def get_scan_attr(self):
 97    def get_scan_attr(self):
 98        """Function to get the scan retention times, TIC values and scan indices.
 99
100        Gets information from scan.xml file in the bruker .d folder.
101        Note this file is only present in some .d format - e.g. for imaging mode data, it is not present.
102
103        Returns
104        -------
105        dict_scan_rt_tic : dict
106            a dictionary with scan number as key and rt and tic as values
107        """
108
109        return get_scan_attributes(self.scan_attr, self.imaging_info_attr)

Function to get the scan retention times, TIC values and scan indices.

Gets information from scan.xml file in the bruker .d folder. Note this file is only present in some .d format - e.g. for imaging mode data, it is not present.

Returns
  • dict_scan_rt_tic (dict): a dictionary with scan number as key and rt and tic as values
def get_transient(self, scan_number=1):
112    def get_transient(self, scan_number=1):
113        """Function to get the transient data and parameters from a Bruker Solarix .d folder.
114
115        Parameters
116        ----------
117        scan_number : int
118            the scan number to be read. Default is 1.
119
120        Returns
121        -------
122        Transient
123            a transient object
124        """
125
126        file_d_params = self.parse_parameters(self.parameter_filename_location)
127
128        self.fix_freq_limits(file_d_params)
129
130        from sys import platform
131
132        if platform == "win32":
133            # Windows...
134            dt = dtype("l")
135        else:
136            dt = dtype("i")
137
138        # get rt, scan, and tic from scan.xml file, otherwise  using 0 defaults values
139
140        output_parameters = deepcopy(default_parameters(self.d_directory_location))
141
142        if self.transient_data_path.name == "ser":
143            dict_scan_rt_tic = self.get_scan_attr()
144
145            output_parameters["scan_number"] = scan_number
146
147            output_parameters["rt"] = dict_scan_rt_tic.get(scan_number)[0]
148
149            output_parameters["tic"] = dict_scan_rt_tic.get(scan_number)[1]
150
151        output_parameters["analyzer"] = "ICR"
152
153        output_parameters["label"] = "Bruker_Frequency"
154
155        output_parameters["Aterm"] = float(file_d_params.get("ML1"))
156
157        output_parameters["Bterm"] = float(file_d_params.get("ML2"))
158
159        output_parameters["Cterm"] = float(file_d_params.get("ML3"))
160
161        output_parameters["exc_high_freq"] = float(file_d_params.get("EXC_Freq_High"))
162
163        output_parameters["exc_low_freq"] = float(file_d_params.get("EXC_Freq_Low"))
164        try:
165            output_parameters["qpd_enabled"] = float(file_d_params.get("QPD_Enabled"))
166        except TypeError:  # for older datasets which dont have this variable
167            output_parameters["qpd_enabled"] = 0
168
169        output_parameters["mw_low"] = float(file_d_params.get("MW_low"))
170
171        output_parameters["mw_high"] = float(file_d_params.get("MW_high"))
172
173        output_parameters["bandwidth"] = float(file_d_params.get("SW_h"))
174
175        output_parameters["number_data_points"] = int(file_d_params.get("TD"))
176
177        output_parameters["polarity"] = str(file_d_params.get("Polarity"))
178
179        output_parameters["acquisition_time"] = file_d_params.get("acquisition_time")
180
181        data_points = int(file_d_params.get("TD"))
182
183        scan = output_parameters["scan_number"]
184        from io import BytesIO
185
186        if self.transient_data_path.name == "ser":
187            if isinstance(self.transient_data_path, S3Path):
188                databin = BytesIO(self.transient_data_path.open("rb").read())
189
190            else:
191                databin = self.transient_data_path.open("rb")
192                
193            databin.seek((scan - 1) * 4 * data_points)
194            # read scan data and parse to 32int struct
195            data = frombuffer(databin.read(4 * data_points), dtype=dt)
196
197        else:
198            if isinstance(self.transient_data_path, S3Path):
199                data = frombuffer(self.transient_data_path.open("rb").read(), dtype=dt)
200            else:
201                data = fromfile(self.transient_data_path, dtype=dt)
202
203        return Transient(data, output_parameters)

Function to get the transient data and parameters from a Bruker Solarix .d folder.

Parameters
  • scan_number (int): the scan number to be read. Default is 1.
Returns
  • Transient: a transient object
def fix_freq_limits(self, d_parameters):
207    def fix_freq_limits(self, d_parameters):
208        """Function to read and set the correct frequency limits for the spectrum
209
210        Notes
211        --------
212        This is using the excitation limits from the apexAcquisition.method file,
213        which may not match the intended detection limits in edge cases.
214        In default acquisitions, excitation and detection are the same.
215        But, they may not be in some cases with selective excitation, custom excite waveforms, or in 2DMS applications.
216
217        Parameters
218        ----------
219        d_parameters : dict
220            a dictionary with the parameters from the apexAcquisition.method file
221        """
222
223        highfreq = float(d_parameters.get("EXC_Freq_High"))
224
225        lowfreq = float(d_parameters.get("EXC_Freq_Low"))
226
227        # CR for compatibility with Apex format as there is no EXciteSweep file
228        if not highfreq and lowfreq:
229            excitation_sweep_filelocation = self.locate_file(
230                self.d_directory_location, "ExciteSweep"
231            )
232            lowfreq, highfreq = self.get_excite_sweep_range(
233                excitation_sweep_filelocation
234            )
235            d_parameters["EXC_Freq_High"] = highfreq
236            d_parameters["EXC_Freq_Low"] = lowfreq

Function to read and set the correct frequency limits for the spectrum

Notes

This is using the excitation limits from the apexAcquisition.method file, which may not match the intended detection limits in edge cases. In default acquisitions, excitation and detection are the same. But, they may not be in some cases with selective excitation, custom excite waveforms, or in 2DMS applications.

Parameters
  • d_parameters (dict): a dictionary with the parameters from the apexAcquisition.method file
@staticmethod
def get_excite_sweep_range(filename):
238    @staticmethod
239    def get_excite_sweep_range(filename):
240        """Function to determine excitation sweep range from ExciteSweep file
241
242        This looks at the first and last rows of the ExciteSweep file to determine the excitation frequency range.
243        Note that this assumes the excitation sweep was linear and the first and last rows are the lowest and highest frequencies.
244        This is presumably always true, but again may be incorrect for edge cases with custom excitation waveforms.
245
246        Parameters
247        ----------
248        filename : str
249            the full path to the ExciteSweep file
250
251        """
252        ExciteSweep_lines = genfromtxt(filename, comments="*", delimiter="\n")
253        # CR ready if we need the full array
254        highfreq = fromstring(ExciteSweep_lines[0])
255        lowfreq = fromstring(ExciteSweep_lines[-1])
256
257        return lowfreq[0], highfreq[0]

Function to determine excitation sweep range from ExciteSweep file

This looks at the first and last rows of the ExciteSweep file to determine the excitation frequency range. Note that this assumes the excitation sweep was linear and the first and last rows are the lowest and highest frequencies. This is presumably always true, but again may be incorrect for edge cases with custom excitation waveforms.

Parameters
  • filename (str): the full path to the ExciteSweep file
@staticmethod
def locate_file(folder, type_file_name='apexAcquisition.method'):
259    @staticmethod
260    def locate_file(folder, type_file_name="apexAcquisition.method"):
261        """Function to locate a file in a folder
262
263        Find the full path of a specific file within the acquisition .d folder or subfolders
264
265        Parameters
266        ----------
267        folder : str
268            the full path to the folder
269        type_file_name : str
270            the name of the file to be located
271            Expected options: ExciteSweep or apexAcquisition.method
272
273        Returns
274        -------
275        str
276            the full path to the file
277
278        Notes
279        -----
280        adapted from code from SPIKE library, https://github.com/spike-project/spike
281
282        """
283
284        from pathlib import Path
285
286        # directory_location = folder.glob( '**/*apexAcquisition.method')
287        directory_location = folder.glob("**/*" + type_file_name)
288        result = list(directory_location)
289        if len(result) > 1:
290            raise Exception(
291                "You have more than 1 %s file in the %s folder, using the first one"
292                % (type_file_name, folder)
293            )
294
295        elif len(result) == 0:
296            raise Exception(
297                "You don't have any %s file in the  %s folder, please double check the path"
298                % (type_file_name, folder)
299            )
300
301        return result[0]

Function to locate a file in a folder

Find the full path of a specific file within the acquisition .d folder or subfolders

Parameters
  • folder (str): the full path to the folder
  • type_file_name (str): the name of the file to be located Expected options: ExciteSweep or apexAcquisition.method
Returns
  • str: the full path to the file
Notes

adapted from code from SPIKE library, https://github.com/spike-project/spike

@staticmethod
def parse_parameters(parameters_filename):
303    @staticmethod
304    def parse_parameters(parameters_filename):
305        """Function to parse the parameters from apexAcquisition.method file
306
307        Open the given file and retrieve all parameters from apexAcquisition.method
308            None is written when no value for value is found
309
310            structure : <param name = "AMS_ActiveExclusion"><value>0</value></param>
311
312        Parameters
313        ----------
314        parameters_filename : str
315            the full path to the apexAcquisition.method file
316
317        Returns
318        -------
319        dict
320            a dictionary with the parameters and values
321
322        Notes
323        -----
324        Adapted from code from SPIKE library, https://github.com/spike-project/spike.
325        Code may not handle all possible parameters, but should be sufficient for most common use cases
326        """
327
328        # TODO: change to beautiful soup xml parsing
329
330        xmldoc = minidom.parse(parameters_filename.open())
331
332        x = xmldoc.documentElement
333        parameter_dict = {}
334        children = x.childNodes
335        for child in children:
336            # print( child.node)
337            if child.nodeName == "methodmetadata":
338                sections = child.childNodes
339                for section in sections:
340                    for element in section.childNodes:
341                        if element.nodeName == "date":
342                            # if element.nodeName == "primarykey":
343
344                            date_time_str = element.childNodes[0].nodeValue
345                            # parameter_dict["acquisition_time"] = pd.to_datetime(date_time_str, infer_datetime_format=True).to_pydatetime()
346                            parameter_dict["acquisition_time"] = datetime.strptime(
347                                date_time_str, "%b_%d_%Y %H:%M:%S.%f"
348                            )
349
350            if child.nodeName == "reportinfo":
351                sections = child.childNodes
352                for section in sections:
353                    if section.nodeName == "section":
354                        if section.getAttribute("title") == "Main":
355                            for element in section.childNodes:
356                                if element.nodeName == "section":
357                                    if element.getAttribute("title") == "Polarity":
358                                        if (
359                                            str(
360                                                element.childNodes[1].getAttribute(
361                                                    "value"
362                                                )
363                                            )
364                                            == "Negative"
365                                        ):
366                                            parameter_dict["Polarity"] = -1
367                                        else:
368                                            parameter_dict["Polarity"] = 1
369
370            if child.nodeName == "paramlist":
371                params = child.childNodes
372                for param in params:
373                    # print( param.nodeName)
374                    if param.nodeName == "param":
375                        paramenter_label = str(param.getAttribute("name"))
376                        for element in param.childNodes:
377                            if element.nodeName == "value":
378                                try:
379                                    parameter_value = str(element.firstChild.toxml())
380                                    # print v
381                                except:
382                                    parameter_value = None
383
384                            parameter_dict[paramenter_label] = parameter_value
385
386        return parameter_dict

Function to parse the parameters from apexAcquisition.method file

Open the given file and retrieve all parameters from apexAcquisition.method None is written when no value for value is found

structure : <param name = "AMS_ActiveExclusion"><value>0</value></param>
Parameters
  • parameters_filename (str): the full path to the apexAcquisition.method file
Returns
  • dict: a dictionary with the parameters and values
Notes

Adapted from code from SPIKE library, https://github.com/spike-project/spike. Code may not handle all possible parameters, but should be sufficient for most common use cases

def parse_sqlite(self, sqlite_filename='chromatography-data.sqlite'):
388    def parse_sqlite(self, sqlite_filename="chromatography-data.sqlite"):
389        """ """
390        import sqlite3
391
392        def read_sqlite_file(file_path, table_name):
393            """
394            Read data from a SQLite database file and return it as a list of tuples
395
396            Parameters
397            ----------
398            file_path : str
399                the full path to the SQLite database file
400            table_name : str
401                the name of the table to be read
402
403            Returns
404            -------
405            list
406                a list of tuples with the data from the table
407            """
408            # Connect to the SQLite database file
409            conn = sqlite3.connect(file_path)
410            cursor = conn.cursor()
411
412            # Execute a query to select data from a table (replace 'table_name' with your table's name)
413            query = f"SELECT * FROM {table_name}"
414            cursor.execute(query)
415
416            # Fetch all rows from the result set
417            rows = cursor.fetchall()
418            stream = []
419            # Print or process the fetched rows
420            for row in rows:
421                stream.append(row)
422                # print(row)  # Print each row, you can also process it differently
423
424            # Close the cursor and the connection
425            cursor.close()
426            conn.close()
427            return stream
428
429        def parse_binary(binary, type):
430            """
431            Parse binary data from the sqlite data streams
432            """
433            if type == "double":
434                data = frombuffer(binary, dtype=float64)
435            elif type == "float":
436                data = frombuffer(binary, dtype=float32)
437            return data
438
439        sqlite_filelocation = self.locate_file(
440            self.d_directory_location, sqlite_filename
441        )
442        table_name = "TraceSources"
443        trace_sources = read_sqlite_file(sqlite_filelocation, table_name)
444        table_name = "TraceChunks"
445        trace_chunks = read_sqlite_file(sqlite_filelocation, table_name)
446        times = []
447        values = []
448        trace_type = {}
449
450        for index, source in enumerate(trace_sources):
451            trace_id = source[0]
452            trace_type[source[1]] = {"times": [], "values": []}
453            for index, chunk in enumerate(trace_chunks):
454                id = chunk[0]
455                times = parse_binary(chunk[1], "double")
456                values = parse_binary(chunk[2], "float")
457                for time, value in zip(times, values):
458                    if source[0] == id:
459                        trace_type[source[1]]["times"].append(time)
460                        trace_type[source[1]]["values"].append(value)
461
462        return trace_type