Batch Optimization in Rocketsled

Hi @alex

I want to run batch optimization. The functionality of batch optimization is currently not working. Please find the relevant code which is simply finding a minima of Rosenbrock test function, batch_size of anything >1 causes FW to go to sleep loop:

from fireworks import FireTaskBase, Firework, FWAction, LaunchPad, Workflow
from fireworks.core.rocket_launcher import rapidfire
from fireworks.utilities.fw_utilities import explicit_serialize
from rocketsled import MissionControl, OptTask

import numpy as np
# Setting up the FireWorks LaunchPad
launchpad = LaunchPad(name="rsled")
opt_label = "opt_default"
db_info = {"launchpad": launchpad, "opt_label": opt_label}
x_dim = [(-5.0, 5.0), (-5.0, 5.0)]

@explicit_serialize
class ROSENBROCKTask(FireTaskBase):
    _fw_name = "ROSENBROCKTask"
    def run_task(self, fw_spec):
        x = fw_spec["_x"]
        y = (1 - x[0])** 2 + 100*(x[1] - x[0] ** 2) ** 2
        return FWAction(update_spec={"_y": y})

def wf_creator_ROSENBROCK(x):
    spec = {"_x": x}
    # ObjectiveFuncTask writes _y field to the spec internally.
    firework1 = Firework([ROSENBROCKTask(), OptTask(**db_info)], spec=spec)
    return Workflow([firework1])


def execute(n_evaluation, predictor_Selected, acquisition_function):
    mc = MissionControl(**db_info)
    launchpad.reset(password=date_, require_password=True)
    mc.reset(hard=True)
    mc.configure(wf_creator=wf_creator_ROSENBROCK, dimensions=x_dim, acq=acquisition_function,
                 predictor=predictor_Selected,batch_size=2)
    launchpad.add_wf(wf_creator_ROSENBROCK([a, b]))
    rapidfire(launchpad, nlaunches=n_evaluation, sleep_time=0)
    mc.results()

if __name__ == "__main__":
    date_='2021-11-28'
    a = np.random.uniform(-5,5)
    b = np.random.uniform(-5,5)

    predictor_Selected = 'GaussianProcessRegressor'
    acquisition_function = 'lcb'
    n_evaluation = 100
    execute(n_evaluation, predictor_Selected, acquisition_function)

My objective is to use the batch optimization to run several parallel experiments based on suggestions and implement other heuristic such as Krigging Believe and compare the results.

Best,
Abdul Wahab

1 Like

Hey Abdul,

I think I’m the alex you’re looking for, not @alex

Executive summary

I’ve released a new version 1.1.0.20211129 of rocketsled on pypi and on the github page:

This issue should be resolved there as I’ve written a test as well as another batch example to test explicitly for this use case. I’ve also updated the code with some fixes for your use case we discussed over email.

Batch optimization

However, note that launching anything less than the batch_size fireworks will never trigger an OptTask optimization to run. This is not a bug, but rather how the batch optimization works.

Batch optimization runs after any multiple of batch_size OptTask-containing workflows runs. When it runs, it submits another batch_size workflows. For example:

  1. You want to do batch optimization with a batch size of 5.
  2. You submit 1 workflow. The workflow runs but no more workflows run after it. This is intended.
  3. You submit 3 more workflows. Those 3 workflows run but again, no more workflows run after it. This is again intended, as only 4 workflows have run and the batch_size of 5 has not been met.
  4. You submit one more workflow. That workflow runs and suddenly another 5 workflows have been added to the launchpad. This is because batch_size=5 workflows have been run, so OptTask runs in the 5th workflow and automatically submits another 5 workflows to the launchpad.
  5. You can keep running workflows by rapidfiring as many as you want, but 5 new workflows will be added to the launchpad only on each 5th OptTask run.

You can check out a new example (an adapted version of the code you put here) in examples/batch.py.

