corems.mass_spectra.calc.MZSearch

  1__author__ = "Yuri E. Corilo"
  2__date__ = "Jun 09, 2021"
  3
  4from threading import Thread
  5from dataclasses import dataclass
  6from typing import List
  7
  8
  9@dataclass
 10class SearchResults:
 11    calculated_mz: float
 12    exp_mz: float
 13    error: float
 14    tolerance: float
 15
 16
 17class MZSearch(Thread):
 18    def __init__(
 19        self,
 20        exp_mzs: List[float],
 21        calculated_mzs: List[float],
 22        tolerance,
 23        method="ppm",
 24        average_target_mz=True,
 25    ):
 26        """
 27        Parameters
 28        ----------
 29        calculated_mzs: [float] calculated m/z
 30        exp_mzs: [float] experimental m/z
 31        method: string,
 32            ppm or ppb
 33        call run to trigger the m/z search algorithm
 34        or start if using it as thread
 35        """
 36        Thread.__init__(self)
 37        # placeholder for the results
 38        self._matched_mz = {}
 39
 40        self._calculated_mzs = calculated_mzs
 41
 42        self._matched_mz = {}
 43
 44        self._averaged_target_mz = []
 45
 46        self._exp_mzs = exp_mzs
 47
 48        self._tolerance = tolerance
 49        self.method = method
 50
 51        if average_target_mz:
 52            self.colapse_calculated()
 53
 54    @property
 55    def results(self):
 56        """{calculated_mz: [SearchResults]}
 57        contains the results of the search
 58        """
 59        return self._matched_mz
 60
 61    @property
 62    def averaged_target_mz(self):
 63        """[float]
 64        contains the average target m/z to be searched against
 65        """
 66        return self._averaged_target_mz
 67
 68    @property
 69    def calculated_mzs(self):
 70        """[float]
 71        contains the mz target to be searched against
 72        """
 73        if self.averaged_target_mz:
 74            return sorted(self.averaged_target_mz)
 75        else:
 76            return sorted(list(self._calculated_mzs))
 77
 78    @property
 79    def exp_mzs(self):
 80        """[float]
 81        contains the exp mz to be searched against
 82        """
 83        return self._exp_mzs
 84
 85    @property
 86    def method(self):
 87        return self._method
 88
 89    @method.setter
 90    def method(self, method):
 91        """
 92        method: string,
 93           ppm or ppb
 94        """
 95        if method not in ["ppm" or "ppb"]:
 96            raise ValueError("Method should be ppm or ppb")
 97        self._method = method
 98
 99    @property
