Source code for km3dq_common.common_library

#! /usr/bin/env python
###############################################################################
# Various functions used in km3dq_lw_dq and km3dq_core scripts
#
# Developer: Benjamin Trocme (benjamin.trocme at apc.in2p3.fr) - 2023

import os
import sys
import time
import array
import re
import urllib.request
import km3db
import pandas as pd
import numpy as np


from km3dq_common.config_library import configure_var_bit
from km3dq_common.config_library import configure_var_thresholds
from km3dq_common.config_library import get_detx_caract
from km3dq_common.config_library import configure_dataquality_tag
from km3dq_common.config_library import configure_defect
from km3dq_common.lw_db_defect_library import read_defect_file
from km3dq_common.lw_db_defect_library import decode_defect_diag_byte
from km3dq_common.lw_db_defect_library import check_defect_diag_byte


###############################################################################
[docs] def get_det_id(dataset): # TEMPORARY """Return the detector id""" # sds = km3db.tools.StreamDS(container="pd") # d = sds.detectors(oid=dataset).iloc[0] # return int(d.SERIALNUMBER) return km3db.tools.todetid(dataset)
###############################################################################
[docs] def get_current_detector(site): """ Extract the current detector names from the database """ sds = km3db.tools.StreamDS() if site == "ARCA": res = sds.detectors(city="Italy").split("\n") else: res = sds.detectors(city="France").split("\n") min_run = 0 for i_det in res[1:]: try: if int(i_det.split("\t")[4]) > min_run: min_run = int(i_det.split("\t")[4]) o_det = i_det.split("\t")[0] except: print("Improper db reply in get_current_detectors") return o_det
###############################################################################
[docs] def get_file_paths(dataset, qaqc_name): """ For a given dataset / detector, returns the path for QAQC files """ filename = {} det_id_str = f"{km3db.tools.todetid(dataset):08d}" # QAQC file stored on sps qaqc_sps = "/sps/km3net/repo/data/raw/quality/" filename["JQAQC_sps"] = qaqc_sps + f"KM3NeT_{det_id_str}_QAQC_Jpp_{qaqc_name}.txt" # QAQC file stored on sftp qaqc_htpps = "https://sftp.km3net.de/data/quality/" filename["JQAQC_sftp"] = ( qaqc_htpps + f"KM3NeT_{det_id_str}_QAQC_Jpp_{qaqc_name}.txt" ) return filename
###############################################################################
[docs] def get_run_properties_from_db(det, filt="PHYS"): """ Retrieve run properties from the database with the km3db package Inputs: - det: string like (e.g: D0ARCA030, D1ORCA024...) - filt: string containing the accepted run type (PHYS, COMM...) separated by a space """ sds = km3db.tools.StreamDS() runs = sds.runs(detid=km3db.tools.todetid(det)) lines = runs.split("\n") # Extract the available data variable_list = lines[0].split("\t") nb_variables = len(variable_list) regex = r"([0-9]+)" for i_var in range(nb_variables - 1): regex += r"\t(.*)" regex += "$" reg_line = re.compile(regex) results = {} for i_line in lines: r_p = reg_line.search(i_line) if r_p: if r_p.group(5).split(".")[0] not in filt: continue run_nb = int(r_p.group(1)) results[run_nb] = {} for i_var in range(2, nb_variables + 1): results[run_nb][variable_list[i_var - 1]] = r_p.group(i_var) return results
###############################################################################
[docs] def get_run_properties_from_qaqc( dataset, dq_tag, origin="qaqc_sftp", startrun=0, endrun=1e9 ): """ For a given dataset/detector, returns the run numbers, their lengths, time counter... A dataset may be restricted to a user-defined run range Source: QAQC file """ runprop = { "runNumber": [], "lvt": [], "lvt_counter": [], "kty": [], "kty_counter": [], "nbRuns": 0, "minRun": 1e10, "maxRun": 0, } dataset_time_counter = 0.0 kty_counter = 0 err_log = "" # Source: QAQC file on sftp or sps (n1, err_log) = create_ttree_from_qaqc( dataset, ["run", "livetime_s", "kton_year"], origin, dq_tag ) if len(n1) == 0: return (runprop, "No data in qaqc") if all((startrun != 0, endrun != 1e9)): n1 = n1[(n1["run"] >= startrun) & (n1["run"] <= endrun)] runprop = { "runNumber": n1.run.to_list(), "lvt": (n1.livetime_s / 3600 / 24).to_list(), "kty": n1.kton_year.to_list(), "nbRuns": len(n1), "minRun": int(n1.min()["run"]), "maxRun": int(n1.max()["run"]), } runprop["lvt_counter"] = [0] + (n1.cumsum()["livetime_s"] / 3600 / 24).to_list() runprop["kty_counter"] = [0] + (n1.cumsum()["kton_year"]).to_list() return (runprop, err_log)
###############################################################################
[docs] def get_last_n_weeks_runs_qaqc(det, nweeks=2): """Extract the all runs acquired in the last nweeks of running""" # QAQC default tag used. As priori OK for this purpose dq_tag = configure_dataquality_tag("default") min_run = 1e9 max_run = 0 try: lines = read_qaqc_file(det, "qaqc_sftp", dq_tag["qaqc_name"][det], "") ten_days = (nweeks * 7 + 1) * 3600.0 * 24.0 for i_run in range(len(lines["run"])): if (time.time() - float(lines["UTCMin_s"][i_run])) < ten_days: run_number = int(lines["run"][i_run]) if run_number < min_run: min_run = run_number if run_number > max_run: max_run = run_number except KeyError: print(f"common_library.py: Missing DQ-tag information for the detector {det}") return (min_run, max_run)
###############################################################################
[docs] def get_site(det): """Returns the site (ORCA or ARCA)""" # A/O detectors corresponds to the current ARCA/ORCA detectors if "ORCA" in det or det == "O": out = "ORCA" else: out = "ARCA" return out
###############################################################################
[docs] def get_active_dus_range(det): """ Retrieve the range of DU active To be improved with database information """ lower_du = 0 upper_du = 48 return (lower_du, upper_du)
###############################################################################
[docs] def get_nb_qaqc_variables(qaqc_vers): """ Retrieve the number of variables in the QAQC file NB: in a near future, the det argument should be replaced by a Jpp version === Arguments === - det : detector name - [string] - Ex: "D0ARCA021", "D0ORCA018"... === Output === - Number of QAQC variables """ if qaqc_vers == "v16.0.3": return 39 if qaqc_vers == "v17.3.2": return 42 return 46 # Jpp >= 19.2.0
###############################################################################
[docs] def create_ttree_from_qaqc(det, var_to_fill, source, tag, append_veto_qsco=False): """ Create a ttree from qaqc sftp file and defect variables stored on git It includes some advanced check about the QAQC file integrity. === Arguments === - det : detector name - [string] - Ex: "D0ARCA021", "D0ORCA018"... - var_to_fill : QAQC variables or defect to fill - [array of string] - Ex: ['run', 'timestampdiff', 'def_operation', 'def_oos'] - source : QAQC source, a priori "qaqc_sftp" - [string] - tag : data-quality tag (not its name) as created by configure_dataquality_tag - append_veto_qsco: append the veto and Qscore - [boolean] === Output === - TTree - Error log """ error_log = "" # Correspondance between the name in the QAQC file (first line) and the # variable name in the script - Mostly obsolete variable_name_corresp = {} # All variables in all version variable_float = [ "livetime_s", "DAQ", "WR", "FIFO", "HRV", "PMTs", "pmts_norm", "timestampdiff", "timestampdiff_r", "triggerrate", "Acoustics", "Acoustics_per_mn", "MEAN_Rate_Hz", "RMS_Rate_Hz", "AHRS", "ahrs_per_mn", "mean_AHRS", "JDAQJTProc", "kton_year", "UTCMin_s", "UTCMax_s", ] variable_int = [ "detector", "run", "nb_of_meta", "trigger3DMuon", "triggerMXShower", "trigger3DShower", "triggerNB", "JTriggerReprocessor", "JTrigger3dmuon", "JTriggermxshower", "JTrigger3dshower", "JTriggernb", "writeL0", "writeL1", "writeL2", "writeSN", "JDAQTimeslice", "JDAQTimesliceL0", "JDAQTimesliceL1", "JDAQTimesliceL2", "JDAQTimesliceSH", "JDAQSummaryslice", "JDAQEvent", "hrv_fifo_failures", "duplic_timeslices", "in_sync", "zero_AHRS", "out_sync", "in_usync", "out_usync", ] # Removing/renaming variables depending on Jpp versions try: # Processing version 16: name change and missing variables if tag["qaqc_vers"][det] == "v16.0.3": # Renaming variable_name_corresp["out_sync"] = "out-sync" variable_name_corresp["in_sync"] = "in-sync" # Missing for i_missing in ["in_usync", "out_usync", "event_duration"]: variable_name_corresp[i_missing] = "MISSING" if i_missing in var_to_fill: var_to_fill.remove(i_missing) # Processing version 16/17: missing variables (added in 19.2.0) if tag["qaqc_vers"][det] in ("v16.0.3", "v17.3.2"): for i_missing in [ "nb_of_meta", "hrv_fifo_failures", "duplic_timeslices", "zero_AHRS", "mean_AHRS", ]: variable_name_corresp[i_missing] = "MISSING" if i_missing in var_to_fill: var_to_fill.remove(i_missing) # Processing version > 19: missing variables (removed in 19.2.0) if all( (tag["qaqc_vers"][det] != "v16.0.3", tag["qaqc_vers"][det] != "v17.3.2") ): # Processing version 16/17: 3 other missing variables for i_missing in ["AHRS"]: variable_name_corresp[i_missing] = "MISSING" if i_missing in var_to_fill: var_to_fill.remove(i_missing) except KeyError: print(f"common_library.py: Missing DQ-tag information for the detector {det}") return ( None, f"common_library.py: Missing DQ-tag information for the detector {det}", ) content = read_qaqc_file(det, source, tag["qaqc_name"][det], tag["run_range"][det]) df = pd.DataFrame(content) df = df.rename(columns={v: k for k, v in variable_name_corresp.items()}) df = df.astype({key: float for key in variable_float if key in df.columns}) df = df.astype({key: int for key in variable_int if key in df.columns}) # To Check # Run duplicated in QAQC file -> Line skipped # Run with 0 livetime df["timestampdiff"] = df.UTCMax_s - df.UTCMin_s - df.livetime_s df["timestampdiff_r"] = 1.0 - df.livetime_s / (df.UTCMax_s - df.UTCMin_s) df["pmts_norm"] = df.PMTs / get_detx_caract(det)["pmts"] df["triggerrate"] = df.JDAQEvent / df.livetime_s # In releases > 19, the AHRS variable has been replaced by two variables. if tag["qaqc_vers"][det] in ("v16.0.3", "v17.3.2"): df["ahrs_per_mn"] = df.AHRS / df.livetime_s * 60 else: # AHRS renamed in 19.2.0 df["ahrs_per_mn"] = df.mean_AHRS / df.livetime_s * 60 df["Acoustics_per_mn"] = df.Acoustics / df.livetime_s * 60 df["JDAQJTProc"] = (df.JDAQEvent - df.JTriggerReprocessor) / (df.JDAQEvent + 1e-10) df["kton_year"] = ( 6980 / (31 * 18 * 115) * df.PMTs * (1.0 - df.HRV) * df.livetime_s / (365.2425 * 24 * 3600) ) # Adding defects defect_results = read_defect_file(det, tag["def_tag"]) df = df.sort_values("run") df.reset_index(drop=True) for key, runs in defect_results.items(): value = np.zeros(len(df)) if len(runs) != 0: # A defect can be assigned to a run missing in the QAQC file (protection added) # The defects_results is not chronologically ordered (sorting added) mask = np.isin( df.run, [x for x in sorted(runs.keys()) if x in df["run"].values] ) value[mask] = [ runs[x] for x in sorted(runs.keys()) if x in df["run"].values ] df[key] = value # Adding Veto and Q score: if append_veto_qsco: var_properties = {} configure_var_thresholds(var_properties, det, tag) configure_var_bit(var_properties) df["veto"], df["veto_source"], df["qsco"], df["qsco_source"] = ( compute_veto_qscore(var_properties, df) ) if error_log != "": print(error_log) return (df, error_log)
###############################################################################
[docs] def compute_veto_qscore(var_prop, var_val): """ Computes the veto/Q-score for a detector/tag using a QAQC source. """ def_config = configure_defect() veto = np.zeros(len(var_val), dtype=bool) veto_source = np.zeros(len(var_val), dtype=int) qsco = np.ones(len(var_val), dtype=float) * len(var_prop["qsco_thresholds"]) qsco_source = np.zeros(len(var_val), dtype=int) pd.set_option("display.max_rows", None) for i_var in var_prop["veto_thresholds"]: values = var_val[i_var] # Defect variables # The defect variable is defined as def_[defect type] # with [defect type] = daq, operation, analysis... if i_var.startswith("def_"): # first extract the diagnosis defect_type = i_var.replace("def_", "") diag_array = {"present": [], "absent": []} for i_excluded_diag in var_prop["veto_thresholds"][i_var]: if i_excluded_diag.startswith("NOT."): diag_array["absent"].append( def_config["bit"][defect_type][ i_excluded_diag.replace("NOT.", "") ] ) else: diag_array["present"].append( def_config["bit"][defect_type][i_excluded_diag] ) if len(diag_array["absent"]) == 0: # Only present diagnosis def_mask = values != 0 elif len(diag_array["present"]) == 0: # Only NOT. diagnosis def_mask = values == 0 else: # Mixture of present/absent -> No mask def_mask = values != (-1) if np.sum(def_mask) == 0: continue defect_test_output = np.vectorize(check_defect_diag_byte, excluded=[1])( values[def_mask], diag_array ) veto[def_mask] = veto[def_mask] | defect_test_output veto_source[def_mask] += (0b1 & defect_test_output) << var_prop["bit"][ i_var ] else: # QAQC variable: range treatement range_mask = (var_val[i_var] < var_prop["veto_thresholds"][i_var][0]) | ( var_val[i_var] > var_prop["veto_thresholds"][i_var][1] ) veto = veto | range_mask veto_source[range_mask] += 0b1 << var_prop["bit"][i_var] for i_var in var_prop["qsco_thresholds"]: values = var_val[i_var] # Defect variables # Defect variables # The defect variable is defined as def_[defect type] # with [defect type] = daq, operation, analysis... if i_var.startswith("def_"): # first extract the set defects def_mask = values != 0 if np.sum(def_mask) == 0: continue defect_type = i_var.replace("def_", "") diag_array = {"present": [], "absent": []} for i_excluded_diag in var_prop["qsco_thresholds"][i_var]: if i_excluded_diag.startswith("NOT."): diag_array["absent"].append( def_config["bit"][defect_type][ i_excluded_diag.replace("NOT.", "") ] ) else: diag_array["present"].append( def_config["bit"][defect_type][i_excluded_diag] ) defect_test_output = np.vectorize(check_defect_diag_byte, excluded=[1])( values[def_mask], diag_array ) qsco[def_mask] = qsco[def_mask] | defect_test_output qsco_source[def_mask] += (0b1 & defect_test_output) << var_prop["bit"][ i_var ] else: # QAQC variable: range treatement range_mask = (var_val[i_var] < var_prop["qsco_thresholds"][i_var][0]) | ( var_val[i_var] > var_prop["qsco_thresholds"][i_var][1] ) qsco[range_mask] -= 1 qsco_source[range_mask] += 0b1 << var_prop["bit"][i_var] qsco /= len(var_prop["qsco_thresholds"]) return (veto, veto_source, qsco, qsco_source)
###############################################################################
[docs] def log_run_range(det, run_0, run_1, log_file): """Write the run range and run list on disk""" os.environ["TZ"] = "Europe/Paris" time.tzset() run_prop = get_run_properties_from_db(det, "PHYS COMM") try: tim_0_unix = int(run_prop[run_0]["UNIXJOBSTART"]) tim_0 = time.strftime("%a, %d %b %Y %H:%M", time.localtime(tim_0_unix / 1000)) except KeyError: tim_0 = "Unknown" try: tim_1_unix = int(run_prop[run_1]["UNIXJOBEND"]) tim_1 = time.strftime("%a, %d %b %Y %H:%M", time.localtime(tim_1_unix / 1000)) except KeyError: tim_1 = "Unknown" run_range = f"{int(run_0)} ({tim_0} CET/CEST) - " f"{int(run_1)} ({tim_1} CET/CEST)" log_file.write(f"Run range: {run_range}\n")
###############################################################################
[docs] def read_qaqc_file(det, source, qaqc_name, run_range): """ sftp QAQC file reading === Arguments === - det : detector name - [string] - Ex: "D0ARCA021", "D0ORCA018"... - source : QAQC source, a priori "qaqc_sftp" - [string] - qaqc_name : - run_range : string as defined in DQ tag "[min_run]-[max_run]" (inclusive) === Output === - TTree - Error log """ lines = [] if source == "qaqc_sftp": with urllib.request.urlopen( get_file_paths(det, qaqc_name)["JQAQC_sftp"] ) as qaqc: tmp = (qaqc.read()).split(b"\n") for i_line in tmp: if i_line != "": lines.append(i_line.decode("utf-8")) else: if source == "qaqc_sps": source_file = get_file_paths(det, qaqc_name)["JQAQC_sps"] else: source_file = source with open(source_file, "r", encoding="utf-8") as s_f: lines = s_f.readlines() labels = lines[0].split() run_index = labels.index("run") if run_range != "": min_run = int(run_range.split("-")[0]) max_run = int(run_range.split("-")[1]) else: min_run = 0 max_run = 1e9 # Number of variables tested again (cf issue in D1ORCA015) but no error message # -> To be implemented or checked separately (run status?) content = { key: [ l.split()[i] for l in lines[1:] if ( len(l.split()) == len(labels) and int(l.split()[run_index]) >= min_run and int(l.split()[run_index]) <= max_run ) ] for i, key in enumerate(labels) } return content
###############################################################################
[docs] def decode_source_byte(value, v_prop): """ Decode the source of degradation stored in the TTree NB: the source is either a QAQC variable either a defect type (("daq", "operation"...) """ source_list = [] source_str = "" for i_var, i_bit in v_prop["bit"].items(): if (int(value) >> i_bit) & 1: source_list.append(i_var) source_str += f"{i_var} " return (source_list, source_str)