corems.mass_spectra.calc.MZSearch
1__author__ = "Yuri E. Corilo" 2__date__ = "Jun 09, 2021" 3 4from threading import Thread 5from dataclasses import dataclass 6from typing import List 7 8 9@dataclass 10class SearchResults: 11 calculated_mz: float 12 exp_mz: float 13 error: float 14 tolerance: float 15 16 17class MZSearch(Thread): 18 def __init__( 19 self, 20 exp_mzs: List[float], 21 calculated_mzs: List[float], 22 tolerance, 23 method="ppm", 24 average_target_mz=True, 25 ): 26 """ 27 Parameters 28 ---------- 29 calculated_mzs: [float] calculated m/z 30 exp_mzs: [float] experimental m/z 31 method: string, 32 ppm or ppb 33 call run to trigger the m/z search algorithm 34 or start if using it as thread 35 """ 36 Thread.__init__(self) 37 # placeholder for the results 38 self._matched_mz = {} 39 40 self._calculated_mzs = calculated_mzs 41 42 self._matched_mz = {} 43 44 self._averaged_target_mz = [] 45 46 self._exp_mzs = exp_mzs 47 48 self._tolerance = tolerance 49 self.method = method 50 51 if average_target_mz: 52 self.colapse_calculated() 53 54 @property 55 def results(self): 56 """{calculated_mz: [SearchResults]} 57 contains the results of the search 58 """ 59 return self._matched_mz 60 61 @property 62 def averaged_target_mz(self): 63 """[float] 64 contains the average target m/z to be searched against 65 """ 66 return self._averaged_target_mz 67 68 @property 69 def calculated_mzs(self): 70 """[float] 71 contains the mz target to be searched against 72 """ 73 if self.averaged_target_mz: 74 return sorted(self.averaged_target_mz) 75 else: 76 return sorted(list(self._calculated_mzs)) 77 78 @property 79 def exp_mzs(self): 80 """[float] 81 contains the exp mz to be searched against 82 """ 83 return self._exp_mzs 84 85 @property 86 def method(self): 87 return self._method 88 89 @method.setter 90 def method(self, method): 91 """ 92 method: string, 93 ppm or ppb 94 """ 95 if method not in ["ppm" or "ppb"]: 96 raise ValueError("Method should be ppm or ppb") 97 self._method = method 98 99 @property 100 def tolerance(self): 101 return self._tolerance 102 103 @tolerance.setter 104 def tolerance(self, tolerance): 105 """ 106 method: string, 107 ppm or ppb 108 """ 109 if tolerance < 0: 110 raise ValueError("Tolerance needs to be a positive number") 111 self._tolerance = tolerance 112 113 def colapse_calculated(self): 114 if len(self.calculated_mzs) > 1: 115 all_mz = [] 116 subset = set() 117 118 i = -1 119 while True: 120 i = i + 1 121 122 if i == len(self.calculated_mzs) - 1: 123 all_mz.append({i}) 124 # print(i, 'break1') 125 break 126 127 if i >= len(self.calculated_mzs) - 1: 128 # print(i, 'break2') 129 break 130 131 error = self.calc_mz_error( 132 self.calculated_mzs[i], self.calculated_mzs[i + 1] 133 ) 134 135 # print(self.tolerance) 136 137 check_error = self.check_ppm_error(self.tolerance, error) 138 139 if not check_error: 140 start_list = {i} 141 142 else: 143 start_list = set() 144 145 while check_error: 146 start_list.add(i) 147 start_list.add(i + 1) 148 149 i = i + 1 150 151 if i == len(self.calculated_mzs) - 1: 152 start_list.add(i) 153 # print(i, 'break3') 154 break 155 156 error = self.calc_mz_error( 157 self.calculated_mzs[i], self.calculated_mzs[i + 1] 158 ) 159 check_error = self.check_ppm_error(self.tolerance, error) 160 161 if start_list: 162 all_mz.append(start_list) 163 164 results = [] 165 for each in all_mz: 166 # print(each) 167 mzs = [self.calculated_mzs[i] for i in each] 168 results.append(sum(mzs) / len(mzs)) 169 170 # print(results) 171 self._averaged_target_mz = results 172 173 def run(self): 174 dict_nominal_exp_mz = self.get_nominal_exp(self.exp_mzs) 175 176 for calculated_mz in self.calculated_mzs: 177 nominal_selected_mz = int(calculated_mz) 178 179 if nominal_selected_mz in dict_nominal_exp_mz.keys(): 180 self.search_mz(self.results, dict_nominal_exp_mz, calculated_mz, 0) 181 182 elif nominal_selected_mz - 1 in dict_nominal_exp_mz.keys(): 183 self.search_mz(self.results, dict_nominal_exp_mz, calculated_mz, -1) 184 185 elif nominal_selected_mz + 1 in dict_nominal_exp_mz.keys(): 186 self.search_mz(self.results, dict_nominal_exp_mz, calculated_mz, +1) 187 188 else: 189 continue 190 191 @staticmethod 192 def calc_mz_error(calculated_mz, exp_mz, method="ppm"): 193 """ 194 Parameters 195 ---------- 196 calculated_mz: float, 197 exp_mz:float 198 method: string, 199 ppm or ppb 200 """ 201 if method == "ppm": 202 multi_factor = 1000000 203 204 elif method == "ppb": 205 multi_factor = 1000000 206 207 else: 208 raise Exception( 209 "method needs to be ppm or ppb, \ 210 you have entered %s" 211 % method 212 ) 213 214 return ((exp_mz - calculated_mz) / calculated_mz) * multi_factor 215 216 @staticmethod 217 def check_ppm_error(tolerance, error): 218 return True if -tolerance <= error <= tolerance else False 219 220 def get_nominal_exp(self, exp_mzs) -> dict: 221 dict_nominal_exp_mz = {} 222 223 for exp_mz in exp_mzs: 224 nominal_mz = int(exp_mz) 225 226 if nominal_mz not in dict_nominal_exp_mz.keys(): 227 dict_nominal_exp_mz[int(exp_mz)] = [exp_mz] 228 else: 229 dict_nominal_exp_mz[int(exp_mz)].append(exp_mz) 230 231 return dict_nominal_exp_mz 232 233 def search_mz(self, results, dict_nominal_exp_mz, calculated_mz, offset) -> None: 234 nominal_calculated_mz = int(calculated_mz) + offset 235 matched_n_precursors = dict_nominal_exp_mz.get(nominal_calculated_mz) 236 237 for precursor_mz in matched_n_precursors: 238 error = self.calc_mz_error(calculated_mz, precursor_mz, method=self.method) 239 240 if self.check_ppm_error(self.tolerance, error): 241 new_match = SearchResults( 242 calculated_mz, precursor_mz, error, self.tolerance 243 ) 244 245 if calculated_mz not in results.keys(): 246 results[calculated_mz] = [new_match] 247 248 else: 249 results[calculated_mz].append(new_match)
@dataclass
class
SearchResults:
class
MZSearch(threading.Thread):
18class MZSearch(Thread): 19 def __init__( 20 self, 21 exp_mzs: List[float], 22 calculated_mzs: List[float], 23 tolerance, 24 method="ppm", 25 average_target_mz=True, 26 ): 27 """ 28 Parameters 29 ---------- 30 calculated_mzs: [float] calculated m/z 31 exp_mzs: [float] experimental m/z 32 method: string, 33 ppm or ppb 34 call run to trigger the m/z search algorithm 35 or start if using it as thread 36 """ 37 Thread.__init__(self) 38 # placeholder for the results 39 self._matched_mz = {} 40 41 self._calculated_mzs = calculated_mzs 42 43 self._matched_mz = {} 44 45 self._averaged_target_mz = [] 46 47 self._exp_mzs = exp_mzs 48 49 self._tolerance = tolerance 50 self.method = method 51 52 if average_target_mz: 53 self.colapse_calculated() 54 55 @property 56 def results(self): 57 """{calculated_mz: [SearchResults]} 58 contains the results of the search 59 """ 60 return self._matched_mz 61 62 @property 63 def averaged_target_mz(self): 64 """[float] 65 contains the average target m/z to be searched against 66 """ 67 return self._averaged_target_mz 68 69 @property 70 def calculated_mzs(self): 71 """[float] 72 contains the mz target to be searched against 73 """ 74 if self.averaged_target_mz: 75 return sorted(self.averaged_target_mz) 76 else: 77 return sorted(list(self._calculated_mzs)) 78 79 @property 80 def exp_mzs(self): 81 """[float] 82 contains the exp mz to be searched against 83 """ 84 return self._exp_mzs 85 86 @property 87 def method(self): 88 return self._method 89 90 @method.setter 91 def method(self, method): 92 """ 93 method: string, 94 ppm or ppb 95 """ 96 if method not in ["ppm" or "ppb"]: 97 raise ValueError("Method should be ppm or ppb") 98 self._method = method 99 100 @property 101 def tolerance(self): 102 return self._tolerance 103 104 @tolerance.setter 105 def tolerance(self, tolerance): 106 """ 107 method: string, 108 ppm or ppb 109 """ 110 if tolerance < 0: 111 raise ValueError("Tolerance needs to be a positive number") 112 self._tolerance = tolerance 113 114 def colapse_calculated(self): 115 if len(self.calculated_mzs) > 1: 116 all_mz = [] 117 subset = set() 118 119 i = -1 120 while True: 121 i = i + 1 122 123 if i == len(self.calculated_mzs) - 1: 124 all_mz.append({i}) 125 # print(i, 'break1') 126 break 127 128 if i >= len(self.calculated_mzs) - 1: 129 # print(i, 'break2') 130 break 131 132 error = self.calc_mz_error( 133 self.calculated_mzs[i], self.calculated_mzs[i + 1] 134 ) 135 136 # print(self.tolerance) 137 138 check_error = self.check_ppm_error(self.tolerance, error) 139 140 if not check_error: 141 start_list = {i} 142 143 else: 144 start_list = set() 145 146 while check_error: 147 start_list.add(i) 148 start_list.add(i + 1) 149 150 i = i + 1 151 152 if i == len(self.calculated_mzs) - 1: 153 start_list.add(i) 154 # print(i, 'break3') 155 break 156 157 error = self.calc_mz_error( 158 self.calculated_mzs[i], self.calculated_mzs[i + 1] 159 ) 160 check_error = self.check_ppm_error(self.tolerance, error) 161 162 if start_list: 163 all_mz.append(start_list) 164 165 results = [] 166 for each in all_mz: 167 # print(each) 168 mzs = [self.calculated_mzs[i] for i in each] 169 results.append(sum(mzs) / len(mzs)) 170 171 # print(results) 172 self._averaged_target_mz = results 173 174 def run(self): 175 dict_nominal_exp_mz = self.get_nominal_exp(self.exp_mzs) 176 177 for calculated_mz in self.calculated_mzs: 178 nominal_selected_mz = int(calculated_mz) 179 180 if nominal_selected_mz in dict_nominal_exp_mz.keys(): 181 self.search_mz(self.results, dict_nominal_exp_mz, calculated_mz, 0) 182 183 elif nominal_selected_mz - 1 in dict_nominal_exp_mz.keys(): 184 self.search_mz(self.results, dict_nominal_exp_mz, calculated_mz, -1) 185 186 elif nominal_selected_mz + 1 in dict_nominal_exp_mz.keys(): 187 self.search_mz(self.results, dict_nominal_exp_mz, calculated_mz, +1) 188 189 else: 190 continue 191 192 @staticmethod 193 def calc_mz_error(calculated_mz, exp_mz, method="ppm"): 194 """ 195 Parameters 196 ---------- 197 calculated_mz: float, 198 exp_mz:float 199 method: string, 200 ppm or ppb 201 """ 202 if method == "ppm": 203 multi_factor = 1000000 204 205 elif method == "ppb": 206 multi_factor = 1000000 207 208 else: 209 raise Exception( 210 "method needs to be ppm or ppb, \ 211 you have entered %s" 212 % method 213 ) 214 215 return ((exp_mz - calculated_mz) / calculated_mz) * multi_factor 216 217 @staticmethod 218 def check_ppm_error(tolerance, error): 219 return True if -tolerance <= error <= tolerance else False 220 221 def get_nominal_exp(self, exp_mzs) -> dict: 222 dict_nominal_exp_mz = {} 223 224 for exp_mz in exp_mzs: 225 nominal_mz = int(exp_mz) 226 227 if nominal_mz not in dict_nominal_exp_mz.keys(): 228 dict_nominal_exp_mz[int(exp_mz)] = [exp_mz] 229 else: 230 dict_nominal_exp_mz[int(exp_mz)].append(exp_mz) 231 232 return dict_nominal_exp_mz 233 234 def search_mz(self, results, dict_nominal_exp_mz, calculated_mz, offset) -> None: 235 nominal_calculated_mz = int(calculated_mz) + offset 236 matched_n_precursors = dict_nominal_exp_mz.get(nominal_calculated_mz) 237 238 for precursor_mz in matched_n_precursors: 239 error = self.calc_mz_error(calculated_mz, precursor_mz, method=self.method) 240 241 if self.check_ppm_error(self.tolerance, error): 242 new_match = SearchResults( 243 calculated_mz, precursor_mz, error, self.tolerance 244 ) 245 246 if calculated_mz not in results.keys(): 247 results[calculated_mz] = [new_match] 248 249 else: 250 results[calculated_mz].append(new_match)
A class that represents a thread of control.
This class can be safely subclassed in a limited fashion. There are two ways to specify the activity: by passing a callable object to the constructor, or by overriding the run() method in a subclass.
MZSearch( exp_mzs: List[float], calculated_mzs: List[float], tolerance, method='ppm', average_target_mz=True)
19 def __init__( 20 self, 21 exp_mzs: List[float], 22 calculated_mzs: List[float], 23 tolerance, 24 method="ppm", 25 average_target_mz=True, 26 ): 27 """ 28 Parameters 29 ---------- 30 calculated_mzs: [float] calculated m/z 31 exp_mzs: [float] experimental m/z 32 method: string, 33 ppm or ppb 34 call run to trigger the m/z search algorithm 35 or start if using it as thread 36 """ 37 Thread.__init__(self) 38 # placeholder for the results 39 self._matched_mz = {} 40 41 self._calculated_mzs = calculated_mzs 42 43 self._matched_mz = {} 44 45 self._averaged_target_mz = [] 46 47 self._exp_mzs = exp_mzs 48 49 self._tolerance = tolerance 50 self.method = method 51 52 if average_target_mz: 53 self.colapse_calculated()
Parameters
calculated_mzs ([float] calculated m/z):
exp_mzs ([float] experimental m/z):
method (string,): ppm or ppb
- call run to trigger the m/z search algorithm
- or start if using it as thread
def
colapse_calculated(self):
114 def colapse_calculated(self): 115 if len(self.calculated_mzs) > 1: 116 all_mz = [] 117 subset = set() 118 119 i = -1 120 while True: 121 i = i + 1 122 123 if i == len(self.calculated_mzs) - 1: 124 all_mz.append({i}) 125 # print(i, 'break1') 126 break 127 128 if i >= len(self.calculated_mzs) - 1: 129 # print(i, 'break2') 130 break 131 132 error = self.calc_mz_error( 133 self.calculated_mzs[i], self.calculated_mzs[i + 1] 134 ) 135 136 # print(self.tolerance) 137 138 check_error = self.check_ppm_error(self.tolerance, error) 139 140 if not check_error: 141 start_list = {i} 142 143 else: 144 start_list = set() 145 146 while check_error: 147 start_list.add(i) 148 start_list.add(i + 1) 149 150 i = i + 1 151 152 if i == len(self.calculated_mzs) - 1: 153 start_list.add(i) 154 # print(i, 'break3') 155 break 156 157 error = self.calc_mz_error( 158 self.calculated_mzs[i], self.calculated_mzs[i + 1] 159 ) 160 check_error = self.check_ppm_error(self.tolerance, error) 161 162 if start_list: 163 all_mz.append(start_list) 164 165 results = [] 166 for each in all_mz: 167 # print(each) 168 mzs = [self.calculated_mzs[i] for i in each] 169 results.append(sum(mzs) / len(mzs)) 170 171 # print(results) 172 self._averaged_target_mz = results
def
run(self):
174 def run(self): 175 dict_nominal_exp_mz = self.get_nominal_exp(self.exp_mzs) 176 177 for calculated_mz in self.calculated_mzs: 178 nominal_selected_mz = int(calculated_mz) 179 180 if nominal_selected_mz in dict_nominal_exp_mz.keys(): 181 self.search_mz(self.results, dict_nominal_exp_mz, calculated_mz, 0) 182 183 elif nominal_selected_mz - 1 in dict_nominal_exp_mz.keys(): 184 self.search_mz(self.results, dict_nominal_exp_mz, calculated_mz, -1) 185 186 elif nominal_selected_mz + 1 in dict_nominal_exp_mz.keys(): 187 self.search_mz(self.results, dict_nominal_exp_mz, calculated_mz, +1) 188 189 else: 190 continue
Method representing the thread's activity.
You may override this method in a subclass. The standard run() method invokes the callable object passed to the object's constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.
@staticmethod
def
calc_mz_error(calculated_mz, exp_mz, method='ppm'):
192 @staticmethod 193 def calc_mz_error(calculated_mz, exp_mz, method="ppm"): 194 """ 195 Parameters 196 ---------- 197 calculated_mz: float, 198 exp_mz:float 199 method: string, 200 ppm or ppb 201 """ 202 if method == "ppm": 203 multi_factor = 1000000 204 205 elif method == "ppb": 206 multi_factor = 1000000 207 208 else: 209 raise Exception( 210 "method needs to be ppm or ppb, \ 211 you have entered %s" 212 % method 213 ) 214 215 return ((exp_mz - calculated_mz) / calculated_mz) * multi_factor
Parameters
calculated_mz (float,):
exp_mz (float):
method (string,): ppm or ppb
def
get_nominal_exp(self, exp_mzs) -> dict:
221 def get_nominal_exp(self, exp_mzs) -> dict: 222 dict_nominal_exp_mz = {} 223 224 for exp_mz in exp_mzs: 225 nominal_mz = int(exp_mz) 226 227 if nominal_mz not in dict_nominal_exp_mz.keys(): 228 dict_nominal_exp_mz[int(exp_mz)] = [exp_mz] 229 else: 230 dict_nominal_exp_mz[int(exp_mz)].append(exp_mz) 231 232 return dict_nominal_exp_mz
def
search_mz(self, results, dict_nominal_exp_mz, calculated_mz, offset) -> None:
234 def search_mz(self, results, dict_nominal_exp_mz, calculated_mz, offset) -> None: 235 nominal_calculated_mz = int(calculated_mz) + offset 236 matched_n_precursors = dict_nominal_exp_mz.get(nominal_calculated_mz) 237 238 for precursor_mz in matched_n_precursors: 239 error = self.calc_mz_error(calculated_mz, precursor_mz, method=self.method) 240 241 if self.check_ppm_error(self.tolerance, error): 242 new_match = SearchResults( 243 calculated_mz, precursor_mz, error, self.tolerance 244 ) 245 246 if calculated_mz not in results.keys(): 247 results[calculated_mz] = [new_match] 248 249 else: 250 results[calculated_mz].append(new_match)
Inherited Members
- threading.Thread
- start
- join
- name
- ident
- is_alive
- daemon
- isDaemon
- setDaemon
- getName
- setName
- native_id