Source code for km3dq_common.lw_db_fact_library

#! /usr/bin/env python
###############################################################################
import re
import sys
import urllib.request

from km3dq_common.detector_fact_classes import DetectorFact


###############################################################################
[docs] def read_basic_fact_file(det, fact_type): """ Read defect files stored on SFTP. Simply returns lines === Arguments === - det : detector name - [string] - Ex: "D0ARCA021", "D0ORCA018"... - fact_type === Output === - Array of lines """ if "ORCA" in det: site = "ORCA" else: site = "ARCA" lines = [] fact_url = ("https://sftp.km3net.de/data/km3dq_lw_db/" f"{site}/{det}/Facts/" f"{fact_type}.txt") try: with urllib.request.urlopen(fact_url) as def_file: tmp = ((def_file.read()).split(b"\n")) for i_line in tmp: if i_line != "": lines.append(i_line.decode("utf-8")) except urllib.error.URLError: # No fact file found lines = None return lines
###############################################################################
[docs] def read_det_fact_file(site): """ Read detector fact files stored on SFTP === Arguments === - det : detector name - [string] - Ex: "D0ARCA021", "D0ORCA018"... - def_tag : defect tag - [string] - type_filter : if not empty, filter on the defect type - If empty, read all defect files - [array of strings] - Ex: ["daq", "operation"] === Output === - raw detector facts - detector start/end """ # det_facts0 = configure_det_fact() raw_df = {} det_prop = {} re_line = re.compile(r"(.*)\|" r"(.*)\|" r"(.*)\|" r"(.*)\|" r"(.*)\|" r"(.*)\|" r"(.*)\|" r"(.*)\|" r"(.*)\|" r"(.*)\|" r"(.*)") raw_df = [] lines = [] det_fact_url = ("https://sftp.km3net.de/data/km3dq_lw_db/" f"{site}//DetectorFacts/" f"detector_facts.txt") try: with urllib.request.urlopen(det_fact_url) as def_file: tmp = ((def_file.read()).split(b"\n")) for i_line in tmp: if i_line != "": lines.append(i_line.decode("utf-8")) except urllib.error.URLError: # No defect file found print("Missing detector-fact file") sys.exit() for (i_line_index, i_line) in enumerate(lines): if i_line_index == 0: continue r_m = re_line.match(i_line) if r_m: raw_df.append(DetectorFact()) raw_df[-1].hardware = r_m.group(4).split("-")[0].replace(" ", "") raw_df[-1].site = site.replace(" ", "") raw_df[-1].time = r_m.group(1) raw_df[-1].det = r_m.group(2).replace(" ", "") raw_df[-1].following_run = int(r_m.group(3)) raw_df[-1].upi = (r_m.group(4).split(f"{raw_df[-1].hardware}-")[1])\ .replace(" ", "") raw_df[-1].location = r_m.group(5).replace(" ", "") if r_m.group(6).replace(" ", "") != "": raw_df[-1].position = int(r_m.group(6)) else: # Detector object has no position raw_df[-1].position = "" raw_df[-1].status = r_m.group(7).replace(" ", "") raw_df[-1].comment = r_m.group(8) raw_df[-1].waveform = r_m.group(9).replace(" ", "") raw_df[-1].documentation = r_m.group(10) raw_df[-1].author = r_m.group(11) return (raw_df, det_prop)