corems.mass_spectrum.input.baseClass

  1__author__ = "Yuri E. Corilo"
  2__date__ = "Nov 11, 2019"
  3
  4from copy import deepcopy
  5from io import BytesIO
  6from pathlib import Path
  7
  8import chardet
  9from bs4 import BeautifulSoup
 10from pandas import read_csv, read_excel, read_pickle
 11from pandas.core.frame import DataFrame
 12from s3path import S3Path
 13
 14from corems.encapsulation.constant import Labels
 15from corems.encapsulation.factory.parameters import default_parameters
 16from corems.encapsulation.factory.processingSetting import DataInputSetting
 17from corems.encapsulation.input.parameter_from_json import (
 18    load_and_set_parameters_class,
 19    load_and_set_parameters_ms,
 20    load_and_set_toml_parameters_class,
 21)
 22
 23
 24class MassListBaseClass:
 25    """The MassListBaseClass object reads mass list data types and returns the mass spectrum obj
 26
 27    Parameters
 28    ----------
 29    file_location : Path or S3Path
 30        Full data path.
 31    isCentroid : bool, optional
 32        Determines the mass spectrum data structure. If set to True, it assumes centroid mode. If set to False, it assumes profile mode and attempts to peak pick. Default is True.
 33    analyzer : str, optional
 34        The analyzer used for the mass spectrum. Default is 'Unknown'.
 35    instrument_label : str, optional
 36        The label of the instrument used for the mass spectrum. Default is 'Unknown'.
 37    sample_name : str, optional
 38        The name of the sample. Default is None.
 39    header_lines : int, optional
 40        The number of lines to skip in the file, including the column labels line. Default is 0.
 41    isThermoProfile : bool, optional
 42        Determines the number of expected columns in the file. If set to True, only m/z and intensity columns are expected. Signal-to-noise ratio (S/N) and resolving power (RP) will be calculated based on the data. Default is False.
 43    headerless : bool, optional
 44        If True, assumes that there are no headers present in the file (e.g., a .xy file from Bruker) and assumes two columns: m/z and intensity. Default is False.
 45
 46    Attributes
 47    ----------
 48    parameters : DataInputSetting
 49        The data input settings for the mass spectrum.
 50    data_type : str
 51        The type of data in the file.
 52    delimiter : str
 53        The delimiter used to read text-based files.
 54
 55    Methods
 56    -------
 57    * set_parameter_from_toml(parameters_path). Sets the data input settings from a TOML file.
 58    * set_parameter_from_json(parameters_path). Sets the data input settings from a JSON file.
 59    * get_dataframe(). Reads the file and returns the data as a pandas DataFrame.
 60    * load_settings(mass_spec_obj, output_parameters). Loads the settings for the mass spectrum.
 61    * get_output_parameters(polarity, scan_index=0). Returns the output parameters for the mass spectrum.
 62    * clean_data_frame(dataframe). Cleans the data frame by removing columns that are not in the expected columns set.
 63
 64    """
 65
 66    def __init__(
 67        self,
 68        file_location: Path | S3Path,
 69        isCentroid: bool = True,
 70        analyzer: str = "Unknown",
 71        instrument_label: str = "Unknown",
 72        sample_name: str = None,
 73        header_lines: int = 0,
 74        isThermoProfile: bool = False,
 75        headerless: bool = False,
 76    ):
 77        self.file_location = (
 78            Path(file_location) if isinstance(file_location, str) else file_location
 79        )
 80
 81        if not self.file_location.exists():
 82            raise FileExistsError("File does not exist: %s" % file_location)
 83
 84        # (newline="\n")
 85
 86        self.header_lines = header_lines
 87
 88        if isThermoProfile:
 89            self._expected_columns = {Labels.mz, Labels.abundance}
 90
 91        else:
 92            self._expected_columns = {
 93                Labels.mz,
 94                Labels.abundance,
 95                Labels.s2n,
 96                Labels.rp,
 97            }
 98
 99        self._delimiter = None
100
101        self.isCentroid = isCentroid
102
103        self.isThermoProfile = isThermoProfile
104
105        self.headerless = headerless
106
107        self._data_type = None
108
109        self.analyzer = analyzer
110
111        self.instrument_label = instrument_label
112
113        self.sample_name = sample_name
114
115        self._parameters = deepcopy(DataInputSetting())
116
117    @property
118    def parameters(self):
119        return self._parameters
120
121    @parameters.setter
122    def parameters(self, instance_DataInputSetting):
123        self._parameters = instance_DataInputSetting
124
125    def set_parameter_from_toml(self, parameters_path):
126        self._parameters = load_and_set_toml_parameters_class(
127            "DataInput", self.parameters, parameters_path=parameters_path
128        )
129
130    def set_parameter_from_json(self, parameters_path):
131        self._parameters = load_and_set_parameters_class(
132            "DataInput", self.parameters, parameters_path=parameters_path
133        )
134
135    @property
136    def data_type(self):
137        return self._data_type
138
139    @data_type.setter
140    def data_type(self, data_type):
141        self._data_type = data_type
142
143    @property
144    def delimiter(self):
145        return self._delimiter
146
147    @delimiter.setter
148    def delimiter(self, delimiter):
149        self._delimiter = delimiter
150
151    def encoding_detector(self, file_location) -> str:
152        """
153        Detects the encoding of a file.
154
155        Parameters
156        --------
157        file_location : str
158            The location of the file to be analyzed.
159
160        Returns
161        --------
162        str
163            The detected encoding of the file.
164        """
165
166        with file_location.open("rb") as rawdata:
167            result = chardet.detect(rawdata.read(10000))
168        return result["encoding"]
169
170    def set_data_type(self):
171        """
172        Set the data type and delimiter based on the file extension.
173
174        Raises
175        ------
176        TypeError
177            If the data type could not be automatically recognized.
178        """
179        if self.file_location.suffix == ".csv":
180            self.data_type = "txt"
181            self.delimiter = ","
182        elif self.file_location.suffix == ".txt":
183            self.data_type = "txt"
184            self.delimiter = "\t"
185        elif self.file_location.suffix == ".tsv":
186            self.data_type = "txt"
187            self.delimiter = "\t"
188        elif self.file_location.suffix == ".xlsx":
189            self.data_type = "excel"
190        elif self.file_location.suffix == ".ascii":
191            self.data_type = "txt"
192            self.delimiter = "  "
193        elif self.file_location.suffix == ".pkl":
194            self.data_type = "dataframe"
195        elif self.file_location.suffix == ".pks":
196            self.data_type = "pks"
197            self.delimiter = "          "
198            self.header_lines = 9
199        elif self.file_location.suffix == ".xml":
200            self.data_type = "xml"
201            # self.delimiter = None
202            # self.header_lines = None
203        elif self.file_location.suffix == ".xy":
204            self.data_type = "txt"
205            self.delimiter = " "
206            self.header_lines = None
207        else:
208            raise TypeError(
209                "Data type could not be automatically recognized for %s; please set data type and delimiter manually."
210                % self.file_location.name
211            )
212
213    def get_dataframe(self) -> DataFrame:
214        """
215        Get the data as a pandas DataFrame.
216
217        Returns
218        -------
219        pandas.DataFrame
220            The data as a pandas DataFrame.
221
222        Raises
223        ------
224        TypeError
225            If the data type is not supported.
226        """
227
228        if not self.data_type or not self.delimiter:
229            self.set_data_type()
230
231        if isinstance(self.file_location, S3Path):
232            data = BytesIO(self.file_location.open("rb").read())
233        else:
234            data = self.file_location
235
236        if self.data_type == "txt":
237            if self.headerless:
238                dataframe = read_csv(
239                    data,
240                    skiprows=self.header_lines,
241                    delimiter=self.delimiter,
242                    header=None,
243                    names=["m/z", "I"],
244                    encoding=self.encoding_detector(self.file_location),
245                    engine="python",
246                )
247            else:
248                dataframe = read_csv(
249                    data,
250                    skiprows=self.header_lines,
251                    delimiter=self.delimiter,
252                    encoding=self.encoding_detector(self.file_location),
253                    engine="python",
254                )
255
256        elif self.data_type == "pks":
257            names = [
258                "m/z",
259                "I",
260                "Scaled Peak Height",
261                "Resolving Power",
262                "Frequency",
263                "S/N",
264            ]
265            clean_data = []
266            with self.file_location.open() as maglabfile:
267                for i in maglabfile.readlines()[8:-1]:
268                    clean_data.append(i.split())
269            dataframe = DataFrame(clean_data, columns=names)
270
271        elif self.data_type == "dataframe":
272            dataframe = read_pickle(data)
273
274        elif self.data_type == "excel":
275            dataframe = read_excel(data)
276
277        elif self.data_type == "xml":
278            dataframe = self.read_xml_peaks(data)
279
280        else:
281            raise TypeError("Data type %s is not supported" % self.data_type)
282
283        return dataframe
284
285    def load_settings(self, mass_spec_obj, output_parameters):
286        """
287        #TODO loading output parameters from json file is not functional
288        Load settings from a JSON file and apply them to the given mass_spec_obj.
289
290        Parameters
291        ----------
292        mass_spec_obj : MassSpec
293            The mass spectrum object to apply the settings to.
294
295        """
296        import json
297        import warnings
298
299        settings_file_path = self.file_location.with_suffix(".json")
300
301        if settings_file_path.exists():
302            self._parameters = load_and_set_parameters_class(
303                "DataInput", self._parameters, parameters_path=settings_file_path
304            )
305
306            load_and_set_parameters_ms(
307                mass_spec_obj, parameters_path=settings_file_path
308            )
309
310        else:
311            warnings.warn(
312                "auto settings loading is enabled but could not locate the file:  %s. Please load the settings manually"
313                % settings_file_path
314            )
315
316        # TODO this will load the setting from SettingCoreMS.json
317        # coreMSHFD5 overrides this function to import the attrs stored in the h5 file
318        # loaded_settings = {}
319        # loaded_settings['MoleculaSearch'] = self.get_scan_group_attr_data(scan_index,  time_index, 'MoleculaSearchSetting')
320        # loaded_settings['MassSpecPeak'] = self.get_scan_group_attr_data(scan_index,  time_index, 'MassSpecPeakSetting')
321
322        # loaded_settings['MassSpectrum'] = self.get_scan_group_attr_data(scan_index, time_index, 'MassSpectrumSetting')
323        # loaded_settings['Transient'] = self.get_scan_group_attr_data(scan_index, time_index, 'TransientSetting')
324
325    def get_output_parameters(self, polarity: int, scan_index: int = 0) -> dict:
326        """
327        Get the output parameters for the mass spectrum.
328
329        Parameters
330        ----------
331        polarity : int
332            The polarity of the mass spectrum +1 or -1.
333        scan_index : int, optional
334            The index of the scan. Default is 0.
335
336        Returns
337        -------
338        dict
339            A dictionary containing the output parameters.
340
341        """
342        from copy import deepcopy
343
344        output_parameters = default_parameters(self.file_location)
345
346        if self.isCentroid:
347            output_parameters["label"] = Labels.corems_centroid
348        else:
349            output_parameters["label"] = Labels.bruker_profile
350
351        output_parameters["analyzer"] = self.analyzer
352
353        output_parameters["instrument_label"] = self.instrument_label
354
355        output_parameters["sample_name"] = self.sample_name
356
357        output_parameters["Aterm"] = None
358
359        output_parameters["Bterm"] = None
360
361        output_parameters["Cterm"] = None
362
363        output_parameters["polarity"] = polarity
364
365        # scan_number and rt will be need to lc ms====
366
367        output_parameters["mobility_scan"] = 0
368
369        output_parameters["mobility_rt"] = 0
370
371        output_parameters["scan_number"] = scan_index
372
373        output_parameters["rt"] = 0
374
375        return output_parameters
376
377    def clean_data_frame(self, dataframe):
378        """
379        Clean the input dataframe by removing columns that are not expected.
380
381        Parameters
382        ----------
383        pandas.DataFrame
384            The input dataframe to be cleaned.
385
386        """
387
388        for column_name in dataframe.columns:
389            expected_column_name = self.parameters.header_translate.get(column_name)
390            if expected_column_name not in self._expected_columns:
391                del dataframe[column_name]
392
393    def check_columns(self, header_labels: list[str]):
394        """
395        Check if the given header labels match the expected columns.
396
397        Parameters
398        ----------
399        header_labels : list
400            The header labels to be checked.
401
402        Raises
403        ------
404        Exception
405            If any expected column is not found in the header labels.
406        """
407        found_label = set()
408
409        for label in header_labels:
410            if not label in self._expected_columns:
411                user_column_name = self.parameters.header_translate.get(label)
412                if user_column_name in self._expected_columns:
413                    found_label.add(user_column_name)
414            else:
415                found_label.add(label)
416
417        not_found = self._expected_columns - found_label
418
419        if len(not_found) > 0:
420            raise Exception(
421                "Please make sure to include the columns %s" % ", ".join(not_found)
422            )
423
424    def read_xml_peaks(self, data: str) -> DataFrame:
425        """
426        Read peaks from a Bruker .xml file and return a pandas DataFrame.
427
428        Parameters
429        ----------
430        data : str
431            The path to the .xml file.
432
433        Returns
434        -------
435        pandas.DataFrame
436            A DataFrame containing the peak data with columns: 'm/z', 'I', 'Resolving Power', 'Area', 'S/N', 'fwhm'.
437        """
438        from numpy import nan
439
440        with open(data, "r") as file:
441            content = file.readlines()
442            content = "".join(content)
443            bs_content = BeautifulSoup(content, features="xml")
444        peaks_xml = bs_content.find_all("pk")
445
446        # initialise lists of the peak variables
447        areas = []
448        fwhms = []
449        intensities = []
450        mzs = []
451        res = []
452        sn = []
453        # iterate through the peaks appending to each list
454        for peak in peaks_xml:
455            areas.append(
456                float(peak.get("a", nan))
457            )  # Use a default value if key 'a' is missing
458            fwhms.append(
459                float(peak.get("fwhm", nan))
460            )  # Use a default value if key 'fwhm' is missing
461            intensities.append(
462                float(peak.get("i", nan))
463            )  # Use a default value if key 'i' is missing
464            mzs.append(
465                float(peak.get("mz", nan))
466            )  # Use a default value if key 'mz' is missing
467            res.append(
468                float(peak.get("res", nan))
469            )  # Use a default value if key 'res' is missing
470            sn.append(
471                float(peak.get("sn", nan))
472            )  # Use a default value if key 'sn' is missing
473
474        # Compile pandas dataframe of these values
475        names = ["m/z", "I", "Resolving Power", "Area", "S/N", "fwhm"]
476        df = DataFrame(columns=names, dtype=float)
477        df["m/z"] = mzs
478        df["I"] = intensities
479        df["Resolving Power"] = res
480        df["Area"] = areas
481        df["S/N"] = sn
482        df["fwhm"] = fwhms
483        return df
484
485    def get_xml_polarity(self):
486        """
487        Get the polarity from an XML peaklist.
488
489        Returns
490        -------
491        int
492            The polarity of the XML peaklist. Returns -1 for negative polarity, +1 for positive polarity.
493
494        Raises
495        ------
496        Exception
497            If the data type is not XML peaklist in Bruker format or if the polarity is unhandled.
498        """
499
500        # Check its an actual xml
501        if not self.data_type or not self.delimiter:
502            self.set_data_type()
503
504        if isinstance(self.file_location, S3Path):
505            # data = self.file_location.open('rb').read()
506            data = BytesIO(self.file_location.open("rb").read())
507
508        else:
509            data = self.file_location
510
511        if self.data_type != "xml":
512            raise Exception("This function is only for XML peaklists (Bruker format)")
513
514        with open(data, "r") as file:
515            content = file.readlines()
516            content = "".join(content)
517            bs_content = BeautifulSoup(content, features="xml")
518        polarity = bs_content.find_all("ms_spectrum")[0]["polarity"]
519        if polarity == "-":
520            return -1
521        elif polarity == "+":
522            return +1
523        else:
524            raise Exception("Polarity %s unhandled" % polarity)
class MassListBaseClass:
 25class MassListBaseClass:
 26    """The MassListBaseClass object reads mass list data types and returns the mass spectrum obj
 27
 28    Parameters
 29    ----------
 30    file_location : Path or S3Path
 31        Full data path.
 32    isCentroid : bool, optional
 33        Determines the mass spectrum data structure. If set to True, it assumes centroid mode. If set to False, it assumes profile mode and attempts to peak pick. Default is True.
 34    analyzer : str, optional
 35        The analyzer used for the mass spectrum. Default is 'Unknown'.
 36    instrument_label : str, optional
 37        The label of the instrument used for the mass spectrum. Default is 'Unknown'.
 38    sample_name : str, optional
 39        The name of the sample. Default is None.
 40    header_lines : int, optional
 41        The number of lines to skip in the file, including the column labels line. Default is 0.
 42    isThermoProfile : bool, optional
 43        Determines the number of expected columns in the file. If set to True, only m/z and intensity columns are expected. Signal-to-noise ratio (S/N) and resolving power (RP) will be calculated based on the data. Default is False.
 44    headerless : bool, optional
 45        If True, assumes that there are no headers present in the file (e.g., a .xy file from Bruker) and assumes two columns: m/z and intensity. Default is False.
 46
 47    Attributes
 48    ----------
 49    parameters : DataInputSetting
 50        The data input settings for the mass spectrum.
 51    data_type : str
 52        The type of data in the file.
 53    delimiter : str
 54        The delimiter used to read text-based files.
 55
 56    Methods
 57    -------
 58    * set_parameter_from_toml(parameters_path). Sets the data input settings from a TOML file.
 59    * set_parameter_from_json(parameters_path). Sets the data input settings from a JSON file.
 60    * get_dataframe(). Reads the file and returns the data as a pandas DataFrame.
 61    * load_settings(mass_spec_obj, output_parameters). Loads the settings for the mass spectrum.
 62    * get_output_parameters(polarity, scan_index=0). Returns the output parameters for the mass spectrum.
 63    * clean_data_frame(dataframe). Cleans the data frame by removing columns that are not in the expected columns set.
 64
 65    """
 66
 67    def __init__(
 68        self,
 69        file_location: Path | S3Path,
 70        isCentroid: bool = True,
 71        analyzer: str = "Unknown",
 72        instrument_label: str = "Unknown",
 73        sample_name: str = None,
 74        header_lines: int = 0,
 75        isThermoProfile: bool = False,
 76        headerless: bool = False,
 77    ):
 78        self.file_location = (
 79            Path(file_location) if isinstance(file_location, str) else file_location
 80        )
 81
 82        if not self.file_location.exists():
 83            raise FileExistsError("File does not exist: %s" % file_location)
 84
 85        # (newline="\n")
 86
 87        self.header_lines = header_lines
 88
 89        if isThermoProfile:
 90            self._expected_columns = {Labels.mz, Labels.abundance}
 91
 92        else:
 93            self._expected_columns = {
 94                Labels.mz,
 95                Labels.abundance,
 96                Labels.s2n,
 97                Labels.rp,
 98            }
 99
