Source code for km3dq_common.common_library

#! /usr/bin/env python
###############################################################################
# Various functions used in km3dq_lw_dq and km3dq_core scripts
#
# Developer: Benjamin Trocme (benjamin.trocme at apc.in2p3.fr) - 2023

import os
import sys
import time
import array
import re
import urllib.request
import km3db
import pandas as pd
import numpy as np


from km3dq_common.config_library import configure_var_bit
from km3dq_common.config_library import configure_var_thresholds
from km3dq_common.config_library import get_detx_caract
from km3dq_common.config_library import configure_dataquality_tag
from km3dq_common.config_library import configure_defect
from km3dq_common.lw_db_defect_library import read_defect_file
from km3dq_common.lw_db_defect_library import decode_defect_diag_byte
from km3dq_common.lw_db_defect_library import check_defect_diag_byte


###############################################################################
[docs] def get_det_id(dataset): # TEMPORARY """Return the detector id""" # sds = km3db.tools.StreamDS(container="pd") # d = sds.detectors(oid=dataset).iloc[0] # return int(d.SERIALNUMBER) return km3db.tools.todetid(dataset)
###############################################################################
[docs] def get_current_detector(site): """ Extract the current detector names from the database """ sds = km3db.tools.StreamDS() if site == "ARCA": res = sds.detectors(city="Italy").split("\n") else: res = sds.detectors(city="France").split("\n") min_run = 0 for i_det in res[1:]: try: if int(i_det.split("\t")[4]) > min_run: o_det = i_det.split("\t")[0] if o_det.startswith("D") is False: continue min_run = int(i_det.split("\t")[4]) except: print("Improper db reply in get_current_detectors") return o_det
###############################################################################
[docs] def get_file_paths(dataset, qaqc_name): """ For a given dataset / detector, returns the path for QAQC files """ filename = {} det_id_str = f"{km3db.tools.todetid(dataset):08d}" # QAQC file stored on sps qaqc_sps = "/sps/km3net/repo/data/raw/quality/" filename["JQAQC_sps"] = qaqc_sps + f"KM3NeT_{det_id_str}_QAQC_Jpp_{qaqc_name}.txt" # QAQC file stored on sftp qaqc_htpps = "https://sftp.km3net.de/data/quality/" filename["JQAQC_sftp"] = ( qaqc_htpps + f"KM3NeT_{det_id_str}_QAQC_Jpp_{qaqc_name}.txt" ) return filename
###############################################################################
[docs] def get_run_properties_from_db(det, filt="PHYS"): """ Retrieve run properties from the database with the km3db package Inputs: - det: string like (e.g: D0ARCA030, D1ORCA024...) - filt: string containing the accepted run type (PHYS, COMM...) separated by a space """ runs = km3db.StreamDS(container="nt").get("runs", detid=det) results = {} for i_run in runs: if all((i_run.runsetupname[0:4] not in filt, i_run.runsetupname[0:5] not in filt,)): continue results[i_run.run] ={ 'UNIXSTARTTIME': i_run.unixstarttime, 'STARTTIME_DEFINED': i_run.starttime_defined, 'RUNSETUPID': i_run.runsetupid, 'RUNSETUPNAME': i_run.runsetupname, 'T0_CALIBSETID': i_run.t0_calibsetid, 'POS_CALIBSETID': i_run.pos_calibsetid, 'ROT_CALIBSETID': i_run.rot_calibsetid, 'RUNJOBID': i_run.runjobid, 'JOBTARGET': i_run.jobtarget, 'JOBPRIORITY': i_run.jobpriority, 'UNIXJOBSTART': i_run.unixjobstart, 'UNIXJOBEND': i_run.unixjobend, } return results
###############################################################################
[docs] def get_run_properties_from_qaqc( dataset, dq_tag, origin="qaqc_sftp", startrun=0, endrun=1e9 ): """ For a given dataset/detector, returns the run numbers, their lengths, time counter... A dataset may be restricted to a user-defined run range Source: QAQC file """ runprop = { "runNumber": [], "lvt": [], "lvt_counter": [], "kty": [], "kty_counter": [], "nbRuns": 0, "minRun": 1e10, "maxRun": 0, } dataset_time_counter = 0.0 kty_counter = 0 err_log = "" # Source: QAQC file on sftp or sps (n1, err_log) = create_ttree_from_qaqc( dataset, ["run", "livetime_s", "kton_year"], origin, dq_tag ) if len(n1) == 0: return (runprop, "No data in qaqc") if all((startrun != 0, endrun != 1e9)): n1 = n1[(n1["run"] >= startrun) & (n1["run"] <= endrun)] runprop = { "runNumber": n1.run.to_list(), "lvt": (n1.livetime_s / 3600 / 24).to_list(), "kty": n1.kton_year.to_list(), "nbRuns": len(n1), "minRun": int(n1.min()["run"]), "maxRun": int(n1.max()["run"]), } runprop["lvt_counter"] = [0] + (n1.cumsum()["livetime_s"] / 3600 / 24).to_list() runprop["kty_counter"] = [0] + (n1.cumsum()["kton_year"]).to_list() return (runprop, err_log)
###############################################################################
[docs] def get_run_timerange(det, filt="PHYS"): """ Retrieve the run start/stop by combining db and QAQC quantities to get the most accurate timing. If QAQC data exist (normally for all PHYS runs), use them, otherwise use the DB ones So far used only in km3net_lw_db_interface Inputs: - det: string like (e.g: D0ARCA030, D1ORCA024...) - filt: string containing the accepted run type (PHYS, COMM...) separated by a space Outputs: - dictionary with run as key and a list of [run_start, run_stop, source, runtype] with source = 0 (QAQC) or 1 (DB only) """ # First retrieve the QAQC variables that will be used to get the exact # run end df, _ = create_ttree_from_qaqc( det, ["run", "livetime_s"], "qaqc_sftp", configure_dataquality_tag("default") ) results = dict([(r, [s, e, 0]) for r, s, e in zip(df["run"], df["UTCMin_s"], df["UTCMax_s"])]) # Then retrieve all runs from the database to get the remaining runs runs = km3db.StreamDS(container="nt").get("runs", detid=det) for i_run in runs: i_run_runtype = i_run.runsetupname.split(".")[0] if i_run_runtype not in filt: continue if i_run.run in results: # Timerange already retrieved from the QAQC data - Append run type results[i_run.run].append(i_run_runtype) continue try: results[i_run.run] = [i_run.unixjobstart/1000., i_run.unixjobend/1000., 1, i_run_runtype] except TypeError: pass return results
###############################################################################
[docs] def get_last_n_weeks_runs_qaqc(det, nweeks=2): """Extract the all runs acquired in the last nweeks of running""" # QAQC default tag used. As priori OK for this purpose dq_tag = configure_dataquality_tag("default") min_run = 1e9 max_run = 0 try: lines = read_qaqc_file(det, "qaqc_sftp", dq_tag["qaqc_name"][det], "") ten_days = (nweeks * 7 + 1) * 3600.0 * 24.0 for i_run in range(len(lines["run"])): if (time.time() - float(lines["UTCMin_s"][i_run])) < ten_days: run_number = int(lines["run"][i_run]) if run_number < min_run: min_run = run_number if run_number > max_run: max_run = run_number except KeyError: print(f"common_library.py: Missing DQ-tag information for the detector {det}") return (min_run, max_run)
###############################################################################
[docs] def get_site(det): """Returns the site (ORCA or ARCA)""" # A/O detectors corresponds to the current ARCA/ORCA detectors if "ORCA" in det or det == "O": out = "ORCA" else: out = "ARCA" return out
###############################################################################
[docs] def get_active_dus_range(det): """ Retrieve the range of DU active To be improved with database information """ lower_du = 0 upper_du = 48 return (lower_du, upper_du)
###############################################################################
[docs] def get_nb_qaqc_variables(qaqc_vers): """ Retrieve the number of variables in the QAQC file NB: in a near future, the det argument should be replaced by a Jpp version === Arguments === - det : detector name - [string] - Ex: "D0ARCA021", "D0ORCA018"... === Output === - Number of QAQC variables """ if qaqc_vers == "v16.0.3": return 39 if qaqc_vers == "v17.3.2": return 42 return 46 # Jpp >= 19.2.0
###############################################################################
[docs] def create_ttree_from_qaqc(det, var_to_fill, source, tag, append_veto_qsco=False): """ Create a ttree from qaqc sftp file and defect variables stored on git It includes some advanced check about the QAQC file integrity. === Arguments === - det : detector name - [string] - Ex: "D0ARCA021", "D0ORCA018"... - var_to_fill : QAQC variables or defect to fill - [array of string] - Ex: ['run', 'timestampdiff', 'def_operation', 'def_oos'] - source : QAQC source, a priori "qaqc_sftp" - [string] - tag : data-quality tag (not its name) as created by configure_dataquality_tag - append_veto_qsco: append the veto and Qscore - [boolean] === Output === - TTree - Error log """ error_log = "" # Correspondance between the name in the QAQC file (first line) and the # variable name in the script - Mostly obsolete variable_name_corresp = {} # All variables in all version variable_float = [ "livetime_s", "DAQ", "WR", "FIFO", "HRV", "PMTs", "pmts_norm", "timestampdiff", "timestampdiff_r", "triggerrate", "Acoustics", "Acoustics_per_mn", "MEAN_Rate_Hz", "RMS_Rate_Hz", "AHRS", "ahrs_per_mn", "mean_AHRS", "JDAQJTProc", "kton_year", "UTCMin_s", "UTCMax_s", ] variable_int = [ "detector", "run", "nb_of_meta", "trigger3DMuon", "triggerMXShower", "trigger3DShower", "triggerNB", "JTriggerReprocessor", "JTrigger3dmuon", "JTriggermxshower", "JTrigger3dshower", "JTriggernb", "writeL0", "writeL1", "writeL2", "writeSN", "JDAQTimeslice", "JDAQTimesliceL0", "JDAQTimesliceL1", "JDAQTimesliceL2", "JDAQTimesliceSH", "JDAQSummaryslice", "JDAQEvent", "hrv_fifo_failures", "duplic_timeslices", "in_sync", "zero_AHRS", "out_sync", "in_usync", "out_usync", ] # Removing/renaming variables depending on Jpp versions try: # Processing version 16: name change and missing variables if tag["qaqc_vers"][det] == "v16.0.3": # Renaming variable_name_corresp["out_sync"] = "out-sync" variable_name_corresp["in_sync"] = "in-sync" # Missing for i_missing in ["in_usync", "out_usync", "event_duration"]: variable_name_corresp[i_missing] = "MISSING" if i_missing in var_to_fill: var_to_fill.remove(i_missing) # Processing version 16/17: missing variables (added in 19.2.0) if tag["qaqc_vers"][det] in ("v16.0.3", "v17.3.2"): for i_missing in [ "nb_of_meta", "hrv_fifo_failures", "duplic_timeslices", "zero_AHRS", "mean_AHRS", ]: variable_name_corresp[i_missing] = "MISSING" if i_missing in var_to_fill: var_to_fill.remove(i_missing) # Processing version > 19: missing variables (removed in 19.2.0) if all( (tag["qaqc_vers"][det] != "v16.0.3", tag["qaqc_vers"][det] != "v17.3.2") ): # Processing version 16/17: 3 other missing variables for i_missing in ["AHRS"]: variable_name_corresp[i_missing] = "MISSING" if i_missing in var_to_fill: var_to_fill.remove(i_missing) except KeyError: print(f"common_library.py: Missing DQ-tag information for the detector {det}") return ( None, f"common_library.py: Missing DQ-tag information for the detector {det}", ) content = read_qaqc_file(det, source, tag["qaqc_name"][det], tag["run_range"][det]) df = pd.DataFrame(content) df = df.rename(columns={v: k for k, v in variable_name_corresp.items()}) df = df.astype({key: float for key in variable_float if key in df.columns}) df = df.astype({key: int for key in variable_int if key in df.columns}) # To Check # Run duplicated in QAQC file -> Line skipped # Run with 0 livetime df["timestampdiff"] = df.UTCMax_s - df.UTCMin_s - df.livetime_s df["timestampdiff_r"] = 1.0 - df.livetime_s / (df.UTCMax_s - df.UTCMin_s) df["pmts_norm"] = df.PMTs / get_detx_caract(det)["pmts"] df["triggerrate"] = df.JDAQEvent / df.livetime_s # In releases > 19, the AHRS variable has been replaced by two variables. if tag["qaqc_vers"][det] in ("v16.0.3", "v17.3.2"): df["ahrs_per_mn"] = df.AHRS / df.livetime_s * 60 else: # AHRS renamed in 19.2.0 df["ahrs_per_mn"] = df.mean_AHRS / df.livetime_s * 60 df["Acoustics_per_mn"] = df.Acoustics / df.livetime_s * 60 df["JDAQJTProc"] = (df.JDAQEvent - df.JTriggerReprocessor) / (df.JDAQEvent + 1e-10) df["kton_year"] = ( 6980 / (31 * 18 * 115) * df.PMTs * (1.0 - df.HRV) * df.livetime_s / (365.2425 * 24 * 3600) ) # Adding defects defect_results = read_defect_file(det, tag["def_tag"]) df = df.sort_values("run") df.reset_index(drop=True) for key, runs in defect_results.items(): value = np.zeros(len(df)) if len(runs) != 0: # A defect can be assigned to a run missing in the QAQC file (protection added) # The defects_results is not chronologically ordered (sorting added) mask = np.isin( df.run, [x for x in sorted(runs.keys()) if x in df["run"].values] ) value[mask] = [ runs[x] for x in sorted(runs.keys()) if x in df["run"].values ] df[key] = value # Adding Veto and Q score: if append_veto_qsco: var_properties = {} configure_var_thresholds(var_properties, det, tag) configure_var_bit(var_properties) df["veto"], df["veto_source"], df["qsco"], df["qsco_source"] = ( compute_veto_qscore(var_properties, df) ) if error_log != "": print(error_log) return (df, error_log)
###############################################################################
[docs] def compute_veto_qscore(var_prop, var_val): """ Computes the veto/Q-score for a detector/tag using a QAQC source. """ def_config = configure_defect() veto = np.zeros(len(var_val), dtype=bool) veto_source = np.zeros(len(var_val), dtype=int) qsco = np.ones(len(var_val), dtype=float) * len(var_prop["qsco_thresholds"]) qsco_source = np.zeros(len(var_val), dtype=int) pd.set_option("display.max_rows", None) for i_var in var_prop["veto_thresholds"]: values = var_val[i_var] # Defect variables # The defect variable is defined as def_[defect type] # with [defect type] = daq, operation, analysis... if i_var.startswith("def_"): # first extract the diagnosis defect_type = i_var.replace("def_", "") diag_array = {"present": [], "absent": []} for i_excluded_diag in var_prop["veto_thresholds"][i_var]: if i_excluded_diag.startswith("NOT."): diag_array["absent"].append( def_config["bit"][defect_type][ i_excluded_diag.replace("NOT.", "") ] ) else: diag_array["present"].append( def_config["bit"][defect_type][i_excluded_diag] ) if len(diag_array["absent"]) == 0: # Only present diagnosis def_mask = values != 0 elif len(diag_array["present"]) == 0: # Only NOT. diagnosis def_mask = values == 0 else: # Mixture of present/absent -> No mask def_mask = values != (-1) if np.sum(def_mask) == 0: continue defect_test_output = np.vectorize(check_defect_diag_byte, excluded=[1])( values[def_mask], diag_array ) # The FutureWarning about the incompatible dtype comes from the line below # When checking explicitly the dtype, both are returned as bool hence # the FutureWarning can not be understood. # The trial to fix with explicit casting and other numpy functions # (e.g veto[def_mask] = np.logical_or(veto[def_mask].astype(bool), defect_test_output.astype(bool))) # did not help at all. veto[def_mask] = veto[def_mask] | defect_test_output veto_source[def_mask] += (0b1 & defect_test_output) << var_prop["bit"][ i_var ] else: # QAQC variable: range treatement range_mask = (var_val[i_var] < var_prop["veto_thresholds"][i_var][0]) | ( var_val[i_var] > var_prop["veto_thresholds"][i_var][1] ) veto = veto | range_mask veto_source[range_mask] += 0b1 << var_prop["bit"][i_var] for i_var in var_prop["qsco_thresholds"]: values = var_val[i_var] # Defect variables # Defect variables # The defect variable is defined as def_[defect type] # with [defect type] = daq, operation, analysis... if i_var.startswith("def_"): # first extract the set defects def_mask = values != 0 if np.sum(def_mask) == 0: continue defect_type = i_var.replace("def_", "") diag_array = {"present": [], "absent": []} for i_excluded_diag in var_prop["qsco_thresholds"][i_var]: if i_excluded_diag.startswith("NOT."): diag_array["absent"].append( def_config["bit"][defect_type][ i_excluded_diag.replace("NOT.", "") ] ) else: diag_array["present"].append( def_config["bit"][defect_type][i_excluded_diag] ) defect_test_output = np.vectorize(check_defect_diag_byte, excluded=[1])( values[def_mask], diag_array ) qsco[def_mask] = qsco[def_mask] | defect_test_output qsco_source[def_mask] += (0b1 & defect_test_output) << var_prop["bit"][ i_var ] else: # QAQC variable: range treatement range_mask = (var_val[i_var] < var_prop["qsco_thresholds"][i_var][0]) | ( var_val[i_var] > var_prop["qsco_thresholds"][i_var][1] ) qsco[range_mask] -= 1 qsco_source[range_mask] += 0b1 << var_prop["bit"][i_var] qsco /= len(var_prop["qsco_thresholds"]) return (veto, veto_source, qsco, qsco_source)
###############################################################################
[docs] def log_run_range(det, run_0, run_1, log_file): """Write the run range and run list on disk""" os.environ["TZ"] = "Europe/Paris" time.tzset() run_prop = get_run_properties_from_db(det, "PHYS COMM") try: tim_0_unix = int(run_prop[run_0]["UNIXJOBSTART"]) tim_0 = time.strftime("%a, %d %b %Y %H:%M", time.localtime(tim_0_unix / 1000)) except KeyError: tim_0 = "Unknown" try: tim_1_unix = int(run_prop[run_1]["UNIXJOBEND"]) tim_1 = time.strftime("%a, %d %b %Y %H:%M", time.localtime(tim_1_unix / 1000)) except KeyError: tim_1 = "Unknown" run_range = f"{int(run_0)} ({tim_0} CET/CEST) - " f"{int(run_1)} ({tim_1} CET/CEST)" log_file.write(f"Run range: {run_range}\n")
###############################################################################
[docs] def read_qaqc_file(det, source, qaqc_name, run_range): """ sftp QAQC file reading === Arguments === - det : detector name - [string] - Ex: "D0ARCA021", "D0ORCA018"... - source : QAQC source, a priori "qaqc_sftp" - [string] - qaqc_name : - run_range : string as defined in DQ tag "[min_run]-[max_run]" (inclusive) === Output === - TTree - Error log """ lines = [] if source == "qaqc_sftp": with urllib.request.urlopen( get_file_paths(det, qaqc_name)["JQAQC_sftp"] ) as qaqc: tmp = (qaqc.read()).split(b"\n") for i_line in tmp: if i_line != "": lines.append(i_line.decode("utf-8")) else: if source == "qaqc_sps": source_file = get_file_paths(det, qaqc_name)["JQAQC_sps"] else: source_file = source with open(source_file, "r", encoding="utf-8") as s_f: lines = s_f.readlines() labels = lines[0].split() run_index = labels.index("run") if run_range != "": min_run = int(run_range.split("-")[0]) max_run = int(run_range.split("-")[1]) else: min_run = 0 max_run = 1e9 # Number of variables tested again (cf issue in D1ORCA015) but no error message # -> To be implemented or checked separately (run status?) content = { key: [ l.split()[i] for l in lines[1:] if ( len(l.split()) == len(labels) and int(l.split()[run_index]) >= min_run and int(l.split()[run_index]) <= max_run ) ] for i, key in enumerate(labels) } return content
###############################################################################
[docs] def decode_source_byte(value, v_prop): """ Decode the source of degradation stored in the TTree NB: the source is either a QAQC variable either a defect type (("daq", "operation"...) """ source_list = [] source_str = "" for i_var, i_bit in v_prop["bit"].items(): if (int(value) >> i_bit) & 1: source_list.append(i_var) source_str += f"{i_var} " return (source_list, source_str)