corems.mass_spectrum.input.coremsHDF5

  1import json
  2
  3from pandas import DataFrame
  4import h5py
  5from io import BytesIO
  6from s3path import S3Path
  7
  8from corems.encapsulation.input.parameter_from_json import _set_dict_data_ms
  9from corems.mass_spectrum.input.massList import ReadCoremsMasslist
 10from corems.mass_spectrum.factory.MassSpectrumClasses import MassSpecCentroid
 11from corems.encapsulation.factory.parameters import default_parameters
 12
 13
 14class ReadCoreMSHDF_MassSpectrum(ReadCoremsMasslist):
 15    """Class for reading mass spectrum data from a CoreMS HDF5 file.
 16
 17    Attributes
 18    ----------
 19    h5pydata : h5py.File
 20        The HDF5 file object.
 21    scans : list
 22        List of scan labels in the HDF5 file.
 23
 24    Parameters
 25    ----------
 26    file_location : str or S3Path
 27        The path to the CoreMS HDF5 file.
 28
 29    Methods
 30    -------
 31    * load_raw_data(mass_spectrum, scan_index=0) Load raw data into the mass spectrum object.
 32    * get_mass_spectrum(scan_number=0, time_index=-1, auto_process=True, load_settings=True, load_raw=True).Get a mass spectrum object.
 33    * load_settings(mass_spectrum, scan_index=0, time_index=-1). Load settings into the mass spectrum object.
 34    * get_dataframe(scan_index=0, time_index=-1). Get a pandas DataFrame representing the mass spectrum.
 35    * get_time_index_to_pull(scan_label, time_index). Get the time index to pull from the HDF5 file.
 36    * get_high_level_attr_data(attr_str). Get high-level attribute data from the HDF5 file.
 37    * get_scan_group_attr_data(scan_index, time_index, attr_group, attr_srt=None). Get scan group attribute data from the HDF5 file.
 38    * get_raw_data_attr_data(scan_index, attr_group, attr_str). Get raw data attribute data from the HDF5 file.
 39    * get_output_parameters(polarity, scan_index=0). Get the output parameters for the mass spectrum.
 40    """
 41
 42    def __init__(self, file_location):
 43        super().__init__(file_location)
 44
 45        if isinstance(self.file_location, S3Path):
 46            data = BytesIO(self.file_location.open("rb").read())
 47        else:
 48            data = self.file_location
 49
 50        self.h5pydata = h5py.File(data, "r")
 51
 52        self.scans = list(self.h5pydata.keys())
 53
 54    def load_raw_data(self, mass_spectrum, scan_index=0):
 55        """
 56        Load raw data into the mass spectrum object.
 57
 58        Parameters
 59        ----------
 60        mass_spectrum : MassSpecCentroid
 61            The mass spectrum object to load the raw data into.
 62        scan_index : int, optional
 63            The index of the scan to load the raw data from. Default is 0.
 64        """
 65
 66        scan_label = self.scans[scan_index]
 67
 68        # Check if the "raw_ms" group in the scan is empty
 69        if self.h5pydata[scan_label]["raw_ms"].shape is not None:
 70            mz_profile = self.h5pydata[scan_label]["raw_ms"][0]
 71
 72            abundance_profile = self.h5pydata[scan_label]["raw_ms"][1]
 73
 74            mass_spectrum.mz_exp_profile = mz_profile
 75
 76            mass_spectrum.abundance_profile = abundance_profile
 77
 78    def get_mass_spectrum(
 79        self,
 80        scan_number=0,
 81        time_index=-1,
 82        auto_process=True,
 83        load_settings=True,
 84        load_raw=True,
 85        load_molecular_formula=True,
 86    ):
 87        """
 88        Instantiate a mass spectrum object from the CoreMS HDF5 file.
 89        Note that this always returns a centroid mass spectrum object; functionality for profile and
 90        frequency mass spectra is not yet implemented.
 91
 92        Parameters
 93        ----------
 94        scan_number : int, optional
 95            The index of the scan to retrieve the mass spectrum from. Default is 0.
 96        time_index : int, optional
 97            The index of the time point to retrieve the mass spectrum from. Default is -1.
 98        auto_process : bool, optional
 99            Whether to automatically process the mass spectrum. Default is True.
100        load_settings : bool, optional
101            Whether to load the settings into the mass spectrum object. Default is True.
102        load_raw : bool, optional
103            Whether to load the raw data into the mass spectrum object. Default is True.
104        load_molecular_formula : bool, optional
105            Whether to load the molecular formula into the mass spectrum object.
106            Default is True.
107
108        Returns
109        -------
110        MassSpecCentroid
111            The mass spectrum object.
112
113        Raises
114        ------
115        ValueError
116            If the CoreMS file is not valid.
117            If the mass spectrum has not been processed and load_molecular_formula is True.
118        """
119        if "mass_spectra" in self.scans[0]:
120            scan_index = self.scans.index("mass_spectra/" + str(scan_number))
121        else:
122            scan_index = self.scans.index(str(scan_number))
123        dataframe = self.get_dataframe(scan_index, time_index=time_index)
124        if dataframe["Molecular Formula"].any() and not dataframe["C"].any():
125            cols = dataframe.columns.tolist()
126            cols = cols[cols.index("Molecular Formula") + 1 :]
127            for index, row in dataframe.iterrows():
128                if row["Molecular Formula"] is not None:
129                    og_formula = row["Molecular Formula"]
130                    for col in cols:
131                        if "col" in og_formula:
132                            # get the digit after the element ("col") in the molecular formula and set it to the dataframe
133                            row[col] = int(og_formula.split(col)[1].split(" ")[0])
134
135        if not set(
136            ["H/C", "O/C", "Heteroatom Class", "Ion Type", "Is Isotopologue"]
137        ).issubset(dataframe.columns):
138            raise ValueError(
139                "%s it is not a valid CoreMS file" % str(self.file_location)
140            )
141
142        dataframe.rename(columns=self.parameters.header_translate, inplace=True)
143
144        # Cast m/z, and 'Peak Height' to float
145        dataframe["m/z"] = dataframe["m/z"].astype(float)
146        dataframe["Peak Height"] = dataframe["Peak Height"].astype(float)
147
148        polarity = dataframe["Ion Charge"].values[0]
149
150        output_parameters = self.get_output_parameters(polarity, scan_index=scan_index)
151
152        mass_spec_obj = MassSpecCentroid(
153            dataframe.to_dict(orient="list"), output_parameters, auto_process=False
154        )
155
156        if auto_process:
157            # Set the settings on the mass spectrum object to relative abuncance of 0 so all peaks get added
158            mass_spec_obj.settings.noise_threshold_method = "absolute_abundance"
159            mass_spec_obj.settings.noise_threshold_absolute_abundance = 0
160            mass_spec_obj.process_mass_spec()
161
162        if load_settings:
163            # Load settings into the mass spectrum object
164            self.load_settings(
165                mass_spec_obj, scan_index=scan_index, time_index=time_index
166            )
167
168        if load_raw:
169            self.load_raw_data(mass_spec_obj, scan_index=scan_index)
170
171        if load_molecular_formula:
172            if not auto_process:
173                raise ValueError(
174                    "Can only add molecular formula if the mass spectrum has been processed"
175                )
176            else:
177                self.add_molecular_formula(mass_spec_obj, dataframe)
178
179        return mass_spec_obj
180
181    def load_settings(self, mass_spectrum, scan_index=0, time_index=-1):
182        """
183        Load settings into the mass spectrum object.
184
185        Parameters
186        ----------
187        mass_spectrum : MassSpecCentroid
188            The mass spectrum object to load the settings into.
189        scan_index : int, optional
190            The index of the scan to load the settings from. Default is 0.
191        time_index : int, optional
192            The index of the time point to load the settings from. Default is -1.
193        """
194
195        loaded_settings = {}
196        loaded_settings["MoleculaSearch"] = self.get_scan_group_attr_data(
197            scan_index, time_index, "MoleculaSearchSetting"
198        )
199        loaded_settings["MassSpecPeak"] = self.get_scan_group_attr_data(
200            scan_index, time_index, "MassSpecPeakSetting"
201        )
202        loaded_settings["MassSpectrum"] = self.get_scan_group_attr_data(
203            scan_index, time_index, "MassSpectrumSetting"
204        )
205        loaded_settings["Transient"] = self.get_scan_group_attr_data(
206            scan_index, time_index, "TransientSetting"
207        )
208
209        _set_dict_data_ms(loaded_settings, mass_spectrum)
210
211    def get_dataframe(self, scan_index=0, time_index=-1):
212        """
213        Get a pandas DataFrame representing the mass spectrum.
214
215        Parameters
216        ----------
217        scan_index : int, optional
218            The index of the scan to retrieve the DataFrame from. Default is 0.
219        time_index : int, optional
220            The index of the time point to retrieve the DataFrame from. Default is -1.
221
222        Returns
223        -------
224        DataFrame
225            The pandas DataFrame representing the mass spectrum.
226        """
227
228        columnsLabels = self.get_scan_group_attr_data(
229            scan_index, time_index, "ColumnsLabels"
230        )
231
232        scan_label = self.scans[scan_index]
233
234        index_to_pull = self.get_time_index_to_pull(scan_label, time_index)
235
236        corems_table_data = self.h5pydata[scan_label][index_to_pull]
237
238        list_dict = []
239        for row in corems_table_data:
240            data_dict = {}
241            for data_index, data in enumerate(row):
242                label = columnsLabels[data_index]
243                # if data starts with a b' it is a byte string, so decode it
244                if isinstance(data, bytes):
245                    data = data.decode("utf-8")
246                if data == "nan":
247                    data = None
248                data_dict[label] = data
249
250            list_dict.append(data_dict)
251
252        # Reorder the columns from low to high "Index" to match the order of the dataframe
253        df = DataFrame(list_dict)
254        # set the "Index" column to int so it sorts correctly
255        df["Index"] = df["Index"].astype(int)
256        df = df.sort_values(by="Index")
257        # Reset index to match the "Index" column
258        df = df.set_index("Index", drop=False)
259
260        return df
261
262    def get_time_index_to_pull(self, scan_label, time_index):
263        """
264        Get the time index to pull from the HDF5 file.
265
266        Parameters
267        ----------
268        scan_label : str
269            The label of the scan.
270        time_index : int
271            The index of the time point.
272
273        Returns
274        -------
275        str
276            The time index to pull.
277        """
278
279        time_data = sorted(
280            [(i, int(i)) for i in self.h5pydata[scan_label].keys() if i != "raw_ms"],
281            key=lambda m: m[1],
282        )
283
284        index_to_pull = time_data[time_index][0]
285
286        return index_to_pull
287
288    def get_high_level_attr_data(self, attr_str):
289        """
290        Get high-level attribute data from the HDF5 file.
291
292        Parameters
293        ----------
294        attr_str : str
295            The attribute string.
296
297        Returns
298        -------
299        dict
300            The attribute data.
301
302        Raises
303        ------
304        KeyError
305            If the attribute string is not found in the HDF5 file.
306        """
307
308        return self.h5pydata.attrs[attr_str]
309
310    def get_scan_group_attr_data(
311        self, scan_index, time_index, attr_group, attr_srt=None
312    ):
313        """
314        Get scan group attribute data from the HDF5 file.
315
316        Parameters
317        ----------
318        scan_index : int
319            The index of the scan.
320        time_index : int
321            The index of the time point.
322        attr_group : str
323            The attribute group.
324        attr_srt : str, optional
325            The attribute string. Default is None.
326
327        Returns
328        -------
329        dict
330            The attribute data.
331
332        Notes
333        -----
334        This method retrieves attribute data from the HDF5 file for a specific scan and time point.
335        The attribute data is stored in the specified attribute group.
336        If an attribute string is provided, only the corresponding attribute value is returned.
337        If no attribute string is provided, all attribute data in the group is returned as a dictionary.
338        """
339        # Get index of self.scans where scan_index_str is found
340        scan_label = self.scans[scan_index]
341
342        index_to_pull = self.get_time_index_to_pull(scan_label, time_index)
343
344        if attr_srt:
345            return json.loads(
346                self.h5pydata[scan_label][index_to_pull].attrs[attr_group]
347            )[attr_srt]
348
349        else:
350            data = self.h5pydata[scan_label][index_to_pull].attrs.get(attr_group)
351            if data:
352                return json.loads(data)
353            else:
354                return {}
355
356    def get_raw_data_attr_data(self, scan_index, attr_group, attr_str):
357        """
358        Get raw data attribute data from the HDF5 file.
359
360        Parameters
361        ----------
362        scan_index : int
363            The index of the scan.
364        attr_group : str
365            The attribute group.
366        attr_str : str
367            The attribute string.
368
369        Returns
370        -------
371        dict
372            The attribute data.
373
374        Raises
375        ------
376        KeyError
377            If the attribute string is not found in the attribute group.
378
379        Notes
380        -----
381        This method retrieves the attribute data associated with a specific scan, attribute group, and attribute string
382        from the HDF5 file. It returns the attribute data as a dictionary.
383
384        Example usage:
385        >>> data = get_raw_data_attr_data(0, "group1", "attribute1")
386        >>> print(data)
387        {'key1': 'value1', 'key2': 'value2'}
388        """
389        scan_label = self.scans[scan_index]
390        try:
391            json.loads(self.h5pydata[scan_label]["raw_ms"].attrs[attr_group])[attr_str]
392        except KeyError:
393            attr_str = attr_str.replace("baseline", "baselise")
394        return json.loads(self.h5pydata[scan_label]["raw_ms"].attrs[attr_group])[
395            attr_str
396        ]
397
398    def get_output_parameters(self, polarity, scan_index=0):
399        """
400        Get the output parameters for the mass spectrum.
401
402        Parameters
403        ----------
404        polarity : str
405            The polarity of the mass spectrum.
406        scan_index : int, optional
407            The index of the scan. Default is 0.
408
409        Returns
410        -------
411        dict
412            The output parameters.
413        """
414
415        d_params = default_parameters(self.file_location)
416        d_params["filename_path"] = self.file_location
417        d_params["polarity"] = self.get_raw_data_attr_data(
418            scan_index, "MassSpecAttrs", "polarity"
419        )
420        d_params["rt"] = self.get_raw_data_attr_data(scan_index, "MassSpecAttrs", "rt")
421
422        d_params["tic"] = self.get_raw_data_attr_data(
423            scan_index, "MassSpecAttrs", "tic"
424        )
425
426        d_params["mobility_scan"] = self.get_raw_data_attr_data(
427            scan_index, "MassSpecAttrs", "mobility_scan"
428        )
429        d_params["mobility_rt"] = self.get_raw_data_attr_data(
430            scan_index, "MassSpecAttrs", "mobility_rt"
431        )
432        d_params["Aterm"] = self.get_raw_data_attr_data(
433            scan_index, "MassSpecAttrs", "Aterm"
434        )
435        d_params["Bterm"] = self.get_raw_data_attr_data(
436            scan_index, "MassSpecAttrs", "Bterm"
437        )
438        d_params["Cterm"] = self.get_raw_data_attr_data(
439            scan_index, "MassSpecAttrs", "Cterm"
440        )
441        d_params["baseline_noise"] = self.get_raw_data_attr_data(
442            scan_index, "MassSpecAttrs", "baseline_noise"
443        )
444        d_params["baseline_noise_std"] = self.get_raw_data_attr_data(
445            scan_index, "MassSpecAttrs", "baseline_noise_std"
446        )
447
448        d_params["analyzer"] = self.get_high_level_attr_data("analyzer")
449        d_params["instrument_label"] = self.get_high_level_attr_data("instrument_label")
450        d_params["sample_name"] = self.get_high_level_attr_data("sample_name")
451
452        return d_params
class ReadCoreMSHDF_MassSpectrum(corems.mass_spectrum.input.massList.ReadCoremsMasslist):
 15class ReadCoreMSHDF_MassSpectrum(ReadCoremsMasslist):
 16    """Class for reading mass spectrum data from a CoreMS HDF5 file.
 17
 18    Attributes
 19    ----------
 20    h5pydata : h5py.File
 21        The HDF5 file object.
 22    scans : list
 23        List of scan labels in the HDF5 file.
 24
 25    Parameters
 26    ----------
 27    file_location : str or S3Path
 28        The path to the CoreMS HDF5 file.
 29
 30    Methods
 31    -------
 32    * load_raw_data(mass_spectrum, scan_index=0) Load raw data into the mass spectrum object.
 33    * get_mass_spectrum(scan_number=0, time_index=-1, auto_process=True, load_settings=True, load_raw=True).Get a mass spectrum object.
 34    * load_settings(mass_spectrum, scan_index=0, time_index=-1). Load settings into the mass spectrum object.
 35    * get_dataframe(scan_index=0, time_index=-1). Get a pandas DataFrame representing the mass spectrum.
 36    * get_time_index_to_pull(scan_label, time_index). Get the time index to pull from the HDF5 file.
 37    * get_high_level_attr_data(attr_str). Get high-level attribute data from the HDF5 file.
 38    * get_scan_group_attr_data(scan_index, time_index, attr_group, attr_srt=None). Get scan group attribute data from the HDF5 file.
 39    * get_raw_data_attr_data(scan_index, attr_group, attr_str). Get raw data attribute data from the HDF5 file.
 40    * get_output_parameters(polarity, scan_index=0). Get the output parameters for the mass spectrum.
 41    """
 42
 43    def __init__(self, file_location):
 44        super().__init__(file_location)
 45
 46        if isinstance(self.file_location, S3Path):
 47            data = BytesIO(self.file_location.open("rb").read())
 48        else:
 49            data = self.file_location
 50
 51        self.h5pydata = h5py.File(data, "r")
 52
 53        self.scans = list(self.h5pydata.keys())
 54
 55    def load_raw_data(self, mass_spectrum, scan_index=0):
 56        """
 57        Load raw data into the mass spectrum object.
 58
 59        Parameters
 60        ----------
 61        mass_spectrum : MassSpecCentroid
 62            The mass spectrum object to load the raw data into.
 63        scan_index : int, optional
 64            The index of the scan to load the raw data from. Default is 0.
 65        """
 66
 67        scan_label = self.scans[scan_index]
 68
 69        # Check if the "raw_ms" group in the scan is empty
 70        if self.h5pydata[scan_label]["raw_ms"].shape is not None:
 71            mz_profile = self.h5pydata[scan_label]["raw_ms"][0]
 72
 73            abundance_profile = self.h5pydata[scan_label]["raw_ms"][1]
 74
 75            mass_spectrum.mz_exp_profile = mz_profile
 76
 77            mass_spectrum.abundance_profile = abundance_profile
 78
 79    def get_mass_spectrum(
 80        self,
 81        scan_number=0,
 82        time_index=-1,
 83        auto_process=True,
 84        load_settings=True,
 85        load_raw=True,
 86        load_molecular_formula=True,
 87    ):
 88        """
 89        Instantiate a mass spectrum object from the CoreMS HDF5 file.
 90        Note that this always returns a centroid mass spectrum object; functionality for profile and
 91        frequency mass spectra is not yet implemented.
 92
 93        Parameters
 94        ----------
 95        scan_number : int, optional
 96            The index of the scan to retrieve the mass spectrum from. Default is 0.
 97        time_index : int, optional
 98            The index of the time point to retrieve the mass spectrum from. Default is -1.
 99        auto_process : bool, optional
100            Whether to automatically process the mass spectrum. Default is True.
101        load_settings : bool, optional
102            Whether to load the settings into the mass spectrum object. Default is True.
103        load_raw : bool, optional
104            Whether to load the raw data into the mass spectrum object. Default is True.
105        load_molecular_formula : bool, optional
106            Whether to load the molecular formula into the mass spectrum object.
107            Default is True.
108
109        Returns
110        -------
111        MassSpecCentroid
112            The mass spectrum object.
113
114        Raises
115        ------
116        ValueError
117            If the CoreMS file is not valid.
118            If the mass spectrum has not been processed and load_molecular_formula is True.
119        """
120        if "mass_spectra" in self.scans[0]:
121            scan_index = self.scans.index("mass_spectra/" + str(scan_number))
122        else:
123            scan_index = self.scans.index(str(scan_number))
124        dataframe = self.get_dataframe(scan_index, time_index=time_index)
125        if dataframe["Molecular Formula"].any() and not dataframe["C"].any():
126            cols = dataframe.columns.tolist()
127            cols = cols[cols.index("Molecular Formula") + 1 :]
128            for index, row in dataframe.iterrows():
129                if row["Molecular Formula"] is not None:
130                    og_formula = row["Molecular Formula"]
131                    for col in cols:
132                        if "col" in og_formula:
133                            # get the digit after the element ("col") in the molecular formula and set it to the dataframe
134                            row[col] = int(og_formula.split(col)[1].split(" ")[0])
135
136        if not set(
137            ["H/C", "O/C", "Heteroatom Class", "Ion Type", "Is Isotopologue"]
138        ).issubset(dataframe.columns):
139            raise ValueError(
140                "%s it is not a valid CoreMS file" % str(self.file_location)
141            )
142
143        dataframe.rename(columns=self.parameters.header_translate, inplace=True)
144
145        # Cast m/z, and 'Peak Height' to float
146        dataframe["m/z"] = dataframe["m/z"].astype(float)
147        dataframe["Peak Height"] = dataframe["Peak Height"].astype(float)
148
149        polarity = dataframe["Ion Charge"].values[0]
150
151        output_parameters = self.get_output_parameters(polarity, scan_index=scan_index)
152
153        mass_spec_obj = MassSpecCentroid(
154            dataframe.to_dict(orient="list"), output_parameters, auto_process=False
155        )
156
157        if auto_process:
158            # Set the settings on the mass spectrum object to relative abuncance of 0 so all peaks get added
159            mass_spec_obj.settings.noise_threshold_method = "absolute_abundance"
160            mass_spec_obj.settings.noise_threshold_absolute_abundance = 0
161            mass_spec_obj.process_mass_spec()
162
163        if load_settings:
164            # Load settings into the mass spectrum object
165            self.load_settings(
166                mass_spec_obj, scan_index=scan_index, time_index=time_index
167            )
168
169        if load_raw:
170            self.load_raw_data(mass_spec_obj, scan_index=scan_index)
171
172        if load_molecular_formula:
173            if not auto_process:
174                raise ValueError(
175                    "Can only add molecular formula if the mass spectrum has been processed"
176                )
177            else:
178                self.add_molecular_formula(mass_spec_obj, dataframe)
179
180        return mass_spec_obj
181
182    def load_settings(self, mass_spectrum, scan_index=0, time_index=-1):
183        """
184        Load settings into the mass spectrum object.
185
186        Parameters
187        ----------
188        mass_spectrum : MassSpecCentroid
189            The mass spectrum object to load the settings into.
190        scan_index : int, optional
191            The index of the scan to load the settings from. Default is 0.
192        time_index : int, optional
193            The index of the time point to load the settings from. Default is -1.
194        """
195
196        loaded_settings = {}
197        loaded_settings["MoleculaSearch"] = self.get_scan_group_attr_data(
198            scan_index, time_index, "MoleculaSearchSetting"
199        )
200        loaded_settings["MassSpecPeak"] = self.get_scan_group_attr_data(
201            scan_index, time_index, "MassSpecPeakSetting"
202        )
203        loaded_settings["MassSpectrum"] = self.get_scan_group_attr_data(
204            scan_index, time_index, "MassSpectrumSetting"
205        )
206        loaded_settings["Transient"] = self.get_scan_group_attr_data(
207            scan_index, time_index, "TransientSetting"
208        )
209
210        _set_dict_data_ms(loaded_settings, mass_spectrum)
211
212    def get_dataframe(self, scan_index=0, time_index=-1):
213        """
214        Get a pandas DataFrame representing the mass spectrum.
215
216        Parameters
217        ----------
218        scan_index : int, optional
219            The index of the scan to retrieve the DataFrame from. Default is 0.
220        time_index : int, optional
221            The index of the time point to retrieve the DataFrame from. Default is -1.
222
223        Returns
224        -------
225        DataFrame
226            The pandas DataFrame representing the mass spectrum.
227        """
228
229        columnsLabels = self.get_scan_group_attr_data(
230            scan_index, time_index, "ColumnsLabels"
231        )
232
233        scan_label = self.scans[scan_index]
234
235        index_to_pull = self.get_time_index_to_pull(scan_label, time_index)
236
237        corems_table_data = self.h5pydata[scan_label][index_to_pull]
238
239        list_dict = []
240        for row in corems_table_data:
241            data_dict = {}
242            for data_index, data in enumerate(row):
243                label = columnsLabels[data_index]
244                # if data starts with a b' it is a byte string, so decode it
245                if isinstance(data, bytes):
246                    data = data.decode("utf-8")
247                if data == "nan":
248                    data = None
249                data_dict[label] = data
250
251            list_dict.append(data_dict)
252
253        # Reorder the columns from low to high "Index" to match the order of the dataframe
254        df = DataFrame(list_dict)
255        # set the "Index" column to int so it sorts correctly
256        df["Index"] = df["Index"].astype(int)
257        df = df.sort_values(by="Index")
258        # Reset index to match the "Index" column
259        df = df.set_index("Index", drop=False)
260
261        return df
262
263    def get_time_index_to_pull(self, scan_label, time_index):
264        """
265        Get the time index to pull from the HDF5 file.
266
267        Parameters
268        ----------
269        scan_label : str
270            The label of the scan.
271        time_index : int
272            The index of the time point.
273
274        Returns
275        -------
276        str
277            The time index to pull.
278        """
279
280        time_data = sorted(
281            [(i, int(i)) for i in self.h5pydata[scan_label].keys() if i != "raw_ms"],
282            key=lambda m: m[1],
283        )
284
285        index_to_pull = time_data[time_index][0]
286
287        return index_to_pull
288
289    def get_high_level_attr_data(self, attr_str):
290        """
291        Get high-level attribute data from the HDF5 file.
292
293        Parameters
294        ----------
295        attr_str : str
296            The attribute string.
297
298        Returns
299        -------
300        dict
301            The attribute data.
302
303        Raises
304        ------
305        KeyError
306            If the attribute string is not found in the HDF5 file.
307        """
308
309        return self.h5pydata.attrs[attr_str]
310
311    def get_scan_group_attr_data(
312        self, scan_index, time_index, attr_group, attr_srt=None
313    ):
314        """
315        Get scan group attribute data from the HDF5 file.
316
317        Parameters
318        ----------
319        scan_index : int
320            The index of the scan.
321        time_index : int
322            The index of the time point.
323        attr_group : str
324            The attribute group.
325        attr_srt : str, optional
326            The attribute string. Default is None.
327
328        Returns
329        -------
330        dict
331            The attribute data.
332
333        Notes
334        -----
335        This method retrieves attribute data from the HDF5 file for a specific scan and time point.
336        The attribute data is stored in the specified attribute group.
337        If an attribute string is provided, only the corresponding attribute value is returned.
338        If no attribute string is provided, all attribute data in the group is returned as a dictionary.
339        """
340        # Get index of self.scans where scan_index_str is found
341        scan_label = self.scans[scan_index]
342
343        index_to_pull = self.get_time_index_to_pull(scan_label, time_index)
344
345        if attr_srt:
346            return json.loads(
347                self.h5pydata[scan_label][index_to_pull].attrs[attr_group]
348            )[attr_srt]
349
350        else:
351            data = self.h5pydata[scan_label][index_to_pull].attrs.get(attr_group)
352            if data:
353                return json.loads(data)
354            else:
355                return {}
356
357    def get_raw_data_attr_data(self, scan_index, attr_group, attr_str):
358        """
359        Get raw data attribute data from the HDF5 file.
360
361        Parameters
362        ----------
363        scan_index : int
364            The index of the scan.
365        attr_group : str
366            The attribute group.
367        attr_str : str
368            The attribute string.
369
370        Returns
371        -------
372        dict
373            The attribute data.
374
375        Raises
376        ------
377        KeyError
378            If the attribute string is not found in the attribute group.
379
380        Notes
381        -----
382        This method retrieves the attribute data associated with a specific scan, attribute group, and attribute string
383        from the HDF5 file. It returns the attribute data as a dictionary.
384
385        Example usage:
386        >>> data = get_raw_data_attr_data(0, "group1", "attribute1")
387        >>> print(data)
388        {'key1': 'value1', 'key2': 'value2'}
389        """
390        scan_label = self.scans[scan_index]
391        try:
392            json.loads(self.h5pydata[scan_label]["raw_ms"].attrs[attr_group])[attr_str]
393        except KeyError:
394            attr_str = attr_str.replace("baseline", "baselise")
395        return json.loads(self.h5pydata[scan_label]["raw_ms"].attrs[attr_group])[
396            attr_str
397        ]
398
399    def get_output_parameters(self, polarity, scan_index=0):
400        """
401        Get the output parameters for the mass spectrum.
402
403        Parameters
404        ----------
405        polarity : str
406            The polarity of the mass spectrum.
407        scan_index : int, optional
408            The index of the scan. Default is 0.
409
410        Returns
411        -------
412        dict
413            The output parameters.
414        """
415
416        d_params = default_parameters(self.file_location)
417        d_params["filename_path"] = self.file_location
418        d_params["polarity"] = self.get_raw_data_attr_data(
419            scan_index, "MassSpecAttrs", "polarity"
420        )
421        d_params["rt"] = self.get_raw_data_attr_data(scan_index, "MassSpecAttrs", "rt")
422
423        d_params["tic"] = self.get_raw_data_attr_data(
424            scan_index, "MassSpecAttrs", "tic"
425        )
426
427        d_params["mobility_scan"] = self.get_raw_data_attr_data(
428            scan_index, "MassSpecAttrs", "mobility_scan"
429        )
430        d_params["mobility_rt"] = self.get_raw_data_attr_data(
431            scan_index, "MassSpecAttrs", "mobility_rt"
432        )
433        d_params["Aterm"] = self.get_raw_data_attr_data(
434            scan_index, "MassSpecAttrs", "Aterm"
435        )
436        d_params["Bterm"] = self.get_raw_data_attr_data(
437            scan_index, "MassSpecAttrs", "Bterm"
438        )
439        d_params["Cterm"] = self.get_raw_data_attr_data(
440            scan_index, "MassSpecAttrs", "Cterm"
441        )
442        d_params["baseline_noise"] = self.get_raw_data_attr_data(
443            scan_index, "MassSpecAttrs", "baseline_noise"
444        )
445        d_params["baseline_noise_std"] = self.get_raw_data_attr_data(
446            scan_index, "MassSpecAttrs", "baseline_noise_std"
447        )
448
449        d_params["analyzer"] = self.get_high_level_attr_data("analyzer")
450        d_params["instrument_label"] = self.get_high_level_attr_data("instrument_label")
451        d_params["sample_name"] = self.get_high_level_attr_data("sample_name")
452
453        return d_params

