Source code for km3dq_common.lw_db_defect_library

#! /usr/bin/env python
###############################################################################
import re

import urllib.request

from km3dq_common.config_library import configure_defect


###############################################################################
[docs] def check_defect_diag_byte(value, bit_list): """ """ result = False # Present defect for i_bit in bit_list["present"]: if (int(value) >> i_bit) & 1: result = True # Absent defect for i_bit in bit_list["absent"]: if ((int(value) >> i_bit) & 1) == 0: result = True return result
###############################################################################
[docs] def decode_defect_diag_byte(value, def_type0): """ Decodes the assigned defect (integer) for a given type and returns a single string. === Arguments === - value : diagnosis value for a given type - [integer] - def_type: defect type ("daq", "operation"...) - [string] === Output === - Single string contained all the corresponding defects separated by a slash Ex: 'missing_data_dst / jmergefitfailure /' """ defects0 = configure_defect() result = "" def_type1 = def_type0.replace("def_", "") for i_bit in defects0["bit"][def_type1]: if (int(value) >> defects0["bit"][def_type1][i_bit]) & 1: result += f"{i_bit} / " return result
###############################################################################
[docs] def decode_defect_file(defect_file_dict, run_nb): """ Decode the defect file and returns === Arguments === - defect_file_dict: defect dictionnary produced by the read_defect_file function - [dictionnary] - run_nb : run number - [integer] === Output === - Defects assigned to this run. Both a dict and string (to be used in html - See qaqcprocessing/dst-process.py) are returned. Ex: {'str': '<b> def_data_processing </b>: missing_data_dst / ', 'dict': {'def_data_processing': 'missing_data_dst / '}} """ run_defect_str = {"str": "", "dict": {}} for i_def_type in defect_file_dict.keys(): if run_nb not in defect_file_dict[i_def_type]: continue diag = decode_defect_diag_byte(defect_file_dict[i_def_type][run_nb], i_def_type) run_defect_str["str"] += f"<b> {i_def_type} </b>: {diag}" run_defect_str["dict"][i_def_type] = diag return run_defect_str
###############################################################################
[docs] def read_defect_file(det, def_tag="def-HEAD", type_filter=None): """ Read defect files stored on SFTP === Arguments === - det : detector name - [string] - Ex: "D0ARCA021", "D0ORCA018"... - def_tag : defect tag - [string] - type_filter : if not empty, filter on the defect type - If empty, read all defect files except the sign_off (specific function defined) [array of strings] - Ex: ["daq", "operation"] === Output === - Dictionnary with def_[defect] as keys and a value for each affected run. This value will be decoded by the decode_defect_diag_byte function. Ex: {'def_daq': {}, 'def_operation': {16501: 1, 16503: 2}...} """ defects0 = configure_defect() res = {} if "ORCA" in det: site = "ORCA" else: site = "ARCA" re_line = re.compile(r"\s*([0-9]+)\s*\|.*") for i_type in defects0["type"]: if type_filter is not None and i_type not in type_filter: continue res[f"def_{i_type}"] = {} for i_diag in defects0["bit"][i_type].keys(): lines = [] defect_url = ( "https://sftp.km3net.de/data/km3dq_lw_db/" f"{site}/{det}/Defects/{def_tag}/" f"{i_type}_{i_diag}.txt" ) try: with urllib.request.urlopen(defect_url) as def_file: tmp = (def_file.read()).split(b"\n") for i_line in tmp: if i_line != "": lines.append(i_line.decode("utf-8")) except urllib.error.URLError: # No defect file found continue for i_line in lines: r_m = re_line.match(i_line) if r_m is not None: run_nb = int(r_m.group(1)) if run_nb in res[f"def_{i_type}"].keys(): res[f"def_{i_type}"][run_nb] |= ( 1 << defects0["bit"][i_type][i_diag] ) else: res[f"def_{i_type}"][run_nb] = ( 1 << defects0["bit"][i_type][i_diag] ) return res
###############################################################################
[docs] def read_basic_defect_file(det, def_type, def_diag, def_tag="def-HEAD"): """ Read defect files stored on SFTP. Simply returns lines === Arguments === - det : detector name - [string] - Ex: "D0ARCA021", "D0ORCA018"... - defect_type - defect_diag - def_tag : defect tag - [string] === Output === - Array of lines """ if "ORCA" in det: site = "ORCA" else: site = "ARCA" lines = [] defect_url = ( "https://sftp.km3net.de/data/km3dq_lw_db/" f"{site}/{det}/Defects/{def_tag}/" f"{def_type}_{def_diag}.txt" ) try: with urllib.request.urlopen(defect_url) as def_file: tmp = (def_file.read()).split(b"\n") for i_line in tmp: if i_line != "": lines.append(i_line.decode("utf-8")) except urllib.error.URLError: # No defect file found lines = None return lines