Coverage for src/km3dq_common/sardine_classes.py: 0%

47 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-06-16 14:13 +0000

1#! /usr/bin/env python 

2############################################################################### 

3import datetime 

4import tomli 

5import re 

6import urllib.request 

7 

8 

9################################################################################### 

10# Class inspired from https://gitlab.in2p3.fr/laurent-aphecetche/km3net-shifter-tools 

11# package 

12 

13class ShiftSummary: 

14 def __init__(self, source): 

15 self.current_week = "" 

16 self.current_sc = "" 

17 self.current_rc = "" 

18 if source == "sftp": 

19 self.retrieve_schedule_from_sftp() 

20 self.get_current_crew() 

21 self.retrieve_links_from_sardine() 

22 

23 ############################## 

24 def retrieve_schedule_from_sftp(self): 

25 """ 

26 Retrieve from the KM3NeT database the name/mails of shifters and 

27 run coordinator 

28 """ 

29 

30 self.shift_crew = {} 

31 self.run_coordinator = {} 

32 

33 with urllib.request.urlopen("https://sftp.km3net.de/data/km3dq_lw_db/Common/shift_schedule.toml") as s_f: 

34 tmp = s_f.read() 

35 

36 t = tomli.loads(tmp.decode("utf-8")) 

37 

38 self.update = t["last_update"] 

39 re_week = re.compile(r"(\d+)/(\d+)/(\d+) - (\d+)/(\d+)/(\d+)") 

40 

41 for (_, i_value) in t["shift_crew"].items(): 

42 res = re_week.search(i_value["week"]) 

43 self.shift_crew[i_value["week"]] = { 

44 "day_start": datetime.date(int(res.group(3)), 

45 int(res.group(2)), 

46 int(res.group(1))), 

47 "shift_leader": i_value["shift_leader"], 

48 "shifter": i_value["shifter"], 

49 "name": i_value["name"], 

50 "mail": i_value["mail"] 

51 } 

52 

53 for (_, i_value) in t["run_coordinator"].items(): 

54 res = re_week.search(i_value["week"]) 

55 self.run_coordinator[i_value["week"]] = { 

56 "day_start": datetime.date(int(res.group(3)), 

57 int(res.group(2)), 

58 int(res.group(1))), 

59 "run_coordinator": i_value["run_coordinator"], 

60 "name": i_value["name"], 

61 "mail": i_value["mail"] 

62 } 

63 

64 ############################## 

65 def retrieve_links_from_sardine(self): 

66 """ 

67 """ 

68 

69 with urllib.request.urlopen("https://shifter-tools-km3net-operation-tools-prod.apps.wok2.in2p3.fr/sardine/shift_prop_crew.toml") as s_f: 

70 tmp = s_f.read() 

71 

72 t = tomli.loads(tmp.decode("utf-8")) 

73 

74 self.weekly_report = t["weekly_report_archive"] 

75 self.weekly_plots_arca = t["weekly_plots_arca_archive"] 

76 self.weekly_plots_orca = t["weekly_plots_orca_archive"] 

77 self.weekly_highlights_arca = t["weekly_highlights_arca_archive"] 

78 self.weekly_highlights_orca = t["weekly_highlights_orca_archive"] 

79 

80 ############################## 

81 def get_current_crew(self): 

82 """ 

83 Return the names of the two shifters as a single string 

84 """ 

85 tod = datetime.datetime.today() 

86 today = datetime.date(tod.year, tod.month, tod.day) 

87 

88 current_crew = "Unknown" 

89 for i_week in self.shift_crew.keys(): 

90 delta = (self.shift_crew[i_week]["day_start"]-today).days 

91 if all((delta >= -7, delta < 0)): 

92 self.current_week = i_week 

93 self.current_week_short = re.sub(r'/20\d\d', "", i_week) 

94 self.current_sc = self.shift_crew[i_week]["name"] 

95 self.current_rc = self.run_coordinator[i_week]["name"] 

96