-
Notifications
You must be signed in to change notification settings - Fork 73
Add a Parallel Task Schedular #967
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
for more information, see https://pre-commit.ci
@jsfreischuetz leaving a few of my notes here from our chat.
Did I miss anything? |
My local diff notes while we were talking in case it helps: diff --git a/mlos_bench/mlos_bench/schedulers/base_scheduler.py b/mlos_bench/mlos_bench/schedulers/base_scheduler.py
index eaa5527c6d..8bd4a7455f 100644
--- a/mlos_bench/mlos_bench/schedulers/base_scheduler.py
+++ b/mlos_bench/mlos_bench/schedulers/base_scheduler.py
@@ -423,6 +423,7 @@ class Scheduler(ContextManager, metaclass=ABCMeta):
TrialRunner
"""
if trial.trial_runner_id is None:
+ # TODO: verify: *should* not be the common path
self.assign_trial_runners([trial])
assert trial.trial_runner_id is not None
trial_runner = self._trial_runners.get(trial.trial_runner_id)
@@ -444,7 +445,8 @@ class Scheduler(ContextManager, metaclass=ABCMeta):
# Make sure that any pending trials have a TrialRunner assigned.
pending_trials = list(self.experiment.pending_trials(datetime.now(UTC), running=running))
self.assign_trial_runners(pending_trials)
- for trial in pending_trials:
+ # TODO: Change this part for ParallelScheduler?:
+ for trial in self.runnable_trials(): # i.e., only pick up ones that have a runner assigned
self.run_trial(trial)
def not_done(self) -> bool:
diff --git a/mlos_bench/mlos_bench/storage/sql/experiment.py b/mlos_bench/mlos_bench/storage/sql/experiment.py
index 62daa0232c..25de6c7ef5 100644
--- a/mlos_bench/mlos_bench/storage/sql/experiment.py
+++ b/mlos_bench/mlos_bench/storage/sql/experiment.py
@@ -238,6 +238,7 @@ class Experiment(Storage.Experiment):
def pending_trials(self, timestamp: datetime, *, running: bool) -> Iterator[Storage.Trial]:
timestamp = utcify_timestamp(timestamp, origin="local")
_LOG.info("Retrieve pending trials for: %s @ %s", self._experiment_id, timestamp)
+ # FIXME? Separate call?
if running:
pending_status = [Status.PENDING.name, Status.READY.name, Status.RUNNING.name]
else:
@@ -251,6 +252,7 @@ class Experiment(Storage.Experiment):
| (self._schema.trial.c.ts_start <= timestamp)
),
self._schema.trial.c.ts_end.is_(None),
+ # Is this the filter problem?
self._schema.trial.c.status.in_(pending_status),
)
)
@@ -278,6 +280,8 @@ class Experiment(Storage.Experiment):
config_id=trial.config_id,
opt_targets=self._opt_targets,
config=config,
+ # FIXME: Bug:
+ trial_runner_id=trial.trial_runner_id,
)
def _get_config_id(self, conn: Connection, tunables: TunableGroups) -> int:
|
# Pull Request ## Title Bug fix: restore trial_runner_id on reload from Storage ______________________________________________________________________ ## Description @jsfreischuetz noticed that `pending_trials` was returning `Trial` objects without a `trial_runner_id` even though they were previously set. This change fixes that and closes #968. - Requires explicit `trial_runner_id` on `Trial` object instantiation - Adds it in places where it was missing. ______________________________________________________________________ ## Type of Change - 🛠️ Bug fix - 🧪 Tests ______________________________________________________________________ ## Testing New and existing tests. ______________________________________________________________________ ## Additional Notes (optional) Found in the course of work on #967 ______________________________________________________________________
for more information, see https://pre-commit.ci
…to parallel_schedular
I have made changes related to the refactor of separating the logic of assigning tasks to workers, and actually scheduling the workers. Using processes as the main unit of parallelism has significant problems because of the management of certain objects deep within the codebase, which are unpickleable. I have attempted a few solutions to recreate these objects in every function, referencing these objects to remove the replication of these objects to new processes. However, this causes significant issues with ensuring the schema is created, since we can no longer guarantee this in the constructors. It may be possible, but it would require a significant refactor of the storage engine. Because this implementation uses a thread pool rather than a process pool, I have used future.concurrent, as the documentation for multiprocessing suggests that their interface for thread pools doesn't make sense considering it was designed for process pools. (https://docs.python.org/3/library/multiprocessing.html) |
"examples": [ | ||
3, | ||
5 | ||
] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did these get automatically reformatted?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes
@@ -26,6 +27,7 @@ def is_good(self) -> bool: | |||
Status.READY, | |||
Status.RUNNING, | |||
Status.SUCCEEDED, | |||
Status.SCHEDULED, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why isn't PENDING good enough for that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, you need to differentiate between pending and scheduled. Scheduled implies that you have assigned a worker, scheduled implies that it has been given a process, but hasn't necessarily started yet (where it would get switched to running)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems to me that status=PENDING AND trial_runner_id != NULL
is the same as SCHEDULED
, no?
# Copyright (c) Microsoft Corporation. | ||
# Licensed under the MIT License. | ||
# | ||
"""A simple single-threaded synchronous optimization loop implementation.""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
update docstring
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, please add some more complex docstrings that explain what this is module can help with and how it does that that include some executable usage details of the config insantiation at least. There should be some other examples elsewhere in the source tree that you can crib off of or I can help cook up an example in abit.
timestamp = utcify_timestamp(timestamp, origin="local") | ||
_LOG.info("Retrieve pending trials for: %s @ %s", self._experiment_id, timestamp) | ||
if running: | ||
pending_status = [Status.PENDING.name, Status.READY.name, Status.RUNNING.name] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I don't love about the new API is it requires us to list all of these now. Maybe we can add an alias for the "runnable" statuses in the Status
module?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was already done in the previous version. I copied these lines
@@ -281,6 +279,13 @@ def pending_trials(self, timestamp: datetime, *, running: bool) -> Iterator[Stor | |||
config=config, | |||
) | |||
|
|||
def pending_trials(self, timestamp: datetime, *, running: bool) -> Iterator[Storage.Trial]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def pending_trials(self, timestamp: datetime, *, running: bool) -> Iterator[Storage.Trial]: | |
def pending_trials(self, timestamp: datetime, *, running: bool) -> Iterator[Storage.Trial]: |
I'm OK to get rid of this one and replace the others with filter_trials_by_status
and feed it Status.RUNNABLE_STATUSES
which returns that list as a literal or something.
Maybe separate that out to a different PR for cleanliness.
self, | ||
timestamp: datetime, | ||
status: list[Status], | ||
) -> Iterator["Storage.Trial"]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could potentially also extend this to trial_runner_id >= 0
type of optional filter as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Possible, but probably just a different function?
for more information, see https://pre-commit.ci
Co-authored-by: Brian Kroth <bpkroth@users.noreply.github.com>
Co-authored-by: Brian Kroth <bpkroth@users.noreply.github.com>
for more information, see https://pre-commit.ci
for more information, see https://pre-commit.ci
…to parallel_schedular
This is part of an attempt to try and see if can work around issues with `multiprocessing.Pool` needing to pickle certain objects when forking. For instance, if the Environment is using an SshServer, we need to start an EventLoopContext in the background to handle the SSH connections and threads are not picklable. Nor are file handles, DB connections, etc., so there may be other things we also need to adjust to make this work. See Also microsoft#967
Moving this effort to #971 |
# Pull Request ## Title Delay entering `TrialRunner` context until `run_trial`. ______________________________________________________________________ ## Description This is part of an attempt to try and see if can work around issues with `multiprocessing.Pool` needing to pickle certain objects when forking. For instance, if the Environment is using an SshServer, we need to start an EventLoopContext in the background to handle the SSH connections and threads are not picklable. Nor are file handles, DB connections, etc., so there may be other things we also need to adjust to make this work. See Also #967 ______________________________________________________________________ ## Type of Change - 🛠️ Bug fix - 🔄 Refactor ______________________________________________________________________ ## Testing - Light so far (still in draft mode) - Just basic existing CI tests (seems to not break anything) ______________________________________________________________________ ## Additional Notes (optional) I think this is incomplete. To support forking inside the Scheduler and *then* entering the context of the given TrialRunner, we may also need to do something about the Scheduler's Storage object. That was true, those PRs are now forthcoming. See Also #971 For now this is a draft PR to allow @jsfreischuetz and I to play with alternative organizations of #967. ______________________________________________________________________
Pull Request
Add a parallel task scheduler and associated test cases.
Description
This PR adds a parallel task scheduler, associated schema additions, and test cases.
This requires minimal changes to the trial runner to ensure the results are properly registered with the storage backend. Importantly, tasks need to be registered in the main thread (https://stackoverflow.com/questions/6297404/multi-threaded-use-of-sqlalchemy), requiring a callback for asynchronous tasks. This does not change how anything needs to be done for synchronous schedulers.
Closes #380
Type of Change
Testing
I have added a test case to ensure that the scheduler properly schedules all of the tasks.
This test case is similar to how the synchronous scheduler is made.
Additional Notes (optional)
mlos_bench/mlos_bench/config/schemas/schedulers/scheduler-schema.json was formatted when I opened it in the dev container. I'm guessing there is no check for the format of the JSONC file. However, black will format it if given the opportunity. It is possible, but potentially somewhat difficult, to revert some of these changes without having black ignore the file.