Skip to content

WIP: Parallel trial scheduler stub using multiprocessing.Pool #939

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
243 changes: 243 additions & 0 deletions mlos_bench/mlos_bench/schedulers/forking_worker_pool_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
#
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
#
"""Simple parallel trial scheduler and optimization loop implementation stub code."""
import json
import random
from multiprocessing.pool import AsyncResult, Pool
from time import sleep
from typing import Any


class TrialRunner: # pylint: disable=too-few-public-methods
"""Stub TrialRunner."""

def __init__(self, runner_id: int):
self.runner_id = runner_id

def run_trial(self, iteration: int, suggestion: int) -> dict[str, int | float]:
"""Stub run_trial."""
# In the real system we'd run the Trial on the Environment and whatnot.
# Most of that *should* remain unchanged, but we'll need some testing to verify.
sleep_time = random.uniform(0, 1) + 0.01
print(
(
f"Trial {iteration} is running on {self.runner_id} "
f"with suggestion {suggestion} with sleep time {sleep_time}"
),
flush=True,
)
# Wait a moment to simulate the time it takes to run the trial.
sleep(sleep_time)
print(f"Trial {iteration} on {self.runner_id} is done.", flush=True)
return {
"runner_id": self.runner_id,
"iteration": iteration,
"suggestion": suggestion,
"sleep_time": sleep_time,
}


class ParallelTrialScheduler:
"""Stub ParallelTrialScheduler."""

def __init__(self, num_trial_runners: int, max_iterations: int):
self._max_iterations = max_iterations
self._trial_runners = [TrialRunner(i) for i in range(num_trial_runners)]

# Track the current status of a TrialRunner.
# In a real system we might need to track which TrialRunner is busy in
# the backend Storage in case of failures of the main process or else
# just treat their state as idempotent such that we could resume and
# check on their status at any time.
# That would also require a deterministic scheduling algorithm so that
# we restart the same Trial on the same TrialRunner rather than picking
# a new one.
self._trial_runners_status: dict[int, AsyncResult | None] = {
runner.runner_id: None for runner in self._trial_runners
}

# Simple trial schedule: maps a trial id to a TrialRunner.
# In the real system we'd store everything in the Storage backend.
self._trial_schedule: dict[int, tuple[int, int]] = {}
self._current_runner_id = 0

# Store all the results in a dictionary.
# In the real system we'd submit them to the Storage and the Optimizer.
self._results: dict[int, dict[str, int | float]] = {}

def get_last_trial_id(self) -> int:
"""Very simple method of tracking the last trial id assigned."""
return max(list(self._results.keys()) + list(self._trial_schedule.keys()), default=-1)

def is_done_scheduling(self) -> bool:
"""Check if the scheduler loop is done."""
# This is a simple stopping condition to check and see if we've
# scheduled enough trials.
return self.get_last_trial_id() + 1 >= self._max_iterations

def is_done_running(self) -> bool:
"""Check if the scheduler run loop is done."""
# This is a simple stopping condition to check and see if we've
# run all the trials.
return len(self._results) >= self._max_iterations

def assign_trial_runner(self, trial_id: int, suggestion: int) -> None:
"""Stub assign_trial_runner."""
# In a real system we'd have a more sophisticated way of assigning
# trials to TrialRunners.
# Here we just round-robin the suggestions to the available TrialRunners.
next_runner_id = self._current_runner_id
self._current_runner_id = (self._current_runner_id + 1) % len(self._trial_runners)
self._trial_schedule[trial_id] = (next_runner_id, suggestion)
print(
f"Assigned trial {trial_id} to runner {next_runner_id} with suggestion {suggestion}",
flush=True,
)

def schedule_new_trials(self, num_new_trials: int = 1) -> None:
"""Stub schedule_new_trial(s)."""

# Accept more than one new suggestion at a time to simulate a real
# system that might be doing multi-objective pareto frontier
# optimization.

while num_new_trials > 0 and not self.is_done_scheduling():
# Generate one (or more) new suggestion(s).
# In the real system we'd get these from the Optimizer.
suggestion = random.randint(0, 100)

# Note: it might be also be the case that we want to repeat that
# suggestion multiple times on different TrialRunners.

# Schedule it to a TrialRunner.
next_trial_id = self.get_last_trial_id() + 1
self.assign_trial_runner(next_trial_id, suggestion)
num_new_trials -= 1

def _run_trial_failed_callback(self, obj: Any) -> None: # pylint: disable=no-self-use
"""Stub callback to run when run_trial fails in pool process."""
raise RuntimeError(f"Trial failed: {obj}")

