Coverage for src/km3dq_common/lw_db_defect_library.py: 57%
74 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-14 11:06 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-14 11:06 +0000
1#! /usr/bin/env python
2###############################################################################
3import re
5import urllib.request
7from .config_library import configure_defect
10###############################################################################
11def check_defect_diag_byte(value, bit_list):
12 """ """
13 result = False
15 # Present defect
16 for i_bit in bit_list["present"]:
17 if (int(value) >> i_bit) & 1:
18 result = True
20 # Absent defect
21 for i_bit in bit_list["absent"]:
22 if ((int(value) >> i_bit) & 1) == 0:
23 result = True
25 return result
28###############################################################################
29def decode_defect_diag_byte(value, def_type0):
30 """
31 Decodes the assigned defect (integer) for a given type and returns a single
32 string.
34 === Arguments ===
35 - value : diagnosis value for a given type - [integer]
36 - def_type: defect type ("daq", "operation"...) - [string]
38 === Output ===
39 - Single string contained all the corresponding defects separated by a
40 slash
41 Ex: 'missing_data_dst / jmergefitfailure /'
43 """
45 defects0 = configure_defect()
47 result = ""
49 def_type1 = def_type0.replace("def_", "")
51 for i_bit in defects0["bit"][def_type1]:
52 if (int(value) >> defects0["bit"][def_type1][i_bit]) & 1:
53 result += f"{i_bit} / "
55 return result
58###############################################################################
59def decode_defect_file(defect_file_dict, run_nb):
60 """
61 Decode the defect file and returns
63 === Arguments ===
64 - defect_file_dict: defect dictionnary produced by the read_defect_file
65 function - [dictionnary]
66 - run_nb : run number - [integer]
68 === Output ===
69 - Defects assigned to this run. Both a dict and string (to be used
70 in html - See qaqcprocessing/dst-process.py) are returned.
71 Ex: {'str': '<b> def_data_processing </b>: missing_data_dst / ',
72 'dict': {'def_data_processing': 'missing_data_dst / '}}
74 """
76 run_defect_str = {"str": "", "dict": {}}
78 for i_def_type in defect_file_dict.keys():
79 if run_nb not in defect_file_dict[i_def_type]:
80 continue
81 diag = decode_defect_diag_byte(defect_file_dict[i_def_type][run_nb], i_def_type)
82 run_defect_str["str"] += f"<b> {i_def_type} </b>: {diag}"
83 run_defect_str["dict"][i_def_type] = diag
85 return run_defect_str
88###############################################################################
89def read_defect_file(det, def_tag="def-HEAD", type_filter=None):
90 """
91 Read defect files stored on SFTP
93 === Arguments ===
94 - det : detector name - [string] - Ex: "D0ARCA021", "D0ORCA018"...
95 - def_tag : defect tag - [string]
96 - type_filter : if not empty, filter on the defect type - If empty, read
97 all defect files except the sign_off (specific function defined)
98 [array of strings] - Ex: ["daq", "operation"]
100 === Output ===
101 - Dictionnary with def_[defect] as keys and a value for each affected run.
102 This value will be decoded by the decode_defect_diag_byte function.
103 Ex: {'def_daq': {}, 'def_operation': {16501: 1, 16503: 2}...}
104 """
106 defects0 = configure_defect()
108 res = {}
110 if "ORCA" in det:
111 site = "ORCA"
112 else:
113 site = "ARCA"
115 re_line = re.compile(r"\s*([0-9]+)\s*\|.*")
116 for i_type in defects0["type"]:
117 if type_filter is not None and i_type not in type_filter:
118 continue
120 res[f"def_{i_type}"] = {}
121 for i_diag in defects0["bit"][i_type].keys():
122 lines = []
123 defect_url = (
124 "https://sftp.km3net.de/data/km3dq_lw_db/"
125 f"{site}/{det}/Defects/{def_tag}/"
126 f"{i_type}_{i_diag}.txt"
127 )
128 try:
129 with urllib.request.urlopen(defect_url) as def_file:
130 tmp = (def_file.read()).split(b"\n")
131 for i_line in tmp:
132 if i_line != "":
133 lines.append(i_line.decode("utf-8"))
134 except urllib.error.URLError:
135 # No defect file found
136 continue
138 for i_line in lines:
139 r_m = re_line.match(i_line)
140 if r_m is not None:
141 run_nb = int(r_m.group(1))
142 if run_nb in res[f"def_{i_type}"].keys():
143 res[f"def_{i_type}"][run_nb] |= (
144 1 << defects0["bit"][i_type][i_diag]
145 )
146 else:
147 res[f"def_{i_type}"][run_nb] = (
148 1 << defects0["bit"][i_type][i_diag]
149 )
151 return res
154###############################################################################
155def read_basic_defect_file(det, def_type, def_diag, def_tag="def-HEAD"):
156 """
157 Read defect files stored on SFTP. Simply returns lines
159 === Arguments ===
160 - det : detector name - [string] - Ex: "D0ARCA021", "D0ORCA018"...
161 - defect_type
162 - defect_diag
163 - def_tag : defect tag - [string]
165 === Output ===
166 - Array of lines
168 """
170 if "ORCA" in det:
171 site = "ORCA"
172 else:
173 site = "ARCA"
175 lines = []
176 defect_url = (
177 "https://sftp.km3net.de/data/km3dq_lw_db/"
178 f"{site}/{det}/Defects/{def_tag}/"
179 f"{def_type}_{def_diag}.txt"
180 )
181 try:
182 with urllib.request.urlopen(defect_url) as def_file:
183 tmp = (def_file.read()).split(b"\n")
184 for i_line in tmp:
185 if i_line != "":
186 lines.append(i_line.decode("utf-8"))
187 except urllib.error.URLError:
188 # No defect file found
189 lines = None
191 return lines