#! /usr/bin/env python
###############################################################################
import re
import sys
import calendar
import time
import urllib.request
import tomli
from .detector_fact_classes import DetectorFact
###############################################################################
[docs]
def read_det_fact_file_2025(site):
"""
Read detector fact files stored on SFTP
=== Arguments ===
- det : detector name - [string] - Ex: "D0ARCA021", "D0ORCA018"...
- def_tag : defect tag - [string]
- type_filter : if not empty, filter on the defect type - If empty, read
all defect files - [array of strings] -
Ex: ["daq", "operation"]
=== Output ===
- raw detector facts
- detector start/end
2025 version with primary_det_fact
"""
# Defined all needed regex
re_line = re.compile(
r"\s*(\S*)\s*\|" # Primary
r"\s*(\S*)\s*\|" # Object id
r"\s*(\S*)\s*\|" # Location
r"\s*(\S*)\s*\|" # Position
r"\s*(\S*)\s*\|" # Status
r"(.*)$" # Waveform
)
re_primary = re.compile(r"(D.*)\_(.*)\_(\d\d)(\d\d)\_.*") # detector_data_time_anything
re_rate = re.compile(r"^\s*\((\d+) ping.+every (\d+)s\)\s*$")
# Sftp links
url = {
"primary_det_fact": (
"https://sftp.km3net.de/data/km3dq_lw_db/"
f"{site}//DetectorFacts/"
f"primary_detector_facts.toml"
),
"det_fact": (
"https://sftp.km3net.de/data/km3dq_lw_db/"
f"{site}//DetectorFacts/"
f"detector_facts_2025.txt"
)
}
# Retrieve all the primaries
try:
with urllib.request.urlopen(url["primary_det_fact"]) as toml_primary:
toml_primary = tomli.loads(toml_primary.read().decode("utf-8"))
except urllib.error.URLError:
print("Missing primary detector-fact file")
sys.exit()
# Initiate the couting of used primaries (except for the default)
primary_usage = {}
for i_primary in toml_primary:
if i_primary != "default":
primary_usage[i_primary] = 0
# Now loop on all detector facts
raw_df = []
lines = []
try:
with urllib.request.urlopen(url["det_fact"]) as def_file:
tmp = (def_file.read()).split(b"\n")
for i_line in tmp:
if i_line != "":
lines.append(i_line.decode("utf-8"))
except urllib.error.URLError:
print("Missing detector-fact file")
sys.exit()
for i_line_index, i_line in enumerate(lines):
if i_line_index == 0:
continue
regex_line = re_line.match(i_line)
if regex_line:
# Try to decode the primary name to get datetime, detector
primary_name = regex_line.group(1)
regex_primary = re_primary.match(primary_name)
if regex_primary is None:
print(f"Bad primary name {primary_name}")
sys.exit()
primary_usage[primary_name] += 1
raw_df.append(DetectorFact())
raw_df[-1].hardware = regex_line.group(2).split("-")[0]
raw_df[-1].site = site.replace(" ", "")
raw_df[-1].time = f"{regex_primary.group(2)} {regex_primary.group(3)}:{regex_primary.group(4)}"
raw_df[-1].det = regex_primary.group(1)
raw_df[-1].following_run = int(toml_primary[primary_name]["run"])
raw_df[-1].upi = (regex_line.group(2).split(f"{raw_df[-1].hardware}-")[1])
raw_df[-1].location = regex_line.group(3)
# For some unknown reasons, the lines below are mandatory to avoid an
# error: AttributeError: 'DetectorFact' object has no attribute coord_utm
raw_df[-1].coord_utm = {"x": 0.0, "y": 0.0, "z": 0.0}
raw_df[-1].coord_detx = {"x": 0.0, "y": 0.0, "z": 0.0}
raw_df[-1].extract_coord_from_location()
raw_df[-1].position = int(regex_line.group(4))
raw_df[-1].status = regex_line.group(5)
raw_df[-1].comment = toml_primary[primary_name]["comment"]
if "&" in regex_line.group(6): # Waveform with rate definition
raw_df[-1].waveform = regex_line.group(6).split("&")[0].replace(" ","")
raw_df[-1].pattern = regex_line.group(6).split("&")[1]
tmp_rate = re_rate.search(raw_df[-1].pattern)
try:
raw_df[-1].emission_rate = int(tmp_rate.group(1)) / int(tmp_rate.group(2))
except AttributeError:
print("Bad pattern -> Unable to retrieve the rate")
print(raw_df[-1].pattern)
raw_df[-1].emission_rate = 0.
else:
raw_df[-1].waveform = regex_line.group(6).replace(" ", "")
raw_df[-1].pattern = ""
raw_df[-1].emission_rate = 0.
raw_df[-1].documentation = toml_primary[primary_name]["documentation"]
try: # If author not defined, looks for the default one (a priori only the author has a default value)
raw_df[-1].author = toml_primary[primary_name]["author"]
except KeyError:
raw_df[-1].author = toml_primary["default"]["author"]
# Check that all primaries have been used at least once
for i_primary in toml_primary:
if i_primary != "default":
if primary_usage[i_primary] == 0:
print(f"{i_primary} never used! -> exiting")
sys.exit()
return raw_df