From 4226b961957668f33ca24e98e16272da9f9858d3 Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Wed, 17 Jan 2018 13:49:42 +0100 Subject: [PATCH] Add option to replace failed elements on job rerun Replacing failed collection elements was already possible when dependent jobs were found (https://github.com/galaxyproject/galaxy/pull/5247). This commit restructures the remapping so that remapping is possible when no dependent jobs are available. This also simplifies the replacement of HDAs between old and new jobs. --- client/galaxy/scripts/mvc/tool/tool-form.js | 12 +++- lib/galaxy/tools/__init__.py | 3 + lib/galaxy/tools/actions/__init__.py | 61 ++++++++++++--------- 3 files changed, 47 insertions(+), 29 deletions(-) diff --git a/client/galaxy/scripts/mvc/tool/tool-form.js b/client/galaxy/scripts/mvc/tool/tool-form.js index 3c027a0cbb3b..931e091e2cc6 100644 --- a/client/galaxy/scripts/mvc/tool/tool-form.js +++ b/client/galaxy/scripts/mvc/tool/tool-form.js @@ -136,16 +136,22 @@ var View = Backbone.View.extend({ // remap feature if (options.job_id && options.job_remap) { + if (options.job_remap === "job_produced_collection_elements") { + var label = "Replace elements in collection ?"; + var help = "The previous run of this tool failed. Use this option to replace the failed element(s) in the dataset collectio that were produced during the previous tool run."; + } else { + var label = "Resume dependencies from this job ?"; + var help = "The previous run of this tool failed and other tools were waiting for it to finish successfully. Use this option to resume those tools using the new output(s) of this tool run."; + } options.inputs.push({ - label: "Resume dependencies from this job", + label: label, name: "rerun_remap_job_id", type: "select", display: "radio", ignore: "__ignore__", value: "__ignore__", options: [["Yes", options.job_id], ["No", "__ignore__"]], - help: - "The previous run of this tool failed and other tools were waiting for it to finish successfully. Use this option to resume those tools using the new output(s) of this tool run." + help: help, }); } diff --git a/lib/galaxy/tools/__init__.py b/lib/galaxy/tools/__init__.py index d9b3dc74dfc6..d334598d0ed8 100755 --- a/lib/galaxy/tools/__init__.py +++ b/lib/galaxy/tools/__init__.py @@ -1890,6 +1890,9 @@ def _get_job_remap(self, job): try: if [hda.dependent_jobs for hda in [jtod.dataset for jtod in job.output_datasets] if hda.dependent_jobs]: return True + elif job.output_dataset_collection_instances: + # We'll want to replace this item + return 'job_produced_collection_elements' except Exception as exception: log.error(str(exception)) pass diff --git a/lib/galaxy/tools/actions/__init__.py b/lib/galaxy/tools/actions/__init__.py index c248141f4334..75da4a3199e1 100644 --- a/lib/galaxy/tools/actions/__init__.py +++ b/lib/galaxy/tools/actions/__init__.py @@ -534,43 +534,52 @@ def _remap_job_on_rerun(self, trans, galaxy_session, rerun_remap_job_id, current for p in old_job.parameters: if p.name.endswith('|__identifier__'): current_job.parameters.append(p.copy()) - remapped_hdas = {} - input_hdcas = set() + remapped_hdas = self.__remap_data_inputs(old_job=old_job, current_job=current_job) for jtod in old_job.output_datasets: for (job_to_remap, jtid) in [(jtid.job, jtid) for jtid in jtod.dataset.dependent_jobs]: if (trans.user is not None and job_to_remap.user_id == trans.user.id) or ( trans.user is None and job_to_remap.session_id == galaxy_session.id): - if job_to_remap.state == job_to_remap.states.PAUSED: - job_to_remap.state = job_to_remap.states.NEW - for hda in [dep_jtod.dataset for dep_jtod in job_to_remap.output_datasets]: - if hda.state == hda.states.PAUSED: - hda.state = hda.states.NEW - hda.info = None - input_values = dict([(p.name, json.loads(p.value)) for p in job_to_remap.parameters]) - remapped_hdas[jtod.dataset] = out_data[jtod.name] - for jtidca in job_to_remap.input_dataset_collections: - input_hdcas.add(jtidca.dataset_collection) - old_dataset_id = jtod.dataset_id - new_dataset_id = out_data[jtod.name].id - input_values = update_dataset_ids(input_values, {old_dataset_id: new_dataset_id}, src='hda') - for p in job_to_remap.parameters: - p.value = json.dumps(input_values[p.name]) - jtid.dataset = out_data[jtod.name] - jtid.dataset.hid = jtod.dataset.hid - log.info('Job %s input HDA %s remapped to new HDA %s' % (job_to_remap.id, jtod.dataset.id, jtid.dataset.id)) + self.__remap_parameters(job_to_remap, jtid, jtod, out_data) trans.sa_session.add(job_to_remap) trans.sa_session.add(jtid) - for hdca in input_hdcas: - hdca.collection.replace_failed_elements(remapped_hdas) - if hdca.implicit_collection_jobs: - for job in hdca.implicit_collection_jobs.jobs: - if job.job_id == old_job.id: - job.job_id = current_job.id jtod.dataset.visible = False trans.sa_session.add(jtod) + for jtodc in old_job.output_dataset_collection_instances: + hdca = jtodc.dataset_collection_instance + hdca.collection.replace_failed_elements(remapped_hdas) + if hdca.implicit_collection_jobs: + for job in hdca.implicit_collection_jobs.jobs: + if job.job_id == old_job.id: + job.job_id = current_job.id except Exception: log.exception('Cannot remap rerun dependencies.') + def __remap_data_inputs(self, old_job, current_job): + """Record output datasets from old_job and build a dictionary that maps the old output HDAs to the new output HDAs.""" + remapped_hdas = {} + old_output_datasets = {jtod.name: jtod.dataset for jtod in old_job.output_datasets} + for jtod in current_job.output_datasets: + remapped_hdas[old_output_datasets[jtod.name]] = jtod.dataset + return remapped_hdas + + def __remap_parameters(self, job_to_remap, jtid, jtod, out_data): + if job_to_remap.state == job_to_remap.states.PAUSED: + job_to_remap.state = job_to_remap.states.NEW + for hda in [dep_jtod.dataset for dep_jtod in job_to_remap.output_datasets]: + if hda.state == hda.states.PAUSED: + hda.state = hda.states.NEW + hda.info = None + input_values = dict([(p.name, json.loads(p.value)) for p in job_to_remap.parameters]) + old_dataset_id = jtod.dataset_id + new_dataset_id = out_data[jtod.name].id + input_values = update_dataset_ids(input_values, {old_dataset_id: new_dataset_id}, src='hda') + for p in job_to_remap.parameters: + p.value = json.dumps(input_values[p.name]) + jtid.dataset = out_data[jtod.name] + jtid.dataset.hid = jtod.dataset.hid + log.info('Job %s input HDA %s remapped to new HDA %s' % (job_to_remap.id, jtod.dataset.id, jtid.dataset.id)) + + def _wrapped_params(self, trans, tool, incoming, input_datasets=None): wrapped_params = WrappedParameters(trans, tool, incoming, input_datasets=input_datasets) return wrapped_params