diff --git a/lib/galaxy/dependencies/pipfiles/default/pinned-requirements.txt b/lib/galaxy/dependencies/pipfiles/default/pinned-requirements.txt index e8f3fc253eaf..7aaf3aa91a4e 100644 --- a/lib/galaxy/dependencies/pipfiles/default/pinned-requirements.txt +++ b/lib/galaxy/dependencies/pipfiles/default/pinned-requirements.txt @@ -56,7 +56,7 @@ functools32==3.2.3.post2 ; python_version == '2.7' future==0.16.0 futures==3.2.0 ; python_version == '2.6' or python_version == '2.7' galaxy-sequence-utils==1.1.3 -gxformat2==0.7.1 +gxformat2 h5py==2.8.0 idna==2.7 ipaddress==1.0.22 ; python_version < '3.3' diff --git a/lib/galaxy/managers/workflows.py b/lib/galaxy/managers/workflows.py index c447a0b4eb65..b981d724b770 100644 --- a/lib/galaxy/managers/workflows.py +++ b/lib/galaxy/managers/workflows.py @@ -998,6 +998,18 @@ def __module_from_dict(self, trans, steps, steps_by_external_id, step_dict, **kw label=label, ) trans.sa_session.add(m) + + if "in" in step_dict: + for input_name, input_dict in step_dict["in"].items(): + step_input = step.get_or_add_input(input_name) + NO_DEFAULT_DEFINED = object() + default = input_dict.get("default", NO_DEFAULT_DEFINED) + if default is not NO_DEFAULT_DEFINED: + step_input.default_value = default + step_input.default_value_set = True + + step.get_or_add_input(input_name) + return module, step def __load_subworkflow_from_step_dict(self, trans, step_dict, subworkflow_id_map, **kwds): @@ -1041,23 +1053,21 @@ def __connect_workflow_steps(self, steps, steps_by_external_id): continue if not isinstance(conn_list, list): # Older style singleton connection conn_list = [conn_list] + for conn_dict in conn_list: if 'output_name' not in conn_dict or 'id' not in conn_dict: template = "Invalid connection [%s] - must be dict with output_name and id fields." message = template % conn_dict raise exceptions.MessageException(message) - conn = model.WorkflowStepConnection() - conn.input_step = step - conn.input_name = input_name - conn.output_name = conn_dict['output_name'] external_id = conn_dict['id'] if external_id not in steps_by_external_id: raise KeyError("Failed to find external id %s in %s" % (external_id, steps_by_external_id.keys())) - conn.output_step = steps_by_external_id[external_id] + output_step = steps_by_external_id[external_id] + output_name = conn_dict["output_name"] input_subworkflow_step_index = conn_dict.get('input_subworkflow_step_id', None) - if input_subworkflow_step_index is not None: - conn.input_subworkflow_step = step.subworkflow.step_by_index(input_subworkflow_step_index) + + step.add_connection(input_name, output_name, output_step, input_subworkflow_step_index) del step.temp_input_connections diff --git a/lib/galaxy/model/__init__.py b/lib/galaxy/model/__init__.py index 88659a4d3add..f15ce10de56f 100644 --- a/lib/galaxy/model/__init__.py +++ b/lib/galaxy/model/__init__.py @@ -4058,13 +4058,49 @@ def __init__(self): self.tool_inputs = None self.tool_errors = None self.position = None - self.input_connections = [] + self.inputs = [] self.config = None self.label = None self.uuid = uuid4() self.workflow_outputs = [] self._input_connections_by_name = None + def get_input(self, input_name): + for step_input in self.inputs: + if step_input.name == input_name: + return step_input + + return None + + def get_or_add_input(self, input_name): + step_input = self.get_input(input_name) + + if step_input is None: + step_input = WorkflowStepInput(self) + step_input.name = input_name + # self.inputs.append(step_input) + return step_input + + def add_connection(self, input_name, output_name, output_step, input_subworkflow_step_index=None): + step_input = self.get_or_add_input(input_name) + + conn = WorkflowStepConnection() + conn.input_step_input = step_input + conn.output_name = output_name + conn.output_step = output_step + if input_subworkflow_step_index is not None: + input_subworkflow_step = self.subworkflow.step_by_index(input_subworkflow_step_index) + conn.input_subworkflow_step = input_subworkflow_step + return conn + + @property + def input_connections(self): + connections = [] + for step_input in self.inputs: + for connection in step_input.connections: + connections.append(connection) + return connections + @property def unique_workflow_outputs(self): # Older Galaxy workflows may have multiple WorkflowOutputs @@ -4138,7 +4174,7 @@ def copy_to(self, copied_step, step_mapping): copied_step.position = self.position copied_step.config = self.config copied_step.label = self.label - copied_step.input_connections = copy_list(self.input_connections) + copied_step.inputs = copy_list(self.inputs, copied_step) subworkflow_step_mapping = {} subworkflow = self.subworkflow @@ -4149,8 +4185,7 @@ def copy_to(self, copied_step, step_mapping): subworkflow_step_mapping[subworkflow_step.id] = copied_subworkflow_step for old_conn, new_conn in zip(self.input_connections, copied_step.input_connections): - # new_conn.input_step = new_ - new_conn.input_step = step_mapping[old_conn.input_step_id] + new_conn.input_step_input = copied_step.get_or_add_input(old_conn.input_name) new_conn.output_step = step_mapping[old_conn.output_step_id] if old_conn.input_subworkflow_step_id: new_conn.input_subworkflow_step = subworkflow_step_mapping[old_conn.input_subworkflow_step_id] @@ -4165,6 +4200,35 @@ def log_str(self): return "WorkflowStep[index=%d,type=%s]" % (self.order_index, self.type) +class WorkflowStepInput(object): + default_merge_type = None + default_scatter_type = None + + def __init__(self, workflow_step): + self.workflow_step = workflow_step + self.name = None + self.default_value = None + self.default_value_set = False + self.merge_type = self.default_merge_type + self.scatter_type = self.default_scatter_type + + def copy(self, copied_step): + copied_step_input = WorkflowStepInput(copied_step) + copied_step_input.name = self.name + copied_step_input.default_value = self.default_value + copied_step_input.default_value_set = self.default_value_set + copied_step_input.merge_type = self.merge_type + copied_step_input.scatter_type = self.scatter_type + + copied_step_input.connections = copy_list(self.connections) + return copied_step_input + + def log_str(self): + return "WorkflowStepInput[name=%s]" % ( + self.name, + ) + + class WorkflowStepConnection(object): # Constant used in lieu of output_name and input_name to indicate an # implicit connection between two steps that is not dependent on a dataset @@ -4176,18 +4240,29 @@ class WorkflowStepConnection(object): def __init__(self): self.output_step_id = None self.output_name = None - self.input_step_id = None - self.input_name = None + self.input_step_input_id = None @property def non_data_connection(self): return (self.output_name == self.input_name == WorkflowStepConnection.NON_DATA_CONNECTION) + @property + def input_name(self): + return self.input_step_input.name + + @property + def input_step(self): + return self.input_step_input and self.input_step_input.workflow_step + + @property + def input_step_id(self): + input_step = self.input_step + return input_step and input_step.id + def copy(self): # TODO: handle subworkflow ids... copied_connection = WorkflowStepConnection() copied_connection.output_name = self.output_name - copied_connection.input_name = self.input_name return copied_connection diff --git a/lib/galaxy/model/mapping.py b/lib/galaxy/model/mapping.py index 0fc1203b8411..6f3e62dcd7f3 100644 --- a/lib/galaxy/model/mapping.py +++ b/lib/galaxy/model/mapping.py @@ -908,6 +908,21 @@ # Column( "input_connections", JSONType ), Column("label", Unicode(255))) + +model.WorkflowStepInput.table = Table( + "workflow_step_input", metadata, + Column("id", Integer, primary_key=True), + Column("workflow_step_id", Integer, ForeignKey("workflow_step.id"), index=True), + Column("name", Unicode(255)), + Column("merge_type", TEXT), + Column("scatter_type", TEXT), + Column("value_from", JSONType), + Column("value_from_type", TEXT), + Column("default_value", JSONType), + Column("default_value_set", Boolean), + Column("runtime_value", Boolean)) + + model.WorkflowRequestStepState.table = Table( "workflow_request_step_states", metadata, Column("id", Integer, primary_key=True), @@ -953,9 +968,8 @@ "workflow_step_connection", metadata, Column("id", Integer, primary_key=True), Column("output_step_id", Integer, ForeignKey("workflow_step.id"), index=True), - Column("input_step_id", Integer, ForeignKey("workflow_step.id"), index=True), + Column("input_step_input_id", Integer, ForeignKey("workflow_step_input.id"), index=True), Column("output_name", TEXT), - Column("input_name", TEXT), Column("input_subworkflow_step_id", Integer, ForeignKey("workflow_step.id"), index=True), ) @@ -2218,6 +2232,13 @@ def simple_mapping(model, **kwds): backref="workflow_steps") )) +mapper(model.WorkflowStepInput, model.WorkflowStepInput.table, properties=dict( + workflow_step=relation(model.WorkflowStep, + backref=backref("inputs", uselist=True), + cascade="all", + primaryjoin=(model.WorkflowStep.table.c.id == model.WorkflowStepInput.table.c.workflow_step_id)) +)) + mapper(model.WorkflowOutput, model.WorkflowOutput.table, properties=dict( workflow_step=relation(model.WorkflowStep, backref='workflow_outputs', @@ -2225,10 +2246,10 @@ def simple_mapping(model, **kwds): )) mapper(model.WorkflowStepConnection, model.WorkflowStepConnection.table, properties=dict( - input_step=relation(model.WorkflowStep, - backref="input_connections", + input_step_input=relation(model.WorkflowStepInput, + backref="connections", cascade="all", - primaryjoin=(model.WorkflowStepConnection.table.c.input_step_id == model.WorkflowStep.table.c.id)), + primaryjoin=(model.WorkflowStepConnection.table.c.input_step_input_id == model.WorkflowStepInput.table.c.id)), input_subworkflow_step=relation(model.WorkflowStep, backref=backref("parent_workflow_input_connections", uselist=True), primaryjoin=(model.WorkflowStepConnection.table.c.input_subworkflow_step_id == model.WorkflowStep.table.c.id), diff --git a/lib/galaxy/model/migrate/versions/0136_collection_and_workflow_state.py b/lib/galaxy/model/migrate/versions/0136_collection_and_workflow_state.py index 0a787a5f7bb1..d2597359041a 100644 --- a/lib/galaxy/model/migrate/versions/0136_collection_and_workflow_state.py +++ b/lib/galaxy/model/migrate/versions/0136_collection_and_workflow_state.py @@ -93,14 +93,6 @@ def upgrade(migrate_engine): for table in tables.values(): __create(table) - def nextval(table, col='id'): - if migrate_engine.name in ['postgres', 'postgresql']: - return "nextval('%s_%s_seq')" % (table, col) - elif migrate_engine.name in ['mysql', 'sqlite']: - return "null" - else: - raise Exception("Unhandled database type") - # Set default for creation to scheduled, actual mapping has new as default. workflow_invocation_step_state_column = Column("state", TrimmedString(64), default="scheduled") if migrate_engine.name in ['postgres', 'postgresql']: diff --git a/lib/galaxy/model/migrate/versions/0145_add_workflow_step_input.py b/lib/galaxy/model/migrate/versions/0145_add_workflow_step_input.py new file mode 100644 index 000000000000..bca3259b9db3 --- /dev/null +++ b/lib/galaxy/model/migrate/versions/0145_add_workflow_step_input.py @@ -0,0 +1,105 @@ +""" +Migration script for workflow step input table. +""" +from __future__ import print_function + +import logging + +from sqlalchemy import Boolean, Column, ForeignKey, Integer, MetaData, Table, TEXT + +from galaxy.model.custom_types import JSONType + +log = logging.getLogger(__name__) +metadata = MetaData() + + +def get_new_tables(): + + WorkflowStepInput_table = Table( + "workflow_step_input", metadata, + Column("id", Integer, primary_key=True), + Column("workflow_step_id", Integer, ForeignKey("workflow_step.id"), index=True), + Column("name", TEXT), + Column("merge_type", TEXT), + Column("scatter_type", TEXT), + Column("value_from", JSONType), + Column("value_from_type", TEXT), + Column("default_value", JSONType), + Column("default_value_set", Boolean, default=False), + Column("runtime_value", Boolean, default=False), + ) + + WorkflowStepConnection_table = Table( + "workflow_step_connection", metadata, + Column("id", Integer, primary_key=True), + Column("output_step_id", Integer, ForeignKey("workflow_step.id"), index=True), + Column("input_step_input_id", Integer, ForeignKey("workflow_step_input.id"), index=True), + Column("output_name", TEXT), + Column("input_subworkflow_step_id", Integer, ForeignKey("workflow_step.id"), index=True), + ) + + return [ + WorkflowStepInput_table, WorkflowStepConnection_table + ] + + +def upgrade(migrate_engine): + metadata.bind = migrate_engine + print(__doc__) + metadata.reflect() + + LegacyWorkflowStepConnection_table = Table("workflow_step_connection", metadata, autoload=True) + for index in LegacyWorkflowStepConnection_table.indexes: + index.drop() + LegacyWorkflowStepConnection_table.rename("workflow_step_connection_premigrate144") + # Try to deregister that table to work around some caching problems it seems. + LegacyWorkflowStepConnection_table.deregister() + metadata._remove_table("workflow_step_connection", metadata.schema) + + metadata.reflect() + tables = get_new_tables() + for table in tables: + __create(table) + + insert_step_inputs_cmd = \ + "INSERT INTO workflow_step_input (workflow_step_id, name) " + \ + "SELECT id, input_name FROM workflow_step_connection_premigrate144" + + migrate_engine.execute(insert_step_inputs_cmd) + + # TODO: verify order here. + insert_step_connections_cmd = \ + "INSERT INTO workflow_step_connection (output_step_id, input_step_input_id, output_name, input_subworkflow_step_id) " + \ + "SELECT wsc.output_step_id, wsi.id, wsc.output_name, wsc.input_subworkflow_step_id " + \ + "FROM workflow_step_connection_premigrate144 as wsc left outer join workflow_step_input as wsi on wsc.input_step_id = wsi.workflow_step_id and wsc.input_name = wsi.name ORDER BY wsc.id" + + migrate_engine.execute(insert_step_connections_cmd) + + +def downgrade(migrate_engine): + metadata.bind = migrate_engine + + tables = get_new_tables() + for table in tables: + __drop(table) + + metadata._remove_table("workflow_step_connection", metadata.schema) + metadata.reflect() + + # Drop new workflow invocation step and job association table and restore legacy data. + LegacyWorkflowStepConnection_table = Table("workflow_step_connection_premigrate144", metadata, autoload=True) + LegacyWorkflowStepConnection_table.rename("workflow_step_connection") + + +def __create(table): + try: + table.create() + except Exception: + log.exception("Creating %s table failed.", table.name) + + +def __drop(table): + try: + table.drop() + except Exception: + log.exception("Dropping %s table failed.", table.name) diff --git a/lib/galaxy/webapps/tool_shed/model/__init__.py b/lib/galaxy/webapps/tool_shed/model/__init__.py index e2720dc25186..19c1d60460d5 100644 --- a/lib/galaxy/webapps/tool_shed/model/__init__.py +++ b/lib/galaxy/webapps/tool_shed/model/__init__.py @@ -479,10 +479,39 @@ def __init__(self): self.tool_inputs = None self.tool_errors = None self.position = None - self.input_connections = [] + self.inputs = [] self.config = None self.label = None + self._input_connections_by_name = None + + def get_or_add_input(self, input_name): + for step_input in self.inputs: + if step_input.name == input_name: + return step_input + + step_input = WorkflowStepInput() + step_input.workflow_step = self + step_input.name = input_name + self.inputs.append(step_input) + return step_input + + @property + def input_connections(self): + connections = [] + for step_input in self.inputs: + for connection in step_input.connections: + connections.append(connection) + return connections + + +class WorkflowStepInput(object): + + def __init__(self): + self.id = None + self.name = None + self.connections = [] + class WorkflowStepConnection(object): diff --git a/lib/galaxy/workflow/extract.py b/lib/galaxy/workflow/extract.py index 44bdda300a21..bf2f41657871 100644 --- a/lib/galaxy/workflow/extract.py +++ b/lib/galaxy/workflow/extract.py @@ -124,10 +124,10 @@ def extract_steps(trans, history=None, job_ids=None, dataset_ids=None, dataset_c else: log.info("Cannot find implicit input collection for %s" % input_name) if other_hid in hid_to_output_pair: + step_input = step.get_or_add_input(input_name) other_step, other_name = hid_to_output_pair[other_hid] conn = model.WorkflowStepConnection() - conn.input_step = step - conn.input_name = input_name + conn.input_step_input = step_input # Should always be connected to an earlier step conn.output_step = other_step conn.output_name = other_name diff --git a/lib/galaxy/workflow/modules.py b/lib/galaxy/workflow/modules.py index 9743d4f42c9a..0865345fc67a 100644 --- a/lib/galaxy/workflow/modules.py +++ b/lib/galaxy/workflow/modules.py @@ -197,7 +197,7 @@ def get_runtime_inputs(self, **kwds): """ return {} - def compute_runtime_state(self, trans, step_updates=None): + def compute_runtime_state(self, trans, step=None, step_updates=None): """ Determine the runtime state (potentially different from self.state which describes configuration state). This (again unlike self.state) is currently always a `DefaultToolState` object. @@ -211,6 +211,21 @@ def compute_runtime_state(self, trans, step_updates=None): """ state = self.get_runtime_state() step_errors = {} + + if step is not None: + + def update_value(input, context, prefixed_name, **kwargs): + step_input = step.get_input(prefixed_name) + if step_input is None: + return NO_REPLACEMENT + + if step_input.default_value_set: + return step_input.default_value + + return NO_REPLACEMENT + + visit_input_values(self.get_runtime_inputs(), state.inputs, update_value, no_replacement_value=NO_REPLACEMENT) + if step_updates: def update_value(input, context, prefixed_name, **kwargs): @@ -972,7 +987,7 @@ def recover_state(self, state, **kwds): """ super(ToolModule, self).recover_state(state, **kwds) if kwds.get("fill_defaults", False) and self.tool: - self.compute_runtime_state(self.trans, step_updates=None) + self.compute_runtime_state(self.trans, step=None, step_updates=None) self.augment_tool_state_for_input_connections(**kwds) self.tool.check_and_update_param_values(self.state.inputs, self.trans, workflow_building_mode=True) @@ -1042,14 +1057,14 @@ def get_runtime_state(self): def get_runtime_inputs(self, **kwds): return self.get_inputs() - def compute_runtime_state(self, trans, step_updates=None): + def compute_runtime_state(self, trans, step, step_updates=None): # Warning: This method destructively modifies existing step state. if self.tool: step_errors = {} state = self.state self.runtime_post_job_actions = {} + state, step_errors = super(ToolModule, self).compute_runtime_state(trans, step, step_updates) if step_updates: - state, step_errors = super(ToolModule, self).compute_runtime_state(trans, step_updates) self.runtime_post_job_actions = step_updates.get(RUNTIME_POST_JOB_ACTIONS_KEY, {}) step_metadata_runtime_state = self.__step_meta_runtime_state() if step_metadata_runtime_state: @@ -1404,7 +1419,7 @@ def inject(self, step, step_args=None, steps=None, **kwargs): subworkflow = step.subworkflow populate_module_and_state(self.trans, subworkflow, param_map=unjsonified_subworkflow_param_map) - state, step_errors = module.compute_runtime_state(self.trans, step_args) + state, step_errors = module.compute_runtime_state(self.trans, step, step_args) step.state = state # Fix any missing parameters diff --git a/lib/tool_shed/util/workflow_util.py b/lib/tool_shed/util/workflow_util.py index 9de41eb748b2..976220cfc3e4 100644 --- a/lib/tool_shed/util/workflow_util.py +++ b/lib/tool_shed/util/workflow_util.py @@ -306,13 +306,12 @@ def get_workflow_from_dict(trans, workflow_dict, tools_metadata, repository_id, # Input connections. for input_name, conn_dict in step.temp_input_connections.items(): if conn_dict: + step_input = step.get_or_add_input(input_name) output_step = steps_by_external_id[conn_dict['id']] conn = trans.model.WorkflowStepConnection() - conn.input_step = step - conn.input_name = input_name + conn.input_step_input = step_input conn.output_step = output_step conn.output_name = conn_dict['output_name'] - step.input_connections.append(conn) del step.temp_input_connections # Order the steps if possible. attach_ordered_steps(workflow, steps) diff --git a/test/api/test_workflows.py b/test/api/test_workflows.py index 3b013564e4af..56c98e0956d6 100644 --- a/test/api/test_workflows.py +++ b/test/api/test_workflows.py @@ -1394,13 +1394,14 @@ def test_subworkflow_recover_mapping_2(self): steps: random_lines: tool_id: random_lines1 - state: - num_lines: 2 - input: - $link: inner_input - seed_source: - seed_source_selector: set_seed - seed: asdf + in: + input: inner_input + num_lines: + default: 2 + seed_source|seed_source_selector: + default: set_seed + seed_source|seed: + default: asdf split: tool_id: split in: @@ -1419,7 +1420,7 @@ def test_subworkflow_recover_mapping_2(self): outer_input: value: 1.bed type: File -""", history_id=history_id, wait=True, round_trip_format_conversion=True) +""", history_id=history_id, wait=True) self.assertEqual("chr6\t108722976\t108723115\tCCDS5067.1_cds_0_0_chr6_108722977_f\t0\t+\nchrX\t152691446\t152691471\tCCDS14735.1_cds_0_0_chrX_152691447_f\t0\t+\n", self.dataset_populator.get_history_dataset_content(history_id)) @skip_without_tool("cat_list") diff --git a/test/unit/test_galaxy_mapping.py b/test/unit/test_galaxy_mapping.py index 9bddc973a22f..fcadf9095c04 100644 --- a/test/unit/test_galaxy_mapping.py +++ b/test/unit/test_galaxy_mapping.py @@ -485,6 +485,11 @@ def workflow_from_steps(steps): workflow_step_2.type = "subworkflow" workflow_step_2.subworkflow = child_workflow + workflow_step_1_input_1 = workflow_step_1.get_or_add_input("moo1") + workflow_step_1_input_2 = workflow_step_1.get_or_add_input("moo2") + workflow_step_2_input = workflow_step_2.get_or_add_input("moo") + workflow_step_1.add_connection("foo", "cow", workflow_step_2) + workflow = workflow_from_steps([workflow_step_1, workflow_step_2]) self.persist(workflow) diff --git a/test/unit/workflows/test_modules.py b/test/unit/workflows/test_modules.py index 1394a39645e8..f382c5c47a45 100644 --- a/test/unit/workflows/test_modules.py +++ b/test/unit/workflows/test_modules.py @@ -166,25 +166,28 @@ def test_tool_version_same(): label: "input2" - type: "tool" tool_id: "cat1" - input_connections: - - input_name: "input1" - "@output_step": 0 - output_name: "output" + inputs: + input1: + connections: + - "@output_step": 0 + output_name: "output" - type: "tool" tool_id: "cat1" - input_connections: - - input_name: "input1" - "@output_step": 0 - output_name: "output" + inputs: + input1: + connections: + - "@output_step": 0 + output_name: "output" workflow_outputs: - output_name: "out_file1" label: "out1" - type: "tool" tool_id: "cat1" - input_connections: - - input_name: "input1" - "@output_step": 2 - output_name: "out_file1" + inputs: + input1: + connections: + - "@output_step": 2 + output_name: "out_file1" workflow_outputs: - output_name: "out_file1" """ diff --git a/test/unit/workflows/test_render.py b/test/unit/workflows/test_render.py index b7afcd28cd16..04ee10c791cf 100644 --- a/test/unit/workflows/test_render.py +++ b/test/unit/workflows/test_render.py @@ -6,28 +6,28 @@ - type: "data_input" order_index: 0 tool_inputs: {"name": "input1"} - input_connections: [] position: {"top": 3, "left": 3} - type: "data_input" order_index: 1 tool_inputs: {"name": "input2"} - input_connections: [] position: {"top": 6, "left": 4} - type: "tool" tool_id: "cat1" order_index: 2 - input_connections: - - input_name: "input1" - "@output_step": 0 - output_name: "di1" + inputs: + input1: + connection: + - "@output_step": 0 + output_name: "di1" position: {"top": 13, "left": 10} - type: "tool" tool_id: "cat1" order_index: 3 - input_connections: - - input_name: "input1" - "@output_step": 0 - output_name: "di1" + inputs: + input1: + connection: + - "@output_step": 0 + output_name: "di1" position: {"top": 33, "left": 103} """ diff --git a/test/unit/workflows/test_workflow_progress.py b/test/unit/workflows/test_workflow_progress.py index 1cede6d09aae..028483f1640c 100644 --- a/test/unit/workflows/test_workflow_progress.py +++ b/test/unit/workflows/test_workflow_progress.py @@ -12,22 +12,25 @@ tool_inputs: {"name": "input2"} - type: "tool" tool_id: "cat1" - input_connections: - - input_name: "input1" - "@output_step": 0 - output_name: "output" + inputs: + "input1": + connections: + - "@output_step": 0 + output_name: "output" - type: "tool" tool_id: "cat1" - input_connections: - - input_name: "input1" - "@output_step": 0 - output_name: "output" + inputs: + input1: + connections: + - "@output_step": 0 + output_name: "output" - type: "tool" tool_id: "cat1" - input_connections: - - input_name: "input1" - "@output_step": 2 - output_name: "out_file1" + inputs: + "input1": + connections: + - "@output_step": 2 + output_name: "out_file1" """ TEST_SUBWORKFLOW_YAML = """ @@ -41,15 +44,17 @@ tool_inputs: {"name": "inner_input"} - type: "tool" tool_id: "cat1" - input_connections: - - input_name: "input1" - "@output_step": 0 - output_name: "output" - input_connections: - - input_name: "inner_input" - "@output_step": 0 - output_name: "output" - "@input_subworkflow_step": 0 + inputs: + "input1": + connections: + - "@output_step": 0 + output_name: "output" + inputs: + inner_input: + connections: + - "@output_step": 0 + output_name: "output" + "@input_subworkflow_step": 0 """ UNSCHEDULED_STEP = object() diff --git a/test/unit/workflows/workflow_support.py b/test/unit/workflows/workflow_support.py index 6f4f567cb66d..1e1392cfd94c 100644 --- a/test/unit/workflows/workflow_support.py +++ b/test/unit/workflows/workflow_support.py @@ -99,20 +99,28 @@ def yaml_to_model(has_dict, id_offset=100): for key, value in step.items(): if key == "input_connections": - connections = [] - for conn_dict in value: - conn = model.WorkflowStepConnection() - for conn_key, conn_value in conn_dict.items(): - if conn_key == "@output_step": - target_step = workflow.steps[conn_value] - conn_value = target_step - conn_key = "output_step" - if conn_key == "@input_subworkflow_step": - conn_value = step["subworkflow"].step_by_index(conn_value) - conn_key = "input_subworkflow_step" - setattr(conn, conn_key, conn_value) - connections.append(conn) - value = connections + raise NotImplementedError() + if key == "inputs": + inputs = [] + for input_name, input_def in value.items(): + step_input = model.WorkflowStepInput() + step_input.name = input_name + connections = [] + for conn_dict in input_def.get("connections", []): + conn = model.WorkflowStepConnection() + for conn_key, conn_value in conn_dict.items(): + if conn_key == "@output_step": + target_step = workflow.steps[conn_value] + conn_value = target_step + conn_key = "output_step" + if conn_key == "@input_subworkflow_step": + conn_value = step["subworkflow"].step_by_index(conn_value) + conn_key = "input_subworkflow_step" + setattr(conn, conn_key, conn_value) + connections.append(conn) + step_input.connections = connections + inputs.append(step_input) + value = inputs if key == "workflow_outputs": value = [partial(_dict_to_workflow_output, workflow_step)(_) for _ in value] setattr(workflow_step, key, value)