Skip to content

Commit

Permalink
Improved locking mechanism. Removing former sendlogs before next disp…
Browse files Browse the repository at this point in the history
…atching retry.
  • Loading branch information
tblock79 committed Mar 4, 2022
1 parent ade3ede commit dc0a83f
Show file tree
Hide file tree
Showing 8 changed files with 28 additions and 28 deletions.
2 changes: 1 addition & 1 deletion common/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class FileLock:
def __init__(self, path_for_lockfile: Path):
self.lockCreated = True
self.lockfile = path_for_lockfile
self.lockfile.touch()
self.lockfile.touch(exist_ok=False)

# Destructor to ensure that the lock file gets deleted
# if the calling function is left somewhere as result
Expand Down
24 changes: 20 additions & 4 deletions dispatch/send.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,16 @@ def execute(
logger.error(error_message)
send_event(m_events.PROCESSING, severity.WARNING, error_message)

# Create a .sending file to indicate that this folder is being sent,
# otherwise the dispatcher would pick it up again if the transfer is
# still going on
# Create a .processing file to indicate that this folder is being sent,
# otherwise another dispatcher instance would pick it up again
lock_file = Path(source_folder) / mercure_names.PROCESSING

# Double check that the case has not been picked up by another instance already
if lock_file.exists():
logger.info(f"Case {source_folder} already processed by other instance")
return
try:
lock_file.touch()
lock_file.touch(exist_ok=False)
except:
# TODO: Put a limit on these error messages -- log will run full at some point
send_event(m_events.PROCESSING, severity.ERROR, f"Error sending {uid} to {target_name}")
Expand All @@ -146,6 +150,18 @@ def execute(
_move_sent_directory(source_folder, error_folder)
_trigger_notification(task_content.info, mercure_events.ERROR)

# Check if a sendlog file from a previous try exists. If so, remove it
sendlog = Path(source_folder) / mercure_names.SENDLOG
if sendlog.exists():
try:
sendlog.unlink()
except:
send_event(m_events.PROCESSING, severity.ERROR, f"Error sending {uid} to {target_name}")
error_message = f"Unable to remove former sendlog {sendlog}"
send_series_event(s_events.ERROR, uid, 0, target_name, error_message)
logger.exception(error_message)
return

logger.debug(f"Running command {command}")
logger.info(f"Sending {source_folder} to target {target_name}")
try:
Expand Down
5 changes: 0 additions & 5 deletions dispatch/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,6 @@ def is_ready_for_sending(folder) -> Optional[Task]:
return None


def has_been_send(folder) -> bool:
"""Checks if the given folder has already been sent."""
return (Path(folder) / mercure_names.SENDLOG).exists()


def is_target_json_valid(folder) -> Optional[Task]:
"""
Checks if the task.json file exists and is valid. Returns the content
Expand Down
4 changes: 2 additions & 2 deletions dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import common.helper as helper
import common.monitor as monitor
from common.constants import mercure_names
from dispatch.status import has_been_send, is_ready_for_sending
from dispatch.status import is_ready_for_sending
from dispatch.send import execute
from common.constants import mercure_defs

Expand Down Expand Up @@ -96,7 +96,7 @@ def dispatch(args) -> None:
logger.info("Dispatching resumed")

# Now process the folders that are ready for dispatching
if entry.is_dir() and not has_been_send(entry) and is_ready_for_sending(entry):
if entry.is_dir() and is_ready_for_sending(entry):
logger.info(f"Sending folder {entry}")
execute(Path(entry), success_folder, error_folder, retry_max, retry_delay)

Expand Down
4 changes: 2 additions & 2 deletions process/process_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ def process_series(folder) -> None:
lock = None
try:
try:
lock_file.touch()
lock_file.touch(exist_ok=False)
# lock = helper.FileLock(lock_file)
except Exception as e:
# Can't create lock file, so something must be seriously wrong
Expand Down Expand Up @@ -348,7 +348,7 @@ def move_results(
logger.error(f"Folder already contains lockfile {folder}/" + mercure_names.LOCK)
return
try:
lock_file.touch()
lock_file.touch(exist_ok=False)
except:
logger.info(f"Error locking folder to be moved {folder}")
logger.error(traceback.format_exc())
Expand Down
13 changes: 1 addition & 12 deletions tests/dispatch/test_status.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import json
from common.types import Task

from dispatch.status import has_been_send, is_ready_for_sending, is_target_json_valid
from dispatch.status import is_ready_for_sending, is_target_json_valid
from common.constants import mercure_names

pytest_plugins = ("pyfakefs",)
Expand Down Expand Up @@ -43,17 +43,6 @@ def test_is_read_for_sending(fs):
assert is_ready_for_sending("/var/data")


def test_has_been_send(fs):
fs.create_dir("/var/data/")
fs.create_file("/var/data/" + mercure_names.SENDLOG)
assert has_been_send("/var/data/")


def test_has_been_send_not(fs):
fs.create_dir("/var/data/")
assert not has_been_send("/var/data/")


def test_read_target(fs):
target = {"info": dummy_info, "dispatch": {"target_name": "test_target"}}
fs.create_file("/var/data/" + mercure_names.TASKFILE, contents=json.dumps(target))
Expand Down
2 changes: 1 addition & 1 deletion tests/test_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ def fake_processor(tag=None, meta=None, **kwargs):
logger.info(k)
for k in Path("/var/success").rglob("*"):
logger.info(k)
# (processor_path / ".complete").touch()
# (processor_path / ".complete").touch(exist_ok=False)
# processor.run_processor()

assert (Path("/var/success") / processor_path.name).exists(), f"{processor_path.name} missing from success dir"
Expand Down
2 changes: 1 addition & 1 deletion webinterface/templates/configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ <h5 class="title is-5 configtitle"><i class="fas fa-server has-text-success"></i
<td>Accept Compressed Images:</td>
<td>{{config['accept_compressed_images']}}
{% if config['accept_compressed_images']=='True' %}
<a href="https://mercure-imaging.org/docs/advanced.html" target="_blank" title="This setting can be problematic. Carefully read the documentation"><i class="fas fa-exclamation-circle has-text-warning" style="margin-left: 1px;"></i></a>
<a href="https://mercure-imaging.org/docs/advanced.html#additional-settings" target="_blank" title="This setting can be problematic. Carefully read the documentation"><i class="fas fa-exclamation-circle has-text-warning" style="margin-left: 1px;"></i></a>
{% endif %}
</td>
</tr>
Expand Down

0 comments on commit dc0a83f

Please sign in to comment.