Skip to content

Commit

Permalink
Modifying task id to include directory
Browse files Browse the repository at this point in the history
  • Loading branch information
lucpeterson committed Aug 24, 2021
1 parent 0d340e9 commit 3c451b5
Showing 1 changed file with 12 additions and 1 deletion.
13 changes: 12 additions & 1 deletion merlin/common/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
"""Test tasks."""
from __future__ import absolute_import, unicode_literals

from datetime import datetime

import logging
import os

Expand Down Expand Up @@ -70,6 +72,11 @@
STOP_COUNTDOWN = 60


def log_result(result, step_dir, result_file):
now = datetime.now().strftime("%c")
cmd = f"flock --timeout 60 {result_file} echo '{now},{step_dir},{result}' >> {result_file}"
_ = os.system(cmd)

@shared_task(
bind=True,
autoretry_for=retry_exceptions,
Expand Down Expand Up @@ -104,6 +111,7 @@ def merlin_step(self, *args, **kwargs):
self.max_retries = step.max_retries
step_name = step.name()
step_dir = step.get_workspace()
LOG.error(self.request.id)
LOG.debug(f"merlin_step: step_name '{step_name}' step_dir '{step_dir}'")
finished_filename = os.path.join(step_dir, "MERLIN_FINISHED")
# if we've already finished this task, skip it
Expand All @@ -112,6 +120,7 @@ def merlin_step(self, *args, **kwargs):
result = ReturnCode.OK
else:
result = step.execute(config)
log_result(result, step_dir, f'{step_dir}/../results_log.txt')
if result == ReturnCode.OK:
LOG.info(f"Step '{step_name}' in '{step_dir}' finished successfully.")
# touch a file indicating we're done with this step
Expand Down Expand Up @@ -296,6 +305,7 @@ def add_merlin_expanded_chain_to_chord(
adapter_config=adapter_config,
)
new_step.set(queue=step.get_task_queue())
new_step.set(task_id=os.path.join(workspace, relative_paths[sample_id]))
new_chain.append(new_step)

all_chains.append(new_chain)
Expand Down Expand Up @@ -355,7 +365,8 @@ def add_simple_chain_to_chord(self, task_type, chain_, adapter_config):

new_steps = [
task_type.s(step, adapter_config=adapter_config).set(
queue=step.get_task_queue()
queue=step.get_task_queue(),
task_id=step.get_workspace(),
)
]
all_chains.append(new_steps)
Expand Down

0 comments on commit 3c451b5

Please sign in to comment.