Coverage for src/km3dq_common/shift_schedule_class.py: 0%
36 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-16 14:13 +0000
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-16 14:13 +0000
1#! /usr/bin/env python
2###############################################################################
3import datetime
4import tomli
5import re
6import urllib.request
9###################################################################################
10# Class inspired from https://gitlab.in2p3.fr/laurent-aphecetche/km3net-shifter-tools
11# package
13class ShiftSchedule:
14 def __init__(self, source, n_weeks):
15 self.n_weeks = n_weeks
16 self.current_week = ""
17 self.current_sc = ""
18 self.current_rc = ""
19 self.upload_schedule_from_sftp()
20 self.get_current_crew()
22 ##############################
23 def upload_schedule_from_sftp(self):
24 """
25 Retrieve from the KM3NeT database the name/mails of shifters and
26 run coordinator
27 """
29 self.shift_crew = {}
30 self.run_coordinator = {}
32 with urllib.request.urlopen("https://sftp.km3net.de/data/km3dq_lw_db/Common/shift_schedule.toml") as s_f:
33 tmp = s_f.read()
35 t = tomli.loads(tmp.decode("utf-8"))
37 self.update = t["last_update"]
38 for (_, i_value) in t["shift_crew"].items():
39 self.shift_crew[i_value["week"]] = {"shift_leader": i_value["shift_leader"],
40 "shifter": i_value["shifter"],
41 "name": i_value["name"],
42 "mail": i_value["mail"]}
43 for (_, i_value) in t["run_coordinator"].items():
44 self.run_coordinator[i_value["week"]] = {"run_coordinator": i_value["run_coordinator"],
45 "name": i_value["name"],
46 "mail": i_value["mail"]}
48 ##############################
49 def get_current_crew(self):
50 """
51 Return the names of the two shifters as a single string
52 """
53 tod = datetime.datetime.today()
54 today = datetime.date(tod.year, tod.month, tod.day)
56 current_crew = "Unknown"
57 for i_week in self.shift_crew.keys():
58 re_week = re.compile(r"(\d+)/(\d+)/(\d+) - (\d+)/(\d+)/(\d+)")
59 res = re_week.search(i_week)
61 delta = (datetime.date(int(res.group(3)),
62 int(res.group(2)),
63 int(res.group(1)))-today).days
64 if all((delta >= -7, delta < 0)):
65 self.current_week = i_week
66 self.current_week_short = re.sub(r'/20\d\d', "", i_week)
67 self.current_sc = self.shift_crew[i_week]["name"]
68 self.current_rc = self.run_coordinator[i_week]["name"]