Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix an issue of releasing lock for rq export job when the worker subprocess is killed #8721

Merged
merged 38 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from 35 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
bcfef06
Draft fix
Marishka17 Nov 19, 2024
15c3cb9
Add a separate thread to prolong export lock
Marishka17 Nov 26, 2024
c4ef502
Set job status to SCHEDULED manually && remove job from rq:scheduler:…
Marishka17 Nov 26, 2024
26fce86
Format code
Marishka17 Nov 26, 2024
7f80ff1
Drop SIGTERM/SIGINT handling
Marishka17 Nov 28, 2024
e1c9de1
Update default settings
Marishka17 Nov 28, 2024
607a839
Refactor code
Marishka17 Nov 28, 2024
a6b6f0f
[REST API tests] Increase waiting timeout
Marishka17 Nov 28, 2024
c61a74b
t
Marishka17 Nov 28, 2024
80af002
Switch to short locks
Marishka17 Dec 3, 2024
1e2fe5f
update unit tests
Marishka17 Dec 5, 2024
48d8b97
Merge branch 'develop' into mk/fix_export_job_lock_releasing
Marishka17 Dec 6, 2024
58b2a94
Small improvements
Marishka17 Dec 6, 2024
015c39c
Mask scheduled status as queued
Marishka17 Dec 6, 2024
7c78e1e
Merge branch 'develop' into mk/fix_export_job_lock_releasing
Marishka17 Dec 6, 2024
2bf9a0b
Update cvat/apps/dataset_manager/views.py
Marishka17 Dec 9, 2024
fd8658e
Apply some comments
Marishka17 Dec 11, 2024
8d050c0
Fix pylint issue
Marishka17 Dec 11, 2024
9038741
Update test_concurrent_export_and_cleanup
Marishka17 Dec 12, 2024
a52751b
Update test_initiate_concurrent_export_by_different_users
Marishka17 Dec 12, 2024
9dcc135
Fix tests
Marishka17 Dec 13, 2024
116a9bc
Try to fix test_initiate_concurrent_export_by_different_users on CI
Marishka17 Dec 13, 2024
15b2ffa
Merge branch 'develop' into mk/fix_export_job_lock_releasing
Marishka17 Dec 13, 2024
39e38c4
t
Marishka17 Dec 13, 2024
98a829b
Update tests
Marishka17 Dec 18, 2024
80c0259
Merge develop
Marishka17 Dec 18, 2024
458e330
Add changelog
Marishka17 Dec 18, 2024
da399e6
Rename settings
Marishka17 Dec 18, 2024
0376d35
Move settings into engine
Marishka17 Dec 18, 2024
6f09d46
Fix formatting
Marishka17 Dec 18, 2024
2a3c19d
Update test_concurrent_export_and_cleanup
Marishka17 Dec 18, 2024
2a2111e
Update changelog.d/20241218_120228_maria_fix_export_job_lock_releasin…
Marishka17 Dec 18, 2024
76928b8
Fix formatting
Marishka17 Dec 18, 2024
7ede7d0
Merge branch 'develop' into mk/fix_export_job_lock_releasing
Marishka17 Dec 18, 2024
3c5f9b4
DATASET_CACHE_TTL -> EXPORT_CACHE_TTL
Marishka17 Dec 18, 2024
788edb8
Remove dataset_manager/default_settings.py
Marishka17 Dec 19, 2024
522d5a1
Fix missing warnings
Marishka17 Dec 19, 2024
b28ea57
Merge branch 'develop' into mk/fix_export_job_lock_releasing
Marishka17 Dec 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
### Fixed

- Exporting datasets could start significantly later than expected, both for 1
and several users in the same project/task/job (<https://github.com/cvat-ai/cvat/pull/8721>)
- Scheduled RQ jobs could not be restarted due to incorrect RQ job status
updating and handling (<https://github.com/cvat-ai/cvat/pull/8721>)
11 changes: 0 additions & 11 deletions cvat/apps/dataset_manager/default_settings.py
zhiltsov-max marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -1,14 +1,3 @@
# Copyright (C) 2024 CVAT.ai Corporation
#
# SPDX-License-Identifier: MIT

import os

DATASET_CACHE_TTL = int(os.getenv("CVAT_DATASET_CACHE_TTL", 60 * 60 * 24))
"Base lifetime for cached exported datasets, in seconds"

DATASET_CACHE_LOCK_TIMEOUT = int(os.getenv("CVAT_DATASET_CACHE_LOCK_TIMEOUT", 10))
"Timeout for cache lock acquiring, in seconds"

DATASET_EXPORT_LOCKED_RETRY_INTERVAL = int(os.getenv("CVAT_DATASET_EXPORT_LOCKED_RETRY_INTERVAL", 60))
"Retry interval for cases the export cache lock was unavailable, in seconds"
635 changes: 400 additions & 235 deletions cvat/apps/dataset_manager/tests/test_rest_api_formats.py

Large diffs are not rendered by default.

16 changes: 12 additions & 4 deletions cvat/apps/dataset_manager/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,14 +111,16 @@ def get_export_cache_lock(
*,
ttl: int | timedelta,
block: bool = True,
acquire_timeout: Optional[int | timedelta] = None,
acquire_timeout: int | timedelta,
) -> Generator[Lock, Any, Any]:
assert acquire_timeout is not None, "Endless waiting for the lock should be avoided"

if isinstance(acquire_timeout, timedelta):
acquire_timeout = acquire_timeout.total_seconds()
if acquire_timeout is not None and acquire_timeout < 0:

if acquire_timeout < 0:
raise ValueError("acquire_timeout must be a non-negative number")
elif acquire_timeout is None:
acquire_timeout = -1


if isinstance(ttl, timedelta):
ttl = ttl.total_seconds()
Expand Down Expand Up @@ -233,3 +235,9 @@ def parse_export_file_path(file_path: os.PathLike[str]) -> ParsedExportFilename:
format_repr=basename_match.group('format_tag'),
file_ext=basename_match.group('file_ext'),
)

def extend_export_file_lifetime(file_path: str):
# Update the last modification time to extend the export's lifetime,
# as the last access time is not available on every filesystem.
# As a result, file deletion by the cleaning job will be postponed.
os.utime(file_path, None)
138 changes: 84 additions & 54 deletions cvat/apps/dataset_manager/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import django_rq
import rq
from os.path import exists as osp_exists
from django.conf import settings
from django.utils import timezone
from rq_scheduler import Scheduler
Expand All @@ -27,21 +28,23 @@
LockNotAvailableError,
current_function_name, get_export_cache_lock,
get_export_cache_dir, make_export_filename,
parse_export_file_path
parse_export_file_path, extend_export_file_lifetime
)
from .util import EXPORT_CACHE_DIR_NAME # pylint: disable=unused-import


slogger = ServerLogManager(__name__)

_MODULE_NAME = __package__ + '.' + osp.splitext(osp.basename(__file__))[0]
def log_exception(logger=None, exc_info=True):

def log_exception(logger: logging.Logger | None = None, exc_info: bool = True):
Marishka17 marked this conversation as resolved.
Show resolved Hide resolved
if logger is None:
logger = slogger
logger = slogger.glob
logger.exception("[%s @ %s]: exception occurred" % \
(_MODULE_NAME, current_function_name(2)),
exc_info=exc_info)

DEFAULT_CACHE_TTL = timedelta(seconds=settings.DATASET_CACHE_TTL)
DEFAULT_CACHE_TTL = timedelta(seconds=settings.EXPORT_CACHE_TTL)
PROJECT_CACHE_TTL = DEFAULT_CACHE_TTL
TASK_CACHE_TTL = DEFAULT_CACHE_TTL
JOB_CACHE_TTL = DEFAULT_CACHE_TTL
Expand All @@ -51,8 +54,9 @@ def log_exception(logger=None, exc_info=True):
'job': JOB_CACHE_TTL,
}

EXPORT_CACHE_LOCK_TIMEOUT = timedelta(seconds=settings.DATASET_CACHE_LOCK_TIMEOUT)
EXPORT_LOCKED_RETRY_INTERVAL = timedelta(seconds=settings.DATASET_EXPORT_LOCKED_RETRY_INTERVAL)
EXPORT_CACHE_LOCK_ACQUISITION_TIMEOUT = timedelta(seconds=settings.EXPORT_CACHE_LOCK_ACQUISITION_TIMEOUT)
EXPORT_CACHE_LOCK_TTL = timedelta(seconds=settings.EXPORT_CACHE_LOCK_TTL)
EXPORT_LOCKED_RETRY_INTERVAL = timedelta(seconds=settings.EXPORT_LOCKED_RETRY_INTERVAL)


def get_export_cache_ttl(db_instance: str | Project | Task | Job) -> timedelta:
Expand All @@ -61,6 +65,14 @@ def get_export_cache_ttl(db_instance: str | Project | Task | Job) -> timedelta:

