Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix an issue of releasing lock for rq export job when the worker subprocess is killed #8721

Merged
merged 38 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
bcfef06
Draft fix
Marishka17 Nov 19, 2024
15c3cb9
Add a separate thread to prolong export lock
Marishka17 Nov 26, 2024
c4ef502
Set job status to SCHEDULED manually && remove job from rq:scheduler:…
Marishka17 Nov 26, 2024
26fce86
Format code
Marishka17 Nov 26, 2024
7f80ff1
Drop SIGTERM/SIGINT handling
Marishka17 Nov 28, 2024
e1c9de1
Update default settings
Marishka17 Nov 28, 2024
607a839
Refactor code
Marishka17 Nov 28, 2024
a6b6f0f
[REST API tests] Increase waiting timeout
Marishka17 Nov 28, 2024
c61a74b
t
Marishka17 Nov 28, 2024
80af002
Switch to short locks
Marishka17 Dec 3, 2024
1e2fe5f
update unit tests
Marishka17 Dec 5, 2024
48d8b97
Merge branch 'develop' into mk/fix_export_job_lock_releasing
Marishka17 Dec 6, 2024
58b2a94
Small improvements
Marishka17 Dec 6, 2024
015c39c
Mask scheduled status as queued
Marishka17 Dec 6, 2024
7c78e1e
Merge branch 'develop' into mk/fix_export_job_lock_releasing
Marishka17 Dec 6, 2024
2bf9a0b
Update cvat/apps/dataset_manager/views.py
Marishka17 Dec 9, 2024
fd8658e
Apply some comments
Marishka17 Dec 11, 2024
8d050c0
Fix pylint issue
Marishka17 Dec 11, 2024
9038741
Update test_concurrent_export_and_cleanup
Marishka17 Dec 12, 2024
a52751b
Update test_initiate_concurrent_export_by_different_users
Marishka17 Dec 12, 2024
9dcc135
Fix tests
Marishka17 Dec 13, 2024
116a9bc
Try to fix test_initiate_concurrent_export_by_different_users on CI
Marishka17 Dec 13, 2024
15b2ffa
Merge branch 'develop' into mk/fix_export_job_lock_releasing
Marishka17 Dec 13, 2024
39e38c4
t
Marishka17 Dec 13, 2024
98a829b
Update tests
Marishka17 Dec 18, 2024
80c0259
Merge develop
Marishka17 Dec 18, 2024
458e330
Add changelog
Marishka17 Dec 18, 2024
da399e6
Rename settings
Marishka17 Dec 18, 2024
0376d35
Move settings into engine
Marishka17 Dec 18, 2024
6f09d46
Fix formatting
Marishka17 Dec 18, 2024
2a3c19d
Update test_concurrent_export_and_cleanup
Marishka17 Dec 18, 2024
2a2111e
Update changelog.d/20241218_120228_maria_fix_export_job_lock_releasin…
Marishka17 Dec 18, 2024
76928b8
Fix formatting
Marishka17 Dec 18, 2024
7ede7d0
Merge branch 'develop' into mk/fix_export_job_lock_releasing
Marishka17 Dec 18, 2024
3c5f9b4
DATASET_CACHE_TTL -> EXPORT_CACHE_TTL
Marishka17 Dec 18, 2024
788edb8
Remove dataset_manager/default_settings.py
Marishka17 Dec 19, 2024
522d5a1
Fix missing warnings
Marishka17 Dec 19, 2024
b28ea57
Merge branch 'develop' into mk/fix_export_job_lock_releasing
Marishka17 Dec 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
### Fixed