Your specific use case

  • I also made a few bug fixes which should help your specific use case of using a custom predictor function with batch optimization. You can find an example for it in the new examples/batch.py by setting USE_CUSTOM_PREDICTOR=True.

  • One thing to note is that your custom predictor must return batch_size number of samples. I.e., for a batch size of 5, your predictor function must return 5 new guesses. If you follow the example in batch.py this should be clear how to do It by passing predictor_kwargs to specify the number of predictions to return.

  • Another thing to note is that if you want duplicate checking when using a custom predictor (batch or not) you’ll need to specify that in MIssionControl with duplicate_checking argument.

Other things to consider with new version

  • A lot of requirements for the package have been updated (along with a lot of git magic to resolve merge conflicts), so I’d recommend a clean installation of rocketsled if you installed it from PyPi or a clean clone of it if you installed it from github

Let me know if you have other problems with the new version!

1 Like

Hi @ardunn,

Thanks for the update. Somehow, I couldnt find the option to fix the tag for your name. I shall go through your summary and example in details and update you should I have any further concerns.

1 Like

I have looked at the code. I would like to request if you could point me to the part of the code where OptTask automatically creates n-fireworks for n-batches. I was trying to find it in task.py

@Awahab I think these are the relevant sections:

In the following code, don’t worry about .pop_lock() calls, self.c.find* calls, or the formats of documents having stuff like reserved in them. This code is not really relevant for this.

Determining if batch is ready, then running optimization and inserting documents into the OptTask collection

self.optimize figures out whether a batch is ready or not based on the current state of the OptTask collection.

# check if optimization should be done, if in batch mode
        batch_mode = False if self.batch_size == 1 else True
        batch_ready = (
            n_completed not in (0, 1) and (n_completed + 1) % self.batch_size == 0
        )

        x = convert_native(x)
        y = convert_native(y)
        z = convert_native(z)

        if batch_mode and not batch_ready:
            # 'None' predictor means this job was not used for
            # an optimization run.
            if self.c.find_one({"x": x}):
                if self.c.find_one({"x": x, "y": "reserved"}):
                    # For reserved guesses: update everything
                    self.c.find_one_and_update(
                        {"x": x, "y": "reserved"},
                        {
                            "$set": {
                                "y": y,
                                "z": z,
                                "z_new": [],
                                "x_new": [],
                                "predictor": None,
                                "index": n_completed + 1,
                            }
                        },
                    )
                else:
                    # For completed guesses (ie, this workflow
                    # is a forced duplicate), do not update
                    # index, but update everything else
                    self.c.find_one_and_update(
                        {"x": x},
                        {
                            "$set": {
                                "y": y,
                                "z": z,
                                "z_new": [],
                                "x_new": [],
                                "predictor": None,
                            }
                        },
                    )
            else:
                # For new guesses: insert x, y, z, index,
                # predictor, and dummy new guesses
                self.c.insert_one(
                    {
                        "x": x,
                        "y": y,
                        "z": z,
                        "x_new": [],
                        "z_new": [],
                        "predictor": None,
                        "index": n_completed + 1,
                    }
                )
            self.pop_lock(manager_id)
            raise BatchNotReadyError

Determining whether workflows should be submitted

If the batch is not ready, self.optimize will throw a BatchNotReadyError which .run catches and the task doesn’t submit anything.

elif not self.enforce_sequential or (
    self.enforce_sequential and lock == pid
):
try:
    x, y, z, all_xz_new, n_completed = self.optimize(
        fw_spec, manager_id
    )
except BatchNotReadyError:
    return None
except Exception:
    self.pop_lock(manager_id)
    raise

Actually submitting new workflows

If the batch is ready, sef.optimize will not throw a BatchNotReadyError and the batch_size number of guesses will be used to submit batch_size new workflows.

new_wfs = [
    self.wf_creator(
        x_new, *self.wf_creator_args, **self.wf_creator_kwargs
    )
    for x_new in all_x_new
]
self.lpad.bulk_add_wfs(new_wfs)
return FWAction(
    update_spec={"_optimization_id": opt_id},
    stored_data={"_optimization_id": opt_id},
)

@ardunn This is quite explicatory indeed! Really appreciate the depth of insight you provided. I took some time and ran the batch experiments and did some benchmarking. It somehow appears that single experiment takes longer then batch of say 15 parallel evaluations. This is quite bewildering to me as the difference is consistent in multiple simulation:

