From dc0a83f140589a73586c20849d5d82fcb0ad6e2f Mon Sep 17 00:00:00 2001 From: Tobias Block Date: Fri, 4 Mar 2022 23:26:59 +0000 Subject: [PATCH] Improved locking mechanism. Removing former sendlogs before next dispatching retry. --- common/helper.py | 2 +- dispatch/send.py | 24 +++++++++++++++++++---- dispatch/status.py | 5 ----- dispatcher.py | 4 ++-- process/process_series.py | 4 ++-- tests/dispatch/test_status.py | 13 +----------- tests/test_processor.py | 2 +- webinterface/templates/configuration.html | 2 +- 8 files changed, 28 insertions(+), 28 deletions(-) diff --git a/common/helper.py b/common/helper.py index 2c0a410f..88041295 100755 --- a/common/helper.py +++ b/common/helper.py @@ -91,7 +91,7 @@ class FileLock: def __init__(self, path_for_lockfile: Path): self.lockCreated = True self.lockfile = path_for_lockfile - self.lockfile.touch() + self.lockfile.touch(exist_ok=False) # Destructor to ensure that the lock file gets deleted # if the calling function is left somewhere as result diff --git a/dispatch/send.py b/dispatch/send.py index 7258b0d9..e4a053f1 100755 --- a/dispatch/send.py +++ b/dispatch/send.py @@ -121,12 +121,16 @@ def execute( logger.error(error_message) send_event(m_events.PROCESSING, severity.WARNING, error_message) - # Create a .sending file to indicate that this folder is being sent, - # otherwise the dispatcher would pick it up again if the transfer is - # still going on + # Create a .processing file to indicate that this folder is being sent, + # otherwise another dispatcher instance would pick it up again lock_file = Path(source_folder) / mercure_names.PROCESSING + + # Double check that the case has not been picked up by another instance already + if lock_file.exists(): + logger.info(f"Case {source_folder} already processed by other instance") + return try: - lock_file.touch() + lock_file.touch(exist_ok=False) except: # TODO: Put a limit on these error messages -- log will run full at some point send_event(m_events.PROCESSING, severity.ERROR, f"Error sending {uid} to {target_name}") @@ -146,6 +150,18 @@ def execute( _move_sent_directory(source_folder, error_folder) _trigger_notification(task_content.info, mercure_events.ERROR) + # Check if a sendlog file from a previous try exists. If so, remove it + sendlog = Path(source_folder) / mercure_names.SENDLOG + if sendlog.exists(): + try: + sendlog.unlink() + except: + send_event(m_events.PROCESSING, severity.ERROR, f"Error sending {uid} to {target_name}") + error_message = f"Unable to remove former sendlog {sendlog}" + send_series_event(s_events.ERROR, uid, 0, target_name, error_message) + logger.exception(error_message) + return + logger.debug(f"Running command {command}") logger.info(f"Sending {source_folder} to target {target_name}") try: diff --git a/dispatch/status.py b/dispatch/status.py index 53fd0e14..04bf8a1d 100755 --- a/dispatch/status.py +++ b/dispatch/status.py @@ -40,11 +40,6 @@ def is_ready_for_sending(folder) -> Optional[Task]: return None -def has_been_send(folder) -> bool: - """Checks if the given folder has already been sent.""" - return (Path(folder) / mercure_names.SENDLOG).exists() - - def is_target_json_valid(folder) -> Optional[Task]: """ Checks if the task.json file exists and is valid. Returns the content diff --git a/dispatcher.py b/dispatcher.py index c8345ca3..fded111b 100755 --- a/dispatcher.py +++ b/dispatcher.py @@ -19,7 +19,7 @@ import common.helper as helper import common.monitor as monitor from common.constants import mercure_names -from dispatch.status import has_been_send, is_ready_for_sending +from dispatch.status import is_ready_for_sending from dispatch.send import execute from common.constants import mercure_defs @@ -96,7 +96,7 @@ def dispatch(args) -> None: logger.info("Dispatching resumed") # Now process the folders that are ready for dispatching - if entry.is_dir() and not has_been_send(entry) and is_ready_for_sending(entry): + if entry.is_dir() and is_ready_for_sending(entry): logger.info(f"Sending folder {entry}") execute(Path(entry), success_folder, error_folder, retry_max, retry_delay) diff --git a/process/process_series.py b/process/process_series.py index 5300dd0e..8d379169 100755 --- a/process/process_series.py +++ b/process/process_series.py @@ -235,7 +235,7 @@ def process_series(folder) -> None: lock = None try: try: - lock_file.touch() + lock_file.touch(exist_ok=False) # lock = helper.FileLock(lock_file) except Exception as e: # Can't create lock file, so something must be seriously wrong @@ -348,7 +348,7 @@ def move_results( logger.error(f"Folder already contains lockfile {folder}/" + mercure_names.LOCK) return try: - lock_file.touch() + lock_file.touch(exist_ok=False) except: logger.info(f"Error locking folder to be moved {folder}") logger.error(traceback.format_exc()) diff --git a/tests/dispatch/test_status.py b/tests/dispatch/test_status.py index 56e2763c..84182834 100755 --- a/tests/dispatch/test_status.py +++ b/tests/dispatch/test_status.py @@ -1,7 +1,7 @@ import json from common.types import Task -from dispatch.status import has_been_send, is_ready_for_sending, is_target_json_valid +from dispatch.status import is_ready_for_sending, is_target_json_valid from common.constants import mercure_names pytest_plugins = ("pyfakefs",) @@ -43,17 +43,6 @@ def test_is_read_for_sending(fs): assert is_ready_for_sending("/var/data") -def test_has_been_send(fs): - fs.create_dir("/var/data/") - fs.create_file("/var/data/" + mercure_names.SENDLOG) - assert has_been_send("/var/data/") - - -def test_has_been_send_not(fs): - fs.create_dir("/var/data/") - assert not has_been_send("/var/data/") - - def test_read_target(fs): target = {"info": dummy_info, "dispatch": {"target_name": "test_target"}} fs.create_file("/var/data/" + mercure_names.TASKFILE, contents=json.dumps(target)) diff --git a/tests/test_processor.py b/tests/test_processor.py index fc2168c8..5b2a3e4c 100755 --- a/tests/test_processor.py +++ b/tests/test_processor.py @@ -142,7 +142,7 @@ def fake_processor(tag=None, meta=None, **kwargs): logger.info(k) for k in Path("/var/success").rglob("*"): logger.info(k) - # (processor_path / ".complete").touch() + # (processor_path / ".complete").touch(exist_ok=False) # processor.run_processor() assert (Path("/var/success") / processor_path.name).exists(), f"{processor_path.name} missing from success dir" diff --git a/webinterface/templates/configuration.html b/webinterface/templates/configuration.html index f0b4ef88..6ff6cfcd 100755 --- a/webinterface/templates/configuration.html +++ b/webinterface/templates/configuration.html @@ -41,7 +41,7 @@
Accept Compressed Images: {{config['accept_compressed_images']}} {% if config['accept_compressed_images']=='True' %} - + {% endif %}