Skip to content

Add a Parallel Task Schedular #967

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
// Mock optimizer to test the benchmarking framework.
{
"$schema": "https://raw.githubusercontent.com/microsoft/MLOS/main/mlos_bench/mlos_bench/config/schemas/schedulers/scheduler-schema.json",

"class": "mlos_bench.schedulers.ParallelScheduler",

"config": {
"trial_config_repeat_count": 3,
"max_trials": -1, // Limited only in the Optimizer logic/config.
"teardown": false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,10 @@
"$schema": "https://json-schema.org/draft/2020-12/schema",
"$id": "https://raw.githubusercontent.com/microsoft/MLOS/main/mlos_bench/mlos_bench/config/schemas/schedulers/scheduler-schema.json",
"title": "mlos_bench Scheduler config",

"$defs": {
"comment": {
"$comment": "This section contains reusable partial schema bits (or just split out for readability)"
},

"config_base_scheduler": {
"$comment": "config properties common to all Scheduler types.",
"type": "object",
Expand All @@ -29,18 +27,23 @@
"description": "Max. number of trials to run. Use -1 or 0 for unlimited.",
"type": "integer",
"minimum": -1,
"examples": [50, -1]
"examples": [
50,
-1
]
},
"trial_config_repeat_count": {
"description": "Number of times to repeat a config.",
"type": "integer",
"minimum": 1,
"examples": [3, 5]
"examples": [
3,
5
]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did these get automatically reformatted?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

}
}
}
},

"description": "config for the mlos_bench scheduler",
"$comment": "top level schema document rules",
"type": "object",
Expand All @@ -51,30 +54,30 @@
"$comment": "This is optional, but if provided, should match the name of this file.",
"pattern": "/schemas/schedulers/scheduler-schema.json$"
},

"description": {
"description": "Optional description of the config.",
"type": "string"
},

"class": {
"description": "The name of the scheduler class to use.",
"$comment": "required",
"enum": [
"mlos_bench.schedulers.SyncScheduler",
"mlos_bench.schedulers.sync_scheduler.SyncScheduler"
"mlos_bench.schedulers.sync_scheduler.SyncScheduler",
"mlos_bench.schedulers.ParallelScheduler",
"mlos_bench.schedulers.parallel_scheduler.ParallelScheduler"
]
},

"config": {
"description": "The scheduler-specific config.",
"$comment": "Stub for scheduler-specific config appended with condition statements below",
"type": "object",
"minProperties": 1
}
},
"required": ["class"],

