Skip to content

Commit 443e83e

Browse files
authored
Improve resiliency of long-running async tasks (#1081)
Closes #1078
2 parents 31605c6 + cd0c532 commit 443e83e

File tree

11 files changed

+55
-73
lines changed

11 files changed

+55
-73
lines changed

RELEASE_NOTES.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,4 @@
1616

1717
## Bug Fixes
1818

19-
<!-- Here goes notable bug fixes that are worth a special mention or explanation -->
19+
- Many long running async tasks including metric streamers in the BatteryPool now have automatic recovery in case of exceptions.

src/frequenz/sdk/_internal/_asyncio.py

+24-1
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,12 @@
55

66

77
import asyncio
8+
import logging
89
from abc import ABC
9-
from typing import Any
10+
from datetime import timedelta
11+
from typing import Any, Callable, Coroutine
12+
13+
_logger = logging.getLogger(__name__)
1014

1115

1216
async def cancel_and_await(task: asyncio.Task[Any]) -> None:
@@ -28,6 +32,25 @@ async def cancel_and_await(task: asyncio.Task[Any]) -> None:
2832
pass
2933

3034

35+
async def run_forever(
36+
async_callable: Callable[[], Coroutine[Any, Any, None]],
37+
interval: timedelta = timedelta(seconds=1),
38+
) -> None:
39+
"""Run a given function forever, restarting it after any exception.
40+
41+
Args:
42+
async_callable: The async callable to run.
43+
interval: The interval between restarts.
44+
"""
45+
interval_s = interval.total_seconds()
46+
while True:
47+
try:
48+
await async_callable()
49+
except Exception: # pylint: disable=broad-except
50+
_logger.exception("Restarting after exception")
51+
await asyncio.sleep(interval_s)
52+
53+
3154
class NotSyncConstructible(AssertionError):
3255
"""Raised when object with async constructor is created in sync way."""
3356

src/frequenz/sdk/microgrid/_data_sourcing/microgrid_api_source.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
MeterData,
1919
)
2020

21+
from ..._internal._asyncio import run_forever
2122
from ..._internal._channels import ChannelRegistry
2223
from ...microgrid import connection_manager
2324
from ...timeseries import Sample
@@ -460,7 +461,7 @@ async def _update_streams(
460461
self.comp_data_tasks[comp_id].cancel()
461462

462463
self.comp_data_tasks[comp_id] = asyncio.create_task(
463-
self._handle_data_stream(comp_id, category)
464+
run_forever(lambda: self._handle_data_stream(comp_id, category))
464465
)
465466

466467
async def add_metric(self, request: ComponentMetricRequest) -> None:

src/frequenz/sdk/microgrid/_power_distributing/_component_managers/_ev_charger_manager/_ev_charger_manager.py

+2-10
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
)
2525
from typing_extensions import override
2626

27+
from ....._internal._asyncio import run_forever
2728
from ....._internal._math import is_close_to_zero
2829
from .....timeseries import Power, Sample3Phase, Voltage
2930
from .... import _data_pipeline, connection_manager
@@ -89,7 +90,7 @@ async def start(self) -> None:
8990
"""Start the ev charger data manager."""
9091
# Need to start a task only if there are EV chargers in the component graph.
9192
if self._ev_charger_ids:
92-
self._task = asyncio.create_task(self._run_forever())
93+
self._task = asyncio.create_task(run_forever(self._run))
9394

9495
@override
9596
async def distribute_power(self, request: Request) -> None:
@@ -217,15 +218,6 @@ def _act_on_new_data(self, ev_data: EVChargerData) -> dict[int, Power]:
217218
)
218219
return {component_id: target_power}
219220

220-
async def _run_forever(self) -> None:
221-
"""Run the EV charger manager forever."""
222-
while True:
223-
try:
224-
await self._run()
225-
except Exception: # pylint: disable=broad-except
226-
_logger.exception("Recovering from an error in EV charger manager.")
227-
await asyncio.sleep(1.0)
228-
229221
async def _run(self) -> None: # pylint: disable=too-many-locals
230222
"""Run the main event loop of the EV charger manager."""
231223
api = connection_manager.get().api_client

src/frequenz/sdk/microgrid/_power_distributing/_component_status/_ev_charger_status_tracker.py

+2-12
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
)
1818
from typing_extensions import override
1919

20+
from ...._internal._asyncio import run_forever
2021
from ....actor._background_service import BackgroundService
2122
from ... import connection_manager
2223
from ._blocking_status import BlockingStatus
@@ -80,7 +81,7 @@ def __init__( # pylint: disable=too-many-arguments
8081
@override
8182
def start(self) -> None:
8283
"""Start the status tracker."""
83-
self._tasks.add(asyncio.create_task(self._run_forever()))
84+
self._tasks.add(asyncio.create_task(run_forever(self._run)))
8485

8586
def _is_working(self, ev_data: EVChargerData) -> bool:
8687
"""Return whether the given data indicates that the component is working."""
@@ -99,17 +100,6 @@ def _is_stale(self, ev_data: EVChargerData) -> bool:
99100
stale = now - ev_data.timestamp > self._max_data_age
100101
return stale
101102

102-
async def _run_forever(self) -> None:
103-
"""Run the status tracker forever."""
104-
while True:
105-
try:
106-
await self._run()
107-
except Exception: # pylint: disable=broad-except
108-
_logger.exception(
109-
"Restarting after exception in EVChargerStatusTracker.run()"
110-
)
111-
await asyncio.sleep(1.0)
112-
113103
def _handle_ev_data(self, ev_data: EVChargerData) -> ComponentStatusEnum:
114104
"""Handle new EV charger data."""
115105
if self._is_stale(ev_data):

src/frequenz/sdk/microgrid/_power_distributing/_component_status/_pv_inverter_status_tracker.py

+2-11
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from frequenz.client.microgrid import InverterComponentState, InverterData
1313
from typing_extensions import override
1414

15+
from ...._internal._asyncio import run_forever
1516
from ....actor._background_service import BackgroundService
1617
from ... import connection_manager
1718
from ._blocking_status import BlockingStatus
@@ -76,7 +77,7 @@ def __init__( # pylint: disable=too-many-arguments
7677
@override
7778
def start(self) -> None:
7879
"""Start the status tracker."""
79-
self._tasks.add(asyncio.create_task(self._run_forever()))
80+
self._tasks.add(asyncio.create_task(run_forever(self._run)))
8081

8182
def _is_working(self, pv_data: InverterData) -> bool:
8283
"""Return whether the given data indicates that the PV inverter is working."""
@@ -87,16 +88,6 @@ def _is_working(self, pv_data: InverterData) -> bool:
8788
InverterComponentState.STANDBY,
8889
)
8990

90-
async def _run_forever(self) -> None:
91-
while True:
92-
try:
93-
await self._run()
94-
except Exception: # pylint: disable=broad-except
95-
_logger.exception(
96-
"Restarting after exception in PVInverterStatusTracker.run()"
97-
)
98-
await asyncio.sleep(1.0)
99-
10091
def _is_stale(self, pv_data: InverterData) -> bool:
10192
"""Return whether the given data is stale."""
10293
now = datetime.now(tz=timezone.utc)

src/frequenz/sdk/microgrid/_power_managing/_power_managing_actor.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from frequenz.client.microgrid import ComponentCategory, ComponentType, InverterType
1616
from typing_extensions import override
1717

18+
from ..._internal._asyncio import run_forever
1819
from ..._internal._channels import ChannelRegistry
1920
from ...actor import Actor
2021
from ...timeseries import Power
@@ -194,7 +195,7 @@ def _add_system_bounds_tracker(self, component_ids: frozenset[int]) -> None:
194195

195196
# Start the bounds tracker, for ongoing updates.
196197
self._bound_tracker_tasks[component_ids] = asyncio.create_task(
197-
self._bounds_tracker(component_ids, bounds_receiver)
198+
run_forever(lambda: self._bounds_tracker(component_ids, bounds_receiver))
198199
)
199200

200201
def _calculate_shifted_bounds(

src/frequenz/sdk/timeseries/battery_pool/_methods.py

+5-3
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212

1313
from frequenz.channels import Broadcast, Receiver
1414

15-
from ..._internal._asyncio import cancel_and_await
15+
from ..._internal._asyncio import cancel_and_await, run_forever
1616
from ..._internal._constants import RECEIVER_MAX_SIZE, WAIT_FOR_COMPONENT_DATA_SEC
1717
from ...microgrid._power_distributing._component_managers._battery_manager import (
1818
_get_battery_inverter_mappings,
@@ -104,8 +104,10 @@ def __init__(
104104
self._update_event = asyncio.Event()
105105
self._cached_metrics: dict[int, ComponentMetricsData] = {}
106106

107-
self._update_task = asyncio.create_task(self._update_and_notify())
108-
self._send_task = asyncio.create_task(self._send_on_update(min_update_interval))
107+
self._update_task = asyncio.create_task(run_forever(self._update_and_notify))
108+
self._send_task = asyncio.create_task(
109+
run_forever(lambda: self._send_on_update(min_update_interval))
110+
)
109111
self._pending_data_fetchers: set[asyncio.Task[ComponentMetricsData | None]] = (
110112
set()
111113
)

src/frequenz/sdk/timeseries/battery_pool/_metric_calculator.py

+11-3
Original file line numberDiff line numberDiff line change
@@ -397,9 +397,17 @@ def calculate(
397397
#
398398
# Therefore, the variables are named with a `_x100` suffix.
399399
usable_capacity_x100 = capacity * (soc_upper_bound - soc_lower_bound)
400-
soc_scaled = (
401-
(soc - soc_lower_bound) / (soc_upper_bound - soc_lower_bound) * 100.0
402-
)
400+
if math.isclose(soc_upper_bound, soc_lower_bound):
401+
if soc < soc_lower_bound:
402+
soc_scaled = 0.0
403+
else:
404+
soc_scaled = 100.0
405+
else:
406+
soc_scaled = (
407+
(soc - soc_lower_bound)
408+
/ (soc_upper_bound - soc_lower_bound)
409+
* 100.0
410+
)
403411
# we are clamping here because the SoC might be out of bounds
404412
soc_scaled = min(max(soc_scaled, 0.0), 100.0)
405413
timestamp = max(timestamp, metrics.timestamp)

src/frequenz/sdk/timeseries/ev_charger_pool/_system_bounds_tracker.py

+2-15
Original file line numberDiff line numberDiff line change
@@ -5,20 +5,18 @@
55

66

77
import asyncio
8-
import logging
98
from collections import abc
109

1110
from frequenz.channels import Receiver, Sender, merge, select, selected_from
1211
from frequenz.client.microgrid import EVChargerData
1312

13+
from ..._internal._asyncio import run_forever
1414
from ...actor import BackgroundService
1515
from ...microgrid import connection_manager
1616
from ...microgrid._power_distributing._component_status import ComponentPoolStatus
1717
from .. import Power
1818
from .._base_types import Bounds, SystemBounds
1919

20-
_logger = logging.getLogger(__name__)
21-
2220

2321
class EVCSystemBoundsTracker(BackgroundService):
2422
"""Track the system bounds for the EV chargers.
@@ -55,7 +53,7 @@ def __init__(
5553

5654
def start(self) -> None:
5755
"""Start the EV charger system bounds tracker."""
58-
self._tasks.add(asyncio.create_task(self._run_forever()))
56+
self._tasks.add(asyncio.create_task(run_forever(self._run)))
5957

6058
async def _send_bounds(self) -> None:
6159
"""Calculate and send the aggregate system bounds if they have changed."""
@@ -104,17 +102,6 @@ async def _send_bounds(self) -> None:
104102
)
105103
await self._bounds_sender.send(self._last_sent_bounds)
106104

107-
async def _run_forever(self) -> None:
108-
"""Run the status tracker forever."""
109-
while True:
110-
try:
111-
await self._run()
112-
except Exception: # pylint: disable=broad-except
113-
_logger.exception(
114-
"Restarting after exception in EVChargerSystemBoundsTracker.run()"
115-
)
116-
await asyncio.sleep(1.0)
117-
118105
async def _run(self) -> None:
119106
"""Run the system bounds tracker."""
120107
api_client = connection_manager.get().api_client

src/frequenz/sdk/timeseries/pv_pool/_system_bounds_tracker.py

+2-15
Original file line numberDiff line numberDiff line change
@@ -4,20 +4,18 @@
44
"""System bounds tracker for PV inverters."""
55

66
import asyncio
7-
import logging
87
from collections import abc
98

109
from frequenz.channels import Receiver, Sender, merge, select, selected_from
1110
from frequenz.client.microgrid import InverterData
1211

12+
from ..._internal._asyncio import run_forever
1313
from ...actor import BackgroundService
1414
from ...microgrid import connection_manager
1515
from ...microgrid._power_distributing._component_status import ComponentPoolStatus
1616
from .._base_types import Bounds, SystemBounds
1717
from .._quantities import Power
1818

19-
_logger = logging.getLogger(__name__)
20-
2119

2220
class PVSystemBoundsTracker(BackgroundService):
2321
"""Track the system bounds for PV inverters.
@@ -54,7 +52,7 @@ def __init__(
5452

5553
def start(self) -> None:
5654
"""Start the PV inverter system bounds tracker."""
57-
self._tasks.add(asyncio.create_task(self._run_forever()))
55+
self._tasks.add(asyncio.create_task(run_forever(self._run)))
5856

5957
async def _send_bounds(self) -> None:
6058
"""Calculate and send the aggregate system bounds if they have changed."""
@@ -103,17 +101,6 @@ async def _send_bounds(self) -> None:
103101
)
104102
await self._bounds_sender.send(self._last_sent_bounds)
105103

106-
async def _run_forever(self) -> None:
107-
"""Run the system bounds tracker."""
108-
while True:
109-
try:
110-
await self._run()
111-
except Exception: # pylint: disable=broad-except
112-
_logger.exception(
113-
"Restarting after exception in PVSystemBoundsTracker.run()"
114-
)
115-
await asyncio.sleep(1.0)
116-
117104
async def _run(self) -> None:
118105
"""Run the system bounds tracker."""
119106
api_client = connection_manager.get().api_client

0 commit comments

Comments
 (0)