How to dynamically insert workflow

Hello,

In the detours argument of FWAction, it says you can give a list of FWs/WFs. I am trying to insert a mini-workflow dynamically but something is wrong.

This workflow runs just fine.

···

from fireworks import Firework, Workflow, LaunchPad, ScriptTask, PyTask

from fireworks.core.rocket_launcher import rapidfire

launchpad = LaunchPad()

launchpad.reset(’’, require_password=False)

create the individual FireWorks and Workflow

#fw0 = Firework(ScriptTask.from_str(“echo start”), name = “start”)

#fw1 = Firework(WorkflowDetourClass(), name = “split”)

#fw2 = Firework(ScriptTask.from_str(“echo finish”), name = “finish”)

#wf = Workflow([fw0, fw1, fw2], {fw0 : [fw1], fw1: [fw2], }, name=“test workflow”)

links = {}

new_fws_lst = []

for i in range(3):

hello_fw = Firework(ScriptTask.from_str("echo hello " + str(i)))

goodbye_fw = Firework(ScriptTask.from_str("echo goodbye " + str(i)))

new_fws_lst.append(hello_fw)

new_fws_lst.append(goodbye_fw)

links.update({hello_fw : [goodbye_fw]})

wf = Workflow(new_fws_lst, links)

launchpad.add_wf(wf)

rapidfire(launchpad)


But if the same workflow is inserted in another workflow through detours, this error is thrown:

File “.local/lib/python3.5/site-packages/fireworks/core/launchpad.py”, line 1467, in _refresh_wf

updated_ids = wf.refresh(fw_id)

File “.local/lib/python3.5/site-packages/fireworks/core/firework.py”, line 1005, in refresh

updated_ids = updated_ids.union(self.apply_action(m_action, fw.fw_id))

File “.local/lib/python3.5/site-packages/fireworks/core/firework.py”, line 834, in apply_action

new_updates = self.append_wf(wf, [fw_id], detour=True, pull_spec_mods=False)

File “.local/lib/python3.5/site-packages/fireworks/core/firework.py”, line 947, in append_wf

updated_ids = self.refresh(new_fw.fw_id, set(updated_ids))

File “.local/lib/python3.5/site-packages/fireworks/core/firework.py”, line 979, in refresh

if self.fw_states[parent] not in completed_parent_states:

KeyError: -8


from fireworks import explicit_serialize, FiretaskBase, FWAction, ScriptTask

import sys

from fireworks import Workflow, Firework

@explicit_serialize

class WorkflowDetourClass(FiretaskBase):

_fw_name = ‘WorkflowDetourTask’

required_params = []

optional_params = []

def run_task(self, fw_spec):

links = {}

new_fws_lst = []

for i in range(3):

hello_fw = Firework(ScriptTask.from_str("echo hello " + str(i)))

goodbye_fw = Firework(ScriptTask.from_str("echo goodbye " + str(i)))

new_fws_lst.append(hello_fw)

new_fws_lst.append(goodbye_fw)

links.update({hello_fw : [goodbye_fw]})

wf = Workflow(new_fws_lst, links)

return FWAction(detours = wf)


fw0 = Firework(ScriptTask.from_str(“echo start”), name = “start”)

fw1 = Firework(WorkflowDetourClass(), name = “split”)

fw2 = Firework(ScriptTask.from_str(“echo finish”), name = “finish”)

wf = Workflow([fw0, fw1, fw2], {fw0 : [fw1], fw1: [fw2], }, name=“test workflow”)


What am I doing wrong here? Does anyone have a working example of an inserted workflow?

Cheers

Marc

Hi Marc,

Thanks for including detailed code for your problem. I ran your code (specifically the code pasted below) with the latest FWS v1.8.7 and did not hit any errors. I’ve also attached a screenshot of the web gui after executing the code, which shows everything executing OK. So, I am not sure what might be wrong.

···

========

from fireworks import explicit_serialize, FiretaskBase, FWAction, ScriptTask, \