100        self._delimiter = None
101
102        self.isCentroid = isCentroid
103
104        self.isThermoProfile = isThermoProfile
105
106        self.headerless = headerless
107
108        self._data_type = None
109
110        self.analyzer = analyzer
111
112        self.instrument_label = instrument_label
113
114        self.sample_name = sample_name
115
116        self._parameters = deepcopy(DataInputSetting())
117
118    @property
119    def parameters(self):
120        return self._parameters
121
122    @parameters.setter
123    def parameters(self, instance_DataInputSetting):
124        self._parameters = instance_DataInputSetting
125
126    def set_parameter_from_toml(self, parameters_path):
127        self._parameters = load_and_set_toml_parameters_class(
128            "DataInput", self.parameters, parameters_path=parameters_path
129        )
130
131    def set_parameter_from_json(self, parameters_path):
132        self._parameters = load_and_set_parameters_class(
133            "DataInput", self.parameters, parameters_path=parameters_path
134        )
135
136    @property
137    def data_type(self):
138        return self._data_type
139
140    @data_type.setter
141    def data_type(self, data_type):
142        self._data_type = data_type
143
144    @property
145    def delimiter(self):
146        return self._delimiter
147
148    @delimiter.setter
149    def delimiter(self, delimiter):
150        self._delimiter = delimiter
151
152    def encoding_detector(self, file_location) -> str:
153        """
154        Detects the encoding of a file.
155
156        Parameters
157        --------
158        file_location : str
159            The location of the file to be analyzed.
160
161        Returns
162        --------
163        str
164            The detected encoding of the file.
165        """
166
167        with file_location.open("rb") as rawdata:
168            result = chardet.detect(rawdata.read(10000))
169        return result["encoding"]
170
171    def set_data_type(self):
172        """
173        Set the data type and delimiter based on the file extension.
174
175        Raises
176        ------
177        TypeError
178            If the data type could not be automatically recognized.
179        """
180        if self.file_location.suffix == ".csv":
181            self.data_type = "txt"
182            self.delimiter = ","
183        elif self.file_location.suffix == ".txt":
184            self.data_type = "txt"
185            self.delimiter = "\t"
186        elif self.file_location.suffix == ".tsv":
187            self.data_type = "txt"
188            self.delimiter = "\t"
189        elif self.file_location.suffix == ".xlsx":
190            self.data_type = "excel"
191        elif self.file_location.suffix == ".ascii":
192            self.data_type = "txt"
193            self.delimiter = "  "
194        elif self.file_location.suffix == ".pkl":
195            self.data_type = "dataframe"
196        elif self.file_location.suffix == ".pks":
197            self.data_type = "pks"
198            self.delimiter = "          "
199            self.header_lines = 9
200        elif self.file_location.suffix == ".xml":
201            self.data_type = "xml"
202            # self.delimiter = None
203            # self.header_lines = None
204        elif self.file_location.suffix == ".xy":
205            self.data_type = "txt"
206            self.delimiter = " "
207            self.header_lines = None
208        else:
209            raise TypeError(
210                "Data type could not be automatically recognized for %s; please set data type and delimiter manually."
211                % self.file_location.name
212            )
213
214    def get_dataframe(self) -> DataFrame:
215        """
216        Get the data as a pandas DataFrame.
217
218        Returns
219        -------
220        pandas.DataFrame
221            The data as a pandas DataFrame.
222
223        Raises
224        ------
225        TypeError
226            If the data type is not supported.
227        """
228
229        if not self.data_type or not self.delimiter:
230            self.set_data_type()
231
232        if isinstance(self.file_location, S3Path):
233            data = BytesIO(self.file_location.open("rb").read())
234        else:
235            data = self.file_location
236
237        if self.data_type == "txt":
238            if self.headerless:
239                dataframe = read_csv(
240                    data,
241                    skiprows=self.header_lines,
242                    delimiter=self.delimiter,
243                    header=None,
244                    names=["m/z", "I"],
245                    encoding=self.encoding_detector(self.file_location),
246                    engine="python",
247                )
248            else:
249                dataframe = read_csv(
250                    data,
251                    skiprows=self.header_lines,
252                    delimiter=self.delimiter,
253                    encoding=self.encoding_detector(self.file_location),
254                    engine="python",
255                )
256
257        elif self.data_type == "pks":
258            names = [
259                "m/z",
260                "I",
261                "Scaled Peak Height",
262                "Resolving Power",
263                "Frequency",
264                "S/N",
265            ]
266            clean_data = []
267            with self.file_location.open() as maglabfile:
268                for i in maglabfile.readlines()[8:-1]:
269                    clean_data.append(i.split())
270            dataframe = DataFrame(clean_data, columns=names)
271
272        elif self.data_type == "dataframe":
273            dataframe = read_pickle(data)
274
275        elif self.data_type == "excel":
276            dataframe = read_excel(data)
277
278        elif self.data_type == "xml":
279            dataframe = self.read_xml_peaks(data)
280
281        else:
282            raise TypeError("Data type %s is not supported" % self.data_type)
283
284        return dataframe
285
286    def load_settings(self, mass_spec_obj, output_parameters):
287        """
288        #TODO loading output parameters from json file is not functional
289        Load settings from a JSON file and apply them to the given mass_spec_obj.
290
291        Parameters
292        ----------
293        mass_spec_obj : MassSpec
294            The mass spectrum object to apply the settings to.
295
296        """
297        import json
298        import warnings
299
300        settings_file_path = self.file_location.with_suffix(".json")
301
302        if settings_file_path.exists():
303            self._parameters = load_and_set_parameters_class(
304                "DataInput", self._parameters, parameters_path=settings_file_path
305            )
306
307            load_and_set_parameters_ms(
308                mass_spec_obj, parameters_path=settings_file_path
309            )
310
311        else:
312            warnings.warn(
313                "auto settings loading is enabled but could not locate the file:  %s. Please load the settings manually"
314                % settings_file_path
315            )
316
317        # TODO this will load the setting from SettingCoreMS.json
318        # coreMSHFD5 overrides this function to import the attrs stored in the h5 file
319        # loaded_settings = {}
320        # loaded_settings['MoleculaSearch'] = self.get_scan_group_attr_data(scan_index,  time_index, 'MoleculaSearchSetting')
321        # loaded_settings['MassSpecPeak'] = self.get_scan_group_attr_data(scan_index,  time_index, 'MassSpecPeakSetting')
322
323        # loaded_settings['MassSpectrum'] = self.get_scan_group_attr_data(scan_index, time_index, 'MassSpectrumSetting')
324        # loaded_settings['Transient'] = self.get_scan_group_attr_data(scan_index, time_index, 'TransientSetting')
325
326    def get_output_parameters(self, polarity: int, scan_index: int = 0) -> dict:
327        """
328        Get the output parameters for the mass spectrum.
329
330        Parameters
331        ----------
332        polarity : int
333            The polarity of the mass spectrum +1 or -1.
334        scan_index : int, optional
335            The index of the scan. Default is 0.
336
337        Returns
338        -------
339        dict
340            A dictionary containing the output parameters.
341
342        """
343        from copy import deepcopy
344
345        output_parameters = default_parameters(self.file_location)
346
347        if self.isCentroid:
348            output_parameters["label"] = Labels.corems_centroid
349        else:
350            output_parameters["label"] = Labels.bruker_profile
351
352        output_parameters["analyzer"] = self.analyzer
353
354        output_parameters["instrument_label"] = self.instrument_label
355
356        output_parameters["sample_name"] = self.sample_name
357
358        output_parameters["Aterm"] = None
359
360        output_parameters["Bterm"] = None
361
362        output_parameters["Cterm"] = None
363
364        output_parameters["polarity"] = polarity
365
366        # scan_number and rt will be need to lc ms====
367
368        output_parameters["mobility_scan"] = 0
369
370        output_parameters["mobility_rt"] = 0
371
372        output_parameters["scan_number"] = scan_index
373
374        output_parameters["rt"] = 0
375
376        return output_parameters
377
378    def clean_data_frame(self, dataframe):
379        """
380        Clean the input dataframe by removing columns that are not expected.
381
382        Parameters
383        ----------
384        pandas.DataFrame
385            The input dataframe to be cleaned.
386
387        """
388
389        for column_name in dataframe.columns:
390            expected_column_name = self.parameters.header_translate.get(column_name)
391            if expected_column_name not in self._expected_columns:
392                del dataframe[column_name]
393
394    def check_columns(self, header_labels: list[str]):
395        """
396        Check if the given header labels match the expected columns.
397
398        Parameters
399        ----------
400        header_labels : list
401            The header labels to be checked.
402
403        Raises
404        ------
405        Exception
406            If any expected column is not found in the header labels.
407        """
408        found_label = set()
409
410        for label in header_labels:
411            if not label in self._expected_columns:
412                user_column_name = self.parameters.header_translate.get(label)
413                if user_column_name in self._expected_columns:
414                    found_label.add(user_column_name)
415            else:
416                found_label.add(label)
417
418        not_found = self._expected_columns - found_label
419
420        if len(not_found) > 0:
421            raise Exception(
422                "Please make sure to include the columns %s" % ", ".join(not_found)
423            )
424
425    def read_xml_peaks(self, data: str) -> DataFrame:
426        """
427        Read peaks from a Bruker .xml file and return a pandas DataFrame.
428
429        Parameters
430        ----------
431        data : str
432            The path to the .xml file.
433
434        Returns
435        -------
436        pandas.DataFrame
437            A DataFrame containing the peak data with columns: 'm/z', 'I', 'Resolving Power', 'Area', 'S/N', 'fwhm'.
438        """
439        from numpy import nan
440
441        with open(data, "r") as file:
442            content = file.readlines()
443            content = "".join(content)
444            bs_content = BeautifulSoup(content, features="xml")
445        peaks_xml = bs_content.find_all("pk")
446
447        # initialise lists of the peak variables
448        areas = []
449        fwhms = []
450        intensities = []
451        mzs = []
452        res = []
453        sn = []
454        # iterate through the peaks appending to each list
455        for peak in peaks_xml:
456            areas.append(
457                float(peak.get("a", nan))
458            )  # Use a default value if key 'a' is missing
459            fwhms.append(
460                float(peak.get("fwhm", nan))
461            )  # Use a default value if key 'fwhm' is missing
462            intensities.append(
463                float(peak.get("i", nan))
464            )  # Use a default value if key 'i' is missing
465            mzs.append(
466                float(peak.get("mz", nan))
467            )  # Use a default value if key 'mz' is missing
468            res.append(
469                float(peak.get("res", nan))
470            )  # Use a default value if key 'res' is missing
471            sn.append(
472                float(peak.get("sn", nan))
473            )  # Use a default value if key 'sn' is missing
474
475        # Compile pandas dataframe of these values
476        names = ["m/z", "I", "Resolving Power", "Area", "S/N", "fwhm"]
477        df = DataFrame(columns=names, dtype=float)
478        df["m/z"] = mzs
479        df["I"] = intensities
480        df["Resolving Power"] = res
481        df["Area"] = areas
482        df["S/N"] = sn
483        df["fwhm"] = fwhms
484        return df
485
486    def get_xml_polarity(self):
487        """
488        Get the polarity from an XML peaklist.
489
490        Returns
491        -------
492        int
493            The polarity of the XML peaklist. Returns -1 for negative polarity, +1 for positive polarity.
494
495        Raises
496        ------
497        Exception
498            If the data type is not XML peaklist in Bruker format or if the polarity is unhandled.
499        """
500
501        # Check its an actual xml
502        if not self.data_type or not self.delimiter:
503            self.set_data_type()
504
505        if isinstance(self.file_location, S3Path):
506            # data = self.file_location.open('rb').read()
507            data = BytesIO(self.file_location.open("rb").read())
508
509        else:
510            data = self.file_location
511
512        if self.data_type != "xml":
513            raise Exception("This function is only for XML peaklists (Bruker format)")
514
515        with open(data, "r") as file:
516            content = file.readlines()
517            content = "".join(content)
518            bs_content = BeautifulSoup(content, features="xml")
519        polarity = bs_content.find_all("ms_spectrum")[0]["polarity"]
520        if polarity == "-":
521            return -1
522        elif polarity == "+":
523            return +1
524        else:
525            raise Exception("Polarity %s unhandled" % polarity)