100    def tolerance(self):
101        return self._tolerance
102
103    @tolerance.setter
104    def tolerance(self, tolerance):
105        """
106        method: string,
107           ppm or ppb
108        """
109        if tolerance < 0:
110            raise ValueError("Tolerance needs to be a positive number")
111        self._tolerance = tolerance
112
113    def colapse_calculated(self):
114        if len(self.calculated_mzs) > 1:
115            all_mz = []
116            subset = set()
117
118            i = -1
119            while True:
120                i = i + 1
121
122                if i == len(self.calculated_mzs) - 1:
123                    all_mz.append({i})
124                    # print(i, 'break1')
125                    break
126
127                if i >= len(self.calculated_mzs) - 1:
128                    # print(i, 'break2')
129                    break
130
131                error = self.calc_mz_error(
132                    self.calculated_mzs[i], self.calculated_mzs[i + 1]
133                )
134
135                # print(self.tolerance)
136
137                check_error = self.check_ppm_error(self.tolerance, error)
138
139                if not check_error:
140                    start_list = {i}
141
142                else:
143                    start_list = set()
144
145                while check_error:
146                    start_list.add(i)
147                    start_list.add(i + 1)
148
149                    i = i + 1
150
151                    if i == len(self.calculated_mzs) - 1:
152                        start_list.add(i)
153                        # print(i, 'break3')
154                        break
155
156                    error = self.calc_mz_error(
157                        self.calculated_mzs[i], self.calculated_mzs[i + 1]
158                    )
159                    check_error = self.check_ppm_error(self.tolerance, error)
160
161                if start_list:
162                    all_mz.append(start_list)
163
164            results = []
165            for each in all_mz:
166                # print(each)
167                mzs = [self.calculated_mzs[i] for i in each]
168                results.append(sum(mzs) / len(mzs))
169
170            # print(results)
171            self._averaged_target_mz = results
172
173    def run(self):
174        dict_nominal_exp_mz = self.get_nominal_exp(self.exp_mzs)
175
176        for calculated_mz in self.calculated_mzs:
177            nominal_selected_mz = int(calculated_mz)
178
179            if nominal_selected_mz in dict_nominal_exp_mz.keys():
180                self.search_mz(self.results, dict_nominal_exp_mz, calculated_mz, 0)
181
182            elif nominal_selected_mz - 1 in dict_nominal_exp_mz.keys():
183                self.search_mz(self.results, dict_nominal_exp_mz, calculated_mz, -1)
184
185            elif nominal_selected_mz + 1 in dict_nominal_exp_mz.keys():
186                self.search_mz(self.results, dict_nominal_exp_mz, calculated_mz, +1)
187
188            else:
189                continue
190
191    @staticmethod
192    def calc_mz_error(calculated_mz, exp_mz, method="ppm"):
193        """
194        Parameters
195        ----------
196        calculated_mz: float,
197        exp_mz:float
198        method: string,
199            ppm or ppb
200        """
201        if method == "ppm":
202            multi_factor = 1000000
203
204        elif method == "ppb":
205            multi_factor = 1000000
206
207        else:
208            raise Exception(
209                "method needs to be ppm or ppb, \
210                             you have entered %s"
211                % method
212            )
213
214        return ((exp_mz - calculated_mz) / calculated_mz) * multi_factor
215
216    @staticmethod
217    def check_ppm_error(tolerance, error):
218        return True if -tolerance <= error <= tolerance else False
219
220    def get_nominal_exp(self, exp_mzs) -> dict:
221        dict_nominal_exp_mz = {}
222
223        for exp_mz in exp_mzs:
224            nominal_mz = int(exp_mz)
225
226            if nominal_mz not in dict_nominal_exp_mz.keys():
227                dict_nominal_exp_mz[int(exp_mz)] = [exp_mz]
228            else:
229                dict_nominal_exp_mz[int(exp_mz)].append(exp_mz)
230
231        return dict_nominal_exp_mz
232
233    def search_mz(self, results, dict_nominal_exp_mz, calculated_mz, offset) -> None:
234        nominal_calculated_mz = int(calculated_mz) + offset
235        matched_n_precursors = dict_nominal_exp_mz.get(nominal_calculated_mz)
236
237        for precursor_mz in matched_n_precursors:
238            error = self.calc_mz_error(calculated_mz, precursor_mz, method=self.method)
239
240            if self.check_ppm_error(self.tolerance, error):
241                new_match = SearchResults(
242                    calculated_mz, precursor_mz, error, self.tolerance
243                )
244
245                if calculated_mz not in results.keys():
246                    results[calculated_mz] = [new_match]
247
248                else:
249                    results[calculated_mz].append(new_match)
@dataclass
class SearchResults:
10@dataclass
11class SearchResults:
12    calculated_mz: float
13    exp_mz: float
14    error: float
15    tolerance: float
SearchResults(calculated_mz: float, exp_mz: float, error: float, tolerance: float)
calculated_mz: float
exp_mz: float
error: float
tolerance: float
class MZSearch(threading.Thread):
 18class MZSearch(Thread):
 19    def __init__(
 20        self,
 21        exp_mzs: List[float],
 22        calculated_mzs: List[float],
 23        tolerance,
 24        method="ppm",
 25        average_target_mz=True,
 26    ):
 27        """
 28        Parameters
 29        ----------
 30        calculated_mzs: [float] calculated m/z
 31        exp_mzs: [float] experimental m/z
 32        method: string,
 33            ppm or ppb
 34        call run to trigger the m/z search algorithm
 35        or start if using it as thread
 36        """
 37        Thread.__init__(self)
 38        # placeholder for the results
 39        self._matched_mz = {}
 40
 41        self._calculated_mzs = calculated_mzs
 42
 43        self._matched_mz = {}
 44
 45        self._averaged_target_mz = []
 46
 47        self._exp_mzs = exp_mzs
 48
 49        self._tolerance = tolerance
 50        self.method = method
 51
 52        if average_target_mz:
 53            self.colapse_calculated()
 54
 55    @property
 56    def results(self):
 57        """{calculated_mz: [SearchResults]}
 58        contains the results of the search
 59        """
 60        return self._matched_mz
 61
 62    @property
 63    def averaged_target_mz(self):
 64        """[float]
 65        contains the average target m/z to be searched against
 66        """
 67        return self._averaged_target_mz
 68
 69    @property
 70    def calculated_mzs(self):
 71        """[float]
 72        contains the mz target to be searched against
 73        """
 74        if self.averaged_target_mz:
 75            return sorted(self.averaged_target_mz)
 76        else:
 77            return sorted(list(self._calculated_mzs))
 78
 79    @property
 80    def exp_mzs(self):
 81        """[float]
 82        contains the exp mz to be searched against
 83        """
 84        return self._exp_mzs
 85
 86    @property
 87    def method(self):
 88        return self._method
 89
 90    @method.setter
 91    def method(self, method):
 92        """
 93        method: string,
 94           ppm or ppb
 95        """
 96        if method not in ["ppm" or "ppb"]:
 97            raise ValueError("Method should be ppm or ppb")
 98        self._method = method
 99
