Coverage for src/km3dq_common/lw_db_det_fact_library.py: 0%

82 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-14 11:06 +0000

1#! /usr/bin/env python 

2############################################################################### 

3import re 

4import sys 

5import calendar 

6import time 

7import urllib.request 

8import tomli 

9 

10from .detector_fact_classes import DetectorFact 

11 

12 

13############################################################################### 

14def read_det_fact_file_2025(site): 

15 """ 

16 Read detector fact files stored on SFTP 

17 

18 === Arguments === 

19 - det : detector name - [string] - Ex: "D0ARCA021", "D0ORCA018"... 

20 - def_tag : defect tag - [string] 

21 - type_filter : if not empty, filter on the defect type - If empty, read 

22 all defect files - [array of strings] - 

23 Ex: ["daq", "operation"] 

24 

25 === Output === 

26 - raw detector facts 

27 - detector start/end 

28 

29 2025 version with primary_det_fact 

30 """ 

31 

32 # Defined all needed regex 

33 re_line = re.compile( 

34 r"\s*(\S*)\s*\|" # Primary 

35 r"\s*(\S*)\s*\|" # Object id 

36 r"\s*(\S*)\s*\|" # Location 

37 r"\s*(\S*)\s*\|" # Position 

38 r"\s*(\S*)\s*\|" # Status 

39 r"(.*)$" # Waveform 

40 ) 

41 re_primary = re.compile(r"(D.*)\_(.*)\_(\d\d)(\d\d)\_.*") # detector_data_time_anything 

42 re_rate = re.compile(r"^\s*\((\d+) ping.+every (\d+)s\)\s*$") 

43 

44 # Sftp links 

45 url = { 

46 "primary_det_fact": ( 

47 "https://sftp.km3net.de/data/km3dq_lw_db/" 

48 f"{site}//DetectorFacts/" 

49 f"primary_detector_facts.toml" 

50 ), 

51 "det_fact": ( 

52 "https://sftp.km3net.de/data/km3dq_lw_db/" 

53 f"{site}//DetectorFacts/" 

54 f"detector_facts_2025.txt" 

55 ) 

56 } 

57 

58 # Retrieve all the primaries 

59 try: 

60 with urllib.request.urlopen(url["primary_det_fact"]) as toml_primary: 

61 toml_primary = tomli.loads(toml_primary.read().decode("utf-8")) 

62 except urllib.error.URLError: 

63 print("Missing primary detector-fact file") 

64 sys.exit() 

65 # Initiate the couting of used primaries (except for the default) 

66 primary_usage = {} 

67 for i_primary in toml_primary: 

68 if i_primary != "default": 

69 primary_usage[i_primary] = 0 

70 

71 # Now loop on all detector facts 

72 raw_df = [] 

73 lines = [] 

74 try: 

75 with urllib.request.urlopen(url["det_fact"]) as def_file: 

76 tmp = (def_file.read()).split(b"\n") 

77 for i_line in tmp: 

78 if i_line != "": 

79 lines.append(i_line.decode("utf-8")) 

80 except urllib.error.URLError: 

81 print("Missing detector-fact file") 

82 sys.exit() 

83 

84 for i_line_index, i_line in enumerate(lines): 

85 if i_line_index == 0: 

86 continue 

87 

88 regex_line = re_line.match(i_line) 

89 if regex_line: 

90 # Try to decode the primary name to get datetime, detector 

91 primary_name = regex_line.group(1) 

92 regex_primary = re_primary.match(primary_name) 

93 if regex_primary is None: 

94 print(f"Bad primary name {primary_name}") 

95 sys.exit() 

96 

97 primary_usage[primary_name] += 1 

98 

99 raw_df.append(DetectorFact()) 

100 

101 raw_df[-1].hardware = regex_line.group(2).split("-")[0] 

102 raw_df[-1].site = site.replace(" ", "") 

103 raw_df[-1].time = f"{regex_primary.group(2)} {regex_primary.group(3)}:{regex_primary.group(4)}" 

104 raw_df[-1].det = regex_primary.group(1) 

105 raw_df[-1].following_run = int(toml_primary[primary_name]["run"]) 

106 raw_df[-1].upi = (regex_line.group(2).split(f"{raw_df[-1].hardware}-")[1]) 

107 

108 raw_df[-1].location = regex_line.group(3) 

109 # For some unknown reasons, the lines below are mandatory to avoid an 

110 # error: AttributeError: 'DetectorFact' object has no attribute coord_utm 

111 raw_df[-1].coord_utm = {"x": 0.0, "y": 0.0, "z": 0.0} 

112 raw_df[-1].coord_detx = {"x": 0.0, "y": 0.0, "z": 0.0} 

113 raw_df[-1].extract_coord_from_location() 

114 

115 raw_df[-1].position = int(regex_line.group(4)) 

116 raw_df[-1].status = regex_line.group(5) 

117 

118 raw_df[-1].comment = toml_primary[primary_name]["comment"] 

119 

120 if "&" in regex_line.group(6): # Waveform with rate definition 

121 raw_df[-1].waveform = regex_line.group(6).split("&")[0].replace(" ","") 

122 raw_df[-1].pattern = regex_line.group(6).split("&")[1] 

123 tmp_rate = re_rate.search(raw_df[-1].pattern) 

124 try: 

125 raw_df[-1].emission_rate = int(tmp_rate.group(1)) / int(tmp_rate.group(2)) 

126 except AttributeError: 

127 print("Bad pattern -> Unable to retrieve the rate") 

128 print(raw_df[-1].pattern) 

129 raw_df[-1].emission_rate = 0. 

130 else: 

131 raw_df[-1].waveform = regex_line.group(6).replace(" ", "") 

132 raw_df[-1].pattern = "" 

133 raw_df[-1].emission_rate = 0. 

134 

135 raw_df[-1].documentation = toml_primary[primary_name]["documentation"] 

136 try: # If author not defined, looks for the default one (a priori only the author has a default value) 

137 raw_df[-1].author = toml_primary[primary_name]["author"] 

138 except KeyError: 

139 raw_df[-1].author = toml_primary["default"]["author"] 

140 

141 # Check that all primaries have been used at least once 

142 for i_primary in toml_primary: 

143 if i_primary != "default": 

144 if primary_usage[i_primary] == 0: 

145 print(f"{i_primary} never used! -> exiting") 

146 sys.exit() 

147 

148 return raw_df