Skip to content

Commit 7e3419b

Browse files
committed
Add dispatch runner
Signed-off-by: Mathias L. Baumann <mathias.baumann@frequenz.com>
1 parent 46d5b17 commit 7e3419b

File tree

4 files changed

+364
-1
lines changed

4 files changed

+364
-1
lines changed

RELEASE_NOTES.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
## New Features
1212

13-
<!-- Here goes the main new features and examples or instructions on how to use them -->
13+
* We now provide the `DispatchRunnerActor` class, a class to manage actors based on incoming dispatches.
1414

1515
## Bug Fixes
1616

src/frequenz/dispatch/__init__.py

+5
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,15 @@
77
88
* [Dispatcher][frequenz.dispatch.Dispatcher]: The entry point for the API.
99
* [Dispatch][frequenz.dispatch.Dispatch]: A dispatch type with lots of useful extra functionality.
10+
* [DispatchRunnerActor][frequenz.dispatch.DispatchRunnerActor]: An actor to
11+
manage other actors based on incoming dispatches.
1012
* [Created][frequenz.dispatch.Created],
1113
[Updated][frequenz.dispatch.Updated],
1214
[Deleted][frequenz.dispatch.Deleted]: Dispatch event types.
1315
1416
"""
1517

18+
from ._actor_runner import DispatchConfigurationEvent, DispatchRunnerActor
1619
from ._dispatch import Dispatch, RunningState
1720
from ._dispatcher import Dispatcher, ReceiverFetcher
1821
from ._event import Created, Deleted, DispatchEvent, Updated
@@ -26,4 +29,6 @@
2629
"Updated",
2730
"Dispatch",
2831
"RunningState",
32+
"DispatchRunnerActor",
33+
"DispatchConfigurationEvent",
2934
]
+187
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
# License: All rights reserved
2+
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Helper class to manage actors based on dispatches."""
5+
6+
import logging
7+
from dataclasses import dataclass
8+
from typing import Any
9+
10+
from frequenz.channels import Receiver, Sender
11+
from frequenz.client.dispatch.types import ComponentSelector
12+
from frequenz.sdk.actor import Actor
13+
14+
from ._dispatch import Dispatch, RunningState
15+
16+
_logger = logging.getLogger(__name__)
17+
18+
19+
@dataclass(frozen=True, kw_only=True)
20+
class DispatchConfigurationEvent:
21+
"""Event emitted when the dispatch configuration changes."""
22+
23+
components: ComponentSelector
24+
"""Components to be used."""
25+
26+
dry_run: bool
27+
"""Whether this is a dry run."""
28+
29+
payload: dict[str, Any]
30+
"""Additional payload."""
31+
32+
33+
class DispatchRunnerActor(Actor):
34+
"""Helper class to manage actors based on dispatches.
35+
36+
Example usage:
37+
38+
```python
39+
import os
40+
import asyncio
41+
from frequenz.dispatch import Dispatcher, DispatchRunnerActor, DispatchConfigurationEvent
42+
from frequenz.client.dispatch.types import ComponentSelector
43+
from frequenz.client.common.microgrid.components import ComponentCategory
44+
45+
from frequenz.channels import Receiver, Broadcast
46+
from unittest.mock import MagicMock
47+
48+
class MyActor(Actor):
49+
def __init__(self, config_channel: Receiver[DispatchConfigurationEvent]):
50+
super().__init__()
51+
self._config_channel = config_channel
52+
self._dry_run: bool
53+
self._payload: dict[str, Any]
54+
55+
async def _run(self) -> None:
56+
while True:
57+
config = await self._config_channel.receive()
58+
print("Received config:", config)
59+
60+
self.set_components(config.components)
61+
self._dry_run = config.dry_run
62+
self._payload = config.payload
63+
64+
def set_components(self, components: ComponentSelector) -> None:
65+
match components:
66+
case [int(), *_] as component_ids:
67+
print("Dispatch: Setting components to %s", components)
68+
case [ComponentCategory.BATTERY, *_]:
69+
print("Dispatch: Using all battery components")
70+
case _ as unsupported:
71+
print(
72+
"Dispatch: Requested an unsupported selector %r, "
73+
"but only component IDs or category BATTERY are supported.",
74+
unsupported,
75+
)
76+
77+
async def run():
78+
url = os.getenv("DISPATCH_API_URL", "grpc://fz-0004.frequenz.io:50051")
79+
key = os.getenv("DISPATCH_API_KEY", "some-key")
80+
81+
microgrid_id = 1
82+
83+
dispatcher = Dispatcher(
84+
microgrid_id=microgrid_id,
85+
server_url=url,
86+
key=key
87+
)
88+
89+
# Create config channel to receive (re-)configuration events pre-start and mid-run
90+
config_channel = Broadcast[DispatchConfigurationEvent](name="config_channel")
91+
92+
# Start actor and supporting actor, give each a config channel receiver
93+
my_actor = MyActor(config_channel.new_receiver())
94+
supporting_actor = MagicMock(config_channel.new_receiver())
95+
96+
status_receiver = dispatcher.running_status_change.new_receiver()
97+
98+
dispatch_handler = DispatchRunnerActor(
99+
actors=frozenset([my_actor, supporting_actor]),
100+
dispatch_type="EXAMPLE",
101+
running_status_receiver=status_receiver,
102+
configuration_sender=config_channel.new_sender(),
103+
)
104+
105+
await asyncio.gather(dispatcher.start(), dispatch_handler.start())
106+
```
107+
"""
108+
109+
def __init__(
110+
self,
111+
actors: frozenset[Actor],
112+
dispatch_type: str,
113+
running_status_receiver: Receiver[Dispatch],
114+
configuration_sender: Sender[DispatchConfigurationEvent] | None = None,
115+
) -> None:
116+
"""Initialize the dispatch handler.
117+
118+
Args:
119+
actors: The actors to handle.
120+
dispatch_type: The type of dispatches to handle.
121+
running_status_receiver: The receiver for dispatch running status changes.
122+
configuration_sender: The sender for dispatch configuration events
123+
"""
124+
super().__init__()
125+
self._dispatch_rx = running_status_receiver
126+
self._actors = actors
127+
self._dispatch_type = dispatch_type
128+
self._configuration_sender = configuration_sender
129+
130+
def _start_actors(self) -> None:
131+
"""Start all actors."""
132+
for actor in self._actors:
133+
if actor.is_running:
134+
_logger.warning("Actor %s is already running", actor.name)
135+
else:
136+
actor.start()
137+
138+
async def _stop_actors(self, msg: str) -> None:
139+
"""Stop all actors.
140+
141+
Args:
142+
msg: The message to be passed to the actors being stopped.
143+
"""
144+
for actor in self._actors:
145+
if actor.is_running:
146+
await actor.stop(msg)
147+
else:
148+
_logger.warning("Actor %s is not running", actor.name)
149+
150+
async def _run(self) -> None:
151+
"""Wait for dispatches and handle them."""
152+
while True:
153+
_logger.info("Waiting for dispatch...")
154+
dispatch = await self._dispatch_rx.receive()
155+
await self._handle_dispatch(dispatch=dispatch)
156+
157+
async def _handle_dispatch(self, dispatch: Dispatch) -> None:
158+
"""Handle a dispatch.
159+
160+
Args:
161+
dispatch: The dispatch to handle.
162+
163+
Returns:
164+
The running state.
165+
"""
166+
running = dispatch.running(self._dispatch_type)
167+
match running:
168+
case RunningState.STOPPED:
169+
_logger.info("Stopping dispatch...")
170+
await self._stop_actors("Dispatch stopped")
171+
case RunningState.RUNNING:
172+
if self._configuration_sender is not None:
173+
_logger.info("Updating configuration...")
174+
await self._configuration_sender.send(
175+
DispatchConfigurationEvent(
176+
components=dispatch.selector,
177+
dry_run=dispatch.dry_run,
178+
payload=dispatch.payload,
179+
)
180+
)
181+
182+
_logger.info("Running dispatch...")
183+
self._start_actors()
184+
case RunningState.DIFFERENT_TYPE:
185+
_logger.debug(
186+
"Unknown dispatch! Ignoring dispatch of type %s", dispatch.type
187+
)