return TTL_CONSTS[db_instance.lower()]

def _patch_scheduled_job_status(job: rq.job.Job):
# NOTE: rq scheduler < 0.14 does not set the appropriate
# job status (SCHEDULED). This has been fixed in the 0.14 version.
# https://github.com/rq/rq-scheduler/blob/f7d5787c5f94b5517e209c612ef648f4bfc44f9e/rq_scheduler/scheduler.py#L148
# FUTURE-TODO: delete manual status setting after upgrading to 0.14
if job.get_status(refresh=False) != rq.job.JobStatus.SCHEDULED:
job.set_status(rq.job.JobStatus.SCHEDULED)
zhiltsov-max marked this conversation as resolved.
Show resolved Hide resolved

def _retry_current_rq_job(time_delta: timedelta) -> rq.job.Job:
# TODO: implement using retries once we move from rq_scheduler to builtin RQ scheduler
# for better reliability and error reporting
Expand All @@ -79,7 +91,7 @@ def _patched_retry(*_1, **_2):
user_id = current_rq_job.meta.get('user', {}).get('id') or -1

with get_rq_lock_by_user(settings.CVAT_QUEUES.EXPORT_DATA.value, user_id):
scheduler.enqueue_in(
scheduled_rq_job: rq.job.Job = scheduler.enqueue_in(
time_delta,
current_rq_job.func,
*current_rq_job.args,
Expand All @@ -92,12 +104,21 @@ def _patched_retry(*_1, **_2):
on_success=current_rq_job.success_callback,
on_failure=current_rq_job.failure_callback,
)
_patch_scheduled_job_status(scheduled_rq_job)

current_rq_job.retries_left = 1
setattr(current_rq_job, 'retry', _patched_retry)
return current_rq_job

def export(dst_format, project_id=None, task_id=None, job_id=None, server_url=None, save_images=False):
def export(
*,
dst_format: str,
project_id: int | None = None,
task_id: int | None = None,
job_id: int | None = None,
server_url: str | None = None,
save_images: bool = False,
):
try:
if task_id is not None:
logger = slogger.task[task_id]
Expand Down Expand Up @@ -134,41 +155,50 @@ def export(dst_format, project_id=None, task_id=None, job_id=None, server_url=No

os.makedirs(cache_dir, exist_ok=True)

# acquire a lock 2 times instead of using one long lock:
# 1. to check whether the file exists or not
# 2. to create a file when it doesn't exist
with get_export_cache_lock(
output_path,
block=True,
acquire_timeout=EXPORT_CACHE_LOCK_TIMEOUT,
ttl=rq.get_current_job().timeout,
ttl=EXPORT_CACHE_LOCK_TTL,
acquire_timeout=EXPORT_CACHE_LOCK_ACQUISITION_TIMEOUT,
):
if not osp.exists(output_path):
with tempfile.TemporaryDirectory(dir=cache_dir) as temp_dir:
temp_file = osp.join(temp_dir, 'result')
export_fn(db_instance.id, temp_file, dst_format,
server_url=server_url, save_images=save_images)
os.replace(temp_file, output_path)

scheduler: Scheduler = django_rq.get_scheduler(
settings.CVAT_QUEUES.EXPORT_DATA.value
)
cleaning_job = scheduler.enqueue_in(
time_delta=cache_ttl,
func=clear_export_cache,
file_path=output_path,
file_ctime=instance_update_time.timestamp(),
logger=logger
)
logger.info(
"The {} '{}' is exported as '{}' at '{}' "
"and available for downloading for the next {}. "
"Export cache cleaning job is enqueued, id '{}'".format(
db_instance.__class__.__name__.lower(),
db_instance.name if isinstance(
db_instance, (Project, Task)
) else db_instance.id,
dst_format, output_path, cache_ttl,
cleaning_job.id
)
)
if osp_exists(output_path):
extend_export_file_lifetime(output_path)
return output_path

with tempfile.TemporaryDirectory(dir=cache_dir) as temp_dir:
temp_file = osp.join(temp_dir, 'result')
export_fn(db_instance.id, temp_file, dst_format,
server_url=server_url, save_images=save_images)
with get_export_cache_lock(
output_path,
ttl=EXPORT_CACHE_LOCK_TTL,
acquire_timeout=EXPORT_CACHE_LOCK_ACQUISITION_TIMEOUT,
):
os.replace(temp_file, output_path)

scheduler: Scheduler = django_rq.get_scheduler(settings.CVAT_QUEUES.EXPORT_DATA.value)
cleaning_job = scheduler.enqueue_in(
time_delta=cache_ttl,
func=clear_export_cache,
file_path=output_path,
file_ctime=instance_update_time.timestamp(),
logger=logger,
)
_patch_scheduled_job_status(cleaning_job)
logger.info(
"The {} '{}' is exported as '{}' at '{}' "
"and available for downloading for the next {}. "
"Export cache cleaning job is enqueued, id '{}'".format(
db_instance.__class__.__name__.lower(),
db_instance.id,
dst_format,
output_path,
cache_ttl,
cleaning_job.id,
)
)

return output_path
except LockNotAvailableError:
Expand All @@ -184,23 +214,23 @@ def export(dst_format, project_id=None, task_id=None, job_id=None, server_url=No
log_exception(logger)
raise

def export_job_annotations(job_id, dst_format=None, server_url=None):
return export(dst_format,job_id=job_id, server_url=server_url, save_images=False)
def export_job_annotations(job_id: int, dst_format: str, *, server_url: str | None = None):
return export(dst_format=dst_format, job_id=job_id, server_url=server_url, save_images=False)

def export_job_as_dataset(job_id, dst_format=None, server_url=None):
return export(dst_format, job_id=job_id, server_url=server_url, save_images=True)
def export_job_as_dataset(job_id: int, dst_format: str, *, server_url: str | None = None):
return export(dst_format=dst_format, job_id=job_id, server_url=server_url, save_images=True)

def export_task_as_dataset(task_id, dst_format=None, server_url=None):
return export(dst_format, task_id=task_id, server_url=server_url, save_images=True)
def export_task_as_dataset(task_id: int, dst_format: str, *, server_url: str | None = None):
return export(dst_format=dst_format, task_id=task_id, server_url=server_url, save_images=True)

def export_task_annotations(task_id, dst_format=None, server_url=None):
return export(dst_format,task_id=task_id, server_url=server_url, save_images=False)
def export_task_annotations(task_id: int, dst_format: str, *, server_url: str | None = None):
return export(dst_format=dst_format, task_id=task_id, server_url=server_url, save_images=False)

def export_project_as_dataset(project_id, dst_format=None, server_url=None):
return export(dst_format, project_id=project_id, server_url=server_url, save_images=True)
def export_project_as_dataset(project_id: int, dst_format: str, *, server_url: str | None = None):
return export(dst_format=dst_format, project_id=project_id, server_url=server_url, save_images=True)

def export_project_annotations(project_id, dst_format=None, server_url=None):
return export(dst_format, project_id=project_id, server_url=server_url, save_images=False)
def export_project_annotations(project_id: int, dst_format: str, *, server_url: str | None = None):
return export(dst_format=dst_format, project_id=project_id, server_url=server_url, save_images=False)
Marishka17 marked this conversation as resolved.
Show resolved Hide resolved


class FileIsBeingUsedError(Exception):
Expand All @@ -213,8 +243,8 @@ def clear_export_cache(file_path: str, file_ctime: float, logger: logging.Logger
with get_export_cache_lock(
file_path,
block=True,
acquire_timeout=EXPORT_CACHE_LOCK_TIMEOUT,
ttl=rq.get_current_job().timeout,
acquire_timeout=EXPORT_CACHE_LOCK_ACQUISITION_TIMEOUT,
ttl=EXPORT_CACHE_LOCK_TTL,
):
if not osp.exists(file_path):
raise FileNotFoundError("Export cache file '{}' doesn't exist".format(file_path))
Expand Down
44 changes: 30 additions & 14 deletions cvat/apps/engine/background.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from django.conf import settings
from django.http.response import HttpResponseBadRequest
from django.utils import timezone
from django_rq.queues import DjangoRQ
from django_rq.queues import DjangoRQ, DjangoScheduler
from rest_framework import serializers, status
from rest_framework.request import Request
from rest_framework.response import Response
Expand Down Expand Up @@ -52,6 +52,11 @@

slogger = ServerLogManager(__name__)

REQUEST_TIMEOUT = 60
# it's better to return LockNotAvailableError instead of response with 504 status
LOCK_TTL = REQUEST_TIMEOUT - 5
LOCK_ACQUIRE_TIMEOUT = LOCK_TTL - 5


class _ResourceExportManager(ABC):
QUEUE_NAME = settings.CVAT_QUEUES.EXPORT_DATA.value
Expand Down Expand Up @@ -89,7 +94,7 @@ def setup_background_job(self, queue: DjangoRQ, rq_id: str) -> None:
def _handle_rq_job_v1(self, rq_job: Optional[RQJob], queue: DjangoRQ) -> Optional[Response]:
pass

def _handle_rq_job_v2(self, rq_job: Optional[RQJob], *args, **kwargs) -> Optional[Response]:
def _handle_rq_job_v2(self, rq_job: Optional[RQJob], queue: DjangoRQ) -> Optional[Response]:
Marishka17 marked this conversation as resolved.
Show resolved Hide resolved
if not rq_job:
return None

Expand All @@ -101,17 +106,23 @@ def _handle_rq_job_v2(self, rq_job: Optional[RQJob], *args, **kwargs) -> Optiona
status=status.HTTP_409_CONFLICT,
)

if rq_job_status in (RQJobStatus.SCHEDULED, RQJobStatus.DEFERRED):
if rq_job_status == RQJobStatus.DEFERRED:
rq_job.cancel(enqueue_dependents=settings.ONE_RUNNING_JOB_IN_QUEUE_PER_USER)

if rq_job_status == RQJobStatus.SCHEDULED:
scheduler: DjangoScheduler = django_rq.get_scheduler(queue.name, queue=queue)
# remove the job id from the set with scheduled keys
scheduler.cancel(rq_job)
rq_job.cancel(enqueue_dependents=settings.ONE_RUNNING_JOB_IN_QUEUE_PER_USER)

rq_job.delete()
return None

def handle_rq_job(self, *args, **kwargs) -> Optional[Response]:
def handle_rq_job(self, rq_job: RQJob | None, queue: DjangoRQ) -> Optional[Response]:
if self.version == 1:
return self._handle_rq_job_v1(*args, **kwargs)
return self._handle_rq_job_v1(rq_job, queue)
elif self.version == 2:
return self._handle_rq_job_v2(*args, **kwargs)
return self._handle_rq_job_v2(rq_job, queue)
Marishka17 marked this conversation as resolved.
Show resolved Hide resolved

raise ValueError("Unsupported version")

Expand Down Expand Up @@ -220,7 +231,9 @@ def is_result_outdated() -> bool:
return rq_job.meta[RQJobMetaField.REQUEST]["timestamp"] < instance_update_time

def handle_local_download() -> Response:
with dm.util.get_export_cache_lock(file_path, ttl=REQUEST_TIMEOUT):
with dm.util.get_export_cache_lock(
file_path, ttl=LOCK_TTL, acquire_timeout=LOCK_ACQUIRE_TIMEOUT
):
if not osp.exists(file_path):
return Response(
"The exported file has expired, please retry exporting",
Expand Down Expand Up @@ -295,8 +308,6 @@ def handle_local_download() -> Response:
instance_update_time = self.get_instance_update_time()
instance_timestamp = self.get_timestamp(instance_update_time)

REQUEST_TIMEOUT = 60

if rq_job_status == RQJobStatus.FINISHED:
if self.export_args.location == Location.CLOUD_STORAGE:
rq_job.delete()
Expand All @@ -313,11 +324,13 @@ def handle_local_download() -> Response:
if action == "download":
return handle_local_download()
else:
with dm.util.get_export_cache_lock(file_path, ttl=REQUEST_TIMEOUT):
with dm.util.get_export_cache_lock(
file_path,
ttl=LOCK_TTL,
acquire_timeout=LOCK_ACQUIRE_TIMEOUT,
):
if osp.exists(file_path) and not is_result_outdated():
# Update last update time to prolong the export lifetime
# as the last access time is not available on every filesystem
os.utime(file_path, None)
dm.util.extend_export_file_lifetime(file_path)

return Response(status=status.HTTP_201_CREATED)

Expand Down Expand Up @@ -422,7 +435,7 @@ def setup_background_job(
user_id = self.request.user.id

func = self.export_callback
func_args = (self.db_instance.id, self.export_args.format, server_address)
func_args = (self.db_instance.id, self.export_args.format)
result_url = None

if self.export_args.location == Location.CLOUD_STORAGE:
Expand Down Expand Up @@ -467,6 +480,9 @@ def setup_background_job(
queue.enqueue_call(
func=func,
args=func_args,
kwargs={
"server_url": server_address,
},
Marishka17 marked this conversation as resolved.
Show resolved Hide resolved
job_id=rq_id,
meta=get_rq_job_meta(
request=self.request, db_obj=self.db_instance, result_url=result_url
Expand Down
Loading
Loading