Source code for km3dq_common.lw_db_fact_library

#! /usr/bin/env python
###############################################################################
import re
import sys
import time
import urllib.request

from km3dq_common.detector_fact_classes import DetectorFact


###############################################################################
[docs] def read_basic_fact_file(det, fact_type): """ Read defect files stored on SFTP. Simply returns lines === Arguments === - det : detector name - [string] - Ex: "D0ARCA021", "D0ORCA018"... - fact_type === Output === - Array of lines """ if "ORCA" in det: site = "ORCA" else: site = "ARCA" lines = [] fact_url = ( "https://sftp.km3net.de/data/km3dq_lw_db/" f"{site}/{det}/Facts/" f"{fact_type}.txt" ) try: with urllib.request.urlopen(fact_url) as def_file: tmp = (def_file.read()).split(b"\n") for i_line in tmp: if i_line != "": lines.append(i_line.decode("utf-8")) except urllib.error.URLError: # No fact file found lines = None return lines
###############################################################################
[docs] def read_fact_file(det, fact_type): """ Read defect files stored on SFTP. === Arguments === - det : detector name - [string] - Ex: "D0ARCA021", "D0ORCA018"... - fact_type === Output === - Array of lines """ if "ORCA" in det: site = "ORCA" else: site = "ARCA" re_line = re.compile( r"\s*(\d+)\s*\|" r"\s*(\d+)\s*\|" r"\s(.*)\s\-\s(.*)\s\|" r"(.*)\|" r"(.*)\|" r"(.*)" ) fact_list = [] fact_url = ( "https://sftp.km3net.de/data/km3dq_lw_db/" f"{site}/{det}/Facts/" f"{fact_type}.txt" ) try: with urllib.request.urlopen(fact_url) as def_file: tmp = (def_file.read()).split(b"\n") for i_line in tmp: r_m = re_line.match(i_line.decode("utf-8")) if r_m: fact_list.append({ "run_start": r_m.group(1), "run_end": r_m.group(2), "time_start": int(time.mktime(time.strptime(r_m.group(3), "%a, %d %b %Y %H:%M"))), "comment": r_m.group(5), "documentation": r_m.group(6), "author": r_m.group(7), }) try: fact_list[-1]["time_end"] = int(time.mktime(time.strptime(r_m.group(4), "%a, %d %b %Y %H:%M"))) except ValueError: # No fact end fact_list[-1]["time_end"] = 0 except urllib.error.URLError: # No fact file found lines = None return fact_list
###############################################################################
[docs] def read_det_fact_file(site): """ Read detector fact files stored on SFTP === Arguments === - det : detector name - [string] - Ex: "D0ARCA021", "D0ORCA018"... - def_tag : defect tag - [string] - type_filter : if not empty, filter on the defect type - If empty, read all defect files - [array of strings] - Ex: ["daq", "operation"] === Output === - raw detector facts - detector start/end """ raw_df = {} re_line = re.compile( r"(.*)\|" r"(.*)\|" r"(.*)\|" r"(.*)\|" r"(.*)\|" r"(.*)\|" r"(.*)\|" r"(.*)\|" r"(.*)\|" r"(.*)\|" r"(.*)" ) raw_df = [] lines = [] det_fact_url = ( "https://sftp.km3net.de/data/km3dq_lw_db/" f"{site}//DetectorFacts/" f"detector_facts.txt" ) try: with urllib.request.urlopen(det_fact_url) as def_file: tmp = (def_file.read()).split(b"\n") for i_line in tmp: if i_line != "": lines.append(i_line.decode("utf-8")) except urllib.error.URLError: # No defect file found print("Missing detector-fact file") sys.exit() for i_line_index, i_line in enumerate(lines): if i_line_index == 0: continue r_m = re_line.match(i_line) if r_m: raw_df.append(DetectorFact()) raw_df[-1].hardware = r_m.group(4).split("-")[0].replace(" ", "") raw_df[-1].site = site.replace(" ", "") raw_df[-1].time = r_m.group(1) raw_df[-1].det = r_m.group(2).replace(" ", "") raw_df[-1].following_run = int(r_m.group(3)) raw_df[-1].upi = (r_m.group(4).split(f"{raw_df[-1].hardware}-")[1]).replace( " ", "" ) raw_df[-1].location = r_m.group(5).replace(" ", "") # For some unknown reasons, the lines below are mandatory to avoid an # error: AttributeError: 'DetectorFact' object has no attribute coord_utm raw_df[-1].coord_utm = {"x": 0.0, "y": 0.0, "z": 0.0} raw_df[-1].coord_detx = {"x": 0.0, "y": 0.0, "z": 0.0} raw_df[-1].extract_coord_from_location() if r_m.group(6).replace(" ", "") != "": raw_df[-1].position = int(r_m.group(6)) else: # Detector object has no position raw_df[-1].position = "" raw_df[-1].status = r_m.group(7).replace(" ", "") raw_df[-1].comment = r_m.group(8) if "&" in r_m.group(9): # Waveform with rate definition raw_df[-1].waveform = r_m.group(9).split("&")[0].replace(" ", "") raw_df[-1].emission_rate = r_m.group(9).split("&")[1] else: raw_df[-1].waveform = r_m.group(9).replace(" ", "") raw_df[-1].emission_rate = "" raw_df[-1].documentation = r_m.group(10) raw_df[-1].author = r_m.group(11) return raw_df