Source code for km3dq_common.detector_status_classes

#!/usr/bin/env python3
import datetime
import re

from km3dq_common.lw_db_fact_library import read_det_fact_file


###############################################################################
[docs] class DetectorFact(dict): """ Detector fact classes ingeriting from a dict https://realpython.com/inherit-python-dict/ """ def __init__(self): dict.__init__(self, hardware="", location="", position=0, status="", upi=0, site="", det="", time="", following_run=0, comment="", waveform="", documentation="", author="")
###############################################################################
[docs] class DetectorStatus(dict): """ Detector fact classes ingeriting from a dict https://pynative.com/make-python-class-json-serializable/ https://realpython.com/inherit-python-dict/ """ def __init__(self, site, date_time=""): dict.__init__(self, site="", time="", hardware={} )
[docs] self.site = site
[docs] self.time = date_time # String "yyyy-mm-dd hh:mm"
[docs] self.hardware = {}
[docs] def set_installed_hardware(self): re_time = re.compile(r"(\d\d\d\d)-(\d\d)-(\d\d)\s(\d\d):(\d\d)") if self.time != "": # Past detector status (i.e not today) spl_time = re_time.match(self.time) ds_time = datetime.datetime(int(spl_time.group(1)), int(spl_time.group(2)), int(spl_time.group(3)), int(spl_time.group(4)), int(spl_time.group(5))).timestamp() else: # Today ds_time = 0 (all_df, _) = read_det_fact_file(self.site) # Loop on all detector facts for i_df in all_df: if i_df.hardware == "det": continue # Extract the time of the detector fact spl_time = re_time.match(i_df.time) if spl_time: i_time = datetime.datetime(int(spl_time.group(1)), int(spl_time.group(2)), int(spl_time.group(3)), int(spl_time.group(4)), int(spl_time.group(5))).timestamp() else: print("Wrong time format - Skip") continue # Keep only the detector fact priors to the requested time # Id ds_time == 0 (i.e today), keep all detector facts if i_time > ds_time and ds_time != 0: # Detector fact after the detector status time continue # The upi is now used as index # NB: the base autonomeous beacons have a _b suffix appended # to makek their upi different from the DU ones. i_hw_upi = i_df.upi if i_hw_upi in self.hardware.keys(): # Position already known. Check if more recent if (i_time < self.hardware[i_hw_upi]["time"]): continue if i_hw_upi in self.hardware.keys() and i_df.status == "removed": self.hardware.pop(i_hw_upi) else: self.hardware[i_hw_upi] = {"type": i_df.hardware, "position": i_df.position, "time": i_time, "run": i_df.following_run, "location": i_df.location, "waveform": i_df.waveform, "upi": i_df.upi, "status": i_df.status}