diff --git a/src/middlewared/middlewared/job.py b/src/middlewared/middlewared/job.py index bd3caf271302b..e09a7e3227994 100644 --- a/src/middlewared/middlewared/job.py +++ b/src/middlewared/middlewared/job.py @@ -50,7 +50,7 @@ class State(enum.Enum): ABORTED = 5 -class JobSharedLock(object): +class JobSharedLock: """ Shared lock for jobs. Each job method can specify a lock which will be shared @@ -83,8 +83,7 @@ def release(self): return self.lock.release() -class JobsQueue(object): - +class JobsQueue: def __init__(self, middleware): self.middleware = middleware self.deque = JobsDeque() @@ -105,7 +104,7 @@ def __getitem__(self, item): def get(self, item): return self.deque.get(item) - def all(self): + def all(self) -> dict[int, "Job"]: return self.deque.all() def for_username(self, username): @@ -124,19 +123,24 @@ def for_username(self, username): def add(self, job): self.handle_lock(job) if job.options["lock_queue_size"] is not None: - queued_jobs = [another_job for another_job in self.queue if another_job.lock is job.lock] - if len(queued_jobs) >= job.options["lock_queue_size"]: - for queued_job in reversed(queued_jobs): - if not credential_is_limited_to_own_jobs(job.credentials): - return queued_job - if ( - job.credentials.is_user_session and - queued_job.credentials.is_user_session and - job.credentials.user['username'] == queued_job.credentials.user['username'] - ): - return queued_job - - raise CallError('This job is already being performed by another user', errno.EBUSY) + if job.options["lock_queue_size"] == 0: + for another_job in self.all().values(): + if another_job.state == State.RUNNING and another_job.lock is job.lock: + raise CallError("This job is already being performed", errno.EBUSY) + else: + queued_jobs = [another_job for another_job in self.queue if another_job.lock is job.lock] + if len(queued_jobs) >= job.options["lock_queue_size"]: + for queued_job in reversed(queued_jobs): + if not credential_is_limited_to_own_jobs(job.credentials): + return queued_job + if ( + job.credentials.is_user_session and + queued_job.credentials.is_user_session and + job.credentials.user['username'] == queued_job.credentials.user['username'] + ): + return queued_job + + raise CallError('This job is already being performed by another user', errno.EBUSY) self.deque.add(job) self.queue.append(job) diff --git a/src/middlewared/middlewared/plugins/system/debug.py b/src/middlewared/middlewared/plugins/system/debug.py index e90787d9736f0..88a272d1de1e8 100644 --- a/src/middlewared/middlewared/plugins/system/debug.py +++ b/src/middlewared/middlewared/plugins/system/debug.py @@ -18,7 +18,7 @@ class SystemService(Service): @private - @job(lock='system.debug_generate') + @job(lock='system.debug_generate', lock_queue_size=1) def debug_generate(self, job): """ Generate system debug file. @@ -64,7 +64,7 @@ def progress_callback(percent, desc): @accepts(roles=['READONLY_ADMIN']) @returns() - @job(lock='system.debug', pipes=['output']) + @job(lock='system.debug', lock_queue_size=0, pipes=['output']) def debug(self, job): """ Download a debug file. diff --git a/src/middlewared/middlewared/plugins/test/rest.py b/src/middlewared/middlewared/plugins/test/rest.py index 4e126b4bbce9f..54f38ee5026d8 100644 --- a/src/middlewared/middlewared/plugins/test/rest.py +++ b/src/middlewared/middlewared/plugins/test/rest.py @@ -45,3 +45,10 @@ def test_download_slow_pipe(self, job, arg): time.sleep(2) job.pipes.output.w.write(json.dumps(arg).encode("utf-8")) job.pipes.output.w.close() + + @accepts(Any("arg")) + @job(lock="test_download_slow_pipe_with_lock", lock_queue_size=0, pipes=["output"]) + def test_download_slow_pipe_with_lock(self, job, arg): + time.sleep(5) + job.pipes.output.w.write(json.dumps(arg).encode("utf-8")) + job.pipes.output.w.close() diff --git a/src/middlewared/middlewared/service/decorators.py b/src/middlewared/middlewared/service/decorators.py index 1f6ffbd882b20..84c639659c98d 100644 --- a/src/middlewared/middlewared/service/decorators.py +++ b/src/middlewared/middlewared/service/decorators.py @@ -94,6 +94,9 @@ def scrub(self, pool_name): If lock queue size is exceeded then the new job is discarded and the `id` of the last job in the queue is returned. + If lock queue size is zero, then launching a job when another job with the same lock is running will raise an + `EBUSY` error. + Default value is `5`. `None` would mean that lock queue is infinite. :param logs: If `True` then `job.logs_fd` object will be available. It is an unbuffered file opened in binary mode; diff --git a/src/middlewared/middlewared/service_exception.py b/src/middlewared/middlewared/service_exception.py index 6b02f18d30c59..f002a836d391c 100644 --- a/src/middlewared/middlewared/service_exception.py +++ b/src/middlewared/middlewared/service_exception.py @@ -14,7 +14,7 @@ class CallException(ErrnoMixin, Exception): class CallError(CallException): - def __init__(self, errmsg: str, errno: int=errno.EFAULT, extra=None): + def __init__(self, errmsg: str, errno: int = errno.EFAULT, extra=None): self.errmsg = errmsg self.errno = errno self.extra = extra @@ -30,7 +30,7 @@ class ValidationError(CallException): attribute of a middleware method is invalid/not allowed. """ - def __init__(self, attribute, errmsg, errno: int=errno.EINVAL): + def __init__(self, attribute, errmsg, errno: int = errno.EINVAL): self.attribute = attribute self.errmsg = errmsg self.errno = errno @@ -53,11 +53,11 @@ class ValidationErrors(CallException): CallException with a collection of ValidationError """ - def __init__(self, errors: typing.List[ValidationError]=None): + def __init__(self, errors: typing.List[ValidationError] = None): self.errors = errors or [] super().__init__(self.errors) - def add(self, attribute, errmsg: str, errno: int=errno.EINVAL): + def add(self, attribute, errmsg: str, errno: int = errno.EINVAL): self.errors.append(ValidationError(attribute, errmsg, errno)) def add_validation_error(self, validation_error: ValidationError): diff --git a/tests/api2/test_rest_api_download.py b/tests/api2/test_rest_api_download.py index 92b22fc77301d..5657380d1338d 100644 --- a/tests/api2/test_rest_api_download.py +++ b/tests/api2/test_rest_api_download.py @@ -6,7 +6,7 @@ from middlewared.service_exception import CallError from middlewared.test.integration.assets.account import unprivileged_user -from middlewared.test.integration.utils import client, session, url +from middlewared.test.integration.utils import call, client, session, url @pytest.mark.parametrize("method", ["test_download_pipe", "test_download_unchecked_pipe"]) @@ -79,6 +79,14 @@ def test_buffered_download_from_slow_download_endpoint(buffered, sleep, result): assert r.text == result +def test_download_duplicate_job(): + call("core.download", "resttest.test_download_slow_pipe_with_lock", [{"key": "value"}], "file.bin") + with pytest.raises(CallError) as ve: + call("core.download", "resttest.test_download_slow_pipe_with_lock", [{"key": "value"}], "file.bin") + + assert ve.value.errno == errno.EBUSY + + def test_download_authorization_ok(): with unprivileged_user( username="unprivileged",