The MassListBaseClass object reads mass list data types and returns the mass spectrum obj

Parameters
  • file_location (Path or S3Path): Full data path.
  • isCentroid (bool, optional): Determines the mass spectrum data structure. If set to True, it assumes centroid mode. If set to False, it assumes profile mode and attempts to peak pick. Default is True.
  • analyzer (str, optional): The analyzer used for the mass spectrum. Default is 'Unknown'.
  • instrument_label (str, optional): The label of the instrument used for the mass spectrum. Default is 'Unknown'.
  • sample_name (str, optional): The name of the sample. Default is None.
  • header_lines (int, optional): The number of lines to skip in the file, including the column labels line. Default is 0.
  • isThermoProfile (bool, optional): Determines the number of expected columns in the file. If set to True, only m/z and intensity columns are expected. Signal-to-noise ratio (S/N) and resolving power (RP) will be calculated based on the data. Default is False.
  • headerless (bool, optional): If True, assumes that there are no headers present in the file (e.g., a .xy file from Bruker) and assumes two columns: m/z and intensity. Default is False.
Attributes
  • parameters (DataInputSetting): The data input settings for the mass spectrum.
  • data_type (str): The type of data in the file.
  • delimiter (str): The delimiter used to read text-based files.
Methods
  • set_parameter_from_toml(parameters_path). Sets the data input settings from a TOML file.
  • set_parameter_from_json(parameters_path). Sets the data input settings from a JSON file.
  • get_dataframe(). Reads the file and returns the data as a pandas DataFrame.
  • load_settings(mass_spec_obj, output_parameters). Loads the settings for the mass spectrum.
  • get_output_parameters(polarity, scan_index=0). Returns the output parameters for the mass spectrum.
  • clean_data_frame(dataframe). Cleans the data frame by removing columns that are not in the expected columns set.
