Best way to dynamically add workflows based on unknown number of files in previous firework?

I want a number of output files that are dynamically generated by one firework to each generate a new workflow, where the steps within the workflow have dependencies, but the workflows for each dynamic file are independent. It’s not clear to me how to go from the end of run_task in the output file generator (right now, an empty FWAction) to creating these workflows. I’m not sure that sending the results to ForeachTask is what I want.

If it matters, there will be a step at the end that would dynamically collect results from each of the new workflows. I’d also like the firework that generates the output files to be as separate as possible from generating the new workflows.

Hey Eric,

My recommendation would be to use detours. For example, in your firetask, you’ll return FWAction(detours=<some fireworks/workflow>). This will insert fireworks between the current and next firework. These fireworks will be dependent on the current firework and the next firework will be dependent on these dynamically created fireworks - i.e. these dynamic fireworks must all complete before the next firework runs.

An example of how I would see this working with what you’re doing is:

from fireworks import Firework, Workflow, FireTaskBase, FWAction

class DynamicTaskSpawner(FireTaskBase):
    def run_task():
        output_files = os.path.walk('.')[0]
        fws = []
        for file in output_files:
            #parse data here and create new fireworks
            fw = <Some code here>
            fws.append(fw)

        wf = Workflow(fws)
        return FWAction(detours=wf)

Just a note that the fireworks you dynamically add do not have to be parallel to each other. You can link them with parent-child relationships as well.

Eric

2 Likes

Thanks, this makes a lot of sense. I have the basic framework down for what I want each spawned workflow to be, but am having a problem with file passing.

I would like DynamicTaskSpawner to pass a file to each new firework, all with the same key, so that it’s done automatically as in a non-dynamic workflow. Setting _files_in properly on the spawned FW is fine, and I know how to use mod_spec to modify _files_out for DynamicTaskSpawner's return action, but this would only work for a a single instance of the key. Here is a conceptual MWE:

import uuid

from fireworks import Firework, LaunchPad, Workflow
from fireworks.core.firework import FiretaskBase, FWAction
from fireworks.core.rocket_launcher import rapidfire
from fireworks.user_objects.firetasks.script_task import ScriptTask
from fireworks.utilities.fw_utilities import explicit_serialize


@explicit_serialize
class UUIDWriterTask(FiretaskBase):
    """Write some UUIDs to a dynamic number of files."""

    def run_task(self, fw_spec):
        fws = []
        for i in range(fw_spec["num_files"]):
            filename = f"{i}.txt"
            with open(filename, "w", encoding="utf-8") as handle:
                handle.write(f"{uuid.uuid1()}\n")
            fw = Firework(
                [ScriptTask.from_str(f"cat {filename}")], {"_files_in": {"uuidfile": filename}}
            )
            fws.append(fw)
        # How to show that this task produces 1.txt, 2.txt, ..., as "uuidfile"?
        return FWAction(detours=Workflow(fws))


if __name__ == "__main__":
    fw1 = Firework([UUIDWriterTask()], spec={"num_files": 3})
    wf = Workflow([fw1])

    launchpad = LaunchPad.auto_load()
    launchpad.reset("", require_password=False)
    launchpad.add_wf(wf)
    rapidfire(launchpad, pdb_on_exception=True)

Eric,I do this with queue of work flows going through fworks custom fire task. When this batch is done programmatically I create a new dynamic queue and sumbit all new wfows again for all nodes to execute again. Faction is used to return json object and from there new workflows and task are created ed and resubmitted.
Hope this helps