#! /usr/bin/env python
###############################################################################
# Various functions used in km3dq_lw_dq and km3dq_core scripts
#
# Developer: Benjamin Trocme (benjamin.trocme at apc.in2p3.fr) - 2023
import os
import sys
import time
import array
import re
import urllib.request
import km3db
import pandas as pd
import numpy as np
from km3dq_common.config_library import configure_var_bit
from km3dq_common.config_library import configure_var_thresholds
from km3dq_common.config_library import get_detx_caract
from km3dq_common.config_library import configure_dataquality_tag
from km3dq_common.config_library import configure_defect
from km3dq_common.lw_db_defect_library import read_defect_file
from km3dq_common.lw_db_defect_library import decode_defect_diag_byte
from km3dq_common.lw_db_defect_library import check_defect_diag_byte
###############################################################################
[docs]
def get_det_id(dataset): # TEMPORARY
"""Return the detector id"""
# sds = km3db.tools.StreamDS(container="pd")
# d = sds.detectors(oid=dataset).iloc[0]
# return int(d.SERIALNUMBER)
return km3db.tools.todetid(dataset)
###############################################################################
[docs]
def get_current_detector(site):
"""
Extract the current detector names from the database
"""
sds = km3db.tools.StreamDS()
if site == "ARCA":
res = sds.detectors(city="Italy").split("\n")
else:
res = sds.detectors(city="France").split("\n")
min_run = 0
for i_det in res[1:]:
try:
if int(i_det.split("\t")[4]) > min_run:
min_run = int(i_det.split("\t")[4])
o_det = i_det.split("\t")[0]
except:
print("Improper db reply in get_current_detectors")
return o_det
###############################################################################
[docs]
def get_file_paths(dataset, qaqc_name):
"""
For a given dataset / detector, returns the path for QAQC files
"""
filename = {}
det_id_str = f"{km3db.tools.todetid(dataset):08d}"
# QAQC file stored on sps
qaqc_sps = "/sps/km3net/repo/data/raw/quality/"
filename["JQAQC_sps"] = qaqc_sps + f"KM3NeT_{det_id_str}_QAQC_Jpp_{qaqc_name}.txt"
# QAQC file stored on sftp
qaqc_htpps = "https://sftp.km3net.de/data/quality/"
filename["JQAQC_sftp"] = (
qaqc_htpps + f"KM3NeT_{det_id_str}_QAQC_Jpp_{qaqc_name}.txt"
)
return filename
###############################################################################
[docs]
def get_run_properties_from_db(det, filt="PHYS"):
"""
Retrieve run properties from the database with the km3db package
Inputs:
- det: string like (e.g: D0ARCA030, D1ORCA024...)
- filt: string containing the accepted run type (PHYS, COMM...) separated by
a space
"""
sds = km3db.tools.StreamDS()
runs = sds.runs(detid=km3db.tools.todetid(det))
lines = runs.split("\n")
# Extract the available data
variable_list = lines[0].split("\t")
nb_variables = len(variable_list)
regex = r"([0-9]+)"
for i_var in range(nb_variables - 1):
regex += r"\t(.*)"
regex += "$"
reg_line = re.compile(regex)
results = {}
for i_line in lines:
r_p = reg_line.search(i_line)
if r_p:
if r_p.group(5).split(".")[0] not in filt:
continue
run_nb = int(r_p.group(1))
results[run_nb] = {}
for i_var in range(2, nb_variables + 1):
results[run_nb][variable_list[i_var - 1]] = r_p.group(i_var)
return results
###############################################################################
[docs]
def get_run_properties_from_qaqc(
dataset, dq_tag, origin="qaqc_sftp", startrun=0, endrun=1e9
):
"""
For a given dataset/detector, returns the run numbers, their lengths,
time counter...
A dataset may be restricted to a user-defined run range
Source: QAQC file
"""
runprop = {
"runNumber": [],
"lvt": [],
"lvt_counter": [],
"kty": [],
"kty_counter": [],
"nbRuns": 0,
"minRun": 1e10,
"maxRun": 0,
}
dataset_time_counter = 0.0
kty_counter = 0
err_log = ""
# Source: QAQC file on sftp or sps
(n1, err_log) = create_ttree_from_qaqc(
dataset, ["run", "livetime_s", "kton_year"], origin, dq_tag
)
if len(n1) == 0:
return (runprop, "No data in qaqc")
if all((startrun != 0, endrun != 1e9)):
n1 = n1[(n1["run"] >= startrun) & (n1["run"] <= endrun)]
runprop = {
"runNumber": n1.run.to_list(),
"lvt": (n1.livetime_s / 3600 / 24).to_list(),
"kty": n1.kton_year.to_list(),
"nbRuns": len(n1),
"minRun": int(n1.min()["run"]),
"maxRun": int(n1.max()["run"]),
}
runprop["lvt_counter"] = [0] + (n1.cumsum()["livetime_s"] / 3600 / 24).to_list()
runprop["kty_counter"] = [0] + (n1.cumsum()["kton_year"]).to_list()
return (runprop, err_log)
###############################################################################
[docs]
def get_last_n_weeks_runs_qaqc(det, nweeks=2):
"""Extract the all runs acquired in the last nweeks of running"""
# QAQC default tag used. As priori OK for this purpose
dq_tag = configure_dataquality_tag("default")
min_run = 1e9
max_run = 0
try:
lines = read_qaqc_file(det, "qaqc_sftp", dq_tag["qaqc_name"][det], "")
ten_days = (nweeks * 7 + 1) * 3600.0 * 24.0
for i_run in range(len(lines["run"])):
if (time.time() - float(lines["UTCMin_s"][i_run])) < ten_days:
run_number = int(lines["run"][i_run])
if run_number < min_run:
min_run = run_number
if run_number > max_run:
max_run = run_number
except KeyError:
print(f"common_library.py: Missing DQ-tag information for the detector {det}")
return (min_run, max_run)
###############################################################################
[docs]
def get_site(det):
"""Returns the site (ORCA or ARCA)"""
# A/O detectors corresponds to the current ARCA/ORCA detectors
if "ORCA" in det or det == "O":
out = "ORCA"
else:
out = "ARCA"
return out
###############################################################################
[docs]
def get_active_dus_range(det):
"""
Retrieve the range of DU active
To be improved with database information
"""
lower_du = 0
upper_du = 48
return (lower_du, upper_du)
###############################################################################
[docs]
def get_nb_qaqc_variables(qaqc_vers):
"""
Retrieve the number of variables in the QAQC file
NB: in a near future, the det argument should be replaced by a Jpp version
=== Arguments ===
- det : detector name - [string] - Ex: "D0ARCA021", "D0ORCA018"...
=== Output ===
- Number of QAQC variables
"""
if qaqc_vers == "v16.0.3":
return 39
if qaqc_vers == "v17.3.2":
return 42
return 46 # Jpp >= 19.2.0
###############################################################################
[docs]
def create_ttree_from_qaqc(det, var_to_fill, source, tag, append_veto_qsco=False):
"""
Create a ttree from qaqc sftp file and defect variables stored on git
It includes some advanced check about the QAQC file integrity.
=== Arguments ===
- det : detector name - [string] - Ex: "D0ARCA021", "D0ORCA018"...
- var_to_fill : QAQC variables or defect to fill - [array of string] -
Ex: ['run', 'timestampdiff', 'def_operation', 'def_oos']
- source : QAQC source, a priori "qaqc_sftp" - [string]
- tag : data-quality tag (not its name) as created by
configure_dataquality_tag
- append_veto_qsco: append the veto and Qscore - [boolean]
=== Output ===
- TTree
- Error log
"""
error_log = ""
# Correspondance between the name in the QAQC file (first line) and the
# variable name in the script - Mostly obsolete
variable_name_corresp = {}
# All variables in all version
variable_float = [
"livetime_s",
"DAQ",
"WR",
"FIFO",
"HRV",
"PMTs",
"pmts_norm",
"timestampdiff",
"timestampdiff_r",
"triggerrate",
"Acoustics",
"Acoustics_per_mn",
"MEAN_Rate_Hz",
"RMS_Rate_Hz",
"AHRS",
"ahrs_per_mn",
"mean_AHRS",
"JDAQJTProc",
"kton_year",
"UTCMin_s",
"UTCMax_s",
]
variable_int = [
"detector",
"run",
"nb_of_meta",
"trigger3DMuon",
"triggerMXShower",
"trigger3DShower",
"triggerNB",
"JTriggerReprocessor",
"JTrigger3dmuon",
"JTriggermxshower",
"JTrigger3dshower",
"JTriggernb",
"writeL0",
"writeL1",
"writeL2",
"writeSN",
"JDAQTimeslice",
"JDAQTimesliceL0",
"JDAQTimesliceL1",
"JDAQTimesliceL2",
"JDAQTimesliceSH",
"JDAQSummaryslice",
"JDAQEvent",
"hrv_fifo_failures",
"duplic_timeslices",
"in_sync",
"zero_AHRS",
"out_sync",
"in_usync",
"out_usync",
]
# Removing/renaming variables depending on Jpp versions
try:
# Processing version 16: name change and missing variables
if tag["qaqc_vers"][det] == "v16.0.3":
# Renaming
variable_name_corresp["out_sync"] = "out-sync"
variable_name_corresp["in_sync"] = "in-sync"
# Missing
for i_missing in ["in_usync", "out_usync", "event_duration"]:
variable_name_corresp[i_missing] = "MISSING"
if i_missing in var_to_fill:
var_to_fill.remove(i_missing)
# Processing version 16/17: missing variables (added in 19.2.0)
if tag["qaqc_vers"][det] in ("v16.0.3", "v17.3.2"):
for i_missing in [
"nb_of_meta",
"hrv_fifo_failures",
"duplic_timeslices",
"zero_AHRS",
"mean_AHRS",
]:
variable_name_corresp[i_missing] = "MISSING"
if i_missing in var_to_fill:
var_to_fill.remove(i_missing)
# Processing version > 19: missing variables (removed in 19.2.0)
if all(
(tag["qaqc_vers"][det] != "v16.0.3", tag["qaqc_vers"][det] != "v17.3.2")
):
# Processing version 16/17: 3 other missing variables
for i_missing in ["AHRS"]:
variable_name_corresp[i_missing] = "MISSING"
if i_missing in var_to_fill:
var_to_fill.remove(i_missing)
except KeyError:
print(f"common_library.py: Missing DQ-tag information for the detector {det}")
return (
None,
f"common_library.py: Missing DQ-tag information for the detector {det}",
)
content = read_qaqc_file(det, source, tag["qaqc_name"][det], tag["run_range"][det])
df = pd.DataFrame(content)
df = df.rename(columns={v: k for k, v in variable_name_corresp.items()})
df = df.astype({key: float for key in variable_float if key in df.columns})
df = df.astype({key: int for key in variable_int if key in df.columns})
# To Check
# Run duplicated in QAQC file -> Line skipped
# Run with 0 livetime
df["timestampdiff"] = df.UTCMax_s - df.UTCMin_s - df.livetime_s
df["timestampdiff_r"] = 1.0 - df.livetime_s / (df.UTCMax_s - df.UTCMin_s)
df["pmts_norm"] = df.PMTs / get_detx_caract(det)["pmts"]
df["triggerrate"] = df.JDAQEvent / df.livetime_s
# In releases > 19, the AHRS variable has been replaced by two variables.
if tag["qaqc_vers"][det] in ("v16.0.3", "v17.3.2"):
df["ahrs_per_mn"] = df.AHRS / df.livetime_s * 60
else: # AHRS renamed in 19.2.0
df["ahrs_per_mn"] = df.mean_AHRS / df.livetime_s * 60
df["Acoustics_per_mn"] = df.Acoustics / df.livetime_s * 60
df["JDAQJTProc"] = (df.JDAQEvent - df.JTriggerReprocessor) / (df.JDAQEvent + 1e-10)
df["kton_year"] = (
6980
/ (31 * 18 * 115)
* df.PMTs
* (1.0 - df.HRV)
* df.livetime_s
/ (365.2425 * 24 * 3600)
)
# Adding defects
defect_results = read_defect_file(det, tag["def_tag"])
df = df.sort_values("run")
df.reset_index(drop=True)
for key, runs in defect_results.items():
value = np.zeros(len(df))
if len(runs) != 0:
# A defect can be assigned to a run missing in the QAQC file (protection added)
# The defects_results is not chronologically ordered (sorting added)
mask = np.isin(
df.run, [x for x in sorted(runs.keys()) if x in df["run"].values]
)
value[mask] = [
runs[x] for x in sorted(runs.keys()) if x in df["run"].values
]
df[key] = value
# Adding Veto and Q score:
if append_veto_qsco:
var_properties = {}
configure_var_thresholds(var_properties, det, tag)
configure_var_bit(var_properties)
df["veto"], df["veto_source"], df["qsco"], df["qsco_source"] = (
compute_veto_qscore(var_properties, df)
)
if error_log != "":
print(error_log)
return (df, error_log)
###############################################################################
[docs]
def compute_veto_qscore(var_prop, var_val):
"""
Computes the veto/Q-score for a detector/tag using a QAQC source.
"""
def_config = configure_defect()
veto = np.zeros(len(var_val), dtype=bool)
veto_source = np.zeros(len(var_val), dtype=int)
qsco = np.ones(len(var_val), dtype=float) * len(var_prop["qsco_thresholds"])
qsco_source = np.zeros(len(var_val), dtype=int)
pd.set_option("display.max_rows", None)
for i_var in var_prop["veto_thresholds"]:
values = var_val[i_var]
# Defect variables
# The defect variable is defined as def_[defect type]
# with [defect type] = daq, operation, analysis...
if i_var.startswith("def_"):
# first extract the diagnosis
defect_type = i_var.replace("def_", "")
diag_array = {"present": [], "absent": []}
for i_excluded_diag in var_prop["veto_thresholds"][i_var]:
if i_excluded_diag.startswith("NOT."):
diag_array["absent"].append(
def_config["bit"][defect_type][
i_excluded_diag.replace("NOT.", "")
]
)
else:
diag_array["present"].append(
def_config["bit"][defect_type][i_excluded_diag]
)
if len(diag_array["absent"]) == 0: # Only present diagnosis
def_mask = values != 0
elif len(diag_array["present"]) == 0: # Only NOT. diagnosis
def_mask = values == 0
else: # Mixture of present/absent -> No mask
def_mask = values != (-1)
if np.sum(def_mask) == 0:
continue
defect_test_output = np.vectorize(check_defect_diag_byte, excluded=[1])(
values[def_mask], diag_array
)
veto[def_mask] = veto[def_mask] | defect_test_output
veto_source[def_mask] += (0b1 & defect_test_output) << var_prop["bit"][
i_var
]
else: # QAQC variable: range treatement
range_mask = (var_val[i_var] < var_prop["veto_thresholds"][i_var][0]) | (
var_val[i_var] > var_prop["veto_thresholds"][i_var][1]
)
veto = veto | range_mask
veto_source[range_mask] += 0b1 << var_prop["bit"][i_var]
for i_var in var_prop["qsco_thresholds"]:
values = var_val[i_var]
# Defect variables
# Defect variables
# The defect variable is defined as def_[defect type]
# with [defect type] = daq, operation, analysis...
if i_var.startswith("def_"):
# first extract the set defects
def_mask = values != 0
if np.sum(def_mask) == 0:
continue
defect_type = i_var.replace("def_", "")
diag_array = {"present": [], "absent": []}
for i_excluded_diag in var_prop["qsco_thresholds"][i_var]:
if i_excluded_diag.startswith("NOT."):
diag_array["absent"].append(
def_config["bit"][defect_type][
i_excluded_diag.replace("NOT.", "")
]
)
else:
diag_array["present"].append(
def_config["bit"][defect_type][i_excluded_diag]
)
defect_test_output = np.vectorize(check_defect_diag_byte, excluded=[1])(
values[def_mask], diag_array
)
qsco[def_mask] = qsco[def_mask] | defect_test_output
qsco_source[def_mask] += (0b1 & defect_test_output) << var_prop["bit"][
i_var
]
else: # QAQC variable: range treatement
range_mask = (var_val[i_var] < var_prop["qsco_thresholds"][i_var][0]) | (
var_val[i_var] > var_prop["qsco_thresholds"][i_var][1]
)
qsco[range_mask] -= 1
qsco_source[range_mask] += 0b1 << var_prop["bit"][i_var]
qsco /= len(var_prop["qsco_thresholds"])
return (veto, veto_source, qsco, qsco_source)
###############################################################################
[docs]
def log_run_range(det, run_0, run_1, log_file):
"""Write the run range and run list on disk"""
os.environ["TZ"] = "Europe/Paris"
time.tzset()
run_prop = get_run_properties_from_db(det, "PHYS COMM")
try:
tim_0_unix = int(run_prop[run_0]["UNIXJOBSTART"])
tim_0 = time.strftime("%a, %d %b %Y %H:%M", time.localtime(tim_0_unix / 1000))
except KeyError:
tim_0 = "Unknown"
try:
tim_1_unix = int(run_prop[run_1]["UNIXJOBEND"])
tim_1 = time.strftime("%a, %d %b %Y %H:%M", time.localtime(tim_1_unix / 1000))
except KeyError:
tim_1 = "Unknown"
run_range = f"{int(run_0)} ({tim_0} CET/CEST) - " f"{int(run_1)} ({tim_1} CET/CEST)"
log_file.write(f"Run range: {run_range}\n")
###############################################################################
[docs]
def read_qaqc_file(det, source, qaqc_name, run_range):
"""
sftp QAQC file reading
=== Arguments ===
- det : detector name - [string] - Ex: "D0ARCA021", "D0ORCA018"...
- source : QAQC source, a priori "qaqc_sftp" - [string]
- qaqc_name :
- run_range : string as defined in DQ tag "[min_run]-[max_run]" (inclusive)
=== Output ===
- TTree
- Error log
"""
lines = []
if source == "qaqc_sftp":
with urllib.request.urlopen(
get_file_paths(det, qaqc_name)["JQAQC_sftp"]
) as qaqc:
tmp = (qaqc.read()).split(b"\n")
for i_line in tmp:
if i_line != "":
lines.append(i_line.decode("utf-8"))
else:
if source == "qaqc_sps":
source_file = get_file_paths(det, qaqc_name)["JQAQC_sps"]
else:
source_file = source
with open(source_file, "r", encoding="utf-8") as s_f:
lines = s_f.readlines()
labels = lines[0].split()
run_index = labels.index("run")
if run_range != "":
min_run = int(run_range.split("-")[0])
max_run = int(run_range.split("-")[1])
else:
min_run = 0
max_run = 1e9
# Number of variables tested again (cf issue in D1ORCA015) but no error message
# -> To be implemented or checked separately (run status?)
content = {
key: [
l.split()[i]
for l in lines[1:]
if (
len(l.split()) == len(labels)
and int(l.split()[run_index]) >= min_run
and int(l.split()[run_index]) <= max_run
)
]
for i, key in enumerate(labels)
}
return content
###############################################################################
[docs]
def decode_source_byte(value, v_prop):
"""
Decode the source of degradation stored in the TTree
NB: the source is either a QAQC variable either a defect type (("daq",
"operation"...)
"""
source_list = []
source_str = ""
for i_var, i_bit in v_prop["bit"].items():
if (int(value) >> i_bit) & 1:
source_list.append(i_var)
source_str += f"{i_var} "
return (source_list, source_str)