tests/test_runner.py

+171
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
# LICENSE: ALL RIGHTS RESERVED
2+
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Test the dispatch runner."""
5+
6+
import asyncio
7+
from dataclasses import dataclass, replace
8+
from datetime import datetime, timedelta, timezone
9+
from typing import AsyncIterator, Iterator
10+
11+
import async_solipsism
12+
import pytest
13+
import time_machine
14+
from frequenz.channels import Broadcast, Receiver, Sender
15+
from frequenz.client.dispatch.test.generator import DispatchGenerator
16+
from frequenz.client.dispatch.types import Frequency
17+
from frequenz.sdk.actor import Actor
18+
from pytest import fixture
19+
20+
from frequenz.dispatch import Dispatch, DispatchConfigurationEvent, DispatchRunnerActor
21+
22+
# pylint: disable=protected-access
23+
24+
25+
# This method replaces the event loop for all tests in the file.
26+
@pytest.fixture
27+
def event_loop_policy() -> async_solipsism.EventLoopPolicy:
28+
"""Return an event loop policy that uses the async solipsism event loop."""
29+
return async_solipsism.EventLoopPolicy()
30+
31+
32+
@fixture
33+
def fake_time() -> Iterator[time_machine.Coordinates]:
34+
"""Replace real time with a time machine that doesn't automatically tick."""
35+
# destination can be a datetime or a timestamp (int), so are moving to the
36+
# epoch (in UTC!)
37+
with time_machine.travel(destination=0, tick=False) as traveller:
38+
yield traveller
39+
40+
41+
def _now() -> datetime:
42+
"""Return the current time in UTC."""
43+
return datetime.now(tz=timezone.utc)
44+
45+
46+
class MockActor(Actor):
47+
"""Mock actor for testing."""
48+
49+
async def _run(self) -> None:
50+
while True:
51+
await asyncio.sleep(1)
52+
53+
54+
@dataclass
55+
class TestEnv:
56+
"""Test environment."""
57+
58+
actor: Actor
59+
runner_actor: DispatchRunnerActor
60+
running_status_sender: Sender[Dispatch]
61+
configuration_receiver: Receiver[DispatchConfigurationEvent]
62+
generator: DispatchGenerator = DispatchGenerator()
63+
64+
65+
@fixture
66+
async def test_env() -> AsyncIterator[TestEnv]:
67+
"""Create a test environment."""
68+
channel = Broadcast[Dispatch](name="dispatch ready test channel")
69+
config_channel = Broadcast[DispatchConfigurationEvent](
70+
name="dispatch config test channel"
71+
)
72+
73+
actor = MockActor()
74+
75+
runner_actor = DispatchRunnerActor(
76+
actors=frozenset([actor]),
77+
dispatch_type="UNIT_TEST",
78+
running_status_receiver=channel.new_receiver(),
79+
configuration_sender=config_channel.new_sender(),
80+
)
81+
82+
runner_actor.start()
83+
84+
yield TestEnv(
85+
actor=actor,
86+
runner_actor=runner_actor,
87+
running_status_sender=channel.new_sender(),
88+
configuration_receiver=config_channel.new_receiver(),
89+
)
90+
91+
await runner_actor.stop()
92+
93+
94+
# Disable for _handle_dispatch access
95+
# pylint: disable=protected-access
96+
97+
98+
async def test_simple_start_stop(
99+
test_env: TestEnv,
100+
fake_time: time_machine.Coordinates,
101+
) -> None:
102+
"""Test behavior when receiving start/stop messages."""
103+
now = _now()
104+
duration = timedelta(minutes=10)
105+
dispatch = test_env.generator.generate_dispatch()
106+
dispatch = replace(
107+
dispatch,
108+
active=True,
109+
dry_run=False,
110+
duration=duration,
111+
start_time=now,
112+
payload={"test": True},
113+
type="UNIT_TEST",
114+
recurrence=replace(
115+
dispatch.recurrence,
116+
frequency=Frequency.UNSPECIFIED,
117+
),
118+
)
119+
120+
await test_env.running_status_sender.send(Dispatch(dispatch))
121+
fake_time.shift(timedelta(seconds=1))
122+
123+
event = await test_env.configuration_receiver.receive()
124+
assert event.payload == {"test": True}
125+
assert event.components == dispatch.selector
126+
assert event.dry_run is False
127+
128+
assert test_env.actor.is_running is True
129+
130+
fake_time.shift(duration)
131+
await test_env.running_status_sender.send(Dispatch(dispatch))
132+
133+
# Give await actor.stop a chance to run in DispatchRunnerActor
134+
await asyncio.sleep(0.1)
135+
136+
assert test_env.actor.is_running is False
137+
138+
139+
async def test_dry_run(test_env: TestEnv, fake_time: time_machine.Coordinates) -> None:
140+
"""Test the dry run mode."""
141+
dispatch = test_env.generator.generate_dispatch()
142+
dispatch = replace(
143+
dispatch,
144+
dry_run=True,
145+
active=True,
146+
start_time=_now(),
147+
duration=timedelta(minutes=10),
148+
type="UNIT_TEST",
149+
recurrence=replace(
150+
dispatch.recurrence,
151+
frequency=Frequency.UNSPECIFIED,
152+
),
153+
)
154+
155+
await test_env.running_status_sender.send(Dispatch(dispatch))
156+
fake_time.shift(timedelta(seconds=1))
157+
158+
event = await test_env.configuration_receiver.receive()
159+
160+
assert event.dry_run is dispatch.dry_run
161+
assert event.components == dispatch.selector
162+
assert event.payload == dispatch.payload
163+
assert test_env.actor.is_running is True
164+
165+
fake_time.shift(dispatch.duration)
166+
await test_env.running_status_sender.send(Dispatch(dispatch))
167+
168+
# Give await actor.stop a chance to run in DispatchRunnerActor
169+
await asyncio.sleep(0.1)
170+
171+
assert test_env.actor.is_running is False

0 commit comments

Comments
 (0)