def _run_trial_finished_callback(self, result: dict[str, int | float]) -> None:
"""Stub callback to run when run_trial finishes in pool process."""

# Store the result of the trial.
trial_id = result["iteration"]
assert isinstance(trial_id, int)
self._results[trial_id] = result

# Remove it from the schedule.
self._trial_schedule.pop(trial_id)

# And mark the TrialRunner as available.
runner_id = result["runner_id"]
assert isinstance(runner_id, int)
trial_runner_status = self._trial_runners_status.get(runner_id)
assert isinstance(trial_runner_status, AsyncResult)
# assert trial_runner_status.ready()
self._trial_runners_status[runner_id] = None

print(f"Trial {trial_id} on {runner_id} callback is done.", flush=True)

# Schedule more trials.
# Note: this would schedule additional trials everytime one completes.
# An alternative option would be to batch them up and schedule several
# after a few complete.
# The tradeoffs being model retraining time vs. waiting on straggler
# workers vs. optimizer new suggestion accuracy.
# Moreover, we need to handle the edge case and include scheduling in
# the loop anyways, so it's probably better to just leave it all there.
# self.schedule_new_trials(num_new_trials=1)

def get_idle_trial_runners_count(self) -> int:
"""Stub get_idle_trial_runners_count."""
return len([x for x in self._trial_runners_status.values() if x is None])

def start_optimization_loop(self) -> None:
"""Stub start_optimization_loop."""

# Create a pool of processes to run the trials in parallel.
# Recreate the child process for each trial to avoid memory leaks or other issues.
with Pool(processes=len(self._trial_runners), maxtasksperchild=1) as pool:
while not self.is_done_scheduling() or not self.is_done_running():
# Run any existing trials that aren't currently running.
# Do this first in case we're resuming from a previous run
# (e.g., the real system will have remembered which Trials were
# in progress by reloading them from the Storage backend).
#
# In the real system this portion might be replace something
# like _run_schedule(), except for the callbacks method of state
# management and async scheduling of new trials portion (see
# below).
# If we stick with the original design that separates
# _run_schedule() and _schedule_new_optimizer_suggestions() into
# distinct phases we might need to adjust this a bit.

# Avoid modifying the dictionary while iterating over it.
# (shouldn't be an issue if we interact with the Storage backend
# since experiment.pending_trials() query results will already
# be a copy)
trial_schedule = self._trial_schedule.copy()
for trial_id, (runner_id, suggestion) in trial_schedule.items():
# Skip trials that are already running on their assigned TrialRunner.
if self._trial_runners_status[runner_id] is not None:
continue
# Else, start the Trial on the given TrialRunner in the background.
self._trial_runners_status[runner_id] = pool.apply_async(
TrialRunner(runner_id).run_trial,
args=(trial_id, suggestion),
# Use a callback to manage the state management.
# Note: this *may* be a problem if the callback can
# timeout (e.g., Storage backend).
# The alternative would be to wait() on each
# AsyncResult, but that can lead to head of line
# blocking and other issues.
callback=self._run_trial_finished_callback,
error_callback=self._run_trial_failed_callback,
)
# Now all the available TrialRunners that had work to do should be running.

# Wait a moment to check if we have any idle TrialRunners.
# This also allows us a chance to collect multiple results from
# the pool before suggesting new ones.
while len(self._trial_schedule) > 0 and self.get_idle_trial_runners_count() == 0:
# Make the polling interval here configurable.
sleep(0.5)

# Schedule more trials if we can.
# This is the async scheduling of new trials portion.
# As mentioned above, it's somewhat different than the
# _schedule_new_optimizer_suggestions() method in the current
# design.
self.schedule_new_trials(num_new_trials=self.get_idle_trial_runners_count() or 1)

# Should be all done starting new trials.
print("Closing the pool.", flush=True)
pool.close()

print("Waiting for all trials to finish.", flush=True)
# FIXME: This sometimes hangs. Not sure why yet.
pool.join()

print("Optimization loop is done.", flush=True)
print("results: " + json.dumps(self._results, indent=2))
print("trial_schedule: " + json.dumps(self._trial_schedule, indent=2))
print("trial_runner_status: " + json.dumps(self._trial_runners_status, indent=2))
assert len(self._results) == self._max_iterations, "Unexpected number of trials run."
assert not self._trial_schedule, "Some scheduled trials were not started."
assert all(
x is None for x in self._trial_runners_status.values()
), "Some TrialRunners are still running."


def main():
"""Main function."""
print("Starting ParallelTrialScheduler.", flush=True)
scheduler = ParallelTrialScheduler(num_trial_runners=4, max_iterations=15)
scheduler.start_optimization_loop()


if __name__ == "__main__":
main()
Loading