From 3c451b5503901c9aebb849bebe0197542238a39d Mon Sep 17 00:00:00 2001 From: Luc Peterson Date: Tue, 24 Aug 2021 15:50:40 -0700 Subject: [PATCH] Modifying task id to include directory --- merlin/common/tasks.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/merlin/common/tasks.py b/merlin/common/tasks.py index f55395fcd..8d27e0e2c 100644 --- a/merlin/common/tasks.py +++ b/merlin/common/tasks.py @@ -31,6 +31,8 @@ """Test tasks.""" from __future__ import absolute_import, unicode_literals +from datetime import datetime + import logging import os @@ -70,6 +72,11 @@ STOP_COUNTDOWN = 60 +def log_result(result, step_dir, result_file): + now = datetime.now().strftime("%c") + cmd = f"flock --timeout 60 {result_file} echo '{now},{step_dir},{result}' >> {result_file}" + _ = os.system(cmd) + @shared_task( bind=True, autoretry_for=retry_exceptions, @@ -104,6 +111,7 @@ def merlin_step(self, *args, **kwargs): self.max_retries = step.max_retries step_name = step.name() step_dir = step.get_workspace() + LOG.error(self.request.id) LOG.debug(f"merlin_step: step_name '{step_name}' step_dir '{step_dir}'") finished_filename = os.path.join(step_dir, "MERLIN_FINISHED") # if we've already finished this task, skip it @@ -112,6 +120,7 @@ def merlin_step(self, *args, **kwargs): result = ReturnCode.OK else: result = step.execute(config) + log_result(result, step_dir, f'{step_dir}/../results_log.txt') if result == ReturnCode.OK: LOG.info(f"Step '{step_name}' in '{step_dir}' finished successfully.") # touch a file indicating we're done with this step @@ -296,6 +305,7 @@ def add_merlin_expanded_chain_to_chord( adapter_config=adapter_config, ) new_step.set(queue=step.get_task_queue()) + new_step.set(task_id=os.path.join(workspace, relative_paths[sample_id])) new_chain.append(new_step) all_chains.append(new_chain) @@ -355,7 +365,8 @@ def add_simple_chain_to_chord(self, task_type, chain_, adapter_config): new_steps = [ task_type.s(step, adapter_config=adapter_config).set( - queue=step.get_task_queue() + queue=step.get_task_queue(), + task_id=step.get_workspace(), ) ] all_chains.append(new_steps)