Class for reading mass spectrum data from a CoreMS HDF5 file.

Attributes
  • h5pydata (h5py.File): The HDF5 file object.
  • scans (list): List of scan labels in the HDF5 file.
Parameters
  • file_location (str or S3Path): The path to the CoreMS HDF5 file.
Methods
  • load_raw_data(mass_spectrum, scan_index=0) Load raw data into the mass spectrum object.
  • get_mass_spectrum(scan_number=0, time_index=-1, auto_process=True, load_settings=True, load_raw=True).Get a mass spectrum object.
  • load_settings(mass_spectrum, scan_index=0, time_index=-1). Load settings into the mass spectrum object.
  • get_dataframe(scan_index=0, time_index=-1). Get a pandas DataFrame representing the mass spectrum.
  • get_time_index_to_pull(scan_label, time_index). Get the time index to pull from the HDF5 file.
  • get_high_level_attr_data(attr_str). Get high-level attribute data from the HDF5 file.
  • get_scan_group_attr_data(scan_index, time_index, attr_group, attr_srt=None). Get scan group attribute data from the HDF5 file.
  • get_raw_data_attr_data(scan_index, attr_group, attr_str). Get raw data attribute data from the HDF5 file.
  • get_output_parameters(polarity, scan_index=0). Get the output parameters for the mass spectrum.
ReadCoreMSHDF_MassSpectrum(file_location)
43    def __init__(self, file_location):
44        super().__init__(file_location)
45
46        if isinstance(self.file_location, S3Path):
47            data = BytesIO(self.file_location.open("rb").read())
48        else:
49            data = self.file_location
50
51        self.h5pydata = h5py.File(data, "r")
52
53        self.scans = list(self.h5pydata.keys())
h5pydata
scans
def load_raw_data(self, mass_spectrum, scan_index=0):
55    def load_raw_data(self, mass_spectrum, scan_index=0):
56        """
57        Load raw data into the mass spectrum object.
58
59        Parameters
60        ----------
61        mass_spectrum : MassSpecCentroid
62            The mass spectrum object to load the raw data into.
63        scan_index : int, optional
64            The index of the scan to load the raw data from. Default is 0.
65        """
66
67        scan_label = self.scans[scan_index]
68
69        # Check if the "raw_ms" group in the scan is empty
70        if self.h5pydata[scan_label]["raw_ms"].shape is not None:
71            mz_profile = self.h5pydata[scan_label]["raw_ms"][0]
72
73            abundance_profile = self.h5pydata[scan_label]["raw_ms"][1]
74
75            mass_spectrum.mz_exp_profile = mz_profile
76
77            mass_spectrum.abundance_profile = abundance_profile

