Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pass single argument string to ShellTasks #72

Merged
merged 11 commits into from
Dec 21, 2024
Prev Previous commit
Next Next commit
Pass arguments as list.
GeigerJ2 committed Dec 19, 2024
commit 3994ccc12563ef6a7b8e9b4c5fd11a0dd7c2a415
16 changes: 9 additions & 7 deletions src/sirocco/parsing/_yaml_data_models.py
Original file line number Diff line number Diff line change
@@ -280,14 +280,16 @@ class ConfigShellTaskSpecs:


class ConfigShellTask(ConfigBaseTask, ConfigShellTaskSpecs):
pass

command: str = ""

# PR(COMMENT) tmp hack to make script work, need to find better solution than PWD for tests
# @field_validator("command", "src")
# @classmethod
# def expand_var(cls, value: str) -> str:
# """Expand environemnt variables"""
# # TODO this might be not intended if we want to use environment variables on remote HPC
# return expandvars(value)
@field_validator("command", "src")
@classmethod
def expand_var(cls, value: str) -> str:
"""Expand environemnt variables"""
# TODO this might be not intended if we want to use environment variables on remote HPC
return expandvars(value)


@dataclass
63 changes: 32 additions & 31 deletions src/sirocco/workgraph.py
Original file line number Diff line number Diff line change
@@ -165,7 +165,7 @@ def _add_aiida_task_nodes(self):
self._add_aiida_task_node(task)
# after creation we can link the wait_on tasks
# TODO check where this is now
#for cycle in self._core_workflow.cycles:
# for cycle in self._core_workflow.cycles:
# for task in cycle.tasks:
# self._link_wait_on_to_task(task)

@@ -183,22 +183,23 @@ def _add_aiida_task_node(self, task: graph_items.Task):
env_source_files = [env_source_files] if isinstance(env_source_files, str) else env_source_files
prepend_text = '\n'.join([f"source {env_source_file}" for env_source_file in env_source_files])

# Note: We don't pass the `nodes` dictionary here, as then we would need to have the sockets available when
# we create the task. Instead, they are being updated via the WG internals when linking inputs/outputs to
# tasks
argument_list = task.cli_argument.split()
# breakpoint()
workgraph_task = self._workgraph.tasks.new(
"ShellJob",
name=label,
command=command,
arguments=task.cli_argument,
# ! Do we still need to add nodes here, as in `aiida-shell`, or WG does that automatically from the
# argument if it finds them?
arguments=argument_list,
metadata={
'options': {
'prepend_text': prepend_text
}
}
)

# workgraph_task.set({"arguments": []})
# workgraph_task.set({"nodes": {}})
self._aiida_task_nodes[label] = workgraph_task

elif isinstance(task, IconTask):
@@ -239,33 +240,33 @@ def _link_input_to_task(self, task: graph_items.Task, input_: graph_items.Data):
task_label = AiidaWorkGraph.get_aiida_label_from_unrolled_task(task)
input_label = AiidaWorkGraph.get_aiida_label_from_unrolled_data(input_)
workgraph_task = self._aiida_task_nodes[task_label]
try:
workgraph_task.inputs.new("Any", f"nodes.{input_label}")
workgraph_task.kwargs.append(f"nodes.{input_label}")

# resolve data
if (data_node := self._aiida_data_nodes.get(input_label)) is not None:
if (nodes := workgraph_task.inputs.get("nodes")) is None:
msg = f"Workgraph task {workgraph_task.name!r} did not initialize input nodes in the workgraph before linking. This is a bug in the code, please contact the developers by making an issue."
raise ValueError(msg)
nodes.value.update({f"{input_label}": data_node})
elif (output_socket := self._aiida_socket_nodes.get(input_label)) is not None:
self._workgraph.links.new(output_socket, workgraph_task.inputs[f"nodes.{input_label}"])
else:
msg = f"Input data node {input_label!r} was neither found in socket nodes nor in data nodes. The task {task_label!r} must have dependencies on inputs before they are created."
raise ValueError(msg)
workgraph_task.inputs.new("Any", f"nodes.{input_label}")
workgraph_task.kwargs.append(f"nodes.{input_label}")

# resolve arg_option
if (workgraph_task_arguments := workgraph_task.inputs.get("arguments")) is None:
msg = f"Workgraph task {workgraph_task.name!r} did not initialize arguments nodes in the workgraph before linking. This is a bug in the code, please contact devevlopers."
# resolve data
if (data_node := self._aiida_data_nodes.get(input_label)) is not None:
if (nodes := workgraph_task.inputs.get("nodes")) is None:
msg = f"Workgraph task {workgraph_task.name!r} did not initialize input nodes in the workgraph before linking. This is a bug in the code, please contact the developers by making an issue."
raise ValueError(msg)
# TODO think about that the yaml file should have aiida valid labels
# if (arg_option := task.input_arg_options.get(input_.name, None)) is not None:
# workgraph_task_arguments.value.append(f"{arg_option}")
workgraph_task_arguments.value.append(f"{{{input_label}}}")
except Exception:
pass
# breakpoint()
nodes.value.update({f"{input_label}": data_node})
elif (output_socket := self._aiida_socket_nodes.get(input_label)) is not None:
self._workgraph.links.new(output_socket, workgraph_task.inputs[f"nodes.{input_label}"])
else:
msg = f"Input data node {input_label!r} was neither found in socket nodes nor in data nodes. The task {task_label!r} must have dependencies on inputs before they are created."
raise ValueError(msg)

# resolve arg_option
if (workgraph_task_arguments := workgraph_task.inputs.get("arguments")) is None:
msg = f"Workgraph task {workgraph_task.name!r} did not initialize arguments nodes in the workgraph before linking. This is a bug in the code, please contact devevlopers."
raise ValueError(msg)
# TODO think about that the yaml file should have aiida valid labels
# if (arg_option := task.input_arg_options.get(input_.name, None)) is not None:
# workgraph_task_arguments.value.append(f"{arg_option}")

# Avoid appending the same argument twice
argument_placeholder = f"{{{input_label}}}"
if argument_placeholder not in workgraph_task_arguments.value:
workgraph_task_arguments.value.append()

def _link_output_to_task(self, task: graph_items.Task, output: graph_items.Data):
"""
4 changes: 2 additions & 2 deletions tests/files/configs/test_config_small_no_icon.yml
Original file line number Diff line number Diff line change
@@ -42,10 +42,10 @@ data:
available:
- initial_conditions:
type: file
src: tests/files/data/initial_conditions
src: tests/files/data/initial_conditions2
- data1:
type: file
src: tests/files/data/data1
src: tests/files/data/data-xyz
generated:
- restart:
type: file
File renamed without changes.
File renamed without changes.