Coverage for src/km3dq_common/shift_schedule_class.py: 0%

36 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-06-16 14:13 +0000

1#! /usr/bin/env python 

2############################################################################### 

3import datetime 

4import tomli 

5import re 

6import urllib.request 

7 

8 

9################################################################################### 

10# Class inspired from https://gitlab.in2p3.fr/laurent-aphecetche/km3net-shifter-tools 

11# package 

12 

13class ShiftSchedule: 

14 def __init__(self, source, n_weeks): 

15 self.n_weeks = n_weeks 

16 self.current_week = "" 

17 self.current_sc = "" 

18 self.current_rc = "" 

19 self.upload_schedule_from_sftp() 

20 self.get_current_crew() 

21 

22 ############################## 

23 def upload_schedule_from_sftp(self): 

24 """ 

25 Retrieve from the KM3NeT database the name/mails of shifters and 

26 run coordinator 

27 """ 

28 

29 self.shift_crew = {} 

30 self.run_coordinator = {} 

31 

32 with urllib.request.urlopen("https://sftp.km3net.de/data/km3dq_lw_db/Common/shift_schedule.toml") as s_f: 

33 tmp = s_f.read() 

34 

35 t = tomli.loads(tmp.decode("utf-8")) 

36 

37 self.update = t["last_update"] 

38 for (_, i_value) in t["shift_crew"].items(): 

39 self.shift_crew[i_value["week"]] = {"shift_leader": i_value["shift_leader"], 

40 "shifter": i_value["shifter"], 

41 "name": i_value["name"], 

42 "mail": i_value["mail"]} 

43 for (_, i_value) in t["run_coordinator"].items(): 

44 self.run_coordinator[i_value["week"]] = {"run_coordinator": i_value["run_coordinator"], 

45 "name": i_value["name"], 

46 "mail": i_value["mail"]} 

47 

48 ############################## 

49 def get_current_crew(self): 

50 """ 

51 Return the names of the two shifters as a single string 

52 """ 

53 tod = datetime.datetime.today() 

54 today = datetime.date(tod.year, tod.month, tod.day) 

55 

56 current_crew = "Unknown" 

57 for i_week in self.shift_crew.keys(): 

58 re_week = re.compile(r"(\d+)/(\d+)/(\d+) - (\d+)/(\d+)/(\d+)") 

59 res = re_week.search(i_week) 

60 

61 delta = (datetime.date(int(res.group(3)), 

62 int(res.group(2)), 

63 int(res.group(1)))-today).days 

64 if all((delta >= -7, delta < 0)): 

65 self.current_week = i_week 

66 self.current_week_short = re.sub(r'/20\d\d', "", i_week) 

67 self.current_sc = self.shift_crew[i_week]["name"] 

68 self.current_rc = self.run_coordinator[i_week]["name"] 

69