MassListBaseClass( file_location: pathlib.Path | s3path.S3Path, isCentroid: bool = True, analyzer: str = 'Unknown', instrument_label: str = 'Unknown', sample_name: str = None, header_lines: int = 0, isThermoProfile: bool = False, headerless: bool = False)
 67    def __init__(
 68        self,
 69        file_location: Path | S3Path,
 70        isCentroid: bool = True,
 71        analyzer: str = "Unknown",
 72        instrument_label: str = "Unknown",
 73        sample_name: str = None,
 74        header_lines: int = 0,
 75        isThermoProfile: bool = False,
 76        headerless: bool = False,
 77    ):
 78        self.file_location = (
 79            Path(file_location) if isinstance(file_location, str) else file_location
 80        )
 81
 82        if not self.file_location.exists():
 83            raise FileExistsError("File does not exist: %s" % file_location)
 84
 85        # (newline="\n")
 86
 87        self.header_lines = header_lines
 88
 89        if isThermoProfile:
 90            self._expected_columns = {Labels.mz, Labels.abundance}
 91
 92        else:
 93            self._expected_columns = {
 94                Labels.mz,
 95                Labels.abundance,
 96                Labels.s2n,
 97                Labels.rp,
 98            }
 99
100        self._delimiter = None
101
102        self.isCentroid = isCentroid
103
104        self.isThermoProfile = isThermoProfile
105
106        self.headerless = headerless
107
108        self._data_type = None
109
110        self.analyzer = analyzer
111
112        self.instrument_label = instrument_label
113
114        self.sample_name = sample_name
115
116        self._parameters = deepcopy(DataInputSetting())
file_location
header_lines
isCentroid
isThermoProfile
headerless
analyzer
instrument_label
sample_name
parameters
def set_parameter_from_toml(self, parameters_path):
126    def set_parameter_from_toml(self, parameters_path):
127        self._parameters = load_and_set_toml_parameters_class(
128            "DataInput", self.parameters, parameters_path=parameters_path
129        )
def set_parameter_from_json(self, parameters_path):
131    def set_parameter_from_json(self, parameters_path):
132        self._parameters = load_and_set_parameters_class(
133            "DataInput", self.parameters, parameters_path=parameters_path
134        )
data_type
delimiter
def encoding_detector(self, file_location) -> str:
152    def encoding_detector(self, file_location) -> str:
153        """
154        Detects the encoding of a file.
155
156        Parameters
157        --------
158        file_location : str
159            The location of the file to be analyzed.
160
161        Returns
162        --------
163        str
164            The detected encoding of the file.
165        """
166
167        with file_location.open("rb") as rawdata:
168            result = chardet.detect(rawdata.read(10000))
169        return result["encoding"]