Time for Batch of 1 vs 15 is 3.57 and 2.43

Shortened code for testing:

import time

import random

import numpy as np
from fireworks.core.firework import FireTaskBase, Firework, FWAction, Workflow
from fireworks.core.launchpad import LaunchPad
from fireworks.core.rocket_launcher import rapidfire
from fireworks.utilities.fw_utilities import explicit_serialize

from rocketsled.control import MissionControl
from rocketsled.task import OptTask
from rocketsled.utils import split_xz

import datetime


# Setting up the FireWorks LaunchPad
launchpad = LaunchPad(name="rsled")
opt_label = "opt_default"
db_info = {"launchpad": launchpad, "opt_label": opt_label}
x_dim = [(-5.0, 5.0), (-5.0, 5.0)]

@explicit_serialize
class RosenbrockTask(FireTaskBase):
    _fw_name = "RosenbrockTask"

    def run_task(self, fw_spec):
        x = fw_spec["_x"]
        y = (1 - x[0]) ** 2 + 100 * (x[1] - x[0] ** 2) ** 2
        return FWAction(update_spec={"_y": y})

def wf_creator_rosenbrock(x):
    spec = {"_x": x}
    # ObjectiveFuncTask writes _y field to the spec internally.
    firework1 = Firework([RosenbrockTask(), OptTask(**db_info)], spec=spec)
    return Workflow([firework1])


if __name__ == "__main__":
    mc = MissionControl(**db_info)

    launchpad.reset(password=str(datetime.datetime.now().year)+'-'+str(datetime.datetime.now().month)+'-'+str(datetime.datetime.now().day).zfill(2), require_password=True)

    #########################BATCH SIZE A ###################

    batch_size_a = 1
    mc.reset(hard=True)
    mc.configure(
        wf_creator=wf_creator_rosenbrock,
        dimensions=x_dim,
        predictor="GaussianProcessRegressor",
        batch_size=batch_size_a,
        acq='ei',
    )
    for bs in range(batch_size_a):
        launchpad.add_wf(
            wf_creator_rosenbrock(
                [np.random.uniform(-5, 5), np.random.uniform(-5, 5)]
            )
        )
    batch_of_a_initial = time.time()
    rapidfire(launchpad, nlaunches=30, sleep_time=0)
    batch_of_a_final = time.time()
    plt = mc.plot()

    ######################### BATCH SIZE B ###################

    batch_size_b = 115
    launchpad.reset(password=str(datetime.datetime.now().year)+'-'+str(datetime.datetime.now().month)+'-'+str(datetime.datetime.now().day).zfill(2), require_password=True)
    mc.reset(hard=True)
    mc.configure(
        wf_creator=wf_creator_rosenbrock,
        dimensions=x_dim,
        predictor="GaussianProcessRegressor",
        batch_size=batch_size_b,
        acq='ei',
    )

    for bs in range(batch_size_b):
        launchpad.add_wf(
            wf_creator_rosenbrock(
                [np.random.uniform(-5, 5), np.random.uniform(-5, 5)]
            )
        )
    batch_of_b_initial = time.time()
    rapidfire(launchpad, nlaunches=30, sleep_time=0)
    batch_of_b_final = time.time()
    plt = mc.plot()
    #########################################################

    print("Time for Batch of {} vs {} is {} and {}".format(batch_size_a,batch_size_b, (batch_of_a_final - batch_of_a_initial),(batch_of_b_final - batch_of_b_initial)))
    # plt.show()


Thus, my final question on this is:

After adding workflows to the bulk, they run in next iteration of the optimization and FWAction here is merely updating the unique ids for the current optimization run ?

@Awahab

Discrepancy in timings

First, we should clear up exactly what you are timing. It seems from your code the timing is comparing 30 launches of the sequential (non-batch, or batch=1) workflow to 30 launches of the batch=15 workflow.
I’ll assume the batch_size_b=115 is a typo and you meant batch_size_b=15. Your objective function is very fast and has a basically negligible time of evaluation. So what we are really comparing in your example is the timing internally for FireWorks and Rocketsled to process two different workflows.