100    @property
101    def tolerance(self):
102        return self._tolerance
103
104    @tolerance.setter
105    def tolerance(self, tolerance):
106        """
107        method: string,
108           ppm or ppb
109        """
110        if tolerance < 0:
111            raise ValueError("Tolerance needs to be a positive number")
112        self._tolerance = tolerance
113
114    def colapse_calculated(self):
115        if len(self.calculated_mzs) > 1:
116            all_mz = []
117            subset = set()
118
119            i = -1
120            while True:
121                i = i + 1
122
123                if i == len(self.calculated_mzs) - 1:
124                    all_mz.append({i})
125                    # print(i, 'break1')
126                    break
127
128                if i >= len(self.calculated_mzs) - 1:
129                    # print(i, 'break2')
130                    break
131
132                error = self.calc_mz_error(
133                    self.calculated_mzs[i], self.calculated_mzs[i + 1]
134                )
135
136                # print(self.tolerance)
137
138                check_error = self.check_ppm_error(self.tolerance, error)
139
140                if not check_error:
141                    start_list = {i}
142
143                else:
144                    start_list = set()
145
146                while check_error:
147                    start_list.add(i)
148                    start_list.add(i + 1)
149
150                    i = i + 1
151
152                    if i == len(self.calculated_mzs) - 1:
153                        start_list.add(i)
154                        # print(i, 'break3')
155                        break
156
157                    error = self.calc_mz_error(
158                        self.calculated_mzs[i], self.calculated_mzs[i + 1]
159                    )
160                    check_error = self.check_ppm_error(self.tolerance, error)
161
162                if start_list:
163                    all_mz.append(start_list)
164
165            results = []
166            for each in all_mz:
167                # print(each)
168                mzs = [self.calculated_mzs[i] for i in each]
169                results.append(sum(mzs) / len(mzs))
170
171            # print(results)
172            self._averaged_target_mz = results
173
174    def run(self):
175        dict_nominal_exp_mz = self.get_nominal_exp(self.exp_mzs)
176
177        for calculated_mz in self.calculated_mzs:
178            nominal_selected_mz = int(calculated_mz)
179
180            if nominal_selected_mz in dict_nominal_exp_mz.keys():
181                self.search_mz(self.results, dict_nominal_exp_mz, calculated_mz, 0)
182
183            elif nominal_selected_mz - 1 in dict_nominal_exp_mz.keys():
184                self.search_mz(self.results, dict_nominal_exp_mz, calculated_mz, -1)
185
186            elif nominal_selected_mz + 1 in dict_nominal_exp_mz.keys():
187                self.search_mz(self.results, dict_nominal_exp_mz, calculated_mz, +1)
188
189            else:
190                continue
191
192    @staticmethod
193    def calc_mz_error(calculated_mz, exp_mz, method="ppm"):
194        """
195        Parameters
196        ----------
197        calculated_mz: float,
198        exp_mz:float
199        method: string,
200            ppm or ppb
201        """
202        if method == "ppm":
203            multi_factor = 1000000
204
205        elif method == "ppb":
206            multi_factor = 1000000
207
208        else:
209            raise Exception(
210                "method needs to be ppm or ppb, \
211                             you have entered %s"
212                % method
213            )
214
215        return ((exp_mz - calculated_mz) / calculated_mz) * multi_factor
216
217    @staticmethod
218    def check_ppm_error(tolerance, error):
219        return True if -tolerance <= error <= tolerance else False
220
221    def get_nominal_exp(self, exp_mzs) -> dict:
222        dict_nominal_exp_mz = {}
223
224        for exp_mz in exp_mzs:
225            nominal_mz = int(exp_mz)
226
227            if nominal_mz not in dict_nominal_exp_mz.keys():
228                dict_nominal_exp_mz[int(exp_mz)] = [exp_mz]
229            else:
230                dict_nominal_exp_mz[int(exp_mz)].append(exp_mz)
231
232        return dict_nominal_exp_mz
233
234    def search_mz(self, results, dict_nominal_exp_mz, calculated_mz, offset) -> None:
235        nominal_calculated_mz = int(calculated_mz) + offset
236        matched_n_precursors = dict_nominal_exp_mz.get(nominal_calculated_mz)
237
238        for precursor_mz in matched_n_precursors:
239            error = self.calc_mz_error(calculated_mz, precursor_mz, method=self.method)
240
241            if self.check_ppm_error(self.tolerance, error):
242                new_match = SearchResults(
243                    calculated_mz, precursor_mz, error, self.tolerance
244                )
245
246                if calculated_mz not in results.keys():
247                    results[calculated_mz] = [new_match]
248
249                else:
250                    results[calculated_mz].append(new_match)