LaunchPad

from fireworks import Workflow, Firework

from fireworks.core.rocket_launcher import rapidfire

@explicit_serialize

class WorkflowDetourClass(FiretaskBase):

_fw_name = ‘WorkflowDetourTask’

required_params = []

optional_params = []

def run_task(self, fw_spec):

links = {}

new_fws_lst = []

for i in range(3):

hello_fw = Firework(ScriptTask.from_str("echo hello " + str(i)))

goodbye_fw = Firework(ScriptTask.from_str("echo goodbye " + str(i)))

new_fws_lst.append(hello_fw)

new_fws_lst.append(goodbye_fw)

links.update({hello_fw : [goodbye_fw]})

wf = Workflow(new_fws_lst, links)

return FWAction(detours = wf)

fw0 = Firework(ScriptTask.from_str(“echo start”), name = “start”)

fw1 = Firework(WorkflowDetourClass(), name = “split”)

fw2 = Firework(ScriptTask.from_str(“echo finish”), name = “finish”)

wf = Workflow([fw0, fw1, fw2], {fw0 : [fw1], fw1: [fw2], }, name=“test workflow”)

launchpad = LaunchPad()

launchpad.reset(’’, require_password=False)

launchpad.add_wf(wf)

rapidfire(launchpad)

========

On Tuesday, March 12, 2019 at 8:51:25 AM UTC-7, Marc Jäger wrote:

Hello,

In the detours argument of FWAction, it says you can give a list of FWs/WFs. I am trying to insert a mini-workflow dynamically but something is wrong.

This workflow runs just fine.


from fireworks import Firework, Workflow, LaunchPad, ScriptTask, PyTask

from fireworks.core.rocket_launcher import rapidfire

launchpad = LaunchPad()

launchpad.reset(’’, require_password=False)

create the individual FireWorks and Workflow

#fw0 = Firework(ScriptTask.from_str(“echo start”), name = “start”)

#fw1 = Firework(WorkflowDetourClass(), name = “split”)

#fw2 = Firework(ScriptTask.from_str(“echo finish”), name = “finish”)

#wf = Workflow([fw0, fw1, fw2], {fw0 : [fw1], fw1: [fw2], }, name=“test workflow”)

links = {}

new_fws_lst = []

for i in range(3):

hello_fw = Firework(ScriptTask.from_str("echo hello " + str(i)))

goodbye_fw = Firework(ScriptTask.from_str("echo goodbye " + str(i)))

new_fws_lst.append(hello_fw)

new_fws_lst.append(goodbye_fw)

links.update({hello_fw : [goodbye_fw]})

wf = Workflow(new_fws_lst, links)

launchpad.add_wf(wf)

rapidfire(launchpad)


But if the same workflow is inserted in another workflow through detours, this error is thrown:

File “.local/lib/python3.5/site-packages/fireworks/core/launchpad.py”, line 1467, in _refresh_wf

updated_ids = wf.refresh(fw_id)

File “.local/lib/python3.5/site-packages/fireworks/core/firework.py”, line 1005, in refresh

updated_ids = updated_ids.union(self.apply_action(m_action, fw.fw_id))

File “.local/lib/python3.5/site-packages/fireworks/core/firework.py”, line 834, in apply_action

new_updates = self.append_wf(wf, [fw_id], detour=True, pull_spec_mods=False)

File “.local/lib/python3.5/site-packages/fireworks/core/firework.py”, line 947, in append_wf

updated_ids = self.refresh(new_fw.fw_id, set(updated_ids))

File “.local/lib/python3.5/site-packages/fireworks/core/firework.py”, line 979, in refresh

if self.fw_states[parent] not in completed_parent_states:

KeyError: -8


from fireworks import explicit_serialize, FiretaskBase, FWAction, ScriptTask

import sys

from fireworks import Workflow, Firework

@explicit_serialize

class WorkflowDetourClass(FiretaskBase):