Detects the encoding of a file.

Parameters
  • file_location (str): The location of the file to be analyzed.
Returns
  • str: The detected encoding of the file.
def set_data_type(self):
171    def set_data_type(self):
172        """
173        Set the data type and delimiter based on the file extension.
174
175        Raises
176        ------
177        TypeError
178            If the data type could not be automatically recognized.
179        """
180        if self.file_location.suffix == ".csv":
181            self.data_type = "txt"
182            self.delimiter = ","
183        elif self.file_location.suffix == ".txt":
184            self.data_type = "txt"
185            self.delimiter = "\t"
186        elif self.file_location.suffix == ".tsv":
187            self.data_type = "txt"
188            self.delimiter = "\t"
189        elif self.file_location.suffix == ".xlsx":
190            self.data_type = "excel"
191        elif self.file_location.suffix == ".ascii":
192            self.data_type = "txt"
193            self.delimiter = "  "
194        elif self.file_location.suffix == ".pkl":
195            self.data_type = "dataframe"
196        elif self.file_location.suffix == ".pks":
197            self.data_type = "pks"
198            self.delimiter = "          "
199            self.header_lines = 9
200        elif self.file_location.suffix == ".xml":
201            self.data_type = "xml"
202            # self.delimiter = None
203            # self.header_lines = None
204        elif self.file_location.suffix == ".xy":
205            self.data_type = "txt"
206            self.delimiter = " "
207            self.header_lines = None
208        else:
209            raise TypeError(
210                "Data type could not be automatically recognized for %s; please set data type and delimiter manually."
211                % self.file_location.name
212            )

Set the data type and delimiter based on the file extension.

Raises
  • TypeError: If the data type could not be automatically recognized.
def get_dataframe(self) -> pandas.core.frame.DataFrame:
214    def get_dataframe(self) -> DataFrame:
215        """
216        Get the data as a pandas DataFrame.
217
218        Returns
219        -------
220        pandas.DataFrame
221            The data as a pandas DataFrame.
222
223        Raises
224        ------
225        TypeError
226            If the data type is not supported.
227        """
228
229        if not self.data_type or not self.delimiter:
230            self.set_data_type()
231
232        if isinstance(self.file_location, S3Path):
233            data = BytesIO(self.file_location.open("rb").read())
234        else:
235            data = self.file_location
236
237        if self.data_type == "txt":
238            if self.headerless:
239                dataframe = read_csv(
240                    data,
241                    skiprows=self.header_lines,
242                    delimiter=self.delimiter,
243                    header=None,
244                    names=["m/z", "I"],
245                    encoding=self.encoding_detector(self.file_location),
246                    engine="python",
247                )
248            else:
249                dataframe = read_csv(
250                    data,
251                    skiprows=self.header_lines,
252                    delimiter=self.delimiter,
253                    encoding=self.encoding_detector(self.file_location),
254                    engine="python",
255                )
256
257        elif self.data_type == "pks":
258            names = [
259                "m/z",
260                "I",
261                "Scaled Peak Height",
262                "Resolving Power",
263                "Frequency",
264                "S/N",
265            ]
266            clean_data = []
267            with self.file_location.open() as maglabfile:
268                for i in maglabfile.readlines()[8:-1]:
269                    clean_data.append(i.split())
270            dataframe = DataFrame(clean_data, columns=names)
271
272        elif self.data_type == "dataframe":
273            dataframe = read_pickle(data)
274
275        elif self.data_type == "excel":
276            dataframe = read_excel(data)
277
278        elif self.data_type == "xml":
279            dataframe = self.read_xml_peaks(data)
280
281        else:
282            raise TypeError("Data type %s is not supported" % self.data_type)
283
284        return dataframe

Get the data as a pandas DataFrame.

Returns
  • pandas.DataFrame: The data as a pandas DataFrame.
Raises
  • TypeError: If the data type is not supported.
def load_settings(self, mass_spec_obj, output_parameters):
286    def load_settings(self, mass_spec_obj, output_parameters):
287        """
288        #TODO loading output parameters from json file is not functional
289        Load settings from a JSON file and apply them to the given mass_spec_obj.
290
291        Parameters
292        ----------
293        mass_spec_obj : MassSpec
294            The mass spectrum object to apply the settings to.
295
296        """
297        import json
298        import warnings
299
300        settings_file_path = self.file_location.with_suffix(".json")
301
302        if settings_file_path.exists():
303            self._parameters = load_and_set_parameters_class(
304                "DataInput", self._parameters, parameters_path=settings_file_path
305            )
306
307            load_and_set_parameters_ms(
308                mass_spec_obj, parameters_path=settings_file_path
309            )
310
311        else:
312            warnings.warn(
313                "auto settings loading is enabled but could not locate the file:  %s. Please load the settings manually"
314                % settings_file_path
315            )
316
317        # TODO this will load the setting from SettingCoreMS.json
318        # coreMSHFD5 overrides this function to import the attrs stored in the h5 file
319        # loaded_settings = {}
320        # loaded_settings['MoleculaSearch'] = self.get_scan_group_attr_data(scan_index,  time_index, 'MoleculaSearchSetting')
321        # loaded_settings['MassSpecPeak'] = self.get_scan_group_attr_data(scan_index,  time_index, 'MassSpecPeakSetting')
322
323        # loaded_settings['MassSpectrum'] = self.get_scan_group_attr_data(scan_index, time_index, 'MassSpectrumSetting')
324        # loaded_settings['Transient'] = self.get_scan_group_attr_data(scan_index, time_index, 'TransientSetting')

TODO loading output parameters from json file is not functional

Load settings from a JSON file and apply them to the given mass_spec_obj.

Parameters
  • mass_spec_obj (MassSpec): The mass spectrum object to apply the settings to.
def get_output_parameters(self, polarity: int, scan_index: int = 0) -> dict:
326    def get_output_parameters(self, polarity: int, scan_index: int = 0) -> dict:
327        """
328        Get the output parameters for the mass spectrum.
329
330        Parameters
331        ----------
332        polarity : int
333            The polarity of the mass spectrum +1 or -1.
334        scan_index : int, optional
335            The index of the scan. Default is 0.
336
337        Returns
338        -------
339        dict
340            A dictionary containing the output parameters.
341
342        """
343        from copy import deepcopy
344
345        output_parameters = default_parameters(self.file_location)
346
347        if self.isCentroid:
348            output_parameters["label"] = Labels.corems_centroid
349        else:
350            output_parameters["label"] = Labels.bruker_profile
351
352        output_parameters["analyzer"] = self.analyzer
353
354        output_parameters["instrument_label"] = self.instrument_label
355
356        output_parameters["sample_name"] = self.sample_name
357
358        output_parameters["Aterm"] = None
359
360        output_parameters["Bterm"] = None
361
362        output_parameters["Cterm"] = None
363
364        output_parameters["polarity"] = polarity
365
366        # scan_number and rt will be need to lc ms====
367
368        output_parameters["mobility_scan"] = 0
369
370        output_parameters["mobility_rt"] = 0
371
372        output_parameters["scan_number"] = scan_index
373
374        output_parameters["rt"] = 0
375
376        return output_parameters