A class that represents a thread of control.

This class can be safely subclassed in a limited fashion. There are two ways to specify the activity: by passing a callable object to the constructor, or by overriding the run() method in a subclass.

MZSearch( exp_mzs: List[float], calculated_mzs: List[float], tolerance, method='ppm', average_target_mz=True)
19    def __init__(
20        self,
21        exp_mzs: List[float],
22        calculated_mzs: List[float],
23        tolerance,
24        method="ppm",
25        average_target_mz=True,
26    ):
27        """
28        Parameters
29        ----------
30        calculated_mzs: [float] calculated m/z
31        exp_mzs: [float] experimental m/z
32        method: string,
33            ppm or ppb
34        call run to trigger the m/z search algorithm
35        or start if using it as thread
36        """
37        Thread.__init__(self)
38        # placeholder for the results
39        self._matched_mz = {}
40
41        self._calculated_mzs = calculated_mzs
42
43        self._matched_mz = {}
44
45        self._averaged_target_mz = []
46
47        self._exp_mzs = exp_mzs
48
49        self._tolerance = tolerance
50        self.method = method
51
52        if average_target_mz:
53            self.colapse_calculated()
Parameters
  • calculated_mzs ([float] calculated m/z):

  • exp_mzs ([float] experimental m/z):

  • method (string,): ppm or ppb

  • call run to trigger the m/z search algorithm
  • or start if using it as thread
method

method: string, ppm or ppb

results

{calculated_mz: [SearchResults]} contains the results of the search

averaged_target_mz

[float] contains the average target m/z to be searched against

calculated_mzs

[float] contains the mz target to be searched against

exp_mzs

[float] contains the exp mz to be searched against

tolerance

method: string, ppm or ppb