If the above is correct and what you intended, then the timings are pretty explainable. There are several reasons why single experiments run sequentially take longer than batches.

  1. Sequential experiments run optimizations on every workflow. Batches run optimizations on every batch_size workflow. So if you are running 30 in total, the sequential will run 30 optimizations whereas the batch=15 case will run only 2. In this case, the optimization time is not trivial compared to the objective function (rosenbrock), so the optimization itself is the expensive step. So in one case you’re running 30 computations and in the other you’re really only running 2. This is probably the main reason for the discrepancies in timings.
  2. Submitting workflows to the launchpad and executing them in bulk (as the larger batch size does) is likely more efficient than submitting them and processing them sequentially. Though I wouldn’t expect this to have a large effect, likely maybe a few milliseconds difference in timing.

In an actual use case where your objective function is much more expensive than the optimization, I expect the optimization/fireworks internal run time to be negligibly small.

FWAction/submitting in bulk

Yes you are correct that the bulk add is actually adding fireworks to the launchpad and the FWAction is just updating the spec to correlate them with an optimization document. Though this optimization id is never actually used again by rocketsled or Fireworks even. It basically can be used by a user to see which optimization produced a specific workflow.

Some general guidance

I’d recommend not getting too in depth with how Rocketsled or Fireworks actually processes workflows since it can get quite complex. Managing optimization workflows in parallel is a parallel programming challenge which causes a lot of crazy logic (hence the need for stuff like pop_lock and "queue" and manager documents and submitting bulk workflows etc.

But if you must have an explanation of how rocketsled and fireworks manage workflows, this is a high level overview that might help:

  • Fireworks and rocketsled are mostly separate except for a few key steps. Fireworks manages the workflows which have run and the workflows which are scheduled to run via the launchpad; this is just a bunch of mongo collections managing individual fireworks and such.

  • Rocketsled keeps it’s own separate mongo collection specifically for optimization. It identifies points in the search space as documents in this collection and has logic for figuring out which points haven’t been tried and which ones have been tried. It also has logic for figuring out when a corresponding workflow has been submitted to the launchpad but hasn’t been run yet. It can’t trigger runs on its own unless you specifically implement that in the wf_creator (not recommended).

  • Fireworks and rocketsled only interact when rocketsled submits new workflows via the workflow creator function. One or many may be created depending on the batch_size. After submitting them to the launchpad, rocketsled has no way to trigger them or work with them - that is all done in fireworks. The only time rocketsled can actually know a workflow is running is when OptTask itself is actually running.

@ardunn

First, we should clear up exactly what you are timing. It seems from your code the timing is comparing 30 launches of the sequential (non-batch, or batch=1) workflow to 30 launches of the batch=15 workflow.

Yes, that’s correct. Although I had initially batch_size = 15, I changed it to batch_size = 115 to test slightly more extreme case as we can afford it since the n_search_pts is 1000. My understanding is that both are doing 30 sequential launches or iterations. However,
batch_size = 1 is only taking 1 suggestion and its evaluation x and y data in single iteration while batch_size = 115 is taking 115 suggestions and its evaluations in single iteration into its data that is used to train the surrogate function. So by that understanding, in control.py inside def plot(.. in line 395: docset.count(). the size of data should in principle be 115 x 30 for batch_size = 115 and 1 x 30 for batch_size = 1, however its size is 30 in both the cases which makes me wonder if the suggestions are actively added to the database.

I’ll assume the batch_size_b=115 is a typo and you meant batch_size_b=15 .

Well I get no time difference in processing batch_size_b=15 or batch_size_b=115 so I wanted to test the extreme scenario. For surrogate function I am using GP and it scales O(n^3) so I was expecting slow performance not because of function evaluation but from its training and prediction from GP as the size grows 115 times more for the batch in each iteration.

Also thanks for the providing high level view of Fireworks and Rocketsled, especially the role of MongoDB in both as it shall help me in future porting of GPyOpt library which implements Gonzales et. al.

This is the source of the misunderstanding. In either case, only 30 total fireworks are being run. The number of firework/workflows being run is only determined by FireWorks. Rocketsled just submits new workflows to the launchpad. Changing the batch_size doesn’t change the total number of guesses or function evaluations - it just changes when (in which workflow) and how the optimizations are done.

  • Regardless of batch size, all available data is used to run the optimizer whenever it is run. So if you have run 28 function evaluations, 28 points will be used in the next optimization.

  • Regarding the batch size, the batch size only specifies an iteration on when to do an optimization, and it allows for multiple new suggestions to be submitted to the launchpad as workflows at once. When you had batch size of 15 and ran 30 launches, only the 15th and 30th OptTasks actually did any optimizations. When you had batch size of 115 and ran 30 launches, an optimization literally never ran.

So like lets take the above case, a workflow with 1 firework and 30 launches but examine each step.

Batch size of 1

  1. You submit a workflow to the launchpad.
  2. You run a workflow using launch_rocket or rapidfire.
  3. Fireworks runs your objective function firework (or task). In our case its just a task.
  4. OptTask runs.
    a. It reads all existing x and y from the opt database. In this first firework’s case, there’s nothing there yet.
    b. It reads the current x and y from the spec.
    c. It does an optimization, writes the current x and y to the optimization db (separate from fws) and submits a new workflow with the suggested x and y.
  5. The launchpad accepts the new workflow and waits for the next launch_rocket/rapidfire iteration.

Repeat steps 2-5 30 times. A total of 30 fireworks are run.

Batch size of 3

  1. You submit 3 workflows to the launchpad.

  2. You run a workflow using launch_rocket or rapidfire

  3. Fireworks runs your objective function firework (or task)

  4. OptTask runs.
    a. Batch isnt’ ready. Doesn’t do anything, just records the current x and y into the optimization database. Doesn’t submit a new workflow.

  5. Repeat 1-4 again. Now, on the third workflow…

  6. You run the third workflow with launch_rocket/rapidfire

  7. Fireworks runs your objective function firework/task.

  8. OptTask runs.
    a. Batch IS ready. Reads all existing x and y from the database.
    b. Gets the current wf’s x and y from the spec.
    c. Runs an optimization to produce 3 new guesses. Submits all 3 to the launchpad and waits for the next launch_rocket or rapidfire iteration

Repeat 2-8 10 times. A total of 30 fireworks are run.

@ardunn

When you had batch size of 15 and ran 30 launches, only the 15th and 30th OptTasks actually did any optimizations. When you had batch size of 115 and ran 30 launches, an optimization literally never ran.

Now, it makes lots of sense. So if I were to compare the optimization performance of 30 optimizations for batch_size=1 and batch_size=15 , then for the batch size of 15, the nlaunches should be 15x30 (keeping the no. optimizations constant). I wanted to do this comparison to determine the speed up and convergence we will get if we run the actual DFT experiments in cluster. So should we run 30 DFT in sequential (significantly long time but perhaps better convergence) or 15 in batch and based on suggestions 15 again in batch (almost 2x faster, but convergence/accuracy might be not that great) here I’m focusing on (keeping the no. evaluations/DFT computations constant). I am trying to find the trade-off between computation time and convergence/accuracy.

Yes exactly!

Though you should note rocketsled is able to handle workflows that run in parallel as well while still doing the optimizations in a purely sequential (highly efficient manner). This “normal” mode option is the best if your workflows are highly asynchronous (and expensive relative to optimization) because all the DFT can compute in parallel while the optimizations actually run strictly in sequence.

Here’s a figure from the rocketsled paper SI which might make it a bit clearer:

The blue f(x) here is your objective function and the orange is the optimization. The red time is what you want to minimize (the waiting time). Which one is best for you will depend on your use case specifically.

As yet another option, you can run “normal” mode rocketsled with enforce_sequential=False in order to run optimizations in parallel. This case isn’t show in the diagram but it means all of your workflows could run 100% in parallel all the time and never have any waiting time. This is not the most efficient thing from an optimization standpoint but it miiiiight be worth considering.

@ardunn

This is really helpful! Thanks a lot for the such considerable effort to clarify things.

I ran several experiments and results do make sense now