forked from frequenz-floss/frequenz-dispatch-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path_dispatch.py
265 lines (205 loc) · 8.22 KB
/
_dispatch.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
# License: MIT
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH
"""Dispatch type with support for next_run calculation."""
import logging
from dataclasses import dataclass
from datetime import datetime, timezone
from enum import Enum
from typing import Iterator, cast
from dateutil import rrule
from frequenz.client.dispatch.types import Dispatch as BaseDispatch
from frequenz.client.dispatch.types import Frequency, Weekday
_logger = logging.getLogger(__name__)
"""The logger for this module."""
_RRULE_FREQ_MAP = {
Frequency.MINUTELY: rrule.MINUTELY,
Frequency.HOURLY: rrule.HOURLY,
Frequency.DAILY: rrule.DAILY,
Frequency.WEEKLY: rrule.WEEKLY,
Frequency.MONTHLY: rrule.MONTHLY,
}
"""To map from our Frequency enum to the dateutil library enum."""
_RRULE_WEEKDAY_MAP = {
Weekday.MONDAY: rrule.MO,
Weekday.TUESDAY: rrule.TU,
Weekday.WEDNESDAY: rrule.WE,
Weekday.THURSDAY: rrule.TH,
Weekday.FRIDAY: rrule.FR,
Weekday.SATURDAY: rrule.SA,
Weekday.SUNDAY: rrule.SU,
}
"""To map from our Weekday enum to the dateutil library enum."""
class RunningState(Enum):
"""The running state of a dispatch."""
RUNNING = "RUNNING"
"""The dispatch is running."""
STOPPED = "STOPPED"
"""The dispatch is stopped."""
DIFFERENT_TYPE = "DIFFERENT_TYPE"
"""The dispatch is for a different type."""
@dataclass(frozen=True)
class Dispatch(BaseDispatch):
"""Dispatch type with extra functionality."""
deleted: bool = False
"""Whether the dispatch is deleted."""
running_state_change_synced: datetime | None = None
"""The last time a message was sent about the running state change."""
def __init__(
self,
client_dispatch: BaseDispatch,
deleted: bool = False,
running_state_change_synced: datetime | None = None,
):
"""Initialize the dispatch.
Args:
client_dispatch: The client dispatch.
deleted: Whether the dispatch is deleted.
running_state_change_synced: Timestamp of the last running state change message.
"""
super().__init__(**client_dispatch.__dict__)
# Work around frozen to set deleted
object.__setattr__(self, "deleted", deleted)
object.__setattr__(
self,
"running_state_change_synced",
running_state_change_synced,
)
def _set_deleted(self) -> None:
"""Mark the dispatch as deleted."""
object.__setattr__(self, "deleted", True)
@property
def _running_status_notified(self) -> bool:
"""Check that the latest running state change notification was sent.
Returns:
True if the latest running state change notification was sent, False otherwise.
"""
return self.running_state_change_synced == self.update_time
def _set_running_status_notified(self) -> None:
"""Mark the latest running state change notification as sent."""
object.__setattr__(self, "running_state_change_synced", self.update_time)
def running(self, type_: str) -> RunningState:
"""Check if the dispatch is currently supposed to be running.
Args:
type_: The type of the dispatch that should be running.
Returns:
RUNNING if the dispatch is running,
STOPPED if it is stopped,
DIFFERENT_TYPE if it is for a different type.
"""
if self.type != type_:
return RunningState.DIFFERENT_TYPE
if not self.active or self.deleted:
return RunningState.STOPPED
now = datetime.now(tz=timezone.utc)
if now < self.start_time:
return RunningState.STOPPED
# A dispatch without duration is always running once it started
if self.duration is None:
return RunningState.RUNNING
if until := self._until(now):
return RunningState.RUNNING if now < until else RunningState.STOPPED
return RunningState.STOPPED
@property
def until(self) -> datetime | None:
"""Time when the dispatch should end.
Returns the time that a running dispatch should end.
If the dispatch is not running, None is returned.
Returns:
The time when the dispatch should end or None if the dispatch is not running.
"""
if not self.active or self.deleted:
return None
now = datetime.now(tz=timezone.utc)
return self._until(now)
@property
# noqa is needed because of a bug in pydoclint that makes it think a `return` without a return
# value needs documenting
def missed_runs(self) -> Iterator[datetime]: # noqa: DOC405
"""Yield all missed runs of a dispatch.
Yields all missed runs of a dispatch.
If a running state change notification was not sent in time
due to connection issues, this method will yield all missed runs
since the last sent notification.
Returns:
A generator that yields all missed runs of a dispatch.
"""
if self.update_time == self.running_state_change_synced:
return
from_time = self.update_time
now = datetime.now(tz=timezone.utc)
while (next_run := self.next_run_after(from_time)) and next_run < now:
yield next_run
from_time = next_run
@property
def next_run(self) -> datetime | None:
"""Calculate the next run of a dispatch.
Returns:
The next run of the dispatch or None if the dispatch is finished.
"""
return self.next_run_after(datetime.now(tz=timezone.utc))
def next_run_after(self, after: datetime) -> datetime | None:
"""Calculate the next run of a dispatch.
Args:
after: The time to calculate the next run from.
Returns:
The next run of the dispatch or None if the dispatch is finished.
"""
if (
not self.recurrence.frequency
or self.recurrence.frequency == Frequency.UNSPECIFIED
or self.duration is None # Infinite duration
):
if after > self.start_time:
return None
return self.start_time
# Make sure no weekday is UNSPECIFIED
if Weekday.UNSPECIFIED in self.recurrence.byweekdays:
_logger.warning("Dispatch %s has UNSPECIFIED weekday, ignoring...", self.id)
return None
# No type information for rrule, so we need to cast
return cast(datetime | None, self._prepare_rrule().after(after, inc=True))
def _prepare_rrule(self) -> rrule.rrule:
"""Prepare the rrule object.
Returns:
The rrule object.
"""
count, until = (None, None)
if end := self.recurrence.end_criteria:
count = end.count
until = end.until
rrule_obj = rrule.rrule(
freq=_RRULE_FREQ_MAP[self.recurrence.frequency],
dtstart=self.start_time,
count=count,
until=until,
byminute=self.recurrence.byminutes,
byhour=self.recurrence.byhours,
byweekday=[
_RRULE_WEEKDAY_MAP[weekday] for weekday in self.recurrence.byweekdays
],
bymonthday=self.recurrence.bymonthdays,
bymonth=self.recurrence.bymonths,
interval=self.recurrence.interval,
)
return rrule_obj
def _until(self, now: datetime) -> datetime | None:
"""Calculate the time when the dispatch should end.
If no previous run is found, None is returned.
Args:
now: The current time.
Returns:
The time when the dispatch should end or None if the dispatch is not running.
Raises:
ValueError: If the dispatch has no duration.
"""
if self.duration is None:
raise ValueError("_until: Dispatch has no duration")
if (
not self.recurrence.frequency
or self.recurrence.frequency == Frequency.UNSPECIFIED
):
return self.start_time + self.duration
latest_past_start: datetime | None = self._prepare_rrule().before(now, inc=True)
if not latest_past_start:
return None
return latest_past_start + self.duration