- Acquiring a long lock with a worker TTL could result in the lock not being released,
blocking further exports with the same parameters (<https://github.com/cvat-ai/cvat/pull/8721>)
- Scheduled RQ jobs could not be restarted due to incorrect RQ job status
updating and handling (<https://github.com/cvat-ai/cvat/pull/8721>)
- Parallel exports with the same parameters by different users caused
a LockNotAvailableError to be raised for all users except the one
who initiated the export first (<https://github.com/cvat-ai/cvat/pull/8721>)
Marishka17 marked this conversation as resolved.
Show resolved Hide resolved
24 changes: 23 additions & 1 deletion cvat/apps/dataset_manager/default_settings.py
zhiltsov-max marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,34 @@
# SPDX-License-Identifier: MIT

import os
import warnings
from django.core.exceptions import ImproperlyConfigured


DATASET_CACHE_TTL = int(os.getenv("CVAT_DATASET_CACHE_TTL", 60 * 60 * 24))
zhiltsov-max marked this conversation as resolved.
Show resolved Hide resolved
"Base lifetime for cached exported datasets, in seconds"

DATASET_CACHE_LOCK_TIMEOUT = int(os.getenv("CVAT_DATASET_CACHE_LOCK_TIMEOUT", 10))
default_dataset_export_lock_ttl = 30

DATASET_EXPORT_LOCK_TTL = int(os.getenv("CVAT_DATASET_EXPORT_LOCK_TTL", default_dataset_export_lock_ttl))
Marishka17 marked this conversation as resolved.
Show resolved Hide resolved
"Default lifetime for the export cache lock, in seconds."

DATASET_CACHE_LOCK_ACQUISITION_TIMEOUT = os.getenv("CVAT_DATASET_CACHE_LOCK_TIMEOUT")
"Timeout for cache lock acquiring, in seconds"

if DATASET_CACHE_LOCK_ACQUISITION_TIMEOUT is not None:
DATASET_CACHE_LOCK_ACQUISITION_TIMEOUT = int(DATASET_CACHE_LOCK_ACQUISITION_TIMEOUT)
warnings.warn(
"The CVAT_DATASET_CACHE_LOCK_TIMEOUT is deprecated, "
"use CVAT_DATASET_CACHE_LOCK_ACQUISITION_TIMEOUT instead", DeprecationWarning)
else:
default_dataset_lock_acquire_timeout = 50
DATASET_CACHE_LOCK_ACQUISITION_TIMEOUT = int(os.getenv("CVAT_DATASET_CACHE_LOCK_ACQUISITION_TIMEOUT", default_dataset_lock_acquire_timeout))

if DATASET_CACHE_LOCK_ACQUISITION_TIMEOUT <= DATASET_EXPORT_LOCK_TTL:
raise ImproperlyConfigured(
"Lock acquisition timeout must be more than lock TTL"
)

DATASET_EXPORT_LOCKED_RETRY_INTERVAL = int(os.getenv("CVAT_DATASET_EXPORT_LOCKED_RETRY_INTERVAL", 60))
"Retry interval for cases the export cache lock was unavailable, in seconds"
630 changes: 395 additions & 235 deletions cvat/apps/dataset_manager/tests/test_rest_api_formats.py

Large diffs are not rendered by default.

16 changes: 12 additions & 4 deletions cvat/apps/dataset_manager/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,14 +111,16 @@ def get_export_cache_lock(
*,
ttl: int | timedelta,
block: bool = True,
acquire_timeout: Optional[int | timedelta] = None,
acquire_timeout: int | timedelta,
) -> Generator[Lock, Any, Any]:
assert acquire_timeout is not None, "Endless waiting for the lock should be avoided"

if isinstance(acquire_timeout, timedelta):
acquire_timeout = acquire_timeout.total_seconds()
if acquire_timeout is not None and acquire_timeout < 0:

if acquire_timeout < 0:
raise ValueError("acquire_timeout must be a non-negative number")
elif acquire_timeout is None:
acquire_timeout = -1


if isinstance(ttl, timedelta):
ttl = ttl.total_seconds()
Expand Down Expand Up @@ -233,3 +235,9 @@ def parse_export_file_path(file_path: os.PathLike[str]) -> ParsedExportFilename:
format_repr=basename_match.group('format_tag'),
file_ext=basename_match.group('file_ext'),
)

def extend_export_file_lifetime(file_path: str):
# Update the last modification time to extend the export's lifetime,
# as the last access time is not available on every filesystem.
# As a result, file deletion by the cleaning job will be postponed.
os.utime(file_path, None)
134 changes: 82 additions & 52 deletions cvat/apps/dataset_manager/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import django_rq
import rq
from os.path import exists as osp_exists
from django.conf import settings
from django.utils import timezone
from rq_scheduler import Scheduler
Expand All @@ -27,16 +28,18 @@
LockNotAvailableError,
current_function_name, get_export_cache_lock,
get_export_cache_dir, make_export_filename,
parse_export_file_path
parse_export_file_path, extend_export_file_lifetime
)
from .util import EXPORT_CACHE_DIR_NAME # pylint: disable=unused-import


slogger = ServerLogManager(__name__)

_MODULE_NAME = __package__ + '.' + osp.splitext(osp.basename(__file__))[0]
def log_exception(logger=None, exc_info=True):

def log_exception(logger: logging.Logger | None = None, exc_info: bool = True):
Marishka17 marked this conversation as resolved.
Show resolved Hide resolved
if logger is None:
logger = slogger
logger = slogger.glob
logger.exception("[%s @ %s]: exception occurred" % \
(_MODULE_NAME, current_function_name(2)),
exc_info=exc_info)
Expand All @@ -51,8 +54,9 @@ def log_exception(logger=None, exc_info=True):
'job': JOB_CACHE_TTL,
}