_fw_name = ‘WorkflowDetourTask’

required_params = []

optional_params = []

def run_task(self, fw_spec):

links = {}

new_fws_lst = []

for i in range(3):

hello_fw = Firework(ScriptTask.from_str("echo hello " + str(i)))

goodbye_fw = Firework(ScriptTask.from_str("echo goodbye " + str(i)))

new_fws_lst.append(hello_fw)

new_fws_lst.append(goodbye_fw)

links.update({hello_fw : [goodbye_fw]})

wf = Workflow(new_fws_lst, links)

return FWAction(detours = wf)


fw0 = Firework(ScriptTask.from_str(“echo start”), name = “start”)

fw1 = Firework(WorkflowDetourClass(), name = “split”)

fw2 = Firework(ScriptTask.from_str(“echo finish”), name = “finish”)

wf = Workflow([fw0, fw1, fw2], {fw0 : [fw1], fw1: [fw2], }, name=“test workflow”)


What am I doing wrong here? Does anyone have a working example of an inserted workflow?

Cheers

Marc

Hi Anubhav,

thanks for running and checking my code, it helped to find a solution. I managed to get it working on python 3.6.7 and 3.6.8.

As soon as python is downgraded to 3.5.3 or 3.5.6, though, the error posted above occurs. It is tested on 2 different machines.

I will investigate this and post a solution for python 3.5 when I find out what is happening here. In the meantime, could you verify that you get the same error with python 3.5?

Cheers

Marc

P.S: Everything is run in a virtualenv

Problem solved for python 3.5, Firework 1.8.7

Fireworks is using OrderedDict - but not strictly everywhere. The reason for the different behaviour with respect to different versions lies in how python treats regular dictionaries. From python 3.6 onwards, they are insertion ordered - which is usually not required in Fireworks. Except when a Workflow is added dynamically which is where python 3.5 failed to execute, since it tried to add a FW which had a parent which had not been added before (In the example: goodbye before hello).

When initializing Workflow() an easy fix is possible by enforcing insertion order. I added a line (bold) in core/firework.py

···

main dict containing mapping of an id to a Firework object

self.id_fw = {}

for fw in fireworks:

if fw.fw_id in self.id_fw:

raise ValueError(‘FW ids must be unique!’)

self.id_fw[fw.fw_id] = fw

if fw.fw_id not in links_dict and fw not in links_dict:

links_dict[fw.fw_id] = []

self.id_fw = OrderedDict([(fw.fw_id, fw) for fw in fireworks])

self.links = Workflow.Links(links_dict)


On Friday, March 29, 2019 at 7:37:05 PM UTC+2, Marc Jäger wrote:

Hi Anubhav,

thanks for running and checking my code, it helped to find a solution. I managed to get it working on python 3.6.7 and 3.6.8.

As soon as python is downgraded to 3.5.3 or 3.5.6, though, the error posted above occurs. It is tested on 2 different machines.

I will investigate this and post a solution for python 3.5 when I find out what is happening here. In the meantime, could you verify that you get the same error with python 3.5?

Cheers

Marc

P.S: Everything is run in a virtualenv

Hi Marc

Thanks!

Two questions:

  1. I understand you are trying to enforce insertion order for “id_fw” in Workflow, but it’s unclear to me which portion of the code requires that Workflow.id_fw is in a particular order (vs any unordered dict). Do you happen to know what part of the code is actually assuming that Workflow.id_fw is in a specific order?

  2. For a fix, does the below work? i.e., instead of reordering self.id_fw afterwards, to just maintain insertion order at the time of constructing it.

···

Best,
Anubhav

I replied to you by email, I only saw this post here now.

Thanks! The issue should be fixed (with your help) in FWS v1.8.8, let me know if you still see problems

···

On Wednesday, April 3, 2019 at 4:42:37 AM UTC-7, Marc Jäger wrote:

I replied to you by email, I only saw this post here now.