Skip to content

Commit

Permalink
Add option to replace failed elements on job rerun
Browse files Browse the repository at this point in the history
Replacing failed collection elements was already possible when dependent jobs
were found (galaxyproject#5247).
This commit restructures the remapping so that remapping is possible when no
dependent jobs are available. This also simplifies the replacement of HDAs
between old and new jobs.
  • Loading branch information
mvdbeek committed Jan 17, 2018
1 parent a2f8ff2 commit 4226b96
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 29 deletions.
12 changes: 9 additions & 3 deletions client/galaxy/scripts/mvc/tool/tool-form.js
Original file line number Diff line number Diff line change
Expand Up @@ -136,16 +136,22 @@ var View = Backbone.View.extend({

// remap feature
if (options.job_id && options.job_remap) {
if (options.job_remap === "job_produced_collection_elements") {
var label = "Replace elements in collection ?";
var help = "The previous run of this tool failed. Use this option to replace the failed element(s) in the dataset collectio that were produced during the previous tool run.";
} else {
var label = "Resume dependencies from this job ?";
var help = "The previous run of this tool failed and other tools were waiting for it to finish successfully. Use this option to resume those tools using the new output(s) of this tool run.";
}
options.inputs.push({
label: "Resume dependencies from this job",
label: label,
name: "rerun_remap_job_id",
type: "select",
display: "radio",
ignore: "__ignore__",
value: "__ignore__",
options: [["Yes", options.job_id], ["No", "__ignore__"]],
help:
"The previous run of this tool failed and other tools were waiting for it to finish successfully. Use this option to resume those tools using the new output(s) of this tool run."
help: help,
});
}

Expand Down
3 changes: 3 additions & 0 deletions lib/galaxy/tools/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1890,6 +1890,9 @@ def _get_job_remap(self, job):
try:
if [hda.dependent_jobs for hda in [jtod.dataset for jtod in job.output_datasets] if hda.dependent_jobs]:
return True
elif job.output_dataset_collection_instances:
# We'll want to replace this item
return 'job_produced_collection_elements'
except Exception as exception:
log.error(str(exception))
pass
Expand Down
61 changes: 35 additions & 26 deletions lib/galaxy/tools/actions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -534,43 +534,52 @@ def _remap_job_on_rerun(self, trans, galaxy_session, rerun_remap_job_id, current
for p in old_job.parameters:
if p.name.endswith('|__identifier__'):
current_job.parameters.append(p.copy())
remapped_hdas = {}
input_hdcas = set()
remapped_hdas = self.__remap_data_inputs(old_job=old_job, current_job=current_job)
for jtod in old_job.output_datasets:
for (job_to_remap, jtid) in [(jtid.job, jtid) for jtid in jtod.dataset.dependent_jobs]:
if (trans.user is not None and job_to_remap.user_id == trans.user.id) or (
trans.user is None and job_to_remap.session_id == galaxy_session.id):
if job_to_remap.state == job_to_remap.states.PAUSED:
job_to_remap.state = job_to_remap.states.NEW
for hda in [dep_jtod.dataset for dep_jtod in job_to_remap.output_datasets]:
if hda.state == hda.states.PAUSED:
hda.state = hda.states.NEW
hda.info = None
input_values = dict([(p.name, json.loads(p.value)) for p in job_to_remap.parameters])
remapped_hdas[jtod.dataset] = out_data[jtod.name]
for jtidca in job_to_remap.input_dataset_collections:
input_hdcas.add(jtidca.dataset_collection)
old_dataset_id = jtod.dataset_id
new_dataset_id = out_data[jtod.name].id
input_values = update_dataset_ids(input_values, {old_dataset_id: new_dataset_id}, src='hda')
for p in job_to_remap.parameters:
p.value = json.dumps(input_values[p.name])
jtid.dataset = out_data[jtod.name]
jtid.dataset.hid = jtod.dataset.hid
log.info('Job %s input HDA %s remapped to new HDA %s' % (job_to_remap.id, jtod.dataset.id, jtid.dataset.id))
self.__remap_parameters(job_to_remap, jtid, jtod, out_data)
trans.sa_session.add(job_to_remap)
trans.sa_session.add(jtid)
for hdca in input_hdcas:
hdca.collection.replace_failed_elements(remapped_hdas)
if hdca.implicit_collection_jobs:
for job in hdca.implicit_collection_jobs.jobs:
if job.job_id == old_job.id:
job.job_id = current_job.id
jtod.dataset.visible = False
trans.sa_session.add(jtod)
for jtodc in old_job.output_dataset_collection_instances:
hdca = jtodc.dataset_collection_instance
hdca.collection.replace_failed_elements(remapped_hdas)
if hdca.implicit_collection_jobs:
for job in hdca.implicit_collection_jobs.jobs:
if job.job_id == old_job.id:
job.job_id = current_job.id
except Exception:
log.exception('Cannot remap rerun dependencies.')

def __remap_data_inputs(self, old_job, current_job):
"""Record output datasets from old_job and build a dictionary that maps the old output HDAs to the new output HDAs."""
remapped_hdas = {}
old_output_datasets = {jtod.name: jtod.dataset for jtod in old_job.output_datasets}
for jtod in current_job.output_datasets:
remapped_hdas[old_output_datasets[jtod.name]] = jtod.dataset
return remapped_hdas

def __remap_parameters(self, job_to_remap, jtid, jtod, out_data):
if job_to_remap.state == job_to_remap.states.PAUSED:
job_to_remap.state = job_to_remap.states.NEW
for hda in [dep_jtod.dataset for dep_jtod in job_to_remap.output_datasets]:
if hda.state == hda.states.PAUSED:
hda.state = hda.states.NEW
hda.info = None
input_values = dict([(p.name, json.loads(p.value)) for p in job_to_remap.parameters])
old_dataset_id = jtod.dataset_id
new_dataset_id = out_data[jtod.name].id
input_values = update_dataset_ids(input_values, {old_dataset_id: new_dataset_id}, src='hda')
for p in job_to_remap.parameters:
p.value = json.dumps(input_values[p.name])
jtid.dataset = out_data[jtod.name]
jtid.dataset.hid = jtod.dataset.hid
log.info('Job %s input HDA %s remapped to new HDA %s' % (job_to_remap.id, jtod.dataset.id, jtid.dataset.id))


def _wrapped_params(self, trans, tool, incoming, input_datasets=None):
wrapped_params = WrappedParameters(trans, tool, incoming, input_datasets=input_datasets)
return wrapped_params
Expand Down

0 comments on commit 4226b96

Please sign in to comment.