Source code for km3dq_common.detector_status_classes

#!/usr/bin/env python3
import datetime
import re

from .lw_db_det_fact_library import read_det_fact_file_2025


[docs] RE_TIME = re.compile(r"(\d\d\d\d)-(\d\d)-(\d\d)\s(\d\d):(\d\d)")
###############################################################################
[docs] def strtime_to_datetime(strtime): """ Convert a time string (like 2025-03-05 00:20) into a datetime """ spl_time = RE_TIME.match(strtime) return datetime.datetime( int(spl_time.group(1)), int(spl_time.group(2)), int(spl_time.group(3)), int(spl_time.group(4)), int(spl_time.group(5)) ).timestamp()
###############################################################################
[docs] class DetectorStatus(dict): """ Detector fact classes ingeriting from a dict https://pynative.com/make-python-class-json-serializable/ https://realpython.com/inherit-python-dict/ """ ####################################### def __init__(self, site, date_time=""): dict.__init__(self, site="", time="", datetimetime=0, hardware={} )
[docs] self.site = site
[docs] self.time = date_time # String "yyyy-mm-dd hh:mm"
if self.time != "": # Past detector status (i.e not today) self.datetime = strtime_to_datetime(self.time) else: # Today self.datetime = 0
[docs] self.hardware = {}
#################################
[docs] def set_installed_hardware(self, all_df): """ Loop on all detector facts of a site and define the installed hardware """ # all_df = read_det_fact_file_2025(self.site) # Loop on all detector facts for i_df in all_df: # Keep only the detector fact priors to the requested time # Id ds_time == 0 (i.e today), keep all detector facts i_df_time = strtime_to_datetime(i_df.time) if i_df_time > self.datetime and self.datetime != 0: # Detector fact after the detector status time continue # The upi is now used as index # NB: the base autonomeous beacons have a _b suffix appended # to makek their upi different from the DU ones. i_hw_upi = i_df.upi if i_hw_upi in self.hardware.keys(): # Position already known. Check if more recent if (i_df_time < self.hardware[i_hw_upi]["time"]): continue if i_hw_upi in self.hardware.keys() and i_df.status == "removed": self.hardware.pop(i_hw_upi) else: self.hardware[i_hw_upi] = { "type": i_df.hardware, "position": i_df.position, "time": i_df_time, "run": i_df.following_run, "location": i_df.location, "coord_detx": i_df.coord_detx, "coord_utm": i_df.coord_utm, "waveform": i_df.waveform, "pattern": i_df.pattern, "emission_rate": i_df.emission_rate, "upi": re.sub("\s*$", "", i_df.upi), "status": i_df.status, "documentation": re.sub("\s*$", "", i_df.documentation) }