diff --git a/changelog.d/20241218_120228_maria_fix_export_job_lock_releasing.md b/changelog.d/20241218_120228_maria_fix_export_job_lock_releasing.md new file mode 100644 index 00000000000..28115fe7f6c --- /dev/null +++ b/changelog.d/20241218_120228_maria_fix_export_job_lock_releasing.md @@ -0,0 +1,6 @@ +### Fixed + +- Exporting datasets could start significantly later than expected, both for 1 + and several users in the same project/task/job () +- Scheduled RQ jobs could not be restarted due to incorrect RQ job status + updating and handling () diff --git a/cvat/apps/dataset_manager/apps.py b/cvat/apps/dataset_manager/apps.py index 3e62d078171..2d2a03c5164 100644 --- a/cvat/apps/dataset_manager/apps.py +++ b/cvat/apps/dataset_manager/apps.py @@ -7,12 +7,3 @@ class DatasetManagerConfig(AppConfig): name = "cvat.apps.dataset_manager" - - def ready(self) -> None: - from django.conf import settings - - from . import default_settings - - for key in dir(default_settings): - if key.isupper() and not hasattr(settings, key): - setattr(settings, key, getattr(default_settings, key)) diff --git a/cvat/apps/dataset_manager/default_settings.py b/cvat/apps/dataset_manager/default_settings.py deleted file mode 100644 index a4dd53b0f52..00000000000 --- a/cvat/apps/dataset_manager/default_settings.py +++ /dev/null @@ -1,14 +0,0 @@ -# Copyright (C) 2024 CVAT.ai Corporation -# -# SPDX-License-Identifier: MIT - -import os - -DATASET_CACHE_TTL = int(os.getenv("CVAT_DATASET_CACHE_TTL", 60 * 60 * 24)) -"Base lifetime for cached exported datasets, in seconds" - -DATASET_CACHE_LOCK_TIMEOUT = int(os.getenv("CVAT_DATASET_CACHE_LOCK_TIMEOUT", 10)) -"Timeout for cache lock acquiring, in seconds" - -DATASET_EXPORT_LOCKED_RETRY_INTERVAL = int(os.getenv("CVAT_DATASET_EXPORT_LOCKED_RETRY_INTERVAL", 60)) -"Retry interval for cases the export cache lock was unavailable, in seconds" diff --git a/cvat/apps/dataset_manager/tests/test_rest_api_formats.py b/cvat/apps/dataset_manager/tests/test_rest_api_formats.py index 059a45f6df2..50883826b5a 100644 --- a/cvat/apps/dataset_manager/tests/test_rest_api_formats.py +++ b/cvat/apps/dataset_manager/tests/test_rest_api_formats.py @@ -1318,6 +1318,33 @@ def get(self) -> str: class _LockTimeoutError(Exception): pass + def setUp(self): + self.export_cache_lock = multiprocessing.Lock() + + @contextmanager + def patched_get_export_cache_lock(self, export_path, *, ttl: int | timedelta, block: bool = True, acquire_timeout: int | timedelta): + # fakeredis lock acquired in a subprocess won't be visible to other processes + # just implement the lock here + from cvat.apps.dataset_manager.util import LockNotAvailableError + + assert acquire_timeout + assert ttl + + if isinstance(acquire_timeout, timedelta): + acquire_timeout = acquire_timeout.total_seconds() + + acquired = self.export_cache_lock.acquire( + block=block, timeout=acquire_timeout + ) + + if not acquired: + raise LockNotAvailableError + + try: + yield + finally: + self.export_cache_lock.release() + @overload @classmethod def set_condition(cls, var: SharedBool, value: bool = True): ... @@ -1340,7 +1367,7 @@ def set_condition(cls, var: SharedBase, value: Any = _not_set): @classmethod def wait_condition(cls, var: SharedBase, timeout: Optional[int] = 5): with var.condition: - if not var.condition.wait(timeout): + if not var.get() and not var.condition.wait(timeout): raise cls._LockTimeoutError @staticmethod @@ -1387,6 +1414,20 @@ def process_closing(process: multiprocessing.Process, *, timeout: Optional[int] process.join(timeout=timeout) process.close() + def _setup_task_with_annotations( + self, + *, + number_of_images: int = 3, + format_name: str | None = None, + name_ann: str | None = None, + ): + assert format_name or name_ann + images = self._generate_task_images(number_of_images) + task = self._create_task(tasks["main"], images) + self._create_annotations(task, name_ann or f"{format_name} many jobs", "default") + + return task + def test_concurrent_export_and_cleanup(self): side_effect = self.side_effect chain_side_effects = self.chain_side_effects @@ -1397,195 +1438,188 @@ def test_concurrent_export_and_cleanup(self): format_name = "CVAT for images 1.1" - export_cache_lock = multiprocessing.Lock() - - export_checked_the_file = self.SharedBool() - export_created_the_file = self.SharedBool() export_file_path = self.SharedString() + export_checked_the_file = self.SharedBool() + clear_has_been_finished = self.SharedBool() clear_removed_the_file = self.SharedBool() + export_outdated_after = timedelta(seconds=1) - @contextmanager - def patched_get_export_cache_lock(export_path, *, ttl, block=True, acquire_timeout=None): - # fakeredis lock acquired in a subprocess won't be visible to other processes - # just implement the lock here - from cvat.apps.dataset_manager.util import LockNotAvailableError - - if isinstance(acquire_timeout, timedelta): - acquire_timeout = acquire_timeout.total_seconds() - if acquire_timeout is None: - acquire_timeout = -1 - - acquired = export_cache_lock.acquire( - block=block, - timeout=acquire_timeout if acquire_timeout > -1 else None - ) - - if not acquired: - raise LockNotAvailableError - - try: - yield - finally: - export_cache_lock.release() + EXPORT_CACHE_LOCK_TTL = 4 + EXPORT_CACHE_LOCK_ACQUISITION_TIMEOUT = EXPORT_CACHE_LOCK_TTL * 2 def _export(*_, task_id: int): - from os.path import exists as original_exists - from os import replace as original_replace - from cvat.apps.dataset_manager.views import log_exception as original_log_exception import sys + from os import replace as original_replace + from os.path import exists as original_exists + from cvat.apps.dataset_manager.task import export_task as original_export_task - def os_replace_dst_recorder(_: str, dst: str): - set_condition(export_file_path, dst) - return MOCK_DEFAULT + from cvat.apps.dataset_manager.views import log_exception as original_log_exception def patched_log_exception(logger=None, exc_info=True): cur_exc_info = sys.exc_info() if exc_info is True else exc_info - if cur_exc_info and cur_exc_info[1] and isinstance(cur_exc_info[1], _LockTimeoutError): - return # don't spam in logs with expected errors + if ( + cur_exc_info + and cur_exc_info[1] + and isinstance(cur_exc_info[1], _LockTimeoutError) + ): + return # don't spam in logs with expected errors original_log_exception(logger, exc_info) with ( - patch('cvat.apps.dataset_manager.views.EXPORT_CACHE_LOCK_TIMEOUT', new=5), + patch("cvat.apps.dataset_manager.views.EXPORT_CACHE_LOCK_TTL", new=EXPORT_CACHE_LOCK_TTL), + patch("cvat.apps.dataset_manager.views.EXPORT_CACHE_LOCK_ACQUISITION_TIMEOUT", + new=EXPORT_CACHE_LOCK_ACQUISITION_TIMEOUT), patch( - 'cvat.apps.dataset_manager.views.get_export_cache_lock', - new=patched_get_export_cache_lock + "cvat.apps.dataset_manager.views.get_export_cache_lock", + new=self.patched_get_export_cache_lock, ), - patch('cvat.apps.dataset_manager.views.osp.exists') as mock_osp_exists, - patch('cvat.apps.dataset_manager.views.os.replace') as mock_os_replace, - patch('cvat.apps.dataset_manager.views.rq.get_current_job') as mock_rq_get_current_job, - patch('cvat.apps.dataset_manager.views.django_rq.get_scheduler'), - patch('cvat.apps.dataset_manager.views.log_exception', new=patched_log_exception), + # We need to mock the function directly imported into the module + # to ensure that the `export_checked_the_file` condition is set + # only after checking whether a file exists inside an acquired lock + patch("cvat.apps.dataset_manager.views.osp_exists") as mock_osp_exists, + patch( + "cvat.apps.dataset_manager.views.os.replace", side_effect=original_replace + ) as mock_os_replace, + patch("cvat.apps.dataset_manager.views.log_exception", new=patched_log_exception), + patch("cvat.apps.dataset_manager.views.task.export_task") as mock_export_fn, ): mock_osp_exists.side_effect = chain_side_effects( original_exists, side_effect(set_condition, export_checked_the_file), ) - - mock_os_replace.side_effect = chain_side_effects( - original_replace, - os_replace_dst_recorder, - side_effect(set_condition, export_created_the_file), - side_effect(wait_condition, clear_removed_the_file), + mock_export_fn.side_effect = chain_side_effects( + original_export_task, + side_effect(wait_condition, clear_has_been_finished), ) - - mock_rq_get_current_job.return_value = MagicMock(timeout=5) - - exited_by_timeout = False - try: - export(dst_format=format_name, task_id=task_id) - except _LockTimeoutError: - # should come from waiting for clear_removed_the_file - exited_by_timeout = True - - assert exited_by_timeout - mock_os_replace.assert_called_once() - + result_file = export(dst_format=format_name, task_id=task_id) + set_condition(export_file_path, result_file) + mock_os_replace.assert_not_called() def _clear(*_, file_path: str, file_ctime: str): from os import remove as original_remove - from cvat.apps.dataset_manager.util import LockNotAvailableError + + from cvat.apps.dataset_manager.views import FileIsBeingUsedError with ( - patch('cvat.apps.dataset_manager.views.EXPORT_CACHE_LOCK_TIMEOUT', new=5), + patch("cvat.apps.dataset_manager.views.EXPORT_CACHE_LOCK_TTL", new=EXPORT_CACHE_LOCK_TTL), + patch("cvat.apps.dataset_manager.views.EXPORT_CACHE_LOCK_ACQUISITION_TIMEOUT", new=EXPORT_CACHE_LOCK_ACQUISITION_TIMEOUT), + patch( + "cvat.apps.dataset_manager.views.get_export_cache_lock", + new=self.patched_get_export_cache_lock, + ), patch( - 'cvat.apps.dataset_manager.views.get_export_cache_lock', - new=patched_get_export_cache_lock + "cvat.apps.dataset_manager.views.os.remove" + ) as mock_os_remove, + patch( + "cvat.apps.dataset_manager.views.rq.get_current_job" + ) as mock_rq_get_current_job, + patch("cvat.apps.dataset_manager.views.django_rq.get_scheduler"), + patch( + "cvat.apps.dataset_manager.views.TTL_CONSTS", + new={"task": export_outdated_after}, ), - patch('cvat.apps.dataset_manager.views.os.remove') as mock_os_remove, - patch('cvat.apps.dataset_manager.views.rq.get_current_job') as mock_rq_get_current_job, - patch('cvat.apps.dataset_manager.views.django_rq.get_scheduler'), - patch('cvat.apps.dataset_manager.views.TTL_CONSTS', new={'task': timedelta(seconds=0)}), ): + mock_rq_get_current_job.return_value = MagicMock(timeout=5) mock_os_remove.side_effect = chain_side_effects( - side_effect(wait_condition, export_created_the_file), original_remove, side_effect(set_condition, clear_removed_the_file), ) - mock_rq_get_current_job.return_value = MagicMock(timeout=5) - - exited_by_timeout = False try: clear_export_cache( file_path=file_path, file_ctime=file_ctime, logger=MagicMock() ) - except LockNotAvailableError: - # should come from waiting for get_export_cache_lock - exited_by_timeout = True - - assert exited_by_timeout + except FileIsBeingUsedError: + set_condition(clear_has_been_finished) + mock_os_remove.assert_not_called() # The problem checked is TOCTOU / race condition for file existence check and - # further file creation / removal. There are several possible variants of the problem. + # further file update / removal. There are several possible variants of the problem. # An example: - # 1. export checks the file exists, but outdated + # 1. export checks the file exists -> file is not outdated -> need to touch file's updated_date # 2. clear checks the file exists, and matches the creation timestamp - # 3. export creates the new export file - # 4. remove removes the new export file (instead of the one that it checked) + # 3. export updates the files's modification date and does not run actual export + # 4. remove removes the actual export file # Thus, we have no exported file after the successful export. - # + + # note: it is not possible to achieve the situation + # when clear process deletes newly "re-created by export process" + # file instead of the checked one since file names contain a timestamp. + # Other variants can be variations on the intermediate calls, such as getmtime: # - export: exists() # - clear: remove() # - export: getmtime() -> an exception + + # - clear_1: exists() + # - clear_2: remove() + # - clear_1: getmtime() -> an exception # etc. - images = self._generate_task_images(3) - task = self._create_task(tasks["main"], images) - self._create_annotations(task, f'{format_name} many jobs', "default") + task = self._setup_task_with_annotations(format_name=format_name) task_id = task["id"] with ( - patch('cvat.apps.dataset_manager.views.rq.get_current_job') as mock_rq_get_current_job, - patch('cvat.apps.dataset_manager.views.django_rq.get_scheduler'), + patch("cvat.apps.dataset_manager.views.rq.get_current_job") as mock_rq_get_current_job, + patch("cvat.apps.dataset_manager.views.django_rq.get_scheduler"), ): mock_rq_job = MagicMock(timeout=5) mock_rq_get_current_job.return_value = mock_rq_job + # create a file in the export cache first_export_path = export(dst_format=format_name, task_id=task_id) - export_instance_timestamp = parse_export_file_path(first_export_path).instance_timestamp + initial_file_modfication_time = os.path.getmtime(first_export_path) + # make sure that a file in the export cache is outdated by timeout + # and a file would have to be deleted if the export was not running in parallel + sleep(export_outdated_after.seconds + 1) - self._create_annotations(task, f'{format_name} many jobs', "default") + export_instance_timestamp = parse_export_file_path(first_export_path).instance_timestamp processes_finished_correctly = False with ExitStack() as es: # Run both operations concurrently # Threads could be faster, but they can't be terminated - export_process = es.enter_context(process_closing(multiprocessing.Process( - target=_export, - args=( - export_cache_lock, - export_checked_the_file, export_created_the_file, - export_file_path, clear_removed_the_file, - ), - kwargs=dict(task_id=task_id), - ))) - clear_process = es.enter_context(process_closing(multiprocessing.Process( - target=_clear, - args=( - export_cache_lock, - export_checked_the_file, export_created_the_file, - export_file_path, clear_removed_the_file, - ), - kwargs=dict(file_path=first_export_path, file_ctime=export_instance_timestamp), - ))) + export_process = es.enter_context( + process_closing( + multiprocessing.Process( + target=_export, + args=( + self.export_cache_lock, + export_checked_the_file, + ), + kwargs=dict(task_id=task_id), + ) + ) + ) + clear_process = es.enter_context( + process_closing( + multiprocessing.Process( + target=_clear, + args=( + self.export_cache_lock, + export_checked_the_file, + ), + kwargs=dict( + file_path=first_export_path, file_ctime=export_instance_timestamp + ), + ) + ) + ) export_process.start() - wait_condition(export_checked_the_file) # ensure the expected execution order + wait_condition(export_checked_the_file) # ensure the expected execution order clear_process.start() # A deadlock (interrupted by a timeout error) is the positive outcome in this test, # if the problem is fixed. # clear() must wait for the export cache lock release (acquired by export()). # It must be finished by a timeout, as export() holds it, waiting - clear_process.join(timeout=10) - - # export() must wait for the clear() file existence check and fail because of timeout - export_process.join(timeout=10) + clear_process.join(timeout=15) + export_process.join(timeout=15) self.assertFalse(export_process.is_alive()) self.assertFalse(clear_process.is_alive()) @@ -1598,17 +1632,17 @@ def _clear(*_, file_path: str, file_ctime: str): processes_finished_correctly = True self.assertTrue(processes_finished_correctly) - - # terminate() may break the locks, don't try to acquire - # https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Process.terminate - self.assertTrue(export_checked_the_file.get()) - self.assertTrue(export_created_the_file.get()) - self.assertFalse(clear_removed_the_file.get()) new_export_path = export_file_path.get() self.assertGreater(len(new_export_path), 0) self.assertTrue(osp.isfile(new_export_path)) + self.assertTrue(osp.isfile(first_export_path)) + self.assertGreater(os.path.getmtime(first_export_path), initial_file_modfication_time) + + # terminate() may break the locks, don't try to acquire + # https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Process.terminate + self.assertTrue(export_checked_the_file.get()) def test_concurrent_download_and_cleanup(self): side_effect = self.side_effect @@ -1619,14 +1653,10 @@ def test_concurrent_download_and_cleanup(self): format_name = "CVAT for images 1.1" - export_cache_lock = multiprocessing.Lock() - download_checked_the_file = self.SharedBool() clear_removed_the_file = self.SharedBool() - images = self._generate_task_images(3) - task = self._create_task(tasks["main"], images) - self._create_annotations(task, f'{format_name} many jobs', "default") + task = self._setup_task_with_annotations(format_name=format_name) task_id = task["id"] download_url = self._generate_url_dump_tasks_annotations(task_id) @@ -1634,30 +1664,6 @@ def test_concurrent_download_and_cleanup(self): "format": format_name, } - @contextmanager - def patched_get_export_cache_lock(export_path, *, ttl, block=True, acquire_timeout=None): - # fakeredis lock acquired in a subprocess won't be visible to other processes - # just implement the lock here - from cvat.apps.dataset_manager.util import LockNotAvailableError - - if isinstance(acquire_timeout, timedelta): - acquire_timeout = acquire_timeout.total_seconds() - if acquire_timeout is None: - acquire_timeout = -1 - - acquired = export_cache_lock.acquire( - block=block, - timeout=acquire_timeout if acquire_timeout > -1 else None - ) - - if not acquired: - raise LockNotAvailableError - - try: - yield - finally: - export_cache_lock.release() - def _download(*_, task_id: int, export_path: str): from os.path import exists as original_exists @@ -1668,16 +1674,16 @@ def patched_osp_exists(path: str): set_condition(download_checked_the_file) wait_condition( clear_removed_the_file, timeout=20 - ) # wait more than the process timeout + ) # wait more than the process timeout return result with ( patch( - 'cvat.apps.engine.views.dm.util.get_export_cache_lock', - new=patched_get_export_cache_lock + "cvat.apps.engine.views.dm.util.get_export_cache_lock", + new=self.patched_get_export_cache_lock, ), - patch('cvat.apps.dataset_manager.views.osp.exists') as mock_osp_exists, + patch("cvat.apps.dataset_manager.views.osp.exists") as mock_osp_exists, TemporaryDirectory() as temp_dir, ): mock_osp_exists.side_effect = patched_osp_exists @@ -1691,18 +1697,23 @@ def patched_osp_exists(path: str): def _clear(*_, file_path: str, file_ctime: str): from os import remove as original_remove + from cvat.apps.dataset_manager.util import LockNotAvailableError with ( - patch('cvat.apps.dataset_manager.views.EXPORT_CACHE_LOCK_TIMEOUT', new=5), + patch("cvat.apps.dataset_manager.views.EXPORT_CACHE_LOCK_ACQUISITION_TIMEOUT", new=3), + patch( + "cvat.apps.dataset_manager.views.get_export_cache_lock", + new=self.patched_get_export_cache_lock, + ), + patch("cvat.apps.dataset_manager.views.os.remove") as mock_os_remove, patch( - 'cvat.apps.dataset_manager.views.get_export_cache_lock', - new=patched_get_export_cache_lock + "cvat.apps.dataset_manager.views.rq.get_current_job" + ) as mock_rq_get_current_job, + patch("cvat.apps.dataset_manager.views.django_rq.get_scheduler"), + patch( + "cvat.apps.dataset_manager.views.TTL_CONSTS", new={"task": timedelta(seconds=0)} ), - patch('cvat.apps.dataset_manager.views.os.remove') as mock_os_remove, - patch('cvat.apps.dataset_manager.views.rq.get_current_job') as mock_rq_get_current_job, - patch('cvat.apps.dataset_manager.views.django_rq.get_scheduler'), - patch('cvat.apps.dataset_manager.views.TTL_CONSTS', new={'task': timedelta(seconds=0)}), ): mock_os_remove.side_effect = chain_side_effects( original_remove, @@ -1722,7 +1733,6 @@ def _clear(*_, file_path: str, file_ctime: str): assert exited_by_timeout - # The problem checked is TOCTOU / race condition for file existence check and # further file reading / removal. There are several possible variants of the problem. # An example: @@ -1748,7 +1758,7 @@ def patched_export(*args, **kwargs): return result - with patch('cvat.apps.dataset_manager.views.export', new=patched_export): + with patch("cvat.apps.dataset_manager.views.export", new=patched_export): response = self._get_request_with_data(download_url, download_params, self.admin) self.assertEqual(response.status_code, status.HTTP_202_ACCEPTED) @@ -1763,27 +1773,35 @@ def patched_export(*args, **kwargs): with ExitStack() as es: # Run both operations concurrently # Threads could be faster, but they can't be terminated - download_process = es.enter_context(process_closing(multiprocessing.Process( - target=_download, - args=(download_checked_the_file, clear_removed_the_file, export_cache_lock), - kwargs=dict(task_id=task_id, export_path=export_path), - ))) - clear_process = es.enter_context(process_closing(multiprocessing.Process( - target=_clear, - args=(download_checked_the_file, clear_removed_the_file, export_cache_lock), - kwargs=dict(file_path=export_path, file_ctime=export_instance_time), - ))) + download_process = es.enter_context( + process_closing( + multiprocessing.Process( + target=_download, + args=(download_checked_the_file, clear_removed_the_file), + kwargs=dict(task_id=task_id, export_path=export_path), + ) + ) + ) + clear_process = es.enter_context( + process_closing( + multiprocessing.Process( + target=_clear, + args=(download_checked_the_file, clear_removed_the_file), + kwargs=dict(file_path=export_path, file_ctime=export_instance_time), + ) + ) + ) download_process.start() - wait_condition(download_checked_the_file) # ensure the expected execution order + wait_condition(download_checked_the_file) # ensure the expected execution order clear_process.start() # A deadlock (interrupted by a timeout error) is the positive outcome in this test, # if the problem is fixed. # clear() must wait for the export cache lock release (acquired by download()). # It must be finished by a timeout, as download() holds it, waiting - clear_process.join(timeout=5) + clear_process.join(timeout=10) # download() must wait for the clear() file existence check and fail because of timeout download_process.join(timeout=5) @@ -1796,7 +1814,7 @@ def patched_export(*args, **kwargs): # All the expected exceptions should be handled in the process callbacks. # This is to avoid passing the test with unexpected errors - self.assertEqual(download_process.exitcode, -15) # sigterm + self.assertEqual(download_process.exitcode, -15) # sigterm self.assertEqual(clear_process.exitcode, 0) processes_finished_correctly = True @@ -1811,15 +1829,15 @@ def patched_export(*args, **kwargs): def test_export_can_create_file_and_cleanup_job(self): format_name = "CVAT for images 1.1" - images = self._generate_task_images(3) - task = self._create_task(tasks["main"], images) - self._create_annotations(task, f'{format_name} many jobs', "default") + task = self._setup_task_with_annotations(format_name=format_name) task_id = task["id"] with ( - patch('cvat.apps.dataset_manager.views.rq.get_current_job') as mock_rq_get_current_job, - patch('cvat.apps.dataset_manager.views.django_rq.get_scheduler') as mock_rq_get_scheduler, - patch('cvat.apps.dataset_manager.views.TTL_CONSTS', new={'task': timedelta(seconds=0)}), + patch("cvat.apps.dataset_manager.views.rq.get_current_job") as mock_rq_get_current_job, + patch( + "cvat.apps.dataset_manager.views.django_rq.get_scheduler" + ) as mock_rq_get_scheduler, + patch("cvat.apps.dataset_manager.views.TTL_CONSTS", new={"task": timedelta(seconds=0)}), ): mock_rq_job = MagicMock(timeout=5) mock_rq_get_current_job.return_value = mock_rq_job @@ -1837,24 +1855,23 @@ def test_export_cache_lock_can_raise_on_releasing_expired_lock(self): with self.assertRaises(ReleaseUnlockedLock): lock_time = 2 - with get_export_cache_lock('test_export_path', ttl=lock_time, acquire_timeout=5): + with get_export_cache_lock("test_export_path", ttl=lock_time, acquire_timeout=5): sleep(lock_time + 1) def test_export_can_request_retry_on_locking_failure(self): format_name = "CVAT for images 1.1" - images = self._generate_task_images(3) - task = self._create_task(tasks["main"], images) - self._create_annotations(task, f'{format_name} many jobs', "default") + task = self._setup_task_with_annotations(format_name=format_name) task_id = task["id"] from cvat.apps.dataset_manager.util import LockNotAvailableError + with ( patch( - 'cvat.apps.dataset_manager.views.get_export_cache_lock', - side_effect=LockNotAvailableError + "cvat.apps.dataset_manager.views.get_export_cache_lock", + side_effect=LockNotAvailableError, ) as mock_get_export_cache_lock, - patch('cvat.apps.dataset_manager.views.rq.get_current_job') as mock_rq_get_current_job, - patch('cvat.apps.dataset_manager.views.django_rq.get_scheduler'), + patch("cvat.apps.dataset_manager.views.rq.get_current_job") as mock_rq_get_current_job, + patch("cvat.apps.dataset_manager.views.django_rq.get_scheduler"), self.assertRaises(LockNotAvailableError), ): mock_rq_job = MagicMock(timeout=5) @@ -1867,25 +1884,26 @@ def test_export_can_request_retry_on_locking_failure(self): def test_export_can_reuse_older_file_if_still_relevant(self): format_name = "CVAT for images 1.1" - images = self._generate_task_images(3) - task = self._create_task(tasks["main"], images) - self._create_annotations(task, f'{format_name} many jobs', "default") + task = self._setup_task_with_annotations(format_name=format_name) task_id = task["id"] with ( - patch('cvat.apps.dataset_manager.views.rq.get_current_job') as mock_rq_get_current_job, - patch('cvat.apps.dataset_manager.views.django_rq.get_scheduler'), + patch("cvat.apps.dataset_manager.views.rq.get_current_job") as mock_rq_get_current_job, + patch("cvat.apps.dataset_manager.views.django_rq.get_scheduler"), ): mock_rq_get_current_job.return_value = MagicMock(timeout=5) first_export_path = export(dst_format=format_name, task_id=task_id) from os.path import exists as original_exists + with ( - patch('cvat.apps.dataset_manager.views.rq.get_current_job') as mock_rq_get_current_job, - patch('cvat.apps.dataset_manager.views.django_rq.get_scheduler'), - patch('cvat.apps.dataset_manager.views.osp.exists', side_effect=original_exists) as mock_osp_exists, - patch('cvat.apps.dataset_manager.views.os.replace') as mock_os_replace, + patch("cvat.apps.dataset_manager.views.rq.get_current_job") as mock_rq_get_current_job, + patch("cvat.apps.dataset_manager.views.django_rq.get_scheduler"), + patch( + "cvat.apps.dataset_manager.views.osp_exists", side_effect=original_exists + ) as mock_osp_exists, + patch("cvat.apps.dataset_manager.views.os.replace") as mock_os_replace, ): mock_rq_get_current_job.return_value = MagicMock(timeout=5) @@ -1895,25 +1913,176 @@ def test_export_can_reuse_older_file_if_still_relevant(self): mock_osp_exists.assert_called_with(first_export_path) mock_os_replace.assert_not_called() + def test_initiate_concurrent_export_by_different_users(self): + side_effect = self.side_effect + chain_side_effects = self.chain_side_effects + process_closing = self.process_closing + wait_condition = self.wait_condition + set_condition = self.set_condition + + export_1_checked_file = self.SharedBool() + export_1_made_export = self.SharedBool() + export_1_replaced_file = self.SharedBool() + + export_2_checked_file = self.SharedBool() + export_2_made_export = self.SharedBool() + export_2_replaced_file = self.SharedBool() + + format_name = "CVAT for images 1.1" + + LOCK_TTL = 4 + LOCK_ACQUISITION_TIMEOUT = LOCK_TTL * 2 + + def _export_1( + *_, + task_id: int, + result_queue: multiprocessing.Queue, + ): + from os import replace as original_replace + + from cvat.apps.dataset_manager.task import export_task as original_export_task + + with ( + patch("cvat.apps.dataset_manager.views.EXPORT_CACHE_LOCK_TTL", new=LOCK_TTL), + patch( + "cvat.apps.dataset_manager.views.EXPORT_CACHE_LOCK_ACQUISITION_TIMEOUT", + new=LOCK_ACQUISITION_TIMEOUT, + ), + patch( + "cvat.apps.dataset_manager.views.get_export_cache_lock", + new=self.patched_get_export_cache_lock, + ), + patch("cvat.apps.dataset_manager.views.os.replace") as mock_os_replace, + patch("cvat.apps.dataset_manager.views.task.export_task") as mock_export_fn, + patch("cvat.apps.dataset_manager.views.django_rq.get_scheduler"), + ): + mock_export_fn.side_effect = chain_side_effects( + side_effect(set_condition, export_1_checked_file), + original_export_task, + side_effect(wait_condition, export_2_checked_file), + side_effect(set_condition, export_1_made_export), + ) + + mock_os_replace.side_effect = chain_side_effects( + original_replace, + side_effect(set_condition, export_1_replaced_file), + ) + result_file_path = export(dst_format=format_name, task_id=task_id) + result_queue.put(result_file_path) + + mock_export_fn.assert_called_once() + mock_os_replace.assert_called_once() + + def _export_2( + *_, + task_id: int, + result_queue: multiprocessing.Queue, + ): + from os import replace as original_replace + + from cvat.apps.dataset_manager.task import export_task as original_export_task + + with ( + patch("cvat.apps.dataset_manager.views.EXPORT_CACHE_LOCK_TTL", new=LOCK_TTL), + patch( + "cvat.apps.dataset_manager.views.EXPORT_CACHE_LOCK_ACQUISITION_TIMEOUT", + new=LOCK_ACQUISITION_TIMEOUT, + ), + patch( + "cvat.apps.dataset_manager.views.get_export_cache_lock", + new=self.patched_get_export_cache_lock, + ), + patch("cvat.apps.dataset_manager.views.os.replace") as mock_os_replace, + patch("cvat.apps.dataset_manager.views.task.export_task") as mock_export_fn, + patch("cvat.apps.dataset_manager.views.django_rq.get_scheduler"), + ): + mock_export_fn.side_effect = chain_side_effects( + side_effect(set_condition, export_2_checked_file), + original_export_task, + side_effect(wait_condition, export_1_replaced_file), + side_effect(set_condition, export_2_made_export), + ) + + mock_os_replace.side_effect = chain_side_effects( + original_replace, + side_effect(set_condition, export_2_replaced_file), + ) + result_file_path = export(dst_format=format_name, task_id=task_id) + result_queue.put(result_file_path) + + mock_export_fn.assert_called_once() + mock_os_replace.assert_called_once() + + task = self._setup_task_with_annotations(format_name=format_name) + + with ExitStack() as es: + result_queue = multiprocessing.Queue() + number_of_processes = 2 + export_process_1 = es.enter_context( + process_closing( + multiprocessing.Process( + target=_export_1, + kwargs=dict( + task_id=task["id"], + result_queue=result_queue, + ), + ) + ) + ) + export_process_2 = es.enter_context( + process_closing( + multiprocessing.Process( + target=_export_2, + kwargs=dict( + task_id=task["id"], + result_queue=result_queue, + ), + ) + ) + ) + + export_process_1.start() + wait_condition(export_1_checked_file) + + export_process_2.start() + export_process_2.join(timeout=20) + export_process_1.join(timeout=20) + + self.assertFalse(export_process_1.is_alive()) + self.assertFalse(export_process_2.is_alive()) + + self.assertEqual(export_process_1.exitcode, 0) + self.assertEqual(export_process_2.exitcode, 0) + paths = {result_queue.get() for _ in range(number_of_processes)} + result_queue.close() + + self.assertTrue(len(paths) == 1) + self.assertNotEqual(paths, {None}) + self.assertTrue(osp.isfile(list(paths)[0])) + + for cond in ( + export_1_checked_file, export_1_made_export, export_1_replaced_file, + export_2_checked_file, export_2_made_export, export_2_replaced_file + ): + self.assertTrue(cond.get()) + def test_cleanup_can_remove_file(self): format_name = "CVAT for images 1.1" - images = self._generate_task_images(3) - task = self._create_task(tasks["main"], images) - self._create_annotations(task, f'{format_name} many jobs', "default") + task = self._setup_task_with_annotations(format_name=format_name) task_id = task["id"] with ( - patch('cvat.apps.dataset_manager.views.rq.get_current_job') as mock_rq_get_current_job, - patch('cvat.apps.dataset_manager.views.django_rq.get_scheduler'), + patch("cvat.apps.dataset_manager.views.rq.get_current_job") as mock_rq_get_current_job, + patch("cvat.apps.dataset_manager.views.django_rq.get_scheduler"), ): mock_rq_get_current_job.return_value = MagicMock(timeout=5) export_path = export(dst_format=format_name, task_id=task_id) with ( - patch('cvat.apps.dataset_manager.views.rq.get_current_job') as mock_rq_get_current_job, - patch('cvat.apps.dataset_manager.views.django_rq.get_scheduler'), - patch('cvat.apps.dataset_manager.views.TTL_CONSTS', new={'task': timedelta(seconds=0)}), + patch("cvat.apps.dataset_manager.views.rq.get_current_job") as mock_rq_get_current_job, + patch("cvat.apps.dataset_manager.views.django_rq.get_scheduler"), + patch("cvat.apps.dataset_manager.views.TTL_CONSTS", new={"task": timedelta(seconds=0)}), ): mock_rq_get_current_job.return_value = MagicMock(timeout=5) @@ -1925,15 +2094,14 @@ def test_cleanup_can_remove_file(self): def test_cleanup_can_request_retry_on_locking_failure(self): format_name = "CVAT for images 1.1" - images = self._generate_task_images(3) - task = self._create_task(tasks["main"], images) - self._create_annotations(task, f'{format_name} many jobs', "default") + task = self._setup_task_with_annotations(format_name=format_name) task_id = task["id"] from cvat.apps.dataset_manager.util import LockNotAvailableError + with ( - patch('cvat.apps.dataset_manager.views.rq.get_current_job') as mock_rq_get_current_job, - patch('cvat.apps.dataset_manager.views.django_rq.get_scheduler'), + patch("cvat.apps.dataset_manager.views.rq.get_current_job") as mock_rq_get_current_job, + patch("cvat.apps.dataset_manager.views.django_rq.get_scheduler"), ): mock_rq_get_current_job.return_value = MagicMock(timeout=5) @@ -1941,11 +2109,11 @@ def test_cleanup_can_request_retry_on_locking_failure(self): with ( patch( - 'cvat.apps.dataset_manager.views.get_export_cache_lock', - side_effect=LockNotAvailableError + "cvat.apps.dataset_manager.views.get_export_cache_lock", + side_effect=LockNotAvailableError, ) as mock_get_export_cache_lock, - patch('cvat.apps.dataset_manager.views.rq.get_current_job') as mock_rq_get_current_job, - patch('cvat.apps.dataset_manager.views.django_rq.get_scheduler'), + patch("cvat.apps.dataset_manager.views.rq.get_current_job") as mock_rq_get_current_job, + patch("cvat.apps.dataset_manager.views.django_rq.get_scheduler"), self.assertRaises(LockNotAvailableError), ): mock_rq_job = MagicMock(timeout=5) @@ -1960,8 +2128,8 @@ def test_cleanup_can_request_retry_on_locking_failure(self): def test_cleanup_can_fail_if_no_file(self): with ( - patch('cvat.apps.dataset_manager.views.rq.get_current_job') as mock_rq_get_current_job, - patch('cvat.apps.dataset_manager.views.django_rq.get_scheduler'), + patch("cvat.apps.dataset_manager.views.rq.get_current_job") as mock_rq_get_current_job, + patch("cvat.apps.dataset_manager.views.django_rq.get_scheduler"), self.assertRaises(FileNotFoundError), ): mock_rq_job = MagicMock(timeout=5) @@ -1971,23 +2139,22 @@ def test_cleanup_can_fail_if_no_file(self): def test_cleanup_can_defer_removal_if_file_is_used_recently(self): format_name = "CVAT for images 1.1" - images = self._generate_task_images(3) - task = self._create_task(tasks["main"], images) - self._create_annotations(task, f'{format_name} many jobs', "default") + task = self._setup_task_with_annotations(format_name=format_name) task_id = task["id"] with ( - patch('cvat.apps.dataset_manager.views.rq.get_current_job') as mock_rq_get_current_job, - patch('cvat.apps.dataset_manager.views.django_rq.get_scheduler'), + patch("cvat.apps.dataset_manager.views.rq.get_current_job") as mock_rq_get_current_job, + patch("cvat.apps.dataset_manager.views.django_rq.get_scheduler"), ): mock_rq_get_current_job.return_value = MagicMock(timeout=5) export_path = export(dst_format=format_name, task_id=task_id) from cvat.apps.dataset_manager.views import FileIsBeingUsedError + with ( - patch('cvat.apps.dataset_manager.views.rq.get_current_job') as mock_rq_get_current_job, - patch('cvat.apps.dataset_manager.views.TTL_CONSTS', new={'task': timedelta(hours=1)}), + patch("cvat.apps.dataset_manager.views.rq.get_current_job") as mock_rq_get_current_job, + patch("cvat.apps.dataset_manager.views.TTL_CONSTS", new={"task": timedelta(hours=1)}), self.assertRaises(FileIsBeingUsedError), ): mock_rq_job = MagicMock(timeout=5) @@ -2006,14 +2173,12 @@ def test_cleanup_can_be_called_with_old_signature_and_values(self): # Jobs referring to the old API can exist in the redis queues after the server is updated format_name = "CVAT for images 1.1" - images = self._generate_task_images(3) - task = self._create_task(tasks["main"], images) - self._create_annotations(task, f'{format_name} many jobs', "default") + task = self._setup_task_with_annotations(format_name=format_name) task_id = task["id"] with ( - patch('cvat.apps.dataset_manager.views.rq.get_current_job') as mock_rq_get_current_job, - patch('cvat.apps.dataset_manager.views.django_rq.get_scheduler'), + patch("cvat.apps.dataset_manager.views.rq.get_current_job") as mock_rq_get_current_job, + patch("cvat.apps.dataset_manager.views.django_rq.get_scheduler"), ): mock_rq_get_current_job.return_value = MagicMock(timeout=5) @@ -2027,14 +2192,14 @@ def test_cleanup_can_be_called_with_old_signature_and_values(self): shutil.move(new_export_path, old_export_path) old_kwargs = { - 'file_path': old_export_path, - 'file_ctime': file_ctime, - 'logger': MagicMock(), + "file_path": old_export_path, + "file_ctime": file_ctime, + "logger": MagicMock(), } with ( - patch('cvat.apps.dataset_manager.views.rq.get_current_job') as mock_rq_get_current_job, - patch('cvat.apps.dataset_manager.views.TTL_CONSTS', new={'task': timedelta(seconds=0)}), + patch("cvat.apps.dataset_manager.views.rq.get_current_job") as mock_rq_get_current_job, + patch("cvat.apps.dataset_manager.views.TTL_CONSTS", new={"task": timedelta(seconds=0)}), ): mock_rq_get_current_job.return_value = MagicMock(timeout=5) diff --git a/cvat/apps/dataset_manager/util.py b/cvat/apps/dataset_manager/util.py index 2f1029049bb..6d814ec2c67 100644 --- a/cvat/apps/dataset_manager/util.py +++ b/cvat/apps/dataset_manager/util.py @@ -111,14 +111,16 @@ def get_export_cache_lock( *, ttl: int | timedelta, block: bool = True, - acquire_timeout: Optional[int | timedelta] = None, + acquire_timeout: int | timedelta, ) -> Generator[Lock, Any, Any]: + assert acquire_timeout is not None, "Endless waiting for the lock should be avoided" + if isinstance(acquire_timeout, timedelta): acquire_timeout = acquire_timeout.total_seconds() - if acquire_timeout is not None and acquire_timeout < 0: + + if acquire_timeout < 0: raise ValueError("acquire_timeout must be a non-negative number") - elif acquire_timeout is None: - acquire_timeout = -1 + if isinstance(ttl, timedelta): ttl = ttl.total_seconds() @@ -233,3 +235,9 @@ def parse_export_file_path(file_path: os.PathLike[str]) -> ParsedExportFilename: format_repr=basename_match.group('format_tag'), file_ext=basename_match.group('file_ext'), ) + +def extend_export_file_lifetime(file_path: str): + # Update the last modification time to extend the export's lifetime, + # as the last access time is not available on every filesystem. + # As a result, file deletion by the cleaning job will be postponed. + os.utime(file_path, None) diff --git a/cvat/apps/dataset_manager/views.py b/cvat/apps/dataset_manager/views.py index d056869adb3..52bc9cd15f7 100644 --- a/cvat/apps/dataset_manager/views.py +++ b/cvat/apps/dataset_manager/views.py @@ -11,6 +11,7 @@ import django_rq import rq +from os.path import exists as osp_exists from django.conf import settings from django.utils import timezone from rq_scheduler import Scheduler @@ -27,21 +28,23 @@ LockNotAvailableError, current_function_name, get_export_cache_lock, get_export_cache_dir, make_export_filename, - parse_export_file_path + parse_export_file_path, extend_export_file_lifetime ) from .util import EXPORT_CACHE_DIR_NAME # pylint: disable=unused-import + slogger = ServerLogManager(__name__) _MODULE_NAME = __package__ + '.' + osp.splitext(osp.basename(__file__))[0] -def log_exception(logger=None, exc_info=True): + +def log_exception(logger: logging.Logger | None = None, exc_info: bool = True): if logger is None: - logger = slogger + logger = slogger.glob logger.exception("[%s @ %s]: exception occurred" % \ (_MODULE_NAME, current_function_name(2)), exc_info=exc_info) -DEFAULT_CACHE_TTL = timedelta(seconds=settings.DATASET_CACHE_TTL) +DEFAULT_CACHE_TTL = timedelta(seconds=settings.EXPORT_CACHE_TTL) PROJECT_CACHE_TTL = DEFAULT_CACHE_TTL TASK_CACHE_TTL = DEFAULT_CACHE_TTL JOB_CACHE_TTL = DEFAULT_CACHE_TTL @@ -51,8 +54,9 @@ def log_exception(logger=None, exc_info=True): 'job': JOB_CACHE_TTL, } -EXPORT_CACHE_LOCK_TIMEOUT = timedelta(seconds=settings.DATASET_CACHE_LOCK_TIMEOUT) -EXPORT_LOCKED_RETRY_INTERVAL = timedelta(seconds=settings.DATASET_EXPORT_LOCKED_RETRY_INTERVAL) +EXPORT_CACHE_LOCK_ACQUISITION_TIMEOUT = timedelta(seconds=settings.EXPORT_CACHE_LOCK_ACQUISITION_TIMEOUT) +EXPORT_CACHE_LOCK_TTL = timedelta(seconds=settings.EXPORT_CACHE_LOCK_TTL) +EXPORT_LOCKED_RETRY_INTERVAL = timedelta(seconds=settings.EXPORT_LOCKED_RETRY_INTERVAL) def get_export_cache_ttl(db_instance: str | Project | Task | Job) -> timedelta: @@ -61,6 +65,14 @@ def get_export_cache_ttl(db_instance: str | Project | Task | Job) -> timedelta: return TTL_CONSTS[db_instance.lower()] +def _patch_scheduled_job_status(job: rq.job.Job): + # NOTE: rq scheduler < 0.14 does not set the appropriate + # job status (SCHEDULED). This has been fixed in the 0.14 version. + # https://github.com/rq/rq-scheduler/blob/f7d5787c5f94b5517e209c612ef648f4bfc44f9e/rq_scheduler/scheduler.py#L148 + # FUTURE-TODO: delete manual status setting after upgrading to 0.14 + if job.get_status(refresh=False) != rq.job.JobStatus.SCHEDULED: + job.set_status(rq.job.JobStatus.SCHEDULED) + def _retry_current_rq_job(time_delta: timedelta) -> rq.job.Job: # TODO: implement using retries once we move from rq_scheduler to builtin RQ scheduler # for better reliability and error reporting @@ -79,7 +91,7 @@ def _patched_retry(*_1, **_2): user_id = current_rq_job.meta.get('user', {}).get('id') or -1 with get_rq_lock_by_user(settings.CVAT_QUEUES.EXPORT_DATA.value, user_id): - scheduler.enqueue_in( + scheduled_rq_job: rq.job.Job = scheduler.enqueue_in( time_delta, current_rq_job.func, *current_rq_job.args, @@ -92,12 +104,21 @@ def _patched_retry(*_1, **_2): on_success=current_rq_job.success_callback, on_failure=current_rq_job.failure_callback, ) + _patch_scheduled_job_status(scheduled_rq_job) current_rq_job.retries_left = 1 setattr(current_rq_job, 'retry', _patched_retry) return current_rq_job -def export(dst_format, project_id=None, task_id=None, job_id=None, server_url=None, save_images=False): +def export( + *, + dst_format: str, + project_id: int | None = None, + task_id: int | None = None, + job_id: int | None = None, + server_url: str | None = None, + save_images: bool = False, +): try: if task_id is not None: logger = slogger.task[task_id] @@ -134,41 +155,50 @@ def export(dst_format, project_id=None, task_id=None, job_id=None, server_url=No os.makedirs(cache_dir, exist_ok=True) + # acquire a lock 2 times instead of using one long lock: + # 1. to check whether the file exists or not + # 2. to create a file when it doesn't exist with get_export_cache_lock( output_path, - block=True, - acquire_timeout=EXPORT_CACHE_LOCK_TIMEOUT, - ttl=rq.get_current_job().timeout, + ttl=EXPORT_CACHE_LOCK_TTL, + acquire_timeout=EXPORT_CACHE_LOCK_ACQUISITION_TIMEOUT, ): - if not osp.exists(output_path): - with tempfile.TemporaryDirectory(dir=cache_dir) as temp_dir: - temp_file = osp.join(temp_dir, 'result') - export_fn(db_instance.id, temp_file, dst_format, - server_url=server_url, save_images=save_images) - os.replace(temp_file, output_path) - - scheduler: Scheduler = django_rq.get_scheduler( - settings.CVAT_QUEUES.EXPORT_DATA.value - ) - cleaning_job = scheduler.enqueue_in( - time_delta=cache_ttl, - func=clear_export_cache, - file_path=output_path, - file_ctime=instance_update_time.timestamp(), - logger=logger - ) - logger.info( - "The {} '{}' is exported as '{}' at '{}' " - "and available for downloading for the next {}. " - "Export cache cleaning job is enqueued, id '{}'".format( - db_instance.__class__.__name__.lower(), - db_instance.name if isinstance( - db_instance, (Project, Task) - ) else db_instance.id, - dst_format, output_path, cache_ttl, - cleaning_job.id - ) - ) + if osp_exists(output_path): + extend_export_file_lifetime(output_path) + return output_path + + with tempfile.TemporaryDirectory(dir=cache_dir) as temp_dir: + temp_file = osp.join(temp_dir, 'result') + export_fn(db_instance.id, temp_file, dst_format, + server_url=server_url, save_images=save_images) + with get_export_cache_lock( + output_path, + ttl=EXPORT_CACHE_LOCK_TTL, + acquire_timeout=EXPORT_CACHE_LOCK_ACQUISITION_TIMEOUT, + ): + os.replace(temp_file, output_path) + + scheduler: Scheduler = django_rq.get_scheduler(settings.CVAT_QUEUES.EXPORT_DATA.value) + cleaning_job = scheduler.enqueue_in( + time_delta=cache_ttl, + func=clear_export_cache, + file_path=output_path, + file_ctime=instance_update_time.timestamp(), + logger=logger, + ) + _patch_scheduled_job_status(cleaning_job) + logger.info( + "The {} '{}' is exported as '{}' at '{}' " + "and available for downloading for the next {}. " + "Export cache cleaning job is enqueued, id '{}'".format( + db_instance.__class__.__name__.lower(), + db_instance.id, + dst_format, + output_path, + cache_ttl, + cleaning_job.id, + ) + ) return output_path except LockNotAvailableError: @@ -184,23 +214,23 @@ def export(dst_format, project_id=None, task_id=None, job_id=None, server_url=No log_exception(logger) raise -def export_job_annotations(job_id, dst_format=None, server_url=None): - return export(dst_format,job_id=job_id, server_url=server_url, save_images=False) +def export_job_annotations(job_id: int, dst_format: str, *, server_url: str | None = None): + return export(dst_format=dst_format, job_id=job_id, server_url=server_url, save_images=False) -def export_job_as_dataset(job_id, dst_format=None, server_url=None): - return export(dst_format, job_id=job_id, server_url=server_url, save_images=True) +def export_job_as_dataset(job_id: int, dst_format: str, *, server_url: str | None = None): + return export(dst_format=dst_format, job_id=job_id, server_url=server_url, save_images=True) -def export_task_as_dataset(task_id, dst_format=None, server_url=None): - return export(dst_format, task_id=task_id, server_url=server_url, save_images=True) +def export_task_as_dataset(task_id: int, dst_format: str, *, server_url: str | None = None): + return export(dst_format=dst_format, task_id=task_id, server_url=server_url, save_images=True) -def export_task_annotations(task_id, dst_format=None, server_url=None): - return export(dst_format,task_id=task_id, server_url=server_url, save_images=False) +def export_task_annotations(task_id: int, dst_format: str, *, server_url: str | None = None): + return export(dst_format=dst_format, task_id=task_id, server_url=server_url, save_images=False) -def export_project_as_dataset(project_id, dst_format=None, server_url=None): - return export(dst_format, project_id=project_id, server_url=server_url, save_images=True) +def export_project_as_dataset(project_id: int, dst_format: str, *, server_url: str | None = None): + return export(dst_format=dst_format, project_id=project_id, server_url=server_url, save_images=True) -def export_project_annotations(project_id, dst_format=None, server_url=None): - return export(dst_format, project_id=project_id, server_url=server_url, save_images=False) +def export_project_annotations(project_id: int, dst_format: str, *, server_url: str | None = None): + return export(dst_format=dst_format, project_id=project_id, server_url=server_url, save_images=False) class FileIsBeingUsedError(Exception): @@ -213,8 +243,8 @@ def clear_export_cache(file_path: str, file_ctime: float, logger: logging.Logger with get_export_cache_lock( file_path, block=True, - acquire_timeout=EXPORT_CACHE_LOCK_TIMEOUT, - ttl=rq.get_current_job().timeout, + acquire_timeout=EXPORT_CACHE_LOCK_ACQUISITION_TIMEOUT, + ttl=EXPORT_CACHE_LOCK_TTL, ): if not osp.exists(file_path): raise FileNotFoundError("Export cache file '{}' doesn't exist".format(file_path)) diff --git a/cvat/apps/engine/background.py b/cvat/apps/engine/background.py index 441d4702014..d9f9237e6d2 100644 --- a/cvat/apps/engine/background.py +++ b/cvat/apps/engine/background.py @@ -14,7 +14,7 @@ from django.conf import settings from django.http.response import HttpResponseBadRequest from django.utils import timezone -from django_rq.queues import DjangoRQ +from django_rq.queues import DjangoRQ, DjangoScheduler from rest_framework import serializers, status from rest_framework.request import Request from rest_framework.response import Response @@ -52,6 +52,11 @@ slogger = ServerLogManager(__name__) +REQUEST_TIMEOUT = 60 +# it's better to return LockNotAvailableError instead of response with 504 status +LOCK_TTL = REQUEST_TIMEOUT - 5 +LOCK_ACQUIRE_TIMEOUT = LOCK_TTL - 5 + class _ResourceExportManager(ABC): QUEUE_NAME = settings.CVAT_QUEUES.EXPORT_DATA.value @@ -89,7 +94,7 @@ def setup_background_job(self, queue: DjangoRQ, rq_id: str) -> None: def _handle_rq_job_v1(self, rq_job: Optional[RQJob], queue: DjangoRQ) -> Optional[Response]: pass - def _handle_rq_job_v2(self, rq_job: Optional[RQJob], *args, **kwargs) -> Optional[Response]: + def _handle_rq_job_v2(self, rq_job: Optional[RQJob], queue: DjangoRQ) -> Optional[Response]: if not rq_job: return None @@ -101,17 +106,23 @@ def _handle_rq_job_v2(self, rq_job: Optional[RQJob], *args, **kwargs) -> Optiona status=status.HTTP_409_CONFLICT, ) - if rq_job_status in (RQJobStatus.SCHEDULED, RQJobStatus.DEFERRED): + if rq_job_status == RQJobStatus.DEFERRED: + rq_job.cancel(enqueue_dependents=settings.ONE_RUNNING_JOB_IN_QUEUE_PER_USER) + + if rq_job_status == RQJobStatus.SCHEDULED: + scheduler: DjangoScheduler = django_rq.get_scheduler(queue.name, queue=queue) + # remove the job id from the set with scheduled keys + scheduler.cancel(rq_job) rq_job.cancel(enqueue_dependents=settings.ONE_RUNNING_JOB_IN_QUEUE_PER_USER) rq_job.delete() return None - def handle_rq_job(self, *args, **kwargs) -> Optional[Response]: + def handle_rq_job(self, rq_job: RQJob | None, queue: DjangoRQ) -> Optional[Response]: if self.version == 1: - return self._handle_rq_job_v1(*args, **kwargs) + return self._handle_rq_job_v1(rq_job, queue) elif self.version == 2: - return self._handle_rq_job_v2(*args, **kwargs) + return self._handle_rq_job_v2(rq_job, queue) raise ValueError("Unsupported version") @@ -220,7 +231,9 @@ def is_result_outdated() -> bool: return rq_job.meta[RQJobMetaField.REQUEST]["timestamp"] < instance_update_time def handle_local_download() -> Response: - with dm.util.get_export_cache_lock(file_path, ttl=REQUEST_TIMEOUT): + with dm.util.get_export_cache_lock( + file_path, ttl=LOCK_TTL, acquire_timeout=LOCK_ACQUIRE_TIMEOUT + ): if not osp.exists(file_path): return Response( "The exported file has expired, please retry exporting", @@ -295,8 +308,6 @@ def handle_local_download() -> Response: instance_update_time = self.get_instance_update_time() instance_timestamp = self.get_timestamp(instance_update_time) - REQUEST_TIMEOUT = 60 - if rq_job_status == RQJobStatus.FINISHED: if self.export_args.location == Location.CLOUD_STORAGE: rq_job.delete() @@ -313,11 +324,13 @@ def handle_local_download() -> Response: if action == "download": return handle_local_download() else: - with dm.util.get_export_cache_lock(file_path, ttl=REQUEST_TIMEOUT): + with dm.util.get_export_cache_lock( + file_path, + ttl=LOCK_TTL, + acquire_timeout=LOCK_ACQUIRE_TIMEOUT, + ): if osp.exists(file_path) and not is_result_outdated(): - # Update last update time to prolong the export lifetime - # as the last access time is not available on every filesystem - os.utime(file_path, None) + dm.util.extend_export_file_lifetime(file_path) return Response(status=status.HTTP_201_CREATED) @@ -422,7 +435,7 @@ def setup_background_job( user_id = self.request.user.id func = self.export_callback - func_args = (self.db_instance.id, self.export_args.format, server_address) + func_args = (self.db_instance.id, self.export_args.format) result_url = None if self.export_args.location == Location.CLOUD_STORAGE: @@ -467,6 +480,9 @@ def setup_background_job( queue.enqueue_call( func=func, args=func_args, + kwargs={ + "server_url": server_address, + }, job_id=rq_id, meta=get_rq_job_meta( request=self.request, db_obj=self.db_instance, result_url=result_url diff --git a/cvat/apps/engine/default_settings.py b/cvat/apps/engine/default_settings.py index 15e1b3fd8c3..f853d3bc821 100644 --- a/cvat/apps/engine/default_settings.py +++ b/cvat/apps/engine/default_settings.py @@ -2,9 +2,13 @@ # # SPDX-License-Identifier: MIT +import logging as log import os from attrs.converters import to_bool +from django.core.exceptions import ImproperlyConfigured + +logger = log.getLogger("cvat") MEDIA_CACHE_ALLOW_STATIC_CACHE = to_bool(os.getenv("CVAT_ALLOW_STATIC_CACHE", False)) """ @@ -24,3 +28,66 @@ """ Sets the frequency of checking the readiness of the chunk """ +default_export_cache_ttl = 60 * 60 * 24 +default_export_cache_lock_ttl = 30 +default_export_cache_lock_acquisition_timeout = 50 +default_export_locked_retry_interval = 60 + +EXPORT_CACHE_TTL = os.getenv("CVAT_DATASET_CACHE_TTL") +"Base lifetime for cached export files, in seconds" + +if EXPORT_CACHE_TTL is not None: + EXPORT_CACHE_TTL = int(EXPORT_CACHE_TTL) + logger.warning( + "The CVAT_DATASET_CACHE_TTL is deprecated, use CVAT_EXPORT_CACHE_TTL instead", + ) +else: + EXPORT_CACHE_TTL = int(os.getenv("CVAT_EXPORT_CACHE_TTL", default_export_cache_ttl)) + + +EXPORT_CACHE_LOCK_TTL = os.getenv("CVAT_DATASET_EXPORT_LOCK_TTL") +"Default lifetime for the export cache lock, in seconds." + +if EXPORT_CACHE_LOCK_TTL is not None: + EXPORT_CACHE_LOCK_TTL = int(EXPORT_CACHE_LOCK_TTL) + logger.warning( + "The CVAT_DATASET_EXPORT_LOCK_TTL is deprecated, use CVAT_EXPORT_CACHE_LOCK_TTL instead", + ) +else: + EXPORT_CACHE_LOCK_TTL = int( + os.getenv("CVAT_EXPORT_CACHE_LOCK_TTL", default_export_cache_lock_ttl) + ) + +EXPORT_CACHE_LOCK_ACQUISITION_TIMEOUT = os.getenv("CVAT_DATASET_CACHE_LOCK_TIMEOUT") +"Timeout for cache lock acquiring, in seconds" + +if EXPORT_CACHE_LOCK_ACQUISITION_TIMEOUT is not None: + EXPORT_CACHE_LOCK_ACQUISITION_TIMEOUT = int(EXPORT_CACHE_LOCK_ACQUISITION_TIMEOUT) + logger.warning( + "The CVAT_DATASET_CACHE_LOCK_TIMEOUT is deprecated, " + "use CVAT_EXPORT_CACHE_LOCK_ACQUISITION_TIMEOUT instead", + ) +else: + EXPORT_CACHE_LOCK_ACQUISITION_TIMEOUT = int( + os.getenv( + "CVAT_EXPORT_CACHE_LOCK_ACQUISITION_TIMEOUT", + default_export_cache_lock_acquisition_timeout, + ) + ) + +if EXPORT_CACHE_LOCK_ACQUISITION_TIMEOUT <= EXPORT_CACHE_LOCK_TTL: + raise ImproperlyConfigured("Lock acquisition timeout must be more than lock TTL") + +EXPORT_LOCKED_RETRY_INTERVAL = os.getenv("CVAT_DATASET_EXPORT_LOCKED_RETRY_INTERVAL") +"Retry interval for cases the export cache lock was unavailable, in seconds" + +if EXPORT_LOCKED_RETRY_INTERVAL is not None: + EXPORT_LOCKED_RETRY_INTERVAL = int(EXPORT_LOCKED_RETRY_INTERVAL) + logger.warning( + "The CVAT_DATASET_EXPORT_LOCKED_RETRY_INTERVAL is deprecated, " + "use CVAT_EXPORT_LOCKED_RETRY_INTERVAL instead", + ) +else: + EXPORT_LOCKED_RETRY_INTERVAL = int( + os.getenv("CVAT_EXPORT_LOCKED_RETRY_INTERVAL", default_export_locked_retry_interval) + ) diff --git a/cvat/apps/engine/serializers.py b/cvat/apps/engine/serializers.py index 3d605d6c8a2..629ce51c13a 100644 --- a/cvat/apps/engine/serializers.py +++ b/cvat/apps/engine/serializers.py @@ -3301,7 +3301,8 @@ def get_message(self, rq_job: RQJob) -> str: def to_representation(self, rq_job: RQJob) -> Dict[str, Any]: representation = super().to_representation(rq_job) - if representation["status"] == RQJobStatus.DEFERRED: + # FUTURE-TODO: support such statuses on UI + if representation["status"] in (RQJobStatus.DEFERRED, RQJobStatus.SCHEDULED): representation["status"] = RQJobStatus.QUEUED if representation["status"] == RQJobStatus.FINISHED: diff --git a/cvat/rqworker.py b/cvat/rqworker.py index 8a3e187b74b..309c4d5ce71 100644 --- a/cvat/rqworker.py +++ b/cvat/rqworker.py @@ -9,7 +9,6 @@ import cvat.utils.remote_debugger as debug - DefaultWorker = Worker diff --git a/tests/python/rest_api/utils.py b/tests/python/rest_api/utils.py index aa747d169e9..5d32f07c9ed 100644 --- a/tests/python/rest_api/utils.py +++ b/tests/python/rest_api/utils.py @@ -44,7 +44,7 @@ def initialize_export(endpoint: Endpoint, *, expect_forbidden: bool = False, **k def wait_and_download_v1( endpoint: Endpoint, *, - max_retries: int = 30, + max_retries: int = 50, interval: float = 0.1, download_result: bool = True, **kwargs, @@ -75,7 +75,7 @@ def wait_and_download_v1( def export_v1( endpoint: Endpoint, *, - max_retries: int = 30, + max_retries: int = 50, interval: float = 0.1, expect_forbidden: bool = False, wait_result: bool = True, @@ -115,7 +115,7 @@ def wait_and_download_v2( api_client: ApiClient, rq_id: str, *, - max_retries: int = 30, + max_retries: int = 50, interval: float = 0.1, download_result: bool = True, ) -> Optional[bytes]: @@ -153,7 +153,7 @@ def wait_and_download_v2( def export_v2( endpoint: Endpoint, *, - max_retries: int = 30, + max_retries: int = 50, interval: float = 0.1, expect_forbidden: bool = False, wait_result: bool = True, @@ -196,7 +196,7 @@ def export_dataset( ], # make this parameter required to be sure that all tests was updated and both API versions are used *, save_images: bool, - max_retries: int = 30, + max_retries: int = 50, interval: float = 0.1, format: str = "CVAT for images 1.1", # pylint: disable=redefined-builtin **kwargs, @@ -278,7 +278,7 @@ def export_backup( int, tuple[int] ], # make this parameter required to be sure that all tests was updated and both API versions are used *, - max_retries: int = 30, + max_retries: int = 50, interval: float = 0.1, **kwargs, ) -> Optional[bytes]: @@ -326,7 +326,7 @@ def export_task_backup( def import_resource( endpoint: Endpoint, *, - max_retries: int = 30, + max_retries: int = 50, interval: float = 0.1, expect_forbidden: bool = False, wait_result: bool = True, @@ -372,7 +372,7 @@ def import_resource( def import_backup( api: Union[ProjectsApi, TasksApi], *, - max_retries: int = 30, + max_retries: int = 50, interval: float = 0.1, **kwargs, ) -> None: