Skip to content

Commit

Permalink
Track workflow step input definitions in our model.
Browse files Browse the repository at this point in the history
We don't track workflow step inputs in any formal way in our model currently. This has resulted in some current hacks and prevents future enhancements. This commit splits WorkflowStepConnection into two models WorkflowStepInput and WorkflowStepConnection - normalizing the previous table workflow_step_connection on input step and input name.

In terms of current hacks forced on it by restricting all of tool state to be confined to a big JSON blob in the database - we have problems distinguishing keys and values when walking tool state. As we store more and more JSON blobs inside of the giant tool state blob - the worse this problem gets. Take for instance checking for runtime parameters or the rules parameter values - these both use JSON blobs that aren't simple values, so it is hard to tell looking at the tool state blob in the database or the workflow export to tell what is a key or what is a value. Tracking state as normalized inputs with default values and explicit attributes runtime values should allow much more percise state definition and construction.

This variant of the models would also potentially allow defining runtime values with non-tool default values (so default values defined for the workflow but still explicitly settable at runtime). The combinations of overriding defaults and defining runtime values were not representable before.

In terms of future enhancements, there is a lot we cannot track with the current models - such as map/reduce options for collection operations (galaxyproject#4623 (comment)). This should enable a lot of that. Obviously there are a lot of attributes defined here that are not yet utilized, but I'm using most (all?) of them downstream in the CWL branch. I'd rather populate this table fully realized and fill in the implementation around it as work continues to stream in from the CWL branch - to keep things simple and avoid extra database migrations. But I understand if this feels like speculative complexity we want to avoid despite the implementation being readily available for inspection downstream.
  • Loading branch information
jmchilton committed Oct 13, 2018
1 parent cb4f615 commit c8b433b
Show file tree
Hide file tree
Showing 11 changed files with 275 additions and 84 deletions.
6 changes: 4 additions & 2 deletions lib/galaxy/managers/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -1044,14 +1044,16 @@ def __connect_workflow_steps(self, steps, steps_by_external_id):
continue
if not isinstance(conn_list, list): # Older style singleton connection
conn_list = [conn_list]

for conn_dict in conn_list:
step_input = step.get_or_add_input(input_name)

if 'output_name' not in conn_dict or 'id' not in conn_dict:
template = "Invalid connection [%s] - must be dict with output_name and id fields."
message = template % conn_dict
raise exceptions.MessageException(message)
conn = model.WorkflowStepConnection()
conn.input_step = step
conn.input_name = input_name
conn.input_step_input = step_input
conn.output_name = conn_dict['output_name']
external_id = conn_dict['id']
if external_id not in steps_by_external_id:
Expand Down
73 changes: 65 additions & 8 deletions lib/galaxy/model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4058,13 +4058,31 @@ def __init__(self):
self.tool_inputs = None
self.tool_errors = None
self.position = None
self.input_connections = []
self.inputs = []
self.config = None
self.label = None
self.uuid = uuid4()
self.workflow_outputs = []
self._input_connections_by_name = None

def get_or_add_input(self, input_name):
for step_input in self.inputs:
if step_input.name == input_name:
return step_input

step_input = WorkflowStepInput()
step_input.workflow_step = self
step_input.name = input_name
return step_input

@property
def input_connections(self):
connections = []
for step_input in self.inputs:
for connection in step_input.connections:
connections.append(connection)
return connections

@property
def unique_workflow_outputs(self):
# Older Galaxy workflows may have multiple WorkflowOutputs
Expand Down Expand Up @@ -4138,7 +4156,7 @@ def copy_to(self, copied_step, step_mapping):
copied_step.position = self.position
copied_step.config = self.config
copied_step.label = self.label
copied_step.input_connections = copy_list(self.input_connections)
copied_step.inputs = copy_list(self.inputs)

subworkflow_step_mapping = {}
subworkflow = self.subworkflow
Expand All @@ -4149,8 +4167,7 @@ def copy_to(self, copied_step, step_mapping):
subworkflow_step_mapping[subworkflow_step.id] = copied_subworkflow_step

for old_conn, new_conn in zip(self.input_connections, copied_step.input_connections):
# new_conn.input_step = new_
new_conn.input_step = step_mapping[old_conn.input_step_id]
new_conn.input_step_input = copied_step.get_or_add_input(old_conn.input_name)
new_conn.output_step = step_mapping[old_conn.output_step_id]
if old_conn.input_subworkflow_step_id:
new_conn.input_subworkflow_step = subworkflow_step_mapping[old_conn.input_subworkflow_step_id]
Expand All @@ -4165,6 +4182,35 @@ def log_str(self):
return "WorkflowStep[index=%d,type=%s]" % (self.order_index, self.type)


class WorkflowStepInput(object):
default_merge_type = None
default_scatter_type = None

def __init__(self):
self.id = None
self.name = None
self.default_value = None
self.default_value_set = False
self.merge_type = self.default_merge_type
self.scatter_type = self.default_scatter_type

def copy(self):
copied_step_input = WorkflowStepInput()
copied_step_input.name = self.name
copied_step_input.default_value = self.default_value
copied_step_input.default_value_set = self.default_value_set
copied_step_input.merge_type = self.merge_type
copied_step_input.scatter_type = self.scatter_type

copied_step_input.connections = copy_list(self.connections)
return copied_step_input

def log_str(self):
return "WorkflowStepInput[name=%s]" % (
self.name,
)


class WorkflowStepConnection(object):
# Constant used in lieu of output_name and input_name to indicate an
# implicit connection between two steps that is not dependent on a dataset
Expand All @@ -4176,23 +4222,34 @@ class WorkflowStepConnection(object):
def __init__(self):
self.output_step_id = None
self.output_name = None
self.input_step_id = None
self.input_name = None
self.input_step_input_id = None

def set_non_data_connection(self):
self.output_name = WorkflowStepConnection.NON_DATA_CONNECTION
self.input_name = WorkflowStepConnection.NON_DATA_CONNECTION
raise NotImplementedError("implement adding step here somehow...")

@property
def non_data_connection(self):
return (self.output_name == WorkflowStepConnection.NON_DATA_CONNECTION and
self.input_name == WorkflowStepConnection.NON_DATA_CONNECTION)

@property
def input_name(self):
return self.input_step_input.name

@property
def input_step(self):
return self.input_step_input and self.input_step_input.workflow_step

@property
def input_step_id(self):
input_step = self.input_step
return input_step and input_step.id

def copy(self):
# TODO: handle subworkflow ids...
copied_connection = WorkflowStepConnection()
copied_connection.output_name = self.output_name
copied_connection.input_name = self.input_name
return copied_connection


Expand Down
29 changes: 24 additions & 5 deletions lib/galaxy/model/mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -908,6 +908,21 @@
# Column( "input_connections", JSONType ),
Column("label", Unicode(255)))


model.WorkflowStepInput.table = Table(
"workflow_step_input", metadata,
Column("id", Integer, primary_key=True),
Column("workflow_step_id", Integer, ForeignKey("workflow_step.id"), index=True),
Column("name", Unicode(255)),
Column("merge_type", TEXT),
Column("scatter_type", TEXT),
Column("value_from", JSONType),
Column("value_from_type", TEXT),
Column("default_value", JSONType),
Column("default_value_set", Boolean),
Column("runtime_value", Boolean))


model.WorkflowRequestStepState.table = Table(
"workflow_request_step_states", metadata,
Column("id", Integer, primary_key=True),
Expand Down Expand Up @@ -953,9 +968,8 @@
"workflow_step_connection", metadata,
Column("id", Integer, primary_key=True),
Column("output_step_id", Integer, ForeignKey("workflow_step.id"), index=True),
Column("input_step_id", Integer, ForeignKey("workflow_step.id"), index=True),
Column("input_step_input_id", Integer, ForeignKey("workflow_step_input.id"), index=True),
Column("output_name", TEXT),
Column("input_name", TEXT),
Column("input_subworkflow_step_id", Integer, ForeignKey("workflow_step.id"), index=True),
)

Expand Down Expand Up @@ -2218,17 +2232,22 @@ def simple_mapping(model, **kwds):
backref="workflow_steps")
))

mapper(model.WorkflowStepInput, model.WorkflowStepInput.table, properties=dict(
workflow_step=relation(model.WorkflowStep,
backref="inputs"),
))

mapper(model.WorkflowOutput, model.WorkflowOutput.table, properties=dict(
workflow_step=relation(model.WorkflowStep,
backref='workflow_outputs',
primaryjoin=(model.WorkflowStep.table.c.id == model.WorkflowOutput.table.c.workflow_step_id))
))

mapper(model.WorkflowStepConnection, model.WorkflowStepConnection.table, properties=dict(
input_step=relation(model.WorkflowStep,
backref="input_connections",
input_step_input=relation(model.WorkflowStepInput,
backref="connections",
cascade="all",
primaryjoin=(model.WorkflowStepConnection.table.c.input_step_id == model.WorkflowStep.table.c.id)),
primaryjoin=(model.WorkflowStepConnection.table.c.input_step_input_id == model.WorkflowStepInput.table.c.id)),
input_subworkflow_step=relation(model.WorkflowStep,
backref=backref("parent_workflow_input_connections", uselist=True),
primaryjoin=(model.WorkflowStepConnection.table.c.input_subworkflow_step_id == model.WorkflowStep.table.c.id),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,6 @@ def upgrade(migrate_engine):
for table in tables.values():
__create(table)

def nextval(table, col='id'):
if migrate_engine.name in ['postgres', 'postgresql']:
return "nextval('%s_%s_seq')" % (table, col)
elif migrate_engine.name in ['mysql', 'sqlite']:
return "null"
else:
raise Exception("Unhandled database type")

# Set default for creation to scheduled, actual mapping has new as default.
workflow_invocation_step_state_column = Column("state", TrimmedString(64), default="scheduled")
if migrate_engine.name in ['postgres', 'postgresql']:
Expand Down
105 changes: 105 additions & 0 deletions lib/galaxy/model/migrate/versions/0144_add_workflow_step_input.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
"""
Migration script for workflow step input table.
"""
from __future__ import print_function

import logging

from sqlalchemy import Boolean, Column, ForeignKey, Integer, MetaData, Table, TEXT

from galaxy.model.custom_types import JSONType

log = logging.getLogger(__name__)
metadata = MetaData()


def get_new_tables():

WorkflowStepInput_table = Table(
"workflow_step_input", metadata,
Column("id", Integer, primary_key=True),
Column("workflow_step_id", Integer, ForeignKey("workflow_step.id"), index=True),
Column("name", TEXT),
Column("merge_type", TEXT),
Column("scatter_type", TEXT),
Column("value_from", JSONType),
Column("value_from_type", TEXT),
Column("default_value", JSONType),
Column("default_value_set", Boolean, default=False),
Column("runtime_value", Boolean, default=False),
)

WorkflowStepConnection_table = Table(
"workflow_step_connection", metadata,
Column("id", Integer, primary_key=True),
Column("output_step_id", Integer, ForeignKey("workflow_step.id"), index=True),
Column("input_step_input_id", Integer, ForeignKey("workflow_step_input.id"), index=True),
Column("output_name", TEXT),
Column("input_subworkflow_step_id", Integer, ForeignKey("workflow_step.id"), index=True),
)

return [
WorkflowStepInput_table, WorkflowStepConnection_table
]


def upgrade(migrate_engine):
metadata.bind = migrate_engine
print(__doc__)
metadata.reflect()

LegacyWorkflowStepConnection_table = Table("workflow_step_connection", metadata, autoload=True)
for index in LegacyWorkflowStepConnection_table.indexes:
index.drop()
LegacyWorkflowStepConnection_table.rename("workflow_step_connection_premigrate144")
# Try to deregister that table to work around some caching problems it seems.
LegacyWorkflowStepConnection_table.deregister()
metadata._remove_table("workflow_step_connection", metadata.schema)

metadata.reflect()
tables = get_new_tables()
for table in tables:
__create(table)

insert_step_inputs_cmd = \
"INSERT INTO workflow_step_input (workflow_step_id, name) " + \
"SELECT id, input_name FROM workflow_step_connection_premigrate144"

migrate_engine.execute(insert_step_inputs_cmd)

# TODO: verify order here.
insert_step_connections_cmd = \
"INSERT INTO workflow_step_connection (output_step_id, input_step_input_id, output_name, input_subworkflow_step_id) " + \
"SELECT wsc.output_step_id, wsi.id, wsc.output_name, wsc.input_subworkflow_step_id " + \
"FROM workflow_step_connection_premigrate144 as wsc left outer join workflow_step_input as wsi on wsc.input_step_id = wsi.workflow_step_id and wsc.input_name = wsi.name ORDER BY wsc.id"

migrate_engine.execute(insert_step_connections_cmd)


def downgrade(migrate_engine):
metadata.bind = migrate_engine

tables = get_new_tables()
for table in tables:
__drop(table)

metadata._remove_table("workflow_step_connection", metadata.schema)
metadata.reflect()

# Drop new workflow invocation step and job association table and restore legacy data.
LegacyWorkflowStepConnection_table = Table("workflow_step_connection_premigrate144", metadata, autoload=True)
LegacyWorkflowStepConnection_table.rename("workflow_step_connection")


def __create(table):
try:
table.create()
except Exception:
log.exception("Creating %s table failed.", table.name)


def __drop(table):
try:
table.drop()
except Exception:
log.exception("Dropping %s table failed.", table.name)
4 changes: 2 additions & 2 deletions lib/galaxy/workflow/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,10 @@ def extract_steps(trans, history=None, job_ids=None, dataset_ids=None, dataset_c
else:
log.info("Cannot find implicit input collection for %s" % input_name)
if other_hid in hid_to_output_pair:
step_input = step.get_or_add_input(input_name)
other_step, other_name = hid_to_output_pair[other_hid]
conn = model.WorkflowStepConnection()
conn.input_step = step
conn.input_name = input_name
conn.input_step_input = step_input
# Should always be connected to an earlier step
conn.output_step = other_step
conn.output_name = other_name
Expand Down
4 changes: 2 additions & 2 deletions lib/tool_shed/util/workflow_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,10 +306,10 @@ def get_workflow_from_dict(trans, workflow_dict, tools_metadata, repository_id,
# Input connections.
for input_name, conn_dict in step.temp_input_connections.items():
if conn_dict:
step_input = step.get_or_add_input(input_name)
output_step = steps_by_external_id[conn_dict['id']]
conn = trans.model.WorkflowStepConnection()
conn.input_step = step
conn.input_name = input_name
conn.input_step_input = step_input
conn.output_step = output_step
conn.output_name = conn_dict['output_name']
step.input_connections.append(conn)
Expand Down
27 changes: 15 additions & 12 deletions test/unit/workflows/test_modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,25 +166,28 @@ def test_tool_version_same():
label: "input2"
- type: "tool"
tool_id: "cat1"
input_connections:
- input_name: "input1"
"@output_step": 0
output_name: "output"
inputs:
input1:
connections:
- "@output_step": 0
output_name: "output"
- type: "tool"
tool_id: "cat1"
input_connections:
- input_name: "input1"
"@output_step": 0
output_name: "output"
inputs:
input1:
connections:
- "@output_step": 0
output_name: "output"
workflow_outputs:
- output_name: "out_file1"
label: "out1"
- type: "tool"
tool_id: "cat1"
input_connections:
- input_name: "input1"
"@output_step": 2
output_name: "out_file1"
inputs:
input1:
connections:
- "@output_step": 2
output_name: "out_file1"
workflow_outputs:
- output_name: "out_file1"
"""
Expand Down
Loading

0 comments on commit c8b433b

Please sign in to comment.