Skip to content

Commit

Permalink
minor refactoring in updating fail stage
Browse files Browse the repository at this point in the history
  • Loading branch information
Aman-saimbhi committed Nov 19, 2024
1 parent 0d7c3be commit 1639100
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 23 deletions.
27 changes: 8 additions & 19 deletions dispatch/send.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,12 +138,7 @@ def execute(
task_content.id,
target="",
)
if not update_fail_stage(source_folder, FailStage.DISPATCHING):
logger.error( # handle_error
f"Error updating fail stage for task {uid}",
task_content.id,
)
_move_sent_directory(task_content.id, source_folder, error_folder)
_move_sent_directory(task_content.id, source_folder, error_folder, FailStage.DISPATCHING)
_trigger_notification(task_content, mercure_events.ERROR)
return

Expand All @@ -154,12 +149,7 @@ def execute(
task_content.id,
target=target_item,
)
if not update_fail_stage(source_folder, FailStage.DISPATCHING):
logger.error( # handle_error
f"Error updating fail stage for task {uid}",
task_content.id,
)
_move_sent_directory(task_content.id, source_folder, error_folder)
_move_sent_directory(task_content.id, source_folder, error_folder, FailStage.DISPATCHING)
_trigger_notification(task_content, mercure_events.ERROR)
return

Expand Down Expand Up @@ -269,19 +259,14 @@ def execute(
else:
logger.info(f"Max retries reached, moving to {error_folder}")
monitor.send_task_event(task_event.SUSPEND, task_content.id, 0, ",".join(dispatch_info.target_name), "Max retries reached")
if not update_fail_stage(source_folder, FailStage.DISPATCHING):
logger.error( # handle_error
f"Error updating fail stage for task {uid}",
task_content.id,
)
_move_sent_directory(task_content.id, source_folder, error_folder)
_move_sent_directory(task_content.id, source_folder, error_folder, FailStage.DISPATCHING)
monitor.send_task_event(task_event.MOVE, task_content.id, 0, str(error_folder), "Moved to error folder")
monitor.send_event(m_events.PROCESSING, severity.ERROR, f"Series suspended after reaching max retries")
_trigger_notification(task_content, mercure_events.ERROR)
logger.info(f"Dispatching folder {source_folder} not successful")


def _move_sent_directory(task_id, source_folder, destination_folder) -> None:
def _move_sent_directory(task_id, source_folder, destination_folder, fail_stage=None) -> None:
"""
This check is needed if there is already a folder with the same name
in the success folder. If so a new directory is create with a timestamp
Expand All @@ -292,10 +277,14 @@ def _move_sent_directory(task_id, source_folder, destination_folder) -> None:
target_folder = destination_folder / (source_folder.name + "_" + datetime.now().isoformat())
logger.debug(f"Moving {source_folder} to {target_folder}")
shutil.move(source_folder, target_folder, copy_function=shutil.copy2)
if fail_stage and not update_fail_stage(target_folder, fail_stage):
logger.error(f"Error updating fail stage for task {task_id}")
(Path(target_folder) / mercure_names.PROCESSING).unlink()
else:
logger.debug(f"Moving {source_folder} to {destination_folder / source_folder.name}")
shutil.move(source_folder, destination_folder / source_folder.name)
if fail_stage and not update_fail_stage(destination_folder / source_folder.name, fail_stage):
logger.error(f"Error updating fail stage for task {task_id}")
(destination_folder / source_folder.name / mercure_names.PROCESSING).unlink()
except:
logger.error(f"Error moving folder {source_folder} to {destination_folder}", task_id) # handle_error
Expand Down
8 changes: 4 additions & 4 deletions process/process_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -585,9 +585,7 @@ def move_results(
lock.free()
if not processing_success:
logger.debug(f"Failing: {folder}")
if not update_fail_stage(folder, FailStage.PROCESSING):
logger.error( f"Error updating fail stage for task {task_id}")
move_out_folder(task_id, folder, Path(config.mercure.error_folder), move_all=True)
move_out_folder(task_id, folder, Path(config.mercure.error_folder), move_all=True, fail_stage=FailStage.PROCESSING)
else:
if needs_dispatching:
logger.debug(f"Dispatching: {folder}")
Expand All @@ -597,7 +595,7 @@ def move_results(
move_out_folder(task_id, folder, Path(config.mercure.success_folder))


def move_out_folder(task_id: str, source_folder: Path, destination_folder: Path, move_all=False) -> None:
def move_out_folder(task_id: str, source_folder: Path, destination_folder: Path, move_all=False, fail_stage=None) -> None:
# source_folder = Path(source_folder_str)
# destination_folder = Path(destination_folder_str)

Expand All @@ -613,6 +611,8 @@ def move_out_folder(task_id: str, source_folder: Path, destination_folder: Path,
try:
if move_all:
shutil.move(str(source_folder), target_folder)
if fail_stage and not update_fail_stage(target_folder, FailStage.PROCESSING):
logger.error( f"Error updating fail stage for task {task_id}")
else:
shutil.move(str(source_folder / "out"), target_folder)
lockfile = source_folder / mercure_names.LOCK
Expand Down

0 comments on commit 1639100

Please sign in to comment.