Load raw data into the mass spectrum object.

Parameters
  • mass_spectrum (MassSpecCentroid): The mass spectrum object to load the raw data into.
  • scan_index (int, optional): The index of the scan to load the raw data from. Default is 0.
def get_mass_spectrum( self, scan_number=0, time_index=-1, auto_process=True, load_settings=True, load_raw=True, load_molecular_formula=True):
 79    def get_mass_spectrum(
 80        self,
 81        scan_number=0,
 82        time_index=-1,
 83        auto_process=True,
 84        load_settings=True,
 85        load_raw=True,
 86        load_molecular_formula=True,
 87    ):
 88        """
 89        Instantiate a mass spectrum object from the CoreMS HDF5 file.
 90        Note that this always returns a centroid mass spectrum object; functionality for profile and
 91        frequency mass spectra is not yet implemented.
 92
 93        Parameters
 94        ----------
 95        scan_number : int, optional
 96            The index of the scan to retrieve the mass spectrum from. Default is 0.
 97        time_index : int, optional
 98            The index of the time point to retrieve the mass spectrum from. Default is -1.
 99        auto_process : bool, optional
100            Whether to automatically process the mass spectrum. Default is True.
101        load_settings : bool, optional
102            Whether to load the settings into the mass spectrum object. Default is True.
103        load_raw : bool, optional
104            Whether to load the raw data into the mass spectrum object. Default is True.
105        load_molecular_formula : bool, optional
106            Whether to load the molecular formula into the mass spectrum object.
107            Default is True.
108
109        Returns
110        -------
111        MassSpecCentroid
112            The mass spectrum object.
113
114        Raises
115        ------
116        ValueError
117            If the CoreMS file is not valid.
118            If the mass spectrum has not been processed and load_molecular_formula is True.
119        """
120        if "mass_spectra" in self.scans[0]:
121            scan_index = self.scans.index("mass_spectra/" + str(scan_number))
122        else:
123            scan_index = self.scans.index(str(scan_number))
124        dataframe = self.get_dataframe(scan_index, time_index=time_index)
125        if dataframe["Molecular Formula"].any() and not dataframe["C"].any():
126            cols = dataframe.columns.tolist()
127            cols = cols[cols.index("Molecular Formula") + 1 :]
128            for index, row in dataframe.iterrows():
129                if row["Molecular Formula"] is not None:
130                    og_formula = row["Molecular Formula"]
131                    for col in cols:
132                        if "col" in og_formula:
133                            # get the digit after the element ("col") in the molecular formula and set it to the dataframe
134                            row[col] = int(og_formula.split(col)[1].split(" ")[0])
135
136        if not set(
137            ["H/C", "O/C", "Heteroatom Class", "Ion Type", "Is Isotopologue"]
138        ).issubset(dataframe.columns):
139            raise ValueError(
140                "%s it is not a valid CoreMS file" % str(self.file_location)
141            )
142
143        dataframe.rename(columns=self.parameters.header_translate, inplace=True)
144
145        # Cast m/z, and 'Peak Height' to float
146        dataframe["m/z"] = dataframe["m/z"].astype(float)
147        dataframe["Peak Height"] = dataframe["Peak Height"].astype(float)
148
149        polarity = dataframe["Ion Charge"].values[0]
150
151        output_parameters = self.get_output_parameters(polarity, scan_index=scan_index)
152
153        mass_spec_obj = MassSpecCentroid(
154            dataframe.to_dict(orient="list"), output_parameters, auto_process=False
155        )
156
157        if auto_process:
158            # Set the settings on the mass spectrum object to relative abuncance of 0 so all peaks get added
159            mass_spec_obj.settings.noise_threshold_method = "absolute_abundance"
160            mass_spec_obj.settings.noise_threshold_absolute_abundance = 0
161            mass_spec_obj.process_mass_spec()
162
163        if load_settings:
164            # Load settings into the mass spectrum object
165            self.load_settings(
166                mass_spec_obj, scan_index=scan_index, time_index=time_index
167            )
168
169        if load_raw:
170            self.load_raw_data(mass_spec_obj, scan_index=scan_index)
171
172        if load_molecular_formula:
173            if not auto_process:
174                raise ValueError(
175                    "Can only add molecular formula if the mass spectrum has been processed"
176                )
177            else:
178                self.add_molecular_formula(mass_spec_obj, dataframe)
179
180        return mass_spec_obj