Get the output parameters for the mass spectrum.

Parameters
  • polarity (int): The polarity of the mass spectrum +1 or -1.
  • scan_index (int, optional): The index of the scan. Default is 0.
Returns
  • dict: A dictionary containing the output parameters.
def clean_data_frame(self, dataframe):
378    def clean_data_frame(self, dataframe):
379        """
380        Clean the input dataframe by removing columns that are not expected.
381
382        Parameters
383        ----------
384        pandas.DataFrame
385            The input dataframe to be cleaned.
386
387        """
388
389        for column_name in dataframe.columns:
390            expected_column_name = self.parameters.header_translate.get(column_name)
391            if expected_column_name not in self._expected_columns:
392                del dataframe[column_name]

Clean the input dataframe by removing columns that are not expected.

Parameters
  • pandas.DataFrame: The input dataframe to be cleaned.
def check_columns(self, header_labels: list[str]):
394    def check_columns(self, header_labels: list[str]):
395        """
396        Check if the given header labels match the expected columns.
397
398        Parameters
399        ----------
400        header_labels : list
401            The header labels to be checked.
402
403        Raises
404        ------
405        Exception
406            If any expected column is not found in the header labels.
407        """
408        found_label = set()
409
410        for label in header_labels:
411            if not label in self._expected_columns:
412                user_column_name = self.parameters.header_translate.get(label)
413                if user_column_name in self._expected_columns:
414                    found_label.add(user_column_name)
415            else:
416                found_label.add(label)
417
418        not_found = self._expected_columns - found_label
419
420        if len(not_found) > 0:
421            raise Exception(
422                "Please make sure to include the columns %s" % ", ".join(not_found)
423            )

Check if the given header labels match the expected columns.

Parameters
  • header_labels (list): The header labels to be checked.
Raises
  • Exception: If any expected column is not found in the header labels.
def read_xml_peaks(self, data: str) -> pandas.core.frame.DataFrame:
425    def read_xml_peaks(self, data: str) -> DataFrame:
426        """
427        Read peaks from a Bruker .xml file and return a pandas DataFrame.
428
429        Parameters
430        ----------
431        data : str
432            The path to the .xml file.
433
434        Returns
435        -------
436        pandas.DataFrame
437            A DataFrame containing the peak data with columns: 'm/z', 'I', 'Resolving Power', 'Area', 'S/N', 'fwhm'.
438        """
439        from numpy import nan
440
441        with open(data, "r") as file:
442            content = file.readlines()
443            content = "".join(content)
444            bs_content = BeautifulSoup(content, features="xml")
445        peaks_xml = bs_content.find_all("pk")
446
447        # initialise lists of the peak variables
448        areas = []
449        fwhms = []
450        intensities = []
451        mzs = []
452        res = []
453        sn = []
454        # iterate through the peaks appending to each list
455        for peak in peaks_xml:
456            areas.append(
457                float(peak.get("a", nan))
458            )  # Use a default value if key 'a' is missing
459            fwhms.append(
460                float(peak.get("fwhm", nan))
461            )  # Use a default value if key 'fwhm' is missing
462            intensities.append(
463                float(peak.get("i", nan))
464            )  # Use a default value if key 'i' is missing
465            mzs.append(
466                float(peak.get("mz", nan))
467            )  # Use a default value if key 'mz' is missing
468            res.append(
469                float(peak.get("res", nan))
470            )  # Use a default value if key 'res' is missing
471            sn.append(
472                float(peak.get("sn", nan))
473            )  # Use a default value if key 'sn' is missing
474
475        # Compile pandas dataframe of these values
476        names = ["m/z", "I", "Resolving Power", "Area", "S/N", "fwhm"]
477        df = DataFrame(columns=names, dtype=float)
478        df["m/z"] = mzs
479        df["I"] = intensities
480        df["Resolving Power"] = res
481        df["Area"] = areas
482        df["S/N"] = sn
483        df["fwhm"] = fwhms
484        return df

Read peaks from a Bruker .xml file and return a pandas DataFrame.

Parameters
  • data (str): The path to the .xml file.
Returns
  • pandas.DataFrame: A DataFrame containing the peak data with columns: 'm/z', 'I', 'Resolving Power', 'Area', 'S/N', 'fwhm'.
def get_xml_polarity(self):
486    def get_xml_polarity(self):
487        """
488        Get the polarity from an XML peaklist.
489
490        Returns
491        -------
492        int
493            The polarity of the XML peaklist. Returns -1 for negative polarity, +1 for positive polarity.
494
495        Raises
496        ------
497        Exception
498            If the data type is not XML peaklist in Bruker format or if the polarity is unhandled.
499        """
500
501        # Check its an actual xml
502        if not self.data_type or not self.delimiter:
503            self.set_data_type()
504
505        if isinstance(self.file_location, S3Path):
506            # data = self.file_location.open('rb').read()
507            data = BytesIO(self.file_location.open("rb").read())
508
509        else:
510            data = self.file_location
511
512        if self.data_type != "xml":
513            raise Exception("This function is only for XML peaklists (Bruker format)")
514
515        with open(data, "r") as file:
516            content = file.readlines()
517            content = "".join(content)
518            bs_content = BeautifulSoup(content, features="xml")
519        polarity = bs_content.find_all("ms_spectrum")[0]["polarity"]
520        if polarity == "-":
521            return -1
522        elif polarity == "+":
523            return +1
524        else:
525            raise Exception("Polarity %s unhandled" % polarity)

Get the polarity from an XML peaklist.

Returns
  • int: The polarity of the XML peaklist. Returns -1 for negative polarity, +1 for positive polarity.
Raises
  • Exception: If the data type is not XML peaklist in Bruker format or if the polarity is unhandled.