Source code for km3dq_common.shift_schedule_class

#! /usr/bin/env python
###############################################################################
import datetime
import tomli
import re
import urllib.request


###################################################################################
# Class inspired from https://gitlab.in2p3.fr/laurent-aphecetche/km3net-shifter-tools
# package

[docs] class ShiftSchedule: def __init__(self, source, n_weeks):
[docs] self.n_weeks = n_weeks
[docs] self.current_week = ""
[docs] self.current_sc = ""
[docs] self.current_rc = ""
self.upload_schedule_from_sftp() self.get_current_crew() ##############################
[docs] def upload_schedule_from_sftp(self): """ Retrieve from the KM3NeT database the name/mails of shifters and run coordinator """ self.shift_crew = {} self.run_coordinator = {} with urllib.request.urlopen("https://sftp.km3net.de/data/km3dq_lw_db/Common/shift_schedule.toml") as s_f: tmp = s_f.read() t = tomli.loads(tmp.decode("utf-8")) self.update = t["last_update"] for (_, i_value) in t["shift_crew"].items(): self.shift_crew[i_value["week"]] = {"shift_leader": i_value["shift_leader"], "shifter": i_value["shifter"], "name": i_value["name"], "mail": i_value["mail"]} for (_, i_value) in t["run_coordinator"].items(): self.run_coordinator[i_value["week"]] = {"run_coordinator": i_value["run_coordinator"], "name": i_value["name"], "mail": i_value["mail"]}
##############################
[docs] def get_current_crew(self): """ Return the names of the two shifters as a single string """ tod = datetime.datetime.today() today = datetime.date(tod.year, tod.month, tod.day) current_crew = "Unknown" for i_week in self.shift_crew.keys(): re_week = re.compile(r"(\d+)/(\d+)/(\d+) - (\d+)/(\d+)/(\d+)") res = re_week.search(i_week) delta = (datetime.date(int(res.group(3)), int(res.group(2)), int(res.group(1)))-today).days if all((delta >= -7, delta < 0)): self.current_week = i_week self.current_week_short = re.sub(r'/20\d\d', "", i_week) self.current_sc = self.shift_crew[i_week]["name"] self.current_rc = self.run_coordinator[i_week]["name"]