Instantiate a mass spectrum object from the CoreMS HDF5 file. Note that this always returns a centroid mass spectrum object; functionality for profile and frequency mass spectra is not yet implemented.

Parameters
  • scan_number (int, optional): The index of the scan to retrieve the mass spectrum from. Default is 0.
  • time_index (int, optional): The index of the time point to retrieve the mass spectrum from. Default is -1.
  • auto_process (bool, optional): Whether to automatically process the mass spectrum. Default is True.
  • load_settings (bool, optional): Whether to load the settings into the mass spectrum object. Default is True.
  • load_raw (bool, optional): Whether to load the raw data into the mass spectrum object. Default is True.
  • load_molecular_formula (bool, optional): Whether to load the molecular formula into the mass spectrum object. Default is True.
Returns
  • MassSpecCentroid: The mass spectrum object.
Raises
  • ValueError: If the CoreMS file is not valid. If the mass spectrum has not been processed and load_molecular_formula is True.
def load_settings(self, mass_spectrum, scan_index=0, time_index=-1):
182    def load_settings(self, mass_spectrum, scan_index=0, time_index=-1):
183        """
184        Load settings into the mass spectrum object.
185
186        Parameters
187        ----------
188        mass_spectrum : MassSpecCentroid
189            The mass spectrum object to load the settings into.
190        scan_index : int, optional
191            The index of the scan to load the settings from. Default is 0.
192        time_index : int, optional
193            The index of the time point to load the settings from. Default is -1.
194        """
195
196        loaded_settings = {}
197        loaded_settings["MoleculaSearch"] = self.get_scan_group_attr_data(
198            scan_index, time_index, "MoleculaSearchSetting"
199        )
200        loaded_settings["MassSpecPeak"] = self.get_scan_group_attr_data(
201            scan_index, time_index, "MassSpecPeakSetting"
202        )
203        loaded_settings["MassSpectrum"] = self.get_scan_group_attr_data(
204            scan_index, time_index, "MassSpectrumSetting"
205        )
206        loaded_settings["Transient"] = self.get_scan_group_attr_data(
207            scan_index, time_index, "TransientSetting"
208        )
209
210        _set_dict_data_ms(loaded_settings, mass_spectrum)

