-
Notifications
You must be signed in to change notification settings - Fork 73
Add a Parallel Task Schedular #967
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
8b6e172
1373ff1
5ff16c3
b45c71e
a326cdf
8d34686
8c29920
4bbb534
123ecd3
3357fcf
7a65629
2c9ecb8
40c7f83
1f9e466
ee9e7e0
d619999
3dfb7de
bb239e8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
// Mock optimizer to test the benchmarking framework. | ||
{ | ||
"$schema": "https://raw.githubusercontent.com/microsoft/MLOS/main/mlos_bench/mlos_bench/config/schemas/schedulers/scheduler-schema.json", | ||
|
||
"class": "mlos_bench.schedulers.ParallelScheduler", | ||
|
||
"config": { | ||
"trial_config_repeat_count": 3, | ||
"max_trials": -1, // Limited only in the Optimizer logic/config. | ||
"teardown": false | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,6 +18,7 @@ class Status(enum.Enum): | |
CANCELED = 5 | ||
FAILED = 6 | ||
TIMED_OUT = 7 | ||
SCHEDULED = 8 | ||
|
||
def is_good(self) -> bool: | ||
"""Check if the status of the benchmark/environment is good.""" | ||
|
@@ -26,6 +27,7 @@ def is_good(self) -> bool: | |
Status.READY, | ||
Status.RUNNING, | ||
Status.SUCCEEDED, | ||
Status.SCHEDULED, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why isn't PENDING good enough for that? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, you need to differentiate between pending and scheduled. Scheduled implies that you have assigned a worker, scheduled implies that it has been given a process, but hasn't necessarily started yet (where it would get switched to running) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems to me that |
||
} | ||
|
||
def is_completed(self) -> bool: | ||
|
@@ -74,3 +76,9 @@ def is_timed_out(self) -> bool: | |
TIMED_OUT. | ||
""" | ||
return self == Status.FAILED | ||
|
||
def is_scheduled(self) -> bool: | ||
"""Check if the status of the benchmark/environment Trial or Experiment is | ||
SCHEDULED. | ||
""" | ||
return self == Status.SCHEDULED |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,162 @@ | ||
# | ||
# Copyright (c) Microsoft Corporation. | ||
# Licensed under the MIT License. | ||
# | ||
"""A simple multi-threaded asynchronous optimization loop implementation.""" | ||
|
||
import asyncio | ||
import logging | ||
from collections.abc import Callable | ||
from concurrent.futures import Future, ProcessPoolExecutor | ||
from datetime import datetime | ||
from typing import Any | ||
|
||
from pytz import UTC | ||
|
||
from mlos_bench.environments.status import Status | ||
from mlos_bench.schedulers.base_scheduler import Scheduler | ||
from mlos_bench.schedulers.trial_runner import TrialRunner | ||
from mlos_bench.storage.base_storage import Storage | ||
from mlos_bench.tunables.tunable_groups import TunableGroups | ||
|
||
_LOG = logging.getLogger(__name__) | ||
|
||
|
||
class ParallelScheduler(Scheduler): | ||
"""A simple multi-process asynchronous optimization loop implementation.""" | ||
|
||
def __init__(self, *args: Any, **kwargs: Any) -> None: | ||
|
||
super().__init__(*args, **kwargs) | ||
self.pool = ProcessPoolExecutor(max_workers=len(self._trial_runners)) | ||
|
||
def start(self) -> None: | ||
"""Start the optimization loop.""" | ||
super().start() | ||
|
||
is_warm_up: bool = self.optimizer.supports_preload | ||
if not is_warm_up: | ||
_LOG.warning("Skip pending trials and warm-up: %s", self.optimizer) | ||
|
||
not_done: bool = True | ||
while not_done: | ||
_LOG.info("Optimization loop: Last trial ID: %d", self._last_trial_id) | ||
self._run_callbacks() | ||
self._run_schedule(is_warm_up) | ||
not_done = self._schedule_new_optimizer_suggestions() | ||
is_warm_up = False | ||
|
||
def teardown(self) -> None: | ||
"""Stop the optimization loop.""" | ||
# Shutdown the thread pool and wait for all tasks to finish | ||
self.pool.shutdown(wait=True) | ||
self._run_callbacks() | ||
super().teardown() | ||
|
||
def schedule_trial(self, tunables: TunableGroups) -> None: | ||
"""Assign a trial to a trial runner.""" | ||
assert self.experiment is not None | ||
|
||
super().schedule_trial(tunables) | ||
|
||
pending_trials: list[Storage.Trial] = list( | ||
self.experiment.pending_trials(datetime.now(UTC), running=False) | ||
) | ||
|
||
idle_runner_ids = [ | ||
id for id, runner in self.trial_runners.items() if not runner.is_running | ||
] | ||
|
||
# Assign pending trials to idle runners | ||
for trial, runner_id in zip(pending_trials, idle_runner_ids): | ||
trial.update(status=Status.SCHEDULED, timestamp=datetime.now(UTC)) | ||
trial.set_trial_runner(runner_id) | ||
|
||
def _run_schedule(self, running: bool = False) -> None: | ||
""" | ||
Scheduler part of the loop. | ||
|
||
Check for pending trials in the queue and run them. | ||
""" | ||
assert self.experiment is not None | ||
|
||
scheduled_trials: list[Storage.Trial] = list( | ||
self.experiment.filter_trials_by_status(datetime.now(UTC), [Status.SCHEDULED]) | ||
) | ||
|
||
for trial in scheduled_trials: | ||
trial.update(status=Status.READY, timestamp=datetime.now(UTC)) | ||
self.deferred_run_trial(trial) | ||
|
||
def _on_trial_finished_closure( | ||
self, trial: Storage.Trial | ||
) -> Callable[["ParallelScheduler", Future], None]: | ||
""" | ||
Generate a closure to handle the callback for when a trial is finished. | ||
|
||
Parameters | ||
---------- | ||
trial : Storage.Trial | ||
The trial to finish. | ||
""" | ||
|
||
def _on_trial_finished(self: ParallelScheduler, result: Future) -> None: | ||
""" | ||
Callback to be called when a trial is finished. | ||
|
||
This must always be called from the main thread. Exceptions can also be | ||
handled here | ||
""" | ||
try: | ||
(status, timestamp, results, telemetry) = result.result() | ||
self.get_trial_runner(trial).finalize_run_trial( | ||
trial, status, timestamp, results, telemetry | ||
) | ||
except Exception as exception: # pylint: disable=broad-except | ||
_LOG.error("Trial failed: %s", exception) | ||
|
||
return _on_trial_finished | ||
|
||
@staticmethod | ||
def _run_callbacks() -> None: | ||
"""Run all pending callbacks in the main thread.""" | ||
loop = asyncio.get_event_loop() | ||
pending = asyncio.all_tasks(loop) | ||
loop.run_until_complete(asyncio.gather(*pending)) | ||
|
||
def run_trial(self, trial: Storage.Trial) -> None: | ||
""" | ||
Parallel Scheduler does not support run_trial. Use async_run_trial instead. | ||
|
||
Parameters | ||
---------- | ||
trial : Storage.Trial | ||
The trial to run. | ||
|
||
Raises | ||
------ | ||
NotImplementedError | ||
Error to indicate that this method is not supported in ParallelScheduler. | ||
""" | ||
raise NotImplementedError( | ||
"ParallelScheduler does not support run_trial. Use async_run_trial instead." | ||
) | ||
|
||
def deferred_run_trial(self, trial: Storage.Trial) -> None: | ||
""" | ||
Set up and run a single trial asynchronously. | ||
|
||
Returns a callback to save the results in the storage. | ||
""" | ||
super().run_trial(trial) | ||
# In the sync scheduler we run each trial on its own TrialRunner in sequence. | ||
trial_runner = self.get_trial_runner(trial) | ||
trial_runner.prepare_run_trial(trial, self.global_config) | ||
|
||
task = self.pool.submit(TrialRunner.execute_run_trial, trial_runner.environment) | ||
# This is required to ensure that the callback happens on the main thread | ||
asyncio.get_event_loop().call_soon_threadsafe( | ||
self._on_trial_finished_closure(trial), self, task | ||
) | ||
|
||
_LOG.info("QUEUE: Finished trial: %s on %s", trial, trial_runner) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did these get automatically reformatted?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes