#! /usr/bin/env python
###############################################################################
# Developer: Benjamin Trocme (benjamin.trocme at apc.in2p3.fr) - 2024
import os
import subprocess
import glob
import re
import km3db
###############################################################################
[docs]
def complete_with_zero(integer, total_length):
output = ""
for _ in range(total_length - len(str(integer))):
output += "0"
output += f"{integer}"
return output
###############################################################################
[docs]
def get_raw_data_full_path(dataset, run_number):
"""
Return the full iRODS path
Functional only at CC-IN2P3
"""
raw_data_file = {"std": "", "root": "", "details": ""}
d_id = km3db.tools.todetid(dataset)
regular_dir = f"/in2p3/km3net/data/raw/sea/KM3NeT_{d_id:08d}"
regular_name = f"KM3NeT_{d_id:08d}_{run_number:08d}.root"
cmd = f"ils {regular_dir}"
out = os.popen(cmd)
for i_out in out:
sub_dir = i_out.split("C- ")
if len(sub_dir) > 1:
sub_dir_clean = sub_dir[1].replace("\n", "")
cmd = f"ils -l {sub_dir_clean}"
out2 = os.popen(cmd)
for i_out2 in out2:
if regular_name in i_out2:
d = sub_dir_clean.replace("/in2p3/", "/")
raw_data_file["std"] = f"/in2p3{d}/{regular_name}"
raw_data_file["root"] = (
f"root://ccxroot:1999//hpss/"
f"in2p3.fr/group{d}/"
f"{regular_name}"
)
irods_details = i_out2.split(f"& {regular_name}")[0]
raw_data_file["details"] += f"{irods_details} / "
return raw_data_file
###############################################################################
[docs]
def get_reco_full_path(dataset, run_number, data_type, version, where, what):
"""
Return the full SPS/irods path
Functional only at CC-IN2P3
- dataset: detector name
- run_number: if 0, returns only the directory
- data_type: "jppmuon_aashower_dynamic" or "jppmuon_jppshower-upgoing_dynamic"
- version: dict of version
Example: {"data":"v9.0","mc_noise":"v9.0", "mc_mupage":"v9.0", "mc_neutr":"v9.0"}
- where: string containing "sps" and/or "irods"
"""
path = {}
d_id_str = f"{km3db.tools.todetid(dataset):08d}"
run_number_str = f"{run_number:08d}"
sub_directory_list = ["", "_priority", "_secondary"]
filename = {
"data": {
"reco": f"KM3NeT_{d_id_str}_{run_number_str}.data.{data_type}.offline.{version['data']}.root",
"dst": f"KM3NeT_{d_id_str}_{run_number_str}.data.{data_type}.offline.dst.{version['data']}.root"
},
"mc_noise": {
"reco": f"KM3NeT_{d_id_str}_{run_number_str}.mc.pure_noise*.root",
"dst": f"KM3NeT_{d_id_str}_{run_number_str}.mc.pure_noise*.root"
},
"mc_mupage": {
"reco": f"KM3NeT_{d_id_str}_{run_number_str}.mc.mupage*.root",
"dst": f"KM3NeT_{d_id_str}_{run_number_str}.mc.mupage*.root"
},
"mc_neutr": {
"reco": f"KM3NeT_{d_id_str}_{run_number_str}.mc.gsg*.root",
"dst": f"KM3NeT_{d_id_str}_{run_number_str}.mc.gsg*.root"
},
}
# Check the file availability on sps
for i_type in what:
if "sps" in where:
for i_file in filename.keys():
if run_number == 0:
path[f"dir_{i_file}"] = (
"/sps/km3net/repo/data_processing/tag/"
f"{version[i_file]}/workdirs/"
f"KM3NeT_{d_id_str}{sub_directory_list}/"
f"results/{d_id_str}"
)
else:
path[f"{i_type}_{i_file}"] = ""
for i_dir_data_type in sub_directory_list:
full_path = (
"/sps/km3net/repo/data_processing/tag/"
f"{version[i_file]}/workdirs/"
f"KM3NeT_{d_id_str}{i_dir_data_type}/"
f"results/{d_id_str}/{i_type}/{filename[i_file][i_type]}"
)
# Check that the file exists
if len(glob.glob(full_path)) == 1:
path[f"{i_type}_{i_file}"] = glob.glob(full_path)[0]
# Check the file availability on irods
# Only data implemented so far
# if "irods" in where:
# for i_file in ["data"]:
# if run_number == 0:
# dst_file[f"dir_{i_file}"] = (
# "/in2p3/km3net/data"
# f"KM3NeT_{d_id_str}{sub_directory_list}/"
# f"{version[i_file]}/dst"
# )
# else:
# dst_file[f"{i_file}_irods"] = ""
# for i_dir_data_type in sub_directory_list:
# full_path = (
# "/in2p3/km3net/data/"
# f"KM3NeT_{d_id_str}{i_dir_data_type}/"
# f"{version[i_file]}/dst/{filename[i_file]}"
# )
# # Check that the file exists
# sub = subprocess.run(["ils", full_path], capture_output=True)
# ils_out = (
# sub.stdout.decode("utf-8").replace(" ", "").replace("\n", "")
# )
# if filename[i_file] in ils_out:
# dst_file[f"{i_file}_irods"] = full_path
return path
###############################################################################
[docs]
def check_raw_data_availability(dataset, run_number_list):
"""
Check that the raw data are available on iRODS
Function optimised for long list (typically JQAQC one)
Input:
- dataset: dataset name (ex: D0ORCA015)
- list of runs to be checked
"""
d_id = km3db.tools.todetid(dataset)
regular_dir = f"/in2p3/km3net/data/raw/sea/KM3NeT_{d_id:08d}"
cmd = f"ils {regular_dir}"
out = os.popen(cmd)
nb_of_files = {}
for i_out in out:
sub_dir = i_out.split("C- ")
if len(sub_dir) > 1:
sub_dir_clean = sub_dir[1].replace("\n", "")
cmd = f"ils {sub_dir_clean}"
out2 = os.popen(cmd)
for i_out2 in out2:
re_file = re.compile(rf"\s*KM3NeT_0+{d_id}_0+(\d+).root\s*")
re_file_match = re_file.match(i_out2)
if re_file_match:
run_number = re_file_match.group(1)
if run_number in nb_of_files.keys():
nb_of_files[run_number] += 1
else:
nb_of_files[run_number] = 1
results = {}
for v_run in run_number_list:
if f"{v_run}" not in nb_of_files.keys():
results[v_run] = 0
elif nb_of_files[f"{v_run}"] > 1:
results[v_run] = nb_of_files[f"{v_run}"]
return results
############################################################################################
[docs]
def check_reco_availability(dataset, run_list, data_type, version, where, what):
"""
Check that the reco/dst/light availability in sps/irods
Function optimised for long list (typically JQAQC one)
Input:
- dataset: dataset name (ex: D0ORCA015)
- list of runs to be checked
"""
d_id_str = f"{km3db.tools.todetid(dataset):08d}"
# Initialize results
results = {}
for i_run in run_list:
results[i_run] = {}
for i_type in ["data", "mc_noise", "mc_mupage", "mc_neutr"]:
results[i_run][i_type] = {}
# Loop on what (reco, dst, light)
for i_what in what:
results[i_run][i_type][i_what] = {}
# loop on where (sps, irods)
for i_where in where:
results[i_run][i_type][i_what][i_where] = 0
filename_re = {
"data": {
"reco": re.compile(f"KM3NeT_{d_id_str}_(\d+).data.{data_type}.offline.{version['data']}.root"),
"dst": re.compile(f"KM3NeT_{d_id_str}_(\d+).data.{data_type}.offline.dst.{version['data']}.root")
},
"mc_noise": {
"reco": re.compile(f"KM3NeT_{d_id_str}_(\d+).mc.pure_noise.*\.root"),
"dst": re.compile(f"KM3NeT_{d_id_str}_(\d+).mc.pure_noise.*\.root")
},
"mc_mupage": {
"reco": re.compile(f"KM3NeT_{d_id_str}_(\d+).mc.mupage.*\.root"),
"dst": re.compile(f"KM3NeT_{d_id_str}_(\d+).mc.mupage.*\.root")
},
"mc_neutr": {
"reco": re.compile(f"KM3NeT_{d_id_str}_(\d+).mc.gsg.*\.root"),
"dst": re.compile(f"KM3NeT_{d_id_str}_(\d+).mc.gsg.*\.root")
},
}
# Extract the directory content only once as it can be time consuming
path_0 = {
"sps":{
"data": "/sps/km3net/repo/data_processing/tag/",
"mc_noise": "/sps/km3net/repo/data_processing/tag/",
"mc_mupage": "/sps/km3net/repo/data_processing/tag/",
"mc_neutr": "/sps/km3net/repo/data_processing/tag/"
},
"irods":{
"data": "/in2p3/km3net/data/",
"mc_noise": "/in2p3/km3net/mc/pure_noise/",
"mc_mupage": "/in2p3/km3net/mc/atm_muon/",
"mc_neutr": "/in2p3/km3net/mc/atm_neutrino/"
}
}
for i_type in ["data", "mc_noise", "mc_mupage", "mc_neutr"]:
path = {
"reco":{
"sps": [f"{path_0['sps'][i_type]}/{version[i_type]}/workdirs/KM3NeT_{d_id_str}/results/{d_id_str}/reco"],
"irods": f"{path_0['irods'][i_type]}/KM3NeT_{d_id_str}/{version[i_type]}/reco"
},
"dst":{
"sps": [
f"{path_0['sps'][i_type]}/{version[i_type]}/workdirs/KM3NeT_{d_id_str}/results/{d_id_str}/dst",
f"{path_0['sps'][i_type]}/{version[i_type]}/workdirs/KM3NeT_{d_id_str}_priority/results/{d_id_str}/dst",
f"{path_0['sps'][i_type]}/{version[i_type]}/workdirs/KM3NeT_{d_id_str}_secondary/results/{d_id_str}/dst"
],
"irods": f"{path_0['irods'][i_type]}/KM3NeT_{d_id_str}/{version[i_type]}/dst"
},
"light":{
"sps": ""
}
}
# Loop on what (reco, dst, light)
for i_what in what:
# loop on where (sps, irods)
for i_where in where:
# Extract the content
content_list = []
if i_where == "sps":
for i_sps_dir in path[i_what][i_where]:
if os.path.exists(i_sps_dir):
for i_out in os.listdir(i_sps_dir):
content_list.append(i_out)
elif i_where == "irods":
cmd = f"ils {path[i_what][i_where]}"
out = os.popen(cmd)
for i_out in out:
if "KM3NeT" in i_out:
content_list.append(i_out.replace(" ","").replace("\n",""))
# Loop on all content and regex
for i_content in content_list:
re_file_match = filename_re[i_type][i_what].match(i_content)
if re_file_match:
run_number = int(re_file_match.group(1))
if run_number in run_list:
results[run_number][i_type][i_what][i_where] += 1
return results