Hi, and thanks for the complete example.
I think this is actually a bug in FileTransferTask and/or the tutorial. Possibly the definition of FileTransferTask changed at some point and they did not update the tutorial? Actually the docstring of FileTransferTask also says that it may be defined inside the files
parameter, but any Firetask will fail at runtime if a required parameter is not set as far as I know.
class FileTransferTask(FiretaskBase):
"""
A Firetask to Transfer files. Note that
Required params:
- mode: (str) - move, mv, copy, cp, copy2, copytree, copyfile, rtransfer
- files: ([str]) or ([(str, str)]) - list of source files, or dictionary containing
'src' and 'dest' keys
- dest: (str) destination directory, if not specified within files parameter (else optional)
Optional params:
- server: (str) server host for remote transfer
- user: (str) user to authenticate with on remote server
- key_filename: (str) optional SSH key location for remote transfer
- max_retry: (int) number of times to retry failed transfers; defaults to `0` (no retries)
- retry_delay: (int) number of seconds to wait between retries; defaults to `10`
"""
_fw_name = 'FileTransferTask'
required_params = ["mode", "files", "dest"]
optional_params = ["server", "user", "key_filename", "max_retry", "retry_delay"]
Further along in the FileTransferTask code you can see that the actual destination it is first read from the files
parameter, and only if it is not present there it will look at the dest
parameter.
for f in self["files"]:
try:
if 'src' in f:
src = os.path.abspath(expanduser(expandvars(f['src']))) if shell_interpret else f['src']
else:
src = abspath(expanduser(expandvars(f))) if shell_interpret else f
if mode == 'rtransfer':
dest = self['dest']
if os.path.isdir(src):
if not self._rexists(sftp, dest):
sftp.mkdir(dest)
for f in os.listdir(src):
if os.path.isfile(os.path.join(src, f)):
sftp.put(os.path.join(src, f), os.path.join(dest, f))
else:
if not self._rexists(sftp, dest):
sftp.mkdir(dest)
sftp.put(src, os.path.join(dest, os.path.basename(src)))
else:
if 'dest' in f:
dest = abspath(expanduser(expandvars(f['dest']))) if shell_interpret else f['dest']
else:
dest = abspath(expanduser(expandvars(self['dest']))) if shell_interpret else self['dest']
FileTransferTask.fn_list[mode](src, dest)
To conclude, you can simply set a dummy dest
parameter and it will use the one from the files
anyhow.
The code below runs for me and correctly copies the files in the launch folder and not to the home folder as would happen if the dest = ~/
would be used. Not that I slightly modified the code to not reset my launchpad and copy the files in place. Also I put the Firetasks in a Firework and a Workflow to test if it runs, which it does with rlaunch rapidfire
.
from fireworks import Firework, FWorker, LaunchPad, ScriptTask,\
TemplateWriterTask, FileTransferTask, Workflow
from fireworks.core.rocket_launcher import launch_rocket
lpad = LaunchPad.auto_load()
firetask1 = TemplateWriterTask(
{
'context': {'opt1': 5.0, 'opt2': 'fast method'},
'template_file': 'simple_template.txt',
'output_file': 'inputs.txt'
}
)
firetask2 = ScriptTask.from_str('wc -w < inputs.txt > words.txt')
firetask3 = FileTransferTask(
{'files':
[
{'src':'./words.txt','dest':'./testwords.txt'},
{'src':'./inputs.txt','dest':'./testfile2.txt'}
],
'mode': 'copy',
'dest': '~/'
}
)
FW = Firework([firetask1, firetask2, firetask3], name='TestFW')
WF = Workflow.from_Firework(FW, name='TestWF')
lpad.add_wf(WF)
I hope that helps for now, maybe you can contact someone of the developers to get the fix the FileTransferTask, as the docstring says it should work differently than it actually does.
Cheers, Michael