Skip to content

Add a Parallel Task Schedular #967

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 18 commits into from

Conversation

jsfreischuetz
Copy link
Collaborator

@jsfreischuetz jsfreischuetz commented Apr 23, 2025

Pull Request

Add a parallel task scheduler and associated test cases.


Description

This PR adds a parallel task scheduler, associated schema additions, and test cases.
This requires minimal changes to the trial runner to ensure the results are properly registered with the storage backend. Importantly, tasks need to be registered in the main thread (https://stackoverflow.com/questions/6297404/multi-threaded-use-of-sqlalchemy), requiring a callback for asynchronous tasks. This does not change how anything needs to be done for synchronous schedulers.

Closes #380


Type of Change

  • ✨ New feature

Testing

I have added a test case to ensure that the scheduler properly schedules all of the tasks.
This test case is similar to how the synchronous scheduler is made.


Additional Notes (optional)

mlos_bench/mlos_bench/config/schemas/schedulers/scheduler-schema.json was formatted when I opened it in the dev container. I'm guessing there is no check for the format of the JSONC file. However, black will format it if given the opportunity. It is possible, but potentially somewhat difficult, to revert some of these changes without having black ignore the file.


@bpkroth
Copy link
Contributor

bpkroth commented Apr 23, 2025

@jsfreischuetz leaving a few of my notes here from our chat.

  1. Preference for multiprocessingpool over threads, similar to WIP: Parallel trial scheduler stub using multiprocessing.Pool #939

    In the future we will likley also add async status polling as a separate thread within each TrialRunner (see: mlos_bench: implement optional early abort logic for time based trials #542 (comment))

  2. We think that we should be able to have the individual (forked) TrialRunner instances connect to the same Storage (e.g., local sqlite.db via sqlalchemy), but it may have some backend nuances (e.g., does sqlite require file locking, or does it manage that for us? do we need to make sure that the engine is disconnected prior to forking so it can reconnect individually after?, etc.)
  3. There seems to be a bug where the trial_runner_id is not restored when pending_trials are fetched from the Storage backend. We should fix that independently and add a test for it (new PR).
  4. We discussed why we want to separate assign_trial_runners and run_schedule (which just runs the Trials which are ready to be run and assigned a runner).
  5. However, you pointed out that there are advantages to allowing some Trials to be late schedueld (e.g., in the context of TUNA, when the budget is 1 we could run them on the very first TrialRunner that becomes available). For that, we may want to extend support for a special case of e.g., -1 to indicate any TrialRunner and -2 for any TrialRunner that hasn't already run this config (and assign the true value late) or else adjust what runnable_trials should mean (see 5). The point of this is to reduce the impact of stragglers in the overall runtime (though we could alleviate that later with support for early abort - mlos_bench: implement optional early abort logic for time based trials #542).
  6. We discussed fixing _run_schedule in the base Scheduler class to use an storage.runnable_trials() call after running assign_trial_runners to further filter for Trials which have had a TrialRunner assigned (or can accept any TrialRunner late as decribed above).

Did I miss anything?

@bpkroth
Copy link
Contributor

bpkroth commented Apr 23, 2025

@jsfreischuetz leaving a few of my notes here from our chat.

  1. Preference for multiprocessingpool over threads, similar to WIP: Parallel trial scheduler stub using multiprocessing.Pool #939

    In the future we will likley also add async status polling as a separate thread within each TrialRunner (see: mlos_bench: implement optional early abort logic for time based trials #542 (comment))

  2. We think that we should be able to have the individual (forked) TrialRunner instances connect to the same Storage (e.g., local sqlite.db via sqlalchemy), but it may have some backend nuances (e.g., does sqlite require file locking, or does it manage that for us? do we need to make sure that the engine is disconnected prior to forking so it can reconnect individually after?, etc.)
  3. There seems to be a bug where the trial_runner_id is not restored when pending_trials are fetched from the Storage backend. We should fix that independently and add a test for it (new PR).
  4. We discussed why we want to separate assign_trial_runners and run_schedule (which just runs the Trials which are ready to be run and assigned a runner).
  5. However, you pointed out that there are advantages to allowing some Trials to be late schedueld (e.g., in the context of TUNA, when the budget is 1 we could run them on the very first TrialRunner that becomes available). For that, we may want to extend support for a special case of e.g., -1 to indicate any TrialRunner and -2 for any TrialRunner that hasn't already run this config (and assign the true value late) or else adjust what runnable_trials should mean (see 5). The point of this is to reduce the impact of stragglers in the overall runtime (though we could alleviate that later with support for early abort - mlos_bench: implement optional early abort logic for time based trials #542).
  6. We discussed fixing _run_schedule in the base Scheduler class to use an storage.runnable_trials() call after running assign_trial_runners to further filter for Trials which have had a TrialRunner assigned (or can accept any TrialRunner late as decribed above).

Did I miss anything?

My local diff notes while we were talking in case it helps:

diff --git a/mlos_bench/mlos_bench/schedulers/base_scheduler.py b/mlos_bench/mlos_bench/schedulers/base_scheduler.py
index eaa5527c6d..8bd4a7455f 100644
--- a/mlos_bench/mlos_bench/schedulers/base_scheduler.py
+++ b/mlos_bench/mlos_bench/schedulers/base_scheduler.py
@@ -423,6 +423,7 @@ class Scheduler(ContextManager, metaclass=ABCMeta):
         TrialRunner
         """
         if trial.trial_runner_id is None:
+            # TODO: verify: *should* not be the common path
             self.assign_trial_runners([trial])
         assert trial.trial_runner_id is not None
         trial_runner = self._trial_runners.get(trial.trial_runner_id)
@@ -444,7 +445,8 @@ class Scheduler(ContextManager, metaclass=ABCMeta):
         # Make sure that any pending trials have a TrialRunner assigned.
         pending_trials = list(self.experiment.pending_trials(datetime.now(UTC), running=running))
         self.assign_trial_runners(pending_trials)
-        for trial in pending_trials:
+        # TODO: Change this part for ParallelScheduler?:
+        for trial in self.runnable_trials():  # i.e., only pick up ones that have a runner assigned
             self.run_trial(trial)
 
     def not_done(self) -> bool:
diff --git a/mlos_bench/mlos_bench/storage/sql/experiment.py b/mlos_bench/mlos_bench/storage/sql/experiment.py
index 62daa0232c..25de6c7ef5 100644
--- a/mlos_bench/mlos_bench/storage/sql/experiment.py
+++ b/mlos_bench/mlos_bench/storage/sql/experiment.py
@@ -238,6 +238,7 @@ class Experiment(Storage.Experiment):
     def pending_trials(self, timestamp: datetime, *, running: bool) -> Iterator[Storage.Trial]:
         timestamp = utcify_timestamp(timestamp, origin="local")
         _LOG.info("Retrieve pending trials for: %s @ %s", self._experiment_id, timestamp)
+        # FIXME? Separate call?
         if running:
             pending_status = [Status.PENDING.name, Status.READY.name, Status.RUNNING.name]
         else:
@@ -251,6 +252,7 @@ class Experiment(Storage.Experiment):
                         | (self._schema.trial.c.ts_start <= timestamp)
                     ),
                     self._schema.trial.c.ts_end.is_(None),
+                    # Is this the filter problem?
                     self._schema.trial.c.status.in_(pending_status),
                 )
             )
@@ -278,6 +280,8 @@ class Experiment(Storage.Experiment):
                     config_id=trial.config_id,
                     opt_targets=self._opt_targets,
                     config=config,
+                    # FIXME: Bug:
+                    trial_runner_id=trial.trial_runner_id,
                 )
 
     def _get_config_id(self, conn: Connection, tunables: TunableGroups) -> int:

bpkroth added a commit that referenced this pull request Apr 23, 2025
# Pull Request

## Title

Bug fix: restore trial_runner_id on reload from Storage

______________________________________________________________________

## Description

@jsfreischuetz noticed that `pending_trials` was returning `Trial`
objects without a `trial_runner_id` even though they were previously
set.

This change fixes that and closes #968.

- Requires explicit `trial_runner_id` on `Trial` object instantiation
- Adds it in places where it was missing.

______________________________________________________________________

## Type of Change

- 🛠️ Bug fix
- 🧪 Tests

______________________________________________________________________

## Testing

New and existing tests.

______________________________________________________________________

## Additional Notes (optional)

Found in the course of work on #967

______________________________________________________________________
@jsfreischuetz
Copy link
Collaborator Author

I have made changes related to the refactor of separating the logic of assigning tasks to workers, and actually scheduling the workers.

Using processes as the main unit of parallelism has significant problems because of the management of certain objects deep within the codebase, which are unpickleable. I have attempted a few solutions to recreate these objects in every function, referencing these objects to remove the replication of these objects to new processes. However, this causes significant issues with ensuring the schema is created, since we can no longer guarantee this in the constructors. It may be possible, but it would require a significant refactor of the storage engine.

Because this implementation uses a thread pool rather than a process pool, I have used future.concurrent, as the documentation for multiprocessing suggests that their interface for thread pools doesn't make sense considering it was designed for process pools. (https://docs.python.org/3/library/multiprocessing.html)

@jsfreischuetz jsfreischuetz requested a review from bpkroth April 24, 2025 20:44
@jsfreischuetz jsfreischuetz marked this pull request as ready for review April 24, 2025 20:45
@jsfreischuetz jsfreischuetz requested a review from a team as a code owner April 24, 2025 20:45
"examples": [
3,
5
]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did these get automatically reformatted?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

@@ -26,6 +27,7 @@ def is_good(self) -> bool:
Status.READY,
Status.RUNNING,
Status.SUCCEEDED,
Status.SCHEDULED,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why isn't PENDING good enough for that?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, you need to differentiate between pending and scheduled. Scheduled implies that you have assigned a worker, scheduled implies that it has been given a process, but hasn't necessarily started yet (where it would get switched to running)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems to me that status=PENDING AND trial_runner_id != NULL is the same as SCHEDULED, no?

# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
#
"""A simple single-threaded synchronous optimization loop implementation."""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update docstring

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, please add some more complex docstrings that explain what this is module can help with and how it does that that include some executable usage details of the config insantiation at least. There should be some other examples elsewhere in the source tree that you can crib off of or I can help cook up an example in abit.

timestamp = utcify_timestamp(timestamp, origin="local")
_LOG.info("Retrieve pending trials for: %s @ %s", self._experiment_id, timestamp)
if running:
pending_status = [Status.PENDING.name, Status.READY.name, Status.RUNNING.name]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I don't love about the new API is it requires us to list all of these now. Maybe we can add an alias for the "runnable" statuses in the Status module?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was already done in the previous version. I copied these lines

@@ -281,6 +279,13 @@ def pending_trials(self, timestamp: datetime, *, running: bool) -> Iterator[Stor
config=config,
)

def pending_trials(self, timestamp: datetime, *, running: bool) -> Iterator[Storage.Trial]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def pending_trials(self, timestamp: datetime, *, running: bool) -> Iterator[Storage.Trial]:
def pending_trials(self, timestamp: datetime, *, running: bool) -> Iterator[Storage.Trial]:

I'm OK to get rid of this one and replace the others with filter_trials_by_status and feed it Status.RUNNABLE_STATUSES which returns that list as a literal or something.

Maybe separate that out to a different PR for cleanliness.

self,
timestamp: datetime,
status: list[Status],
) -> Iterator["Storage.Trial"]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could potentially also extend this to trial_runner_id >= 0 type of optional filter as well.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possible, but probably just a different function?

jsfreischuetz and others added 4 commits April 25, 2025 17:26
bpkroth added a commit to bpkroth/MLOS that referenced this pull request Apr 28, 2025
This is part of an attempt to try and see if can work around issues with
`multiprocessing.Pool` needing to pickle certain objects when forking.

For instance, if the Environment is using an SshServer, we need to start an
EventLoopContext in the background to handle the SSH connections and threads are
not picklable.

Nor are file handles, DB connections, etc., so there may be other things we also
need to adjust to make this work.

See Also microsoft#967
@bpkroth
Copy link
Contributor

bpkroth commented May 9, 2025

Moving this effort to #971

@bpkroth bpkroth closed this May 9, 2025
bpkroth added a commit that referenced this pull request May 12, 2025
# Pull Request

## Title

Delay entering `TrialRunner` context until `run_trial`.

______________________________________________________________________

## Description

This is part of an attempt to try and see if can work around issues with
`multiprocessing.Pool` needing to pickle certain objects when forking.

For instance, if the Environment is using an SshServer, we need to start
an EventLoopContext in the background to handle the SSH connections and
threads are not picklable.

Nor are file handles, DB connections, etc., so there may be other things
we also need to adjust to make this work.

See Also #967

______________________________________________________________________

## Type of Change


- 🛠️ Bug fix
- 🔄 Refactor

______________________________________________________________________

## Testing

- Light so far (still in draft mode)
- Just basic existing CI tests (seems to not break anything)

______________________________________________________________________

## Additional Notes (optional)

I think this is incomplete. To support forking inside the Scheduler and
*then* entering the context of the given TrialRunner, we may also need
to do something about the Scheduler's Storage object.

That was true, those PRs are now forthcoming.  See Also #971 

For now this is a draft PR to allow @jsfreischuetz and I to play with
alternative organizations of #967.

______________________________________________________________________
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

parallel trial execution
2 participants