Source code for km3dq_common.common_library

#! /usr/bin/env python
########################################################################################################
# Various functions used in km3dq_lw_dq and km3dq_core scripts
#
# Developer: Benjamin Trocme (benjamin.trocme at apc.in2p3.fr) - 2023

import os
import sys
import time
import array
import re
import urllib.request
import km3db
import pandas as pd
import numpy as np
import tomli

from .config_library import configure_var_bit
from .config_library import configure_var_thresholds
from .config_library import get_detx_caract
from .config_library import configure_dataquality_tag
from .config_library import configure_defect
from .lw_db_defect_library import read_defect_file
from .lw_db_defect_library import decode_defect_diag_byte
from .lw_db_defect_library import check_defect_diag_byte

# Types of all variables of all versions except the oos variables of 16.0.3 that
# are renamed on the fly
# Variable evolution with versions:
# - 17.3.2: out-sync/in-sync renamed out_sync/in_sync
#           out_usync/in_usync/event_duration added
# - 19.2.0: nb_of_meta/hrv_fifo_failures/duplic_timeslices/zero_AHRS/mean_AHRS added
#           AHRS removed

[docs] VARIABLE_TYPE = { "livetime_s": float, "DAQ": float, "WR": float, "FIFO": float, "HRV": float, "PMTs": float, "pmts_norm": float, "timestampdiff": float, "timestampdiff_r": float, "triggerrate": float, "Acoustics": float, "Acoustics_per_mn": float, "MEAN_Rate_Hz": float, "RMS_Rate_Hz": float, "AHRS": float, "ahrs_per_mn": float, "mean_AHRS": float, "JDAQJTProc": float, "kton_year": float, "UTCMin_s": float, "UTCMax_s": float, "detector": int, "run": int, "nb_of_meta": int, "trigger3DMuon": int, "triggerMXShower": int, "trigger3DShower": int, "triggerNB": int, "JTriggerReprocessor": int, "JTrigger3dmuon": int, "JTriggermxshower": int, "JTrigger3dshower": int, "JTriggernb": int, "writeL0": int, "writeL1": int, "writeL2": int, "writeSN": int, "JDAQTimeslice": int, "JDAQTimesliceL0": int, "JDAQTimesliceL1": int, "JDAQTimesliceL2": int, "JDAQTimesliceSH": int, "JDAQSummaryslice": int, "JDAQEvent": int, "hrv_fifo_failures": int, "duplic_timeslices": int, "in_sync": int, "zero_AHRS": int, "out_sync": int, "in_usync": int, "out_usync": int, "hv_inconsist":int }
# Dummy value used when the variable is missing in a run/QAQC file
[docs] MISSING_VALUE = -999999
########################################################################################################
[docs] def get_det_id(det): """ Returns the det id === Input: - det [string]: D1ORCA024, D0ARCA030... === Outputs: - detector id [integer] """ return km3db.tools.todetid(det)
########################################################################################################
[docs] def get_current_detector(site): """ Extract the current detector names from the database by checking the highest run number By default, it is retrieved from the KM3NeT DB but in some rare cases (typically at a detector change) it can be forced to a different one by modifying the km3dq_lw_db/Common/current_detector.toml file that is available on sftp. === Input: - site [string]: ARCA / ORCA === Outputs: - detector [string]: D1ORCA024, D0ARCA030... """ # First check if the current detector is hardcoded in the current_detector.toml file # Facility especially useful at the detector change to force keeping a detector in Coral with urllib.request.urlopen( "https://sftp.km3net.de/data/km3dq_lw_db/Common/current_detector.toml" ) as s_f: toml = tomli.loads(s_f.read().decode("utf-8")) if toml[site] != "": return toml[site] # If not -> Retrieve it from the database sds = km3db.tools.StreamDS() if site == "ARCA": res = sds.detectors(city="Italy").split("\n") else: res = sds.detectors(city="France").split("\n") max_run = 0 for i_det in res[1:]: try: if int(i_det.split("\t")[5]) > max_run: o_det = i_det.split("\t")[0] if o_det.startswith("D") is False: continue max_run = int(i_det.split("\t")[5]) except: print("Improper db reply in get_current_detectors") return o_det
########################################################################################################
[docs] def get_file_paths(detector, qaqc_name): """ For a given detector / detector, returns the path for QAQC files both on sps and sftp === Input: - det [string]: D1ORCA024, D0ARCA030... === Outputs: - detector id [integer] """ return { "JQAQC_sps": ( "/sps/km3net/repo/data/raw/quality/" f"KM3NeT_{km3db.tools.todetid(detector):08d}_QAQC_Jpp_{qaqc_name}.txt" ), "JQAQC_sftp":( "https://sftp.km3net.de/data/quality/" f"KM3NeT_{km3db.tools.todetid(detector):08d}_QAQC_Jpp_{qaqc_name}.txt" ) }
########################################################################################################
[docs] def get_run_properties_from_db(det, filt="PHYS", filt_target="Run"): """ Retrieve run properties from the database with the km3db package === Inputs: - det [string]: D1ORCA024, D0ARCA030... - filt [string]: accepted run types (PHYS, COMM...) separated by a space - filt_target [string]: accepted target (run, on...) separated by a space === Outputs: - properties [dict of dict]: all DB content in a single dictionnary with the run numbers as key """ runs = km3db.StreamDS(container="nt").get("runs", detid=det) results = {} for i_run in runs: if all((i_run.runsetupname[0:4] not in filt.split(" "), i_run.runsetupname[0:5] not in filt.split(" "))): continue if i_run.jobtarget not in filt_target.split(" "): continue results[i_run.run] ={ 'UNIXSTARTTIME': i_run.unixstarttime, 'STARTTIME_DEFINED': i_run.starttime_defined, 'RUNSETUPID': i_run.runsetupid, 'RUNSETUPNAME': i_run.runsetupname, 'T0_CALIBSETID': i_run.t0_calibsetid, 'POS_CALIBSETID': i_run.pos_calibsetid, 'ROT_CALIBSETID': i_run.rot_calibsetid, 'RUNJOBID': i_run.runjobid, 'JOBTARGET': i_run.jobtarget, 'JOBPRIORITY': i_run.jobpriority, 'UNIXJOBSTART': i_run.unixjobstart, 'UNIXJOBEND': i_run.unixjobend, } return results
########################################################################################################
[docs] def get_run_properties_from_qaqc( det, dq_tag, origin="qaqc_sftp", startrun=0, endrun=1e9 ): """ For a given detector, returns the run numbers, their lengths, time counter... as retrieved from the QAQC file === Inputs: - det [string]: D1ORCA024, D0ARCA030... - dq_tag [DQ tag]: as configured in config_library - origin [string]: "qaqc_sftp" or "qaqc_sps" - startrun [integer]: first run - endrun [integer]: last run === Outputs: - run properties: TOBEDETAILED - error: should be empty """ runprop = { "runNumber": [], "lvt": [], "lvt_counter": [], "kty": [], "kty_counter": [], "nbRuns": 0, "minRun": 1e10, "maxRun": 0, } kty_counter = 0 err_log = "" (n1, err_log) = create_ttree_from_qaqc( det, ["run", "livetime_s", "kton_year"], origin, dq_tag ) if len(n1) == 0: return (runprop, "No data in qaqc") if all((startrun != 0, endrun != 1e9)): n1 = n1[(n1["run"] >= startrun) & (n1["run"] <= endrun)] runprop = { "runNumber": n1.run.to_list(), "lvt": (n1.livetime_s / 3600 / 24).to_list(), "kty": n1.kton_year.to_list(), "nbRuns": len(n1), "minRun": int(n1.min()["run"]), "maxRun": int(n1.max()["run"]), } runprop["lvt_counter"] = [0] + (n1.cumsum()["livetime_s"] / 3600 / 24).to_list() runprop["kty_counter"] = [0] + (n1.cumsum()["kton_year"]).to_list() return (runprop, err_log)
########################################################################################################
[docs] def get_run_timerange(det, filt="PHYS"): """ Retrieve the run start/stop by combining db and QAQC quantities to get the most accurate timing. If QAQC data exist (normally for all PHYS runs), use them, otherwise use the DB ones So far used only in km3net_lw_db_interface === Inputs: - det: string like (e.g: D0ARCA030, D1ORCA024...) - filt: string containing the accepted run type (PHYS, COMM...) separated by a space === Outputs: - dictionary with run as key and a list of [run_start, run_stop, source, runtype] with source = 0 (QAQC) or 1 (DB only) """ # First retrieve the QAQC variables that will be used to get the exact # run end df, _ = create_ttree_from_qaqc( det, ["run", "livetime_s"], "qaqc_sftp", configure_dataquality_tag("operation") ) results = dict([(r, [s, e, 0]) for r, s, e in zip(df["run"], df["UTCMin_s"], df["UTCMax_s"])]) # Then retrieve all runs from the database to get the remaining runs runs = km3db.StreamDS(container="nt").get("runs", detid=det) for i_run in runs: i_run_runtype = i_run.runsetupname.split(".")[0] if i_run_runtype not in filt: continue if i_run.run in results: # Timerange already retrieved from the QAQC data - Append run type results[i_run.run].append(i_run_runtype) continue try: results[i_run.run] = [i_run.unixjobstart/1000., i_run.unixjobend/1000., 1, i_run_runtype] except TypeError: pass return results
########################################################################################################
[docs] def get_last_n_weeks_runs_qaqc(det, nweeks=2): """Extract the all runs acquired in the last nweeks of running""" # QAQC default tag used. As priori OK for this purpose dq_tag = configure_dataquality_tag("operation") min_run = 1e9 max_run = 0 try: lines = read_qaqc_file(det, "qaqc_sftp", dq_tag["qaqc_name"][det], "") ten_days = (nweeks * 7 + 1) * 3600.0 * 24.0 for i_run in range(len(lines["run"])): if (time.time() - float(lines["UTCMin_s"][i_run])) < ten_days: run_number = int(lines["run"][i_run]) if run_number < min_run: min_run = run_number if run_number > max_run: max_run = run_number except KeyError: print(f"common_library.py: Missing DQ-tag information for the detector {det}") return (min_run, max_run)
########################################################################################################
[docs] def get_site(det): """ Returns the site (ORCA or ARCA) === Inputs: - det [string]: D1ORCA024, D0ARCA030... === Ouputs: - site [string]: ARCA/ORCA """ # A/O detectors corresponds to the current ARCA/ORCA detectors if "ORC" in det or det == "O": out = "ORCA" else: out = "ARCA" return out
########################################################################################################
[docs] def get_active_dus_range(det): """ Retrieve the range of active DUs To be improved with database information === Inputs: - det [string]: D1ORCA024, D0ARCA030... === Ouputs: - site [string]: ARCA/ORCA """ lower_du = 0 upper_du = 48 return (lower_du, upper_du)
########################################################################################################
[docs] def get_nb_qaqc_variables(qaqc_vers): """ Retrieve the number of variables in the QAQC file === Inputs - qaqc_vers [string]: "v16.0.3", "v17.3.2", "V19.2.0"... === Output === - Number of QAQC variables [integer] """ if qaqc_vers in ("v16.0.3", "16.0.3"): return 39 if qaqc_vers in ("v17.3.2", "17.3.2", "18.0.0-rc.4-34-g8618072aa-D"): return 42 if qaqc_vers in ("v19.2.0", "19.1.0", "19.2.0"): return 46 if qaqc_vers in ("19.3.0-rc.2", "19.3.0-rc.5"): return 46 if qaqc_vers == "19.3.0-rc.x": return 47 print(f"Unknwon qaqc version {qaqc_vers} -> Exiting") sys.exit()
########################################################################################################
[docs] def create_ttree_from_qaqc(det, var_to_fill, source, tag, append_veto_qsco=False): """ Create a ttree from qaqc sftp file and defect variables stored on git It includes some advanced check about the QAQC file integrity. === Inputs - det : detector name - [string] - Ex: "D0ARCA021", "D0ORCA018"... - var_to_fill : QAQC variables or defect to fill - [array of string] - Ex: ['run', 'timestampdiff', 'def_operation', 'def_oos'] Obsolete - To be removed at some point - source : QAQC source, a priori "qaqc_sftp" - [string] - tag : data-quality tag (not its name) as created by configure_dataquality_tag - append_veto_qsco: append the veto and Qscore - [boolean] === Outputs - TTree - Error log """ error_log = "" content = read_qaqc_file(det, source, tag["qaqc_name"][det], tag["run_range"][det]) df = pd.DataFrame(content) # Renaming required for 16.0.3 df = df.rename(columns={"out-sync": "out_sync"}) df = df.rename(columns={"in-sync": "in_sync"}) # Type definition for each column # df = df.astype({key: float for key in variable_float if key in df.columns}) # df = df.astype({key: int for key in variable_int if key in df.columns}) for i_var, i_type in VARIABLE_TYPE.items(): try: df = df.astype({i_var: i_type}) except KeyError: # Variable missing in this version -> Dummy column added df[i_var] = len(df) * [MISSING_VALUE] df["timestampdiff"] = df.UTCMax_s - df.UTCMin_s - df.livetime_s df["timestampdiff_r"] = 1.0 - df.livetime_s / (df.UTCMax_s - df.UTCMin_s) df["pmts_norm"] = df.PMTs / get_detx_caract(det)["pmts"] df["triggerrate"] = df.JDAQEvent / df.livetime_s try: # Version > 19.2.0 df["ahrs_per_mn"] = df.mean_AHRS / df.livetime_s * 60 except AttributeError: # Version 16.0.3 / 17.3.2 df["ahrs_per_mn"] = df.AHRS / df.livetime_s * 60 df["Acoustics_per_mn"] = df.Acoustics / df.livetime_s * 60 df["JDAQJTProc"] = (df.JDAQEvent - df.JTriggerReprocessor) / (df.JDAQEvent + 1e-10) if "ARC" in det: df["kton_year"] = ( 1e6 / (31 * 18 * 2 * 115) * df.PMTs * (1.0 - df.HRV) * df.livetime_s / (365.2425 * 24 * 3600) ) else: df["kton_year"] = ( 6980 / (31 * 18 * 115) * df.PMTs * (1.0 - df.HRV) * df.livetime_s / (365.2425 * 24 * 3600) ) # Adding defects defect_results = read_defect_file(det, tag["def_tag"]) df = df.sort_values("run") df.reset_index(drop=True) for key, runs in defect_results.items(): value = np.zeros(len(df)) if len(runs) != 0: # A defect can be assigned to a run missing in the QAQC file (protection added) # The defects_results is not chronologically ordered (sorting added) mask = np.isin( df.run, [x for x in sorted(runs.keys()) if x in df["run"].values] ) value[mask] = [ runs[x] for x in sorted(runs.keys()) if x in df["run"].values ] df[key] = value # Adding Veto and Q score: if append_veto_qsco: var_properties = {} configure_var_thresholds(var_properties, det, tag) configure_var_bit(var_properties) df["veto"], df["veto_source"], df["qsco"], df["qsco_source"] = ( compute_veto_qscore(var_properties, df) ) if error_log != "": print(error_log) return (df, error_log)
########################################################################################################
[docs] def compute_veto_qscore(var_prop, var_val): """ Computes the veto/Q-score for a detector/tag using a QAQC source. """ def_config = configure_defect() veto = np.zeros(len(var_val), dtype=bool) veto_source = np.zeros(len(var_val), dtype=int) qsco = np.ones(len(var_val), dtype=float) * len(var_prop["qsco_thresholds"]) qsco_source = np.zeros(len(var_val), dtype=int) pd.set_option("display.max_rows", None) for i_var in var_prop["veto_thresholds"]: values = var_val[i_var] # Defect variables # The defect variable is defined as def_[defect type] # with [defect type] = daq, operation, analysis... if i_var.startswith("def_"): # first extract the diagnosis defect_type = i_var.replace("def_", "") diag_array = {"present": [], "absent": []} for i_excluded_diag in var_prop["veto_thresholds"][i_var]: if i_excluded_diag.startswith("NOT."): diag_array["absent"].append( def_config["bit"][defect_type][ i_excluded_diag.replace("NOT.", "") ] ) else: diag_array["present"].append( def_config["bit"][defect_type][i_excluded_diag] ) if len(diag_array["absent"]) == 0: # Only present diagnosis def_mask = values != 0 elif len(diag_array["present"]) == 0: # Only NOT. diagnosis def_mask = values == 0 else: # Mixture of present/absent -> No mask def_mask = values != (-1) if np.sum(def_mask) == 0: continue defect_test_output = np.vectorize(check_defect_diag_byte, excluded=[1])( values[def_mask], diag_array ) # The FutureWarning about the incompatible dtype comes from the line below # When checking explicitly the dtype, both are returned as bool hence # the FutureWarning can not be understood. # The trial to fix with explicit casting and other numpy functions # (e.g veto[def_mask] = np.logical_or(veto[def_mask].astype(bool), defect_test_output.astype(bool))) # did not help at all. veto[def_mask] = veto[def_mask] | defect_test_output veto_source[def_mask] += (0b1 & defect_test_output) << var_prop["bit"][ i_var ] else: # QAQC variable: range treatement range_mask = (var_val[i_var] != MISSING_VALUE) & ( (var_val[i_var] < var_prop["veto_thresholds"][i_var][0]) | ( var_val[i_var] > var_prop["veto_thresholds"][i_var][1] ) ) veto = veto | range_mask veto_source[range_mask] += 0b1 << var_prop["bit"][i_var] for i_var in var_prop["qsco_thresholds"]: values = var_val[i_var] # Defect variables # Defect variables # The defect variable is defined as def_[defect type] # with [defect type] = daq, operation, analysis... if i_var.startswith("def_"): # first extract the set defects def_mask = values != 0 if np.sum(def_mask) == 0: continue defect_type = i_var.replace("def_", "") diag_array = {"present": [], "absent": []} for i_excluded_diag in var_prop["qsco_thresholds"][i_var]: if i_excluded_diag.startswith("NOT."): diag_array["absent"].append( def_config["bit"][defect_type][ i_excluded_diag.replace("NOT.", "") ] ) else: diag_array["present"].append( def_config["bit"][defect_type][i_excluded_diag] ) defect_test_output = np.vectorize(check_defect_diag_byte, excluded=[1])( values[def_mask], diag_array ) qsco[def_mask] = qsco[def_mask] | defect_test_output qsco_source[def_mask] += (0b1 & defect_test_output) << var_prop["bit"][ i_var ] else: # QAQC variable: range treatement range_mask = (var_val[i_var] != MISSING_VALUE) & ( (var_val[i_var] < var_prop["qsco_thresholds"][i_var][0]) | ( var_val[i_var] > var_prop["qsco_thresholds"][i_var][1] ) ) qsco[range_mask] -= 1 qsco_source[range_mask] += 0b1 << var_prop["bit"][i_var] qsco /= len(var_prop["qsco_thresholds"]) return (veto, veto_source, qsco, qsco_source)
########################################################################################################
[docs] def log_run_range(det, run_0, run_1, log_file): """Write the run range and run list on disk""" os.environ["TZ"] = "Europe/Paris" time.tzset() run_prop = get_run_properties_from_db(det, "PHYS COMM") try: tim_0_unix = int(run_prop[run_0]["UNIXJOBSTART"]) tim_0 = time.strftime("%a, %d %b %Y %H:%M", time.localtime(tim_0_unix / 1000)) except KeyError: tim_0 = "Unknown" try: tim_1_unix = int(run_prop[run_1]["UNIXJOBEND"]) tim_1 = time.strftime("%a, %d %b %Y %H:%M", time.localtime(tim_1_unix / 1000)) except KeyError: tim_1 = "Unknown" run_range = f"{int(run_0)} ({tim_0} CET/CEST) - " f"{int(run_1)} ({tim_1} CET/CEST)" log_file.write(f"Run range: {run_range}\n")
########################################################################################################
[docs] def read_qaqc_file(det, source, qaqc_name, run_range): """ sftp QAQC file reading === Arguments === - det : detector name - [string] - Ex: "D0ARCA021", "D0ORCA018"... - source : QAQC source, a priori "qaqc_sftp" - [string] - qaqc_name : - run_range : string as defined in DQ tag "[min_run]-[max_run]" (inclusive) === Output === - TTree - Error log """ lines = [] if source == "qaqc_sftp": with urllib.request.urlopen( get_file_paths(det, qaqc_name)["JQAQC_sftp"] ) as qaqc: tmp = (qaqc.read()).split(b"\n") for i_line in tmp: if i_line != "": lines.append(i_line.decode("utf-8")) else: if source == "qaqc_sps": source_file = get_file_paths(det, qaqc_name)["JQAQC_sps"] else: source_file = source with open(source_file, "r", encoding="utf-8") as s_f: lines = s_f.readlines() labels = lines[0].split() run_index = labels.index("run") if run_range != "": min_run = int(run_range.split("-")[0]) max_run = int(run_range.split("-")[1]) else: min_run = 0 max_run = 1e9 content = {} for l in lines[1:]: try: if len(l.split()) != get_nb_qaqc_variables(l.split()[0]): print(f"Bad number of variables in {l}") continue # sys.exit() except IndexError: # Empty line pass try: if all((int(l.split()[run_index]) >= min_run, int(l.split()[run_index]) <= max_run )): for i, key in enumerate(labels): try: try: content[key].append(l.split()[i]) except KeyError: content[key] = [l.split()[i]] except IndexError: # Missing variable in QAQC file try: content[key].append(MISSING_VALUE) except KeyError: content[key] = [MISSING_VALUE] except IndexError: # Empty line pass return content
########################################################################################################
[docs] def decode_source_byte(value, v_prop): """ Decode the source of degradation stored in the TTree NB: the source is either a QAQC variable either a defect type (("daq", "operation"...) """ source_list = [] source_str = "" for i_var, i_bit in v_prop["bit"].items(): if (int(value) >> i_bit) & 1: source_list.append(i_var) source_str += f"{i_var} " return (source_list, source_str)