Skip to content

Commit 4c88807

Browse files
committed
Add dispatch runner
Signed-off-by: Mathias L. Baumann <mathias.baumann@frequenz.com>
1 parent 2b3f7e8 commit 4c88807

File tree

3 files changed

+170
-1
lines changed

3 files changed

+170
-1
lines changed

RELEASE_NOTES.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
## New Features
1212

13-
<!-- Here goes the main new features and examples or instructions on how to use them -->
13+
* We now provide the `DispatchRunnerActor` class, a class to manage actors based on incoming dispatches.
1414

1515
## Bug Fixes
1616

src/frequenz/dispatch/__init__.py

+5
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,15 @@
77
88
* [Dispatcher][frequenz.dispatch.Dispatcher]: The entry point for the API.
99
* [Dispatch][frequenz.dispatch.Dispatch]: A dispatch type with lots of useful extra functionality.
10+
* [DispatchRunnerActor][frequenz.dispatch.DispatchRunnerActor]: An actor to
11+
manage other actors based on incoming dispatches.
1012
* [Created][frequenz.dispatch.Created],
1113
[Updated][frequenz.dispatch.Updated],
1214
[Deleted][frequenz.dispatch.Deleted]: Dispatch event types.
1315
1416
"""
1517

18+
from ._actor_runner import DispatchConfigurationEvent, DispatchRunnerActor
1619
from ._dispatch import Dispatch, RunningState
1720
from ._dispatcher import Dispatcher, ReceiverFetcher
1821
from ._event import Created, Deleted, DispatchEvent, Updated
@@ -26,4 +29,6 @@
2629
"Updated",
2730
"Dispatch",
2831
"RunningState",
32+
"DispatchRunnerActor",
33+
"DispatchConfigurationEvent",
2934
]
+164
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
# License: All rights reserved
2+
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Helper class to manage actors based on dispatches."""
5+
6+
import logging
7+
from dataclasses import dataclass
8+
from typing import Any
9+
10+
from frequenz.channels import Receiver, Sender
11+
from frequenz.client.dispatch.types import ComponentSelector
12+
from frequenz.sdk.actor import Actor
13+
14+
from ._dispatch import Dispatch, RunningState
15+
16+
_logger = logging.getLogger(__name__)
17+
18+
19+
@dataclass(frozen=True, kw_only=True)
20+
class DispatchConfigurationEvent:
21+
"""Event emitted when the dispatch configuration changes."""
22+
23+
components: ComponentSelector
24+
"""Components to be used."""
25+
26+
dry_run: bool
27+
"""Whether this is a dry run."""
28+
29+
payload: dict[str, Any]
30+
"""Additional payload."""
31+
32+
33+
class DispatchRunnerActor(Actor):
34+
"""Helper class to manage actors based on dispatches.
35+
36+
Example usage:
37+
```python
38+
import os
39+
import asyncio
40+
from frequenz.dispatch import Dispatcher, DispatchRunnerActor, DispatchConfigurationEvent
41+
from frequenz.channels import Receiver, Broadcast
42+
from unittest.mock import MagicMock
43+
44+
class MyActor(Actor):
45+
def __init__(self, config_channel: Receiver[DispatchConfigurationEvent]):
46+
super().__init__()
47+
self._config_channel = config_channel
48+
49+
async def _run(self) -> None:
50+
while True:
51+
config = await self._config_channel.receive()
52+
print("Received config:", config)
53+
54+
async def run():
55+
url = os.getenv("DISPATCH_API_URL", "grpc://fz-0004.frequenz.io:50051")
56+
key = os.getenv("DISPATCH_API_KEY", "some-key")
57+
58+
microgrid_id = 1
59+
60+
dispatcher = Dispatcher(
61+
microgrid_id=microgrid_id,
62+
server_url=url,
63+
key=key
64+
)
65+
66+
# Create config channel to receive (re-)configuration events pre-start and mid-run
67+
config_channel = Broadcast[DispatchConfigurationEvent](name="config_channel")
68+
69+
# Start actor and supporting actor, give each a config channel receiver
70+
my_actor = MyActor(config_channel.new_receiver())
71+
supporting_actor = MagicMock(config_channel.new_receiver())
72+
73+
status_receiver = dispatcher.running_status_change.new_receiver()
74+
75+
dispatch_handler = DispatchRunnerActor(
76+
actors=[my_actor, supporting_actor],
77+
dispatch_type="EXAMPLE",
78+
running_status_receiver=status_receiver,
79+
configuration_sender=config_channel.new_sender(),
80+
)
81+
82+
await asyncio.gather(dispatcher.start(), dispatch_handler.start())
83+
```
84+
"""
85+
86+
def __init__(
87+
self,
88+
actors: frozenset[Actor],
89+
dispatch_type: str,
90+
running_status_receiver: Receiver[Dispatch],
91+
configuration_sender: Sender[DispatchConfigurationEvent] | None = None,
92+
) -> None:
93+
"""Initialize the dispatch handler.
94+
95+
Args:
96+
actors: The actors to handle.
97+
dispatch_type: The type of dispatches to handle.
98+
running_status_receiver: The receiver for dispatch running status changes.
99+
configuration_sender: The sender for dispatch configuration events
100+
"""
101+
super().__init__()
102+
self._dispatch_rx = running_status_receiver
103+
self._actors = actors
104+
self._dispatch_type = dispatch_type
105+
self._configuration_sender = configuration_sender
106+
107+
def _start_actors(self) -> None:
108+
"""Start all actors."""
109+
for actor in self._actors:
110+
if actor.is_running:
111+
_logger.warning("Actor %s is already running", actor.name)
112+
else:
113+
actor.start()
114+
115+
async def _stop_actors(self, msg: str) -> None:
116+
"""Stop all actors.
117+
118+
Args:
119+
msg: The message to be passed to the actors being stopped.
120+
"""
121+
for actor in self._actors:
122+
if actor.is_running:
123+
await actor.stop(msg)
124+
else:
125+
_logger.warning("Actor %s is not running", actor.name)
126+
127+
async def _run(self) -> None:
128+
"""Wait for dispatches and handle them."""
129+
while True:
130+
_logger.info("Waiting for dispatch...")
131+
dispatch = await self._dispatch_rx.receive()
132+
await self._handle_dispatch(dispatch=dispatch)
133+
134+
async def _handle_dispatch(self, dispatch: Dispatch) -> None:
135+
"""Handle a dispatch.
136+
137+
Args:
138+
dispatch: The dispatch to handle.
139+
140+
Returns:
141+
The running state.
142+
"""
143+
running = dispatch.running(self._dispatch_type)
144+
match running:
145+
case RunningState.STOPPED:
146+
_logger.info("Stopping dispatch...")
147+
await self._stop_actors("Dispatch stopped")
148+
case RunningState.RUNNING:
149+
if self._configuration_sender is not None:
150+
_logger.info("Updating configuration...")
151+
await self._configuration_sender.send(
152+
DispatchConfigurationEvent(
153+
components=dispatch.selector,
154+
dry_run=dispatch.dry_run,
155+
payload=dispatch.payload,
156+
)
157+
)
158+
159+
_logger.info("Running dispatch...")
160+
self._start_actors()
161+
case RunningState.DIFFERENT_TYPE:
162+
_logger.debug(
163+
"Unknown dispatch! Ignoring dispatch of type %s", dispatch.type
164+
)

0 commit comments

Comments
 (0)