Source code for km3dq_common.aux_library

#! /usr/bin/env python
###############################################################################
# Developer: Benjamin Trocme (benjamin.trocme at apc.in2p3.fr) - 2024

import os
import subprocess
import glob
import re
import km3db


###############################################################################
[docs] def complete_with_zero(integer, total_length): output = "" for _ in range(total_length - len(str(integer))): output += "0" output += f"{integer}" return output
###############################################################################
[docs] def get_raw_data_full_path(dataset, run_number): """ Return the full iRODS path Functional only at CC-IN2P3 """ raw_data_file = {"std": "", "root": "", "details": ""} d_id = km3db.tools.todetid(dataset) regular_dir = f"/in2p3/km3net/data/raw/sea/KM3NeT_{d_id:08d}" regular_name = f"KM3NeT_{d_id:08d}_{run_number:08d}.root" cmd = f"ils {regular_dir}" out = os.popen(cmd) for i_out in out: sub_dir = i_out.split("C- ") if len(sub_dir) > 1: sub_dir_clean = sub_dir[1].replace("\n", "") cmd = f"ils -l {sub_dir_clean}" out2 = os.popen(cmd) for i_out2 in out2: if regular_name in i_out2: d = sub_dir_clean.replace("/in2p3/", "/") raw_data_file["std"] = f"/in2p3{d}/{regular_name}" raw_data_file["root"] = ( f"root://ccxroot:1999//hpss/" f"in2p3.fr/group{d}/" f"{regular_name}" ) irods_details = i_out2.split(f"& {regular_name}")[0] raw_data_file["details"] += f"{irods_details} / " return raw_data_file
###############################################################################
[docs] def get_reco_full_path(dataset, run_number, data_type, version, where, what): """ Return the full SPS/irods path Functional only at CC-IN2P3 - dataset: detector name - run_number: if 0, returns only the directory - data_type: "jppmuon_aashower_dynamic" or "jppmuon_jppshower-upgoing_dynamic" - version: dict of version Example: {"data":"v9.0","mc_noise":"v9.0", "mc_mupage":"v9.0", "mc_neutr":"v9.0"} - where: string containing "sps" and/or "irods" """ path = {} d_id_str = f"{km3db.tools.todetid(dataset):08d}" run_number_str = f"{run_number:08d}" sub_directory_list = ["", "_priority", "_secondary"] filename = { "data": { "reco": f"KM3NeT_{d_id_str}_{run_number_str}.data.{data_type}.offline.{version['data']}.root", "dst": f"KM3NeT_{d_id_str}_{run_number_str}.data.{data_type}.offline.dst.{version['data']}.root" }, "mc_noise": { "reco": f"KM3NeT_{d_id_str}_{run_number_str}.mc.pure_noise*.root", "dst": f"KM3NeT_{d_id_str}_{run_number_str}.mc.pure_noise*.root" }, "mc_mupage": { "reco": f"KM3NeT_{d_id_str}_{run_number_str}.mc.mupage*.root", "dst": f"KM3NeT_{d_id_str}_{run_number_str}.mc.mupage*.root" }, "mc_neutr": { "reco": f"KM3NeT_{d_id_str}_{run_number_str}.mc.gsg*.root", "dst": f"KM3NeT_{d_id_str}_{run_number_str}.mc.gsg*.root" }, } # Check the file availability on sps for i_type in what: if "sps" in where: for i_file in filename.keys(): if run_number == 0: path[f"dir_{i_file}"] = ( "/sps/km3net/repo/data_processing/tag/" f"{version[i_file]}/workdirs/" f"KM3NeT_{d_id_str}{sub_directory_list}/" f"results/{d_id_str}" ) else: path[f"{i_type}_{i_file}"] = "" for i_dir_data_type in sub_directory_list: full_path = ( "/sps/km3net/repo/data_processing/tag/" f"{version[i_file]}/workdirs/" f"KM3NeT_{d_id_str}{i_dir_data_type}/" f"results/{d_id_str}/{i_type}/{filename[i_file][i_type]}" ) # Check that the file exists if len(glob.glob(full_path)) == 1: path[f"{i_type}_{i_file}"] = glob.glob(full_path)[0] # Check the file availability on irods # Only data implemented so far # if "irods" in where: # for i_file in ["data"]: # if run_number == 0: # dst_file[f"dir_{i_file}"] = ( # "/in2p3/km3net/data" # f"KM3NeT_{d_id_str}{sub_directory_list}/" # f"{version[i_file]}/dst" # ) # else: # dst_file[f"{i_file}_irods"] = "" # for i_dir_data_type in sub_directory_list: # full_path = ( # "/in2p3/km3net/data/" # f"KM3NeT_{d_id_str}{i_dir_data_type}/" # f"{version[i_file]}/dst/{filename[i_file]}" # ) # # Check that the file exists # sub = subprocess.run(["ils", full_path], capture_output=True) # ils_out = ( # sub.stdout.decode("utf-8").replace(" ", "").replace("\n", "") # ) # if filename[i_file] in ils_out: # dst_file[f"{i_file}_irods"] = full_path return path
###############################################################################
[docs] def check_raw_data_availability(dataset, run_number_list): """ Check that the raw data are available on iRODS Function optimised for long list (typically JQAQC one) Input: - dataset: dataset name (ex: D0ORCA015) - list of runs to be checked """ d_id = km3db.tools.todetid(dataset) regular_dir = f"/in2p3/km3net/data/raw/sea/KM3NeT_{d_id:08d}" cmd = f"ils {regular_dir}" out = os.popen(cmd) nb_of_files = {} for i_out in out: sub_dir = i_out.split("C- ") if len(sub_dir) > 1: sub_dir_clean = sub_dir[1].replace("\n", "") cmd = f"ils {sub_dir_clean}" out2 = os.popen(cmd) for i_out2 in out2: re_file = re.compile(rf"\s*KM3NeT_0+{d_id}_0+(\d+).root\s*") re_file_match = re_file.match(i_out2) if re_file_match: run_number = re_file_match.group(1) if run_number in nb_of_files.keys(): nb_of_files[run_number] += 1 else: nb_of_files[run_number] = 1 results = {} for v_run in run_number_list: if f"{v_run}" not in nb_of_files.keys(): results[v_run] = 0 elif nb_of_files[f"{v_run}"] > 1: results[v_run] = nb_of_files[f"{v_run}"] return results
############################################################################################
[docs] def check_reco_availability(dataset, run_list, data_type, version, where, what): """ Check that the reco/dst/light availability in sps/irods Function optimised for long list (typically JQAQC one) Input: - dataset: dataset name (ex: D0ORCA015) - list of runs to be checked """ d_id_str = f"{km3db.tools.todetid(dataset):08d}" # Initialize results results = {} for i_run in run_list: results[i_run] = {} for i_type in ["data", "mc_noise", "mc_mupage", "mc_neutr"]: results[i_run][i_type] = {} # Loop on what (reco, dst, light) for i_what in what: results[i_run][i_type][i_what] = {} # loop on where (sps, irods) for i_where in where: results[i_run][i_type][i_what][i_where] = 0 filename_re = { "data": { "reco": re.compile(f"KM3NeT_{d_id_str}_(\d+).data.{data_type}.offline.{version['data']}.root"), "dst": re.compile(f"KM3NeT_{d_id_str}_(\d+).data.{data_type}.offline.dst.{version['data']}.root") }, "mc_noise": { "reco": re.compile(f"KM3NeT_{d_id_str}_(\d+).mc.pure_noise.*\.root"), "dst": re.compile(f"KM3NeT_{d_id_str}_(\d+).mc.pure_noise.*\.root") }, "mc_mupage": { "reco": re.compile(f"KM3NeT_{d_id_str}_(\d+).mc.mupage.*\.root"), "dst": re.compile(f"KM3NeT_{d_id_str}_(\d+).mc.mupage.*\.root") }, "mc_neutr": { "reco": re.compile(f"KM3NeT_{d_id_str}_(\d+).mc.gsg.*\.root"), "dst": re.compile(f"KM3NeT_{d_id_str}_(\d+).mc.gsg.*\.root") }, } # Extract the directory content only once as it can be time consuming path_0 = { "sps":{ "data": "/sps/km3net/repo/data_processing/tag/", "mc_noise": "/sps/km3net/repo/data_processing/tag/", "mc_mupage": "/sps/km3net/repo/data_processing/tag/", "mc_neutr": "/sps/km3net/repo/data_processing/tag/" }, "irods":{ "data": "/in2p3/km3net/data/", "mc_noise": "/in2p3/km3net/mc/pure_noise/", "mc_mupage": "/in2p3/km3net/mc/atm_muon/", "mc_neutr": "/in2p3/km3net/mc/atm_neutrino/" } } for i_type in ["data", "mc_noise", "mc_mupage", "mc_neutr"]: path = { "reco":{ "sps": [f"{path_0['sps'][i_type]}/{version[i_type]}/workdirs/KM3NeT_{d_id_str}/results/{d_id_str}/reco"], "irods": f"{path_0['irods'][i_type]}/KM3NeT_{d_id_str}/{version[i_type]}/reco" }, "dst":{ "sps": [ f"{path_0['sps'][i_type]}/{version[i_type]}/workdirs/KM3NeT_{d_id_str}/results/{d_id_str}/dst", f"{path_0['sps'][i_type]}/{version[i_type]}/workdirs/KM3NeT_{d_id_str}_priority/results/{d_id_str}/dst", f"{path_0['sps'][i_type]}/{version[i_type]}/workdirs/KM3NeT_{d_id_str}_secondary/results/{d_id_str}/dst" ], "irods": f"{path_0['irods'][i_type]}/KM3NeT_{d_id_str}/{version[i_type]}/dst" }, "light":{ "sps": "" } } # Loop on what (reco, dst, light) for i_what in what: # loop on where (sps, irods) for i_where in where: # Extract the content content_list = [] if i_where == "sps": for i_sps_dir in path[i_what][i_where]: if os.path.exists(i_sps_dir): for i_out in os.listdir(i_sps_dir): content_list.append(i_out) elif i_where == "irods": cmd = f"ils {path[i_what][i_where]}" out = os.popen(cmd) for i_out in out: if "KM3NeT" in i_out: content_list.append(i_out.replace(" ","").replace("\n","")) # Loop on all content and regex for i_content in content_list: re_file_match = filename_re[i_type][i_what].match(i_content) if re_file_match: run_number = int(re_file_match.group(1)) if run_number in run_list: results[run_number][i_type][i_what][i_where] += 1 return results