EXPORT_CACHE_LOCK_TIMEOUT = timedelta(seconds=settings.DATASET_CACHE_LOCK_TIMEOUT)
EXPORT_CACHE_LOCK_ACQUISITION_TIMEOUT = timedelta(seconds=settings.DATASET_CACHE_LOCK_ACQUISITION_TIMEOUT)
EXPORT_LOCKED_RETRY_INTERVAL = timedelta(seconds=settings.DATASET_EXPORT_LOCKED_RETRY_INTERVAL)
EXPORT_LOCK_TTL = timedelta(seconds=settings.DATASET_EXPORT_LOCK_TTL)


def get_export_cache_ttl(db_instance: str | Project | Task | Job) -> timedelta:
Expand All @@ -61,6 +65,14 @@ def get_export_cache_ttl(db_instance: str | Project | Task | Job) -> timedelta:

return TTL_CONSTS[db_instance.lower()]

def _patch_scheduled_job_status(job: rq.job.Job):
# NOTE: rq scheduler < 0.14 does not set the appropriate
# job status (SCHEDULED). This has been fixed in the 0.14 version.
# https://github.com/rq/rq-scheduler/blob/f7d5787c5f94b5517e209c612ef648f4bfc44f9e/rq_scheduler/scheduler.py#L148
# FUTURE-TODO: delete manual status setting after upgrading to 0.14
if job.get_status(refresh=False) != rq.job.JobStatus.SCHEDULED:
job.set_status(rq.job.JobStatus.SCHEDULED)
zhiltsov-max marked this conversation as resolved.
Show resolved Hide resolved

def _retry_current_rq_job(time_delta: timedelta) -> rq.job.Job:
# TODO: implement using retries once we move from rq_scheduler to builtin RQ scheduler
# for better reliability and error reporting
Expand All @@ -79,7 +91,7 @@ def _patched_retry(*_1, **_2):
user_id = current_rq_job.meta.get('user', {}).get('id') or -1