def colapse_calculated(self):
114    def colapse_calculated(self):
115        if len(self.calculated_mzs) > 1:
116            all_mz = []
117            subset = set()
118
119            i = -1
120            while True:
121                i = i + 1
122
123                if i == len(self.calculated_mzs) - 1:
124                    all_mz.append({i})
125                    # print(i, 'break1')
126                    break
127
128                if i >= len(self.calculated_mzs) - 1:
129                    # print(i, 'break2')
130                    break
131
132                error = self.calc_mz_error(
133                    self.calculated_mzs[i], self.calculated_mzs[i + 1]
134                )
135
136                # print(self.tolerance)
137
138                check_error = self.check_ppm_error(self.tolerance, error)
139
140                if not check_error:
141                    start_list = {i}
142
143                else:
144                    start_list = set()
145
146                while check_error:
147                    start_list.add(i)
148                    start_list.add(i + 1)
149
150                    i = i + 1
151
152                    if i == len(self.calculated_mzs) - 1:
153                        start_list.add(i)
154                        # print(i, 'break3')
155                        break
156
157                    error = self.calc_mz_error(
158                        self.calculated_mzs[i], self.calculated_mzs[i + 1]
159                    )
160                    check_error = self.check_ppm_error(self.tolerance, error)
161
162                if start_list:
163                    all_mz.append(start_list)
164
165            results = []
166            for each in all_mz:
167                # print(each)
168                mzs = [self.calculated_mzs[i] for i in each]
169                results.append(sum(mzs) / len(mzs))
170
171            # print(results)
172            self._averaged_target_mz = results
def run(self):
174    def run(self):
175        dict_nominal_exp_mz = self.get_nominal_exp(self.exp_mzs)
176
177        for calculated_mz in self.calculated_mzs:
178            nominal_selected_mz = int(calculated_mz)
179
180            if nominal_selected_mz in dict_nominal_exp_mz.keys():
181                self.search_mz(self.results, dict_nominal_exp_mz, calculated_mz, 0)
182
183            elif nominal_selected_mz - 1 in dict_nominal_exp_mz.keys():
184                self.search_mz(self.results, dict_nominal_exp_mz, calculated_mz, -1)
185
186            elif nominal_selected_mz + 1 in dict_nominal_exp_mz.keys():
187                self.search_mz(self.results, dict_nominal_exp_mz, calculated_mz, +1)
188
189            else:
190                continue

Method representing the thread's activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object's constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

@staticmethod
def calc_mz_error(calculated_mz, exp_mz, method='ppm'):
192    @staticmethod
193    def calc_mz_error(calculated_mz, exp_mz, method="ppm"):
194        """
195        Parameters
196        ----------
197        calculated_mz: float,
198        exp_mz:float
199        method: string,
200            ppm or ppb
201        """
202        if method == "ppm":
203            multi_factor = 1000000
204
205        elif method == "ppb":
206            multi_factor = 1000000
207
208        else:
209            raise Exception(
210                "method needs to be ppm or ppb, \
211                             you have entered %s"
212                % method
213            )
214
215        return ((exp_mz - calculated_mz) / calculated_mz) * multi_factor
Parameters
  • calculated_mz (float,):

  • exp_mz (float):

  • method (string,): ppm or ppb

@staticmethod
def check_ppm_error(tolerance, error):
217    @staticmethod
218    def check_ppm_error(tolerance, error):
219        return True if -tolerance <= error <= tolerance else False
def get_nominal_exp(self, exp_mzs) -> dict:
221    def get_nominal_exp(self, exp_mzs) -> dict:
222        dict_nominal_exp_mz = {}
223
224        for exp_mz in exp_mzs:
225            nominal_mz = int(exp_mz)
226
227            if nominal_mz not in dict_nominal_exp_mz.keys():
228                dict_nominal_exp_mz[int(exp_mz)] = [exp_mz]
229            else:
230                dict_nominal_exp_mz[int(exp_mz)].append(exp_mz)
231
232        return dict_nominal_exp_mz
def search_mz(self, results, dict_nominal_exp_mz, calculated_mz, offset) -> None:
234    def search_mz(self, results, dict_nominal_exp_mz, calculated_mz, offset) -> None:
235        nominal_calculated_mz = int(calculated_mz) + offset
236        matched_n_precursors = dict_nominal_exp_mz.get(nominal_calculated_mz)
237
238        for precursor_mz in matched_n_precursors:
239            error = self.calc_mz_error(calculated_mz, precursor_mz, method=self.method)
240
241            if self.check_ppm_error(self.tolerance, error):
242                new_match = SearchResults(
243                    calculated_mz, precursor_mz, error, self.tolerance
244                )
245
246                if calculated_mz not in results.keys():
247                    results[calculated_mz] = [new_match]
248
249                else:
250                    results[calculated_mz].append(new_match)
Inherited Members
threading.Thread
start
join
name
ident
is_alive
daemon
isDaemon
setDaemon
getName
setName
native_id