Load settings into the mass spectrum object.

Parameters
  • mass_spectrum (MassSpecCentroid): The mass spectrum object to load the settings into.
  • scan_index (int, optional): The index of the scan to load the settings from. Default is 0.
  • time_index (int, optional): The index of the time point to load the settings from. Default is -1.
def get_dataframe(self, scan_index=0, time_index=-1):
212    def get_dataframe(self, scan_index=0, time_index=-1):
213        """
214        Get a pandas DataFrame representing the mass spectrum.
215
216        Parameters
217        ----------
218        scan_index : int, optional
219            The index of the scan to retrieve the DataFrame from. Default is 0.
220        time_index : int, optional
221            The index of the time point to retrieve the DataFrame from. Default is -1.
222
223        Returns
224        -------
225        DataFrame
226            The pandas DataFrame representing the mass spectrum.
227        """
228
229        columnsLabels = self.get_scan_group_attr_data(
230            scan_index, time_index, "ColumnsLabels"
231        )
232
233        scan_label = self.scans[scan_index]
234
235        index_to_pull = self.get_time_index_to_pull(scan_label, time_index)
236
237        corems_table_data = self.h5pydata[scan_label][index_to_pull]
238
239        list_dict = []
240        for row in corems_table_data:
241            data_dict = {}
242            for data_index, data in enumerate(row):
243                label = columnsLabels[data_index]
244                # if data starts with a b' it is a byte string, so decode it
245                if isinstance(data, bytes):
246                    data = data.decode("utf-8")
247                if data == "nan":
248                    data = None
249                data_dict[label] = data
250
251            list_dict.append(data_dict)
252
253        # Reorder the columns from low to high "Index" to match the order of the dataframe
254        df = DataFrame(list_dict)
255        # set the "Index" column to int so it sorts correctly
256        df["Index"] = df["Index"].astype(int)
257        df = df.sort_values(by="Index")
258        # Reset index to match the "Index" column
259        df = df.set_index("Index", drop=False)
260
261        return df

Get a pandas DataFrame representing the mass spectrum.

Parameters
  • scan_index (int, optional): The index of the scan to retrieve the DataFrame from. Default is 0.
  • time_index (int, optional): The index of the time point to retrieve the DataFrame from. Default is -1.
Returns
  • DataFrame: The pandas DataFrame representing the mass spectrum.
def get_time_index_to_pull(self, scan_label, time_index):
263    def get_time_index_to_pull(self, scan_label, time_index):
264        """
265        Get the time index to pull from the HDF5 file.
266
267        Parameters
268        ----------
269        scan_label : str
270            The label of the scan.
271        time_index : int
272            The index of the time point.
273
274        Returns
275        -------
276        str
277            The time index to pull.
278        """
279
280        time_data = sorted(
281            [(i, int(i)) for i in self.h5pydata[scan_label].keys() if i != "raw_ms"],
282            key=lambda m: m[1],
283        )
284
285        index_to_pull = time_data[time_index][0]
286
287        return index_to_pull

Get the time index to pull from the HDF5 file.

Parameters
  • scan_label (str): The label of the scan.
  • time_index (int): The index of the time point.
Returns
  • str: The time index to pull.
def get_high_level_attr_data(self, attr_str):
289    def get_high_level_attr_data(self, attr_str):
290        """
291        Get high-level attribute data from the HDF5 file.
292
293        Parameters
294        ----------
295        attr_str : str
296            The attribute string.
297
298        Returns
299        -------
300        dict
301            The attribute data.
302
303        Raises
304        ------
305        KeyError
306            If the attribute string is not found in the HDF5 file.
307        """
308
309        return self.h5pydata.attrs[attr_str]

Get high-level attribute data from the HDF5 file.

Parameters
  • attr_str (str): The attribute string.
Returns
  • dict: The attribute data.
Raises
  • KeyError: If the attribute string is not found in the HDF5 file.
def get_scan_group_attr_data(self, scan_index, time_index, attr_group, attr_srt=None):
311    def get_scan_group_attr_data(
312        self, scan_index, time_index, attr_group, attr_srt=None
313    ):
314        """
315        Get scan group attribute data from the HDF5 file.
316
317        Parameters
318        ----------
319        scan_index : int
320            The index of the scan.
321        time_index : int
322            The index of the time point.
323        attr_group : str
324            The attribute group.
325        attr_srt : str, optional
326            The attribute string. Default is None.
327
328        Returns
329        -------
330        dict
331            The attribute data.
332
333        Notes
334        -----
335        This method retrieves attribute data from the HDF5 file for a specific scan and time point.
336        The attribute data is stored in the specified attribute group.
337        If an attribute string is provided, only the corresponding attribute value is returned.
338        If no attribute string is provided, all attribute data in the group is returned as a dictionary.
339        """
340        # Get index of self.scans where scan_index_str is found
341        scan_label = self.scans[scan_index]
342
343        index_to_pull = self.get_time_index_to_pull(scan_label, time_index)
344
345        if attr_srt:
346            return json.loads(
347                self.h5pydata[scan_label][index_to_pull].attrs[attr_group]
348            )[attr_srt]
349
350        else:
351            data = self.h5pydata[scan_label][index_to_pull].attrs.get(attr_group)
352            if data:
353                return json.loads(data)
354            else:
355                return {}

Get scan group attribute data from the HDF5 file.

Parameters
  • scan_index (int): The index of the scan.
  • time_index (int): The index of the time point.
  • attr_group (str): The attribute group.
  • attr_srt (str, optional): The attribute string. Default is None.
Returns
  • dict: The attribute data.
Notes

This method retrieves attribute data from the HDF5 file for a specific scan and time point. The attribute data is stored in the specified attribute group. If an attribute string is provided, only the corresponding attribute value is returned. If no attribute string is provided, all attribute data in the group is returned as a dictionary.

def get_raw_data_attr_data(self, scan_index, attr_group, attr_str):
357    def get_raw_data_attr_data(self, scan_index, attr_group, attr_str):
358        """
359        Get raw data attribute data from the HDF5 file.
360
361        Parameters
362        ----------
363        scan_index : int
364            The index of the scan.
365        attr_group : str
366            The attribute group.
367        attr_str : str
368            The attribute string.
369
370        Returns
371        -------
372        dict
373            The attribute data.
374
375        Raises
376        ------
377        KeyError
378            If the attribute string is not found in the attribute group.
379
380        Notes
381        -----
382        This method retrieves the attribute data associated with a specific scan, attribute group, and attribute string
383        from the HDF5 file. It returns the attribute data as a dictionary.
384
385        Example usage:
386        >>> data = get_raw_data_attr_data(0, "group1", "attribute1")
387        >>> print(data)
388        {'key1': 'value1', 'key2': 'value2'}
389        """
390        scan_label = self.scans[scan_index]
391        try:
392            json.loads(self.h5pydata[scan_label]["raw_ms"].attrs[attr_group])[attr_str]
393        except KeyError:
394            attr_str = attr_str.replace("baseline", "baselise")
395        return json.loads(self.h5pydata[scan_label]["raw_ms"].attrs[attr_group])[
396            attr_str
397        ]

Get raw data attribute data from the HDF5 file.

Parameters
  • scan_index (int): The index of the scan.
  • attr_group (str): The attribute group.
  • attr_str (str): The attribute string.
Returns
  • dict: The attribute data.
Raises
  • KeyError: If the attribute string is not found in the attribute group.
Notes

This method retrieves the attribute data associated with a specific scan, attribute group, and attribute string from the HDF5 file. It returns the attribute data as a dictionary.

Example usage:

>>> data = get_raw_data_attr_data(0, "group1", "attribute1")
>>> print(data)
{'key1': 'value1', 'key2': 'value2'}
def get_output_parameters(self, polarity, scan_index=0):
399    def get_output_parameters(self, polarity, scan_index=0):
400        """
401        Get the output parameters for the mass spectrum.
402
403        Parameters
404        ----------
405        polarity : str
406            The polarity of the mass spectrum.
407        scan_index : int, optional
408            The index of the scan. Default is 0.
409
410        Returns
411        -------
412        dict
413            The output parameters.
414        """
415
416        d_params = default_parameters(self.file_location)
417        d_params["filename_path"] = self.file_location
418        d_params["polarity"] = self.get_raw_data_attr_data(
419            scan_index, "MassSpecAttrs", "polarity"
420        )
421        d_params["rt"] = self.get_raw_data_attr_data(scan_index, "MassSpecAttrs", "rt")
422
423        d_params["tic"] = self.get_raw_data_attr_data(
424            scan_index, "MassSpecAttrs", "tic"
425        )
426
427        d_params["mobility_scan"] = self.get_raw_data_attr_data(
428            scan_index, "MassSpecAttrs", "mobility_scan"
429        )
430        d_params["mobility_rt"] = self.get_raw_data_attr_data(
431            scan_index, "MassSpecAttrs", "mobility_rt"
432        )
433        d_params["Aterm"] = self.get_raw_data_attr_data(
434            scan_index, "MassSpecAttrs", "Aterm"
435        )
436        d_params["Bterm"] = self.get_raw_data_attr_data(
437            scan_index, "MassSpecAttrs", "Bterm"
438        )
439        d_params["Cterm"] = self.get_raw_data_attr_data(
440            scan_index, "MassSpecAttrs", "Cterm"
441        )
442        d_params["baseline_noise"] = self.get_raw_data_attr_data(
443            scan_index, "MassSpecAttrs", "baseline_noise"
444        )
445        d_params["baseline_noise_std"] = self.get_raw_data_attr_data(
446            scan_index, "MassSpecAttrs", "baseline_noise_std"
447        )
448
449        d_params["analyzer"] = self.get_high_level_attr_data("analyzer")
450        d_params["instrument_label"] = self.get_high_level_attr_data("instrument_label")
451        d_params["sample_name"] = self.get_high_level_attr_data("sample_name")
452
453        return d_params

Get the output parameters for the mass spectrum.

Parameters
  • polarity (str): The polarity of the mass spectrum.
  • scan_index (int, optional): The index of the scan. Default is 0.
Returns
  • dict: The output parameters.