Coverage for src/km3dq_common/lw_db_fact_library.py: 0%

104 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-06-16 14:13 +0000

1#! /usr/bin/env python 

2############################################################################### 

3import re 

4import sys 

5import calendar 

6import time 

7import urllib.request 

8 

9from .detector_fact_classes import DetectorFact 

10 

11 

12############################################################################### 

13def read_basic_fact_file(det, fact_type): 

14 """ 

15 Read defect files stored on SFTP. Simply returns lines 

16 

17 === Arguments === 

18 - det : detector name - [string] - Ex: "D0ARCA021", "D0ORCA018"... 

19 - fact_type 

20 

21 === Output === 

22 - Array of lines 

23 

24 """ 

25 

26 if "ORCA" in det: 

27 site = "ORCA" 

28 else: 

29 site = "ARCA" 

30 

31 lines = [] 

32 fact_url = ( 

33 "https://sftp.km3net.de/data/km3dq_lw_db/" 

34 f"{site}/{det}/Facts/" 

35 f"{fact_type}.txt" 

36 ) 

37 try: 

38 with urllib.request.urlopen(fact_url) as def_file: 

39 tmp = (def_file.read()).split(b"\n") 

40 for i_line in tmp: 

41 if i_line != "": 

42 lines.append(i_line.decode("utf-8")) 

43 except urllib.error.URLError: 

44 # No fact file found 

45 lines = None 

46 

47 return lines 

48 

49 

50############################################################################### 

51def read_fact_file(det, fact_type): 

52 """ 

53 Read defect files stored on SFTP. 

54 

55 === Arguments === 

56 - det : detector name - [string] - Ex: "D0ARCA021", "D0ORCA018"... 

57 - fact_type 

58 

59 === Output === 

60 - Array of lines 

61 

62 """ 

63 

64 if "ORCA" in det: 

65 site = "ORCA" 

66 else: 

67 site = "ARCA" 

68 

69 re_line = re.compile( 

70 r"\s*(\S*)\s*\|" 

71 r"\s*(\S*)\s*\|" 

72 r"\s*(\S.*\d)\s\-\s(\S.*\d)\s*\|" 

73 r"(.*)\|" 

74 r"(.*)\|" 

75 r"(.*)" 

76 ) 

77 

78 re_line_no_end = re.compile( 

79 r"\s*(\S*)\s*\|" 

80 r".*\|" 

81 r"\s*(\S.*\d)\s\-\s*\|" 

82 r"(.*)\|" 

83 r"(.*)\|" 

84 r"(.*)" 

85 ) 

86 

87 fact_list = [] 

88 fact_url = ( 

89 "https://sftp.km3net.de/data/km3dq_lw_db/" 

90 f"{site}/{det}/Facts/" 

91 f"{fact_type}.txt" 

92 ) 

93 try: 

94 with urllib.request.urlopen(fact_url) as def_file: 

95 tmp = (def_file.read()).split(b"\n") 

96 for i_line in tmp: 

97 r_m = re_line.match(i_line.decode("utf-8")) 

98 if r_m: 

99 # Intermediate step to handle facts with and without second accuracy 

100 # Accuracy must be the same for both end/start by construction 

101 if r_m.group(3).count(":") == 1: 

102 t_s = int(calendar.timegm(time.strptime(r_m.group(3), "%a, %d %b %Y %H:%M"))) 

103 t_e = int(calendar.timegm(time.strptime(r_m.group(4), "%a, %d %b %Y %H:%M"))) 

104 else: 

105 t_s = int(calendar.timegm(time.strptime(r_m.group(3), "%a, %d %b %Y %H:%M:%S"))) 

106 t_e = int(calendar.timegm(time.strptime(r_m.group(4), "%a, %d %b %Y %H:%M:%S"))) 

107 

108 fact_list.append({ 

109 "run_start": r_m.group(1), 

110 "run_end": r_m.group(2), 

111 "time_start": t_s, 

112 "time_end": t_e, 

113 "comment": r_m.group(5), 

114 "documentation": r_m.group(6), 

115 "author": r_m.group(7), 

116 }) 

117 continue 

118 r_m = re_line_no_end.match(i_line.decode("utf-8")) 

119 if r_m: 

120 # Intermediate step to handle facts with and without second accuracy 

121 if r_m.group(2).count(":") == 1: 

122 t_s = int(calendar.timegm(time.strptime(r_m.group(2), "%a, %d %b %Y %H:%M"))) 

123 else: 

124 t_s = int(calendar.timegm(time.strptime(r_m.group(2), "%a, %d %b %Y %H:%M:%S"))) 

125 fact_list.append({ 

126 "run_start": r_m.group(1), 

127 "run_end": 1e6, 

128 "time_start": t_s, 

129 "time_end": 0, 

130 "comment": r_m.group(3), 

131 "documentation": r_m.group(4), 

132 "author": r_m.group(5), 

133 }) 

134 

135 

136 

137 except urllib.error.URLError: 

138 # No fact file found 

139 lines = None 

140 

141 

142 return fact_list 

143 

144 

145############################################################################### 

146def read_det_fact_file(site): 

147 """ 

148 Read detector fact files stored on SFTP 

149 

150 === Arguments === 

151 - det : detector name - [string] - Ex: "D0ARCA021", "D0ORCA018"... 

152 - def_tag : defect tag - [string] 

153 - type_filter : if not empty, filter on the defect type - If empty, read 

154 all defect files - [array of strings] - 

155 Ex: ["daq", "operation"] 

156 

157 === Output === 

158 - raw detector facts 

159 - detector start/end 

160 """ 

161 

162 raw_df = {} 

163 

164 re_line = re.compile( 

165 r"(.*)\|" 

166 r"(.*)\|" 

167 r"(.*)\|" 

168 r"(.*)\|" 

169 r"(.*)\|" 

170 r"(.*)\|" 

171 r"(.*)\|" 

172 r"(.*)\|" 

173 r"(.*)\|" 

174 r"(.*)\|" 

175 r"(.*)$" 

176 ) 

177 re_rate = re.compile("^\s*\((\d+) ping.+every (\d+)s\)\s*$") 

178 

179 raw_df = [] 

180 

181 lines = [] 

182 det_fact_url = ( 

183 "https://sftp.km3net.de/data/km3dq_lw_db/" 

184 f"{site}//DetectorFacts/" 

185 f"detector_facts.txt" 

186 ) 

187 try: 

188 with urllib.request.urlopen(det_fact_url) as def_file: 

189 tmp = (def_file.read()).split(b"\n") 

190 for i_line in tmp: 

191 if i_line != "": 

192 lines.append(i_line.decode("utf-8")) 

193 except urllib.error.URLError: 

194 # No defect file found 

195 print("Missing detector-fact file") 

196 sys.exit() 

197 

198 for i_line_index, i_line in enumerate(lines): 

199 if i_line_index == 0: 

200 continue 

201 

202 r_m = re_line.match(i_line) 

203 if r_m: 

204 raw_df.append(DetectorFact()) 

205 raw_df[-1].hardware = r_m.group(4).split("-")[0].replace(" ", "") 

206 raw_df[-1].site = site.replace(" ", "") 

207 raw_df[-1].time = r_m.group(1) 

208 raw_df[-1].det = r_m.group(2).replace(" ", "") 

209 raw_df[-1].following_run = int(r_m.group(3)) 

210 raw_df[-1].upi = (r_m.group(4).split(f"{raw_df[-1].hardware}-")[1]).replace( 

211 " ", "" 

212 ) 

213 raw_df[-1].location = r_m.group(5).replace(" ", "") 

214 # For some unknown reasons, the lines below are mandatory to avoid an 

215 # error: AttributeError: 'DetectorFact' object has no attribute coord_utm 

216 raw_df[-1].coord_utm = {"x": 0.0, "y": 0.0, "z": 0.0} 

217 raw_df[-1].coord_detx = {"x": 0.0, "y": 0.0, "z": 0.0} 

218 raw_df[-1].extract_coord_from_location() 

219 if r_m.group(6).replace(" ", "") != "": 

220 raw_df[-1].position = int(r_m.group(6)) 

221 else: # Detector object has no position 

222 raw_df[-1].position = "" 

223 raw_df[-1].status = r_m.group(7).replace(" ", "") 

224 raw_df[-1].comment = r_m.group(8) 

225 if "&" in r_m.group(9): # Waveform with rate definition 

226 raw_df[-1].waveform = r_m.group(9).split("&")[0].replace(" ", "") 

227 raw_df[-1].pattern = r_m.group(9).split("&")[1] 

228 tmp_rate = re_rate.search(raw_df[-1].pattern) 

229 try: 

230 raw_df[-1].emission_rate = int(tmp_rate.group(1)) / int(tmp_rate.group(2)) 

231 except AttributeError: 

232 print("Bad pattern -> Unable to retrieve the rate") 

233 print(raw_df[-1].pattern) 

234 raw_df[-1].emission_rate = 0. 

235 else: 

236 raw_df[-1].waveform = r_m.group(9).replace(" ", "") 

237 raw_df[-1].pattern = "" 

238 raw_df[-1].emission_rate = 0. 

239 raw_df[-1].documentation = r_m.group(10) 

240 raw_df[-1].author = r_m.group(11) 

241 

242 return raw_df