with get_rq_lock_by_user(settings.CVAT_QUEUES.EXPORT_DATA.value, user_id):
scheduler.enqueue_in(
scheduled_rq_job: rq.job.Job = scheduler.enqueue_in(
time_delta,
current_rq_job.func,
*current_rq_job.args,
Expand All @@ -92,12 +104,21 @@ def _patched_retry(*_1, **_2):
on_success=current_rq_job.success_callback,
on_failure=current_rq_job.failure_callback,
)
_patch_scheduled_job_status(scheduled_rq_job)

current_rq_job.retries_left = 1
setattr(current_rq_job, 'retry', _patched_retry)
return current_rq_job

def export(dst_format, project_id=None, task_id=None, job_id=None, server_url=None, save_images=False):
def export(
*,
dst_format: str,
project_id: int | None = None,
task_id: int | None = None,
job_id: int | None = None,
server_url: str | None = None,
save_images: bool = False,
):
try:
if task_id is not None:
logger = slogger.task[task_id]
Expand Down Expand Up @@ -134,41 +155,50 @@ def export(dst_format, project_id=None, task_id=None, job_id=None, server_url=No

os.makedirs(cache_dir, exist_ok=True)

# acquire a lock 2 times instead of using one long lock:
# 1. to check whether the file exists or not
# 2. to create a file when it doesn't exist
with get_export_cache_lock(
output_path,
block=True,
acquire_timeout=EXPORT_CACHE_LOCK_TIMEOUT,
ttl=rq.get_current_job().timeout,
ttl=EXPORT_LOCK_TTL,
acquire_timeout=EXPORT_CACHE_LOCK_ACQUISITION_TIMEOUT,
):
if not osp.exists(output_path):
with tempfile.TemporaryDirectory(dir=cache_dir) as temp_dir:
temp_file = osp.join(temp_dir, 'result')
export_fn(db_instance.id, temp_file, dst_format,
server_url=server_url, save_images=save_images)
os.replace(temp_file, output_path)

scheduler: Scheduler = django_rq.get_scheduler(
settings.CVAT_QUEUES.EXPORT_DATA.value
)
cleaning_job = scheduler.enqueue_in(
time_delta=cache_ttl,
func=clear_export_cache,
file_path=output_path,
file_ctime=instance_update_time.timestamp(),
logger=logger
)
logger.info(
"The {} '{}' is exported as '{}' at '{}' "
"and available for downloading for the next {}. "
"Export cache cleaning job is enqueued, id '{}'".format(
db_instance.__class__.__name__.lower(),
db_instance.name if isinstance(
db_instance, (Project, Task)
) else db_instance.id,
dst_format, output_path, cache_ttl,
cleaning_job.id
)
)
if osp_exists(output_path):
extend_export_file_lifetime(output_path)
return output_path

with tempfile.TemporaryDirectory(dir=cache_dir) as temp_dir:
temp_file = osp.join(temp_dir, 'result')
export_fn(db_instance.id, temp_file, dst_format,
server_url=server_url, save_images=save_images)
with get_export_cache_lock(
output_path,
ttl=EXPORT_LOCK_TTL,
acquire_timeout=EXPORT_CACHE_LOCK_ACQUISITION_TIMEOUT,
):
os.replace(temp_file, output_path)

scheduler: Scheduler = django_rq.get_scheduler(settings.CVAT_QUEUES.EXPORT_DATA.value)
cleaning_job = scheduler.enqueue_in(
time_delta=cache_ttl,
func=clear_export_cache,
file_path=output_path,
file_ctime=instance_update_time.timestamp(),
logger=logger,
)
_patch_scheduled_job_status(cleaning_job)
logger.info(
"The {} '{}' is exported as '{}' at '{}' "
"and available for downloading for the next {}. "
"Export cache cleaning job is enqueued, id '{}'".format(
db_instance.__class__.__name__.lower(),
db_instance.id,
dst_format,
output_path,
cache_ttl,
cleaning_job.id,
)
)

return output_path
except LockNotAvailableError:
Expand All @@ -184,23 +214,23 @@ def export(dst_format, project_id=None, task_id=None, job_id=None, server_url=No
log_exception(logger)
raise

def export_job_annotations(job_id, dst_format=None, server_url=None):
return export(dst_format,job_id=job_id, server_url=server_url, save_images=False)
def export_job_annotations(job_id: int, dst_format: str, *, server_url: str | None = None):
return export(dst_format=dst_format, job_id=job_id, server_url=server_url, save_images=False)

def export_job_as_dataset(job_id, dst_format=None, server_url=None):
return export(dst_format, job_id=job_id, server_url=server_url, save_images=True)
def export_job_as_dataset(job_id: int, dst_format: str, *, server_url: str | None = None):
return export(dst_format=dst_format, job_id=job_id, server_url=server_url, save_images=True)

def export_task_as_dataset(task_id, dst_format=None, server_url=None):
return export(dst_format, task_id=task_id, server_url=server_url, save_images=True)
def export_task_as_dataset(task_id: int, dst_format: str, *, server_url: str | None = None):
return export(dst_format=dst_format, task_id=task_id, server_url=server_url, save_images=True)

def export_task_annotations(task_id, dst_format=None, server_url=None):
return export(dst_format,task_id=task_id, server_url=server_url, save_images=False)
def export_task_annotations(task_id: int, dst_format: str, *, server_url: str | None = None):
return export(dst_format=dst_format, task_id=task_id, server_url=server_url, save_images=False)

def export_project_as_dataset(project_id, dst_format=None, server_url=None):
return export(dst_format, project_id=project_id, server_url=server_url, save_images=True)
def export_project_as_dataset(project_id: int, dst_format: str, *, server_url: str | None = None):
return export(dst_format=dst_format, project_id=project_id, server_url=server_url, save_images=True)

def export_project_annotations(project_id, dst_format=None, server_url=None):
return export(dst_format, project_id=project_id, server_url=server_url, save_images=False)
def export_project_annotations(project_id: int, dst_format: str, *, server_url: str | None = None):
return export(dst_format=dst_format, project_id=project_id, server_url=server_url, save_images=False)
Marishka17 marked this conversation as resolved.
Show resolved Hide resolved


class FileIsBeingUsedError(Exception):
Expand All @@ -213,8 +243,8 @@ def clear_export_cache(file_path: str, file_ctime: float, logger: logging.Logger
with get_export_cache_lock(
file_path,
block=True,
acquire_timeout=EXPORT_CACHE_LOCK_TIMEOUT,
ttl=rq.get_current_job().timeout,
acquire_timeout=EXPORT_CACHE_LOCK_ACQUISITION_TIMEOUT,
ttl=EXPORT_LOCK_TTL,
):
if not osp.exists(file_path):
raise FileNotFoundError("Export cache file '{}' doesn't exist".format(file_path))
Expand Down
Loading
Loading