"required": [
"class"
],
"oneOf": [
{
"$comment": "extensions to the 'config' object properties when synchronous scheduler is being used",
Expand All @@ -83,17 +86,25 @@
"class": {
"enum": [
"mlos_bench.schedulers.SyncScheduler",
"mlos_bench.schedulers.sync_scheduler.SyncScheduler"
"mlos_bench.schedulers.sync_scheduler.SyncScheduler",
"mlos_bench.schedulers.ParallelScheduler",
"mlos_bench.schedulers.parallel_scheduler.ParallelScheduler"
]
}
},
"required": ["class"]
"required": [
"class"
]
},
"then": {
"properties": {
"config": {
"type": "object",
"allOf": [{ "$ref": "#/$defs/config_base_scheduler" }],
"allOf": [
{
"$ref": "#/$defs/config_base_scheduler"
}
],
"$comment": "disallow other properties",
"unevaluatedProperties": false
}
Expand Down
8 changes: 8 additions & 0 deletions mlos_bench/mlos_bench/environments/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class Status(enum.Enum):
CANCELED = 5
FAILED = 6
TIMED_OUT = 7
SCHEDULED = 8

def is_good(self) -> bool:
"""Check if the status of the benchmark/environment is good."""
Expand All @@ -26,6 +27,7 @@ def is_good(self) -> bool:
Status.READY,
Status.RUNNING,
Status.SUCCEEDED,
Status.SCHEDULED,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why isn't PENDING good enough for that?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, you need to differentiate between pending and scheduled. Scheduled implies that you have assigned a worker, scheduled implies that it has been given a process, but hasn't necessarily started yet (where it would get switched to running)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems to me that status=PENDING AND trial_runner_id != NULL is the same as SCHEDULED, no?

}

def is_completed(self) -> bool:
Expand Down Expand Up @@ -74,3 +76,9 @@ def is_timed_out(self) -> bool:
TIMED_OUT.
"""
return self == Status.FAILED

def is_scheduled(self) -> bool:
"""Check if the status of the benchmark/environment Trial or Experiment is
SCHEDULED.
"""
return self == Status.SCHEDULED
2 changes: 2 additions & 0 deletions mlos_bench/mlos_bench/schedulers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
"""Interfaces and implementations of the optimization loop scheduling policies."""

from mlos_bench.schedulers.base_scheduler import Scheduler
from mlos_bench.schedulers.parallel_scheduler import ParallelScheduler
from mlos_bench.schedulers.sync_scheduler import SyncScheduler

__all__ = [
"Scheduler",
"SyncScheduler",
"ParallelScheduler",
]
162 changes: 162 additions & 0 deletions mlos_bench/mlos_bench/schedulers/parallel_scheduler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
#
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
#
"""A simple multi-threaded asynchronous optimization loop implementation."""

import asyncio
import logging
from collections.abc import Callable
from concurrent.futures import Future, ProcessPoolExecutor
from datetime import datetime
from typing import Any

from pytz import UTC

from mlos_bench.environments.status import Status
from mlos_bench.schedulers.base_scheduler import Scheduler
from mlos_bench.schedulers.trial_runner import TrialRunner
from mlos_bench.storage.base_storage import Storage
from mlos_bench.tunables.tunable_groups import TunableGroups

_LOG = logging.getLogger(__name__)


class ParallelScheduler(Scheduler):
"""A simple multi-process asynchronous optimization loop implementation."""

def __init__(self, *args: Any, **kwargs: Any) -> None:

super().__init__(*args, **kwargs)
self.pool = ProcessPoolExecutor(max_workers=len(self._trial_runners))

def start(self) -> None:
"""Start the optimization loop."""
super().start()

is_warm_up: bool = self.optimizer.supports_preload
if not is_warm_up:
_LOG.warning("Skip pending trials and warm-up: %s", self.optimizer)

not_done: bool = True
while not_done:
_LOG.info("Optimization loop: Last trial ID: %d", self._last_trial_id)
self._run_callbacks()
self._run_schedule(is_warm_up)
not_done = self._schedule_new_optimizer_suggestions()
is_warm_up = False

def teardown(self) -> None:
"""Stop the optimization loop."""
# Shutdown the thread pool and wait for all tasks to finish
self.pool.shutdown(wait=True)
self._run_callbacks()
super().teardown()

def schedule_trial(self, tunables: TunableGroups) -> None:
"""Assign a trial to a trial runner."""
assert self.experiment is not None

super().schedule_trial(tunables)

pending_trials: list[Storage.Trial] = list(
self.experiment.pending_trials(datetime.now(UTC), running=False)
)

idle_runner_ids = [
id for id, runner in self.trial_runners.items() if not runner.is_running
]

# Assign pending trials to idle runners
for trial, runner_id in zip(pending_trials, idle_runner_ids):
trial.update(status=Status.SCHEDULED, timestamp=datetime.now(UTC))
trial.set_trial_runner(runner_id)

def _run_schedule(self, running: bool = False) -> None:
"""
Scheduler part of the loop.

Check for pending trials in the queue and run them.
"""
assert self.experiment is not None

scheduled_trials: list[Storage.Trial] = list(
self.experiment.filter_trials_by_status(datetime.now(UTC), [Status.SCHEDULED])
)

for trial in scheduled_trials:
trial.update(status=Status.READY, timestamp=datetime.now(UTC))
self.deferred_run_trial(trial)

def _on_trial_finished_closure(
self, trial: Storage.Trial
) -> Callable[["ParallelScheduler", Future], None]:
"""
Generate a closure to handle the callback for when a trial is finished.

Parameters
----------
trial : Storage.Trial
The trial to finish.
"""

def _on_trial_finished(self: ParallelScheduler, result: Future) -> None:
"""
Callback to be called when a trial is finished.

This must always be called from the main thread. Exceptions can also be
handled here
"""
try:
(status, timestamp, results, telemetry) = result.result()
self.get_trial_runner(trial).finalize_run_trial(
trial, status, timestamp, results, telemetry
)
except Exception as exception: # pylint: disable=broad-except
_LOG.error("Trial failed: %s", exception)

return _on_trial_finished

@staticmethod
def _run_callbacks() -> None:
"""Run all pending callbacks in the main thread."""
loop = asyncio.get_event_loop()
pending = asyncio.all_tasks(loop)
loop.run_until_complete(asyncio.gather(*pending))

def run_trial(self, trial: Storage.Trial) -> None:
"""
Parallel Scheduler does not support run_trial. Use async_run_trial instead.

Parameters
----------
trial : Storage.Trial
The trial to run.

Raises
------
NotImplementedError
Error to indicate that this method is not supported in ParallelScheduler.
"""
raise NotImplementedError(
"ParallelScheduler does not support run_trial. Use async_run_trial instead."
)

def deferred_run_trial(self, trial: Storage.Trial) -> None:
"""
Set up and run a single trial asynchronously.

Returns a callback to save the results in the storage.
"""
super().run_trial(trial)
# In the sync scheduler we run each trial on its own TrialRunner in sequence.
trial_runner = self.get_trial_runner(trial)
trial_runner.prepare_run_trial(trial, self.global_config)

task = self.pool.submit(TrialRunner.execute_run_trial, trial_runner.environment)
# This is required to ensure that the callback happens on the main thread
asyncio.get_event_loop().call_soon_threadsafe(
self._on_trial_finished_closure(trial), self, task
)

_LOG.info("QUEUE: Finished trial: %s on %s", trial, trial_runner)
Loading
Loading