Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NAS-130653 / 25.04 / Updated debug generate lock queue size #14301

Merged
merged 3 commits into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 21 additions & 17 deletions src/middlewared/middlewared/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class State(enum.Enum):
ABORTED = 5


class JobSharedLock(object):
class JobSharedLock:
"""
Shared lock for jobs.
Each job method can specify a lock which will be shared
Expand Down Expand Up @@ -83,8 +83,7 @@ def release(self):
return self.lock.release()


class JobsQueue(object):

class JobsQueue:
def __init__(self, middleware):
self.middleware = middleware
self.deque = JobsDeque()
Expand All @@ -105,7 +104,7 @@ def __getitem__(self, item):
def get(self, item):
return self.deque.get(item)

def all(self):
def all(self) -> dict[int, "Job"]:
return self.deque.all()

def for_username(self, username):
Expand All @@ -124,19 +123,24 @@ def for_username(self, username):
def add(self, job):
self.handle_lock(job)
if job.options["lock_queue_size"] is not None:
queued_jobs = [another_job for another_job in self.queue if another_job.lock is job.lock]
if len(queued_jobs) >= job.options["lock_queue_size"]:
for queued_job in reversed(queued_jobs):
if not credential_is_limited_to_own_jobs(job.credentials):
return queued_job
if (
job.credentials.is_user_session and
queued_job.credentials.is_user_session and
job.credentials.user['username'] == queued_job.credentials.user['username']
):
return queued_job

raise CallError('This job is already being performed by another user', errno.EBUSY)
if job.options["lock_queue_size"] == 0:
for another_job in self.all().values():
if another_job.state == State.RUNNING and another_job.lock is job.lock:
raise CallError("This job is already being performed", errno.EBUSY)
else:
queued_jobs = [another_job for another_job in self.queue if another_job.lock is job.lock]
if len(queued_jobs) >= job.options["lock_queue_size"]:
for queued_job in reversed(queued_jobs):
if not credential_is_limited_to_own_jobs(job.credentials):
return queued_job
if (
job.credentials.is_user_session and
queued_job.credentials.is_user_session and
job.credentials.user['username'] == queued_job.credentials.user['username']
):
return queued_job

raise CallError('This job is already being performed by another user', errno.EBUSY)

self.deque.add(job)
self.queue.append(job)
Expand Down
4 changes: 2 additions & 2 deletions src/middlewared/middlewared/plugins/system/debug.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
class SystemService(Service):

@private
@job(lock='system.debug_generate')
@job(lock='system.debug_generate', lock_queue_size=1)
def debug_generate(self, job):
"""
Generate system debug file.
Expand Down Expand Up @@ -64,7 +64,7 @@ def progress_callback(percent, desc):

@accepts(roles=['READONLY_ADMIN'])
@returns()
@job(lock='system.debug', pipes=['output'])
@job(lock='system.debug', lock_queue_size=0, pipes=['output'])
def debug(self, job):
"""
Download a debug file.
Expand Down
7 changes: 7 additions & 0 deletions src/middlewared/middlewared/plugins/test/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,10 @@ def test_download_slow_pipe(self, job, arg):
time.sleep(2)
job.pipes.output.w.write(json.dumps(arg).encode("utf-8"))
job.pipes.output.w.close()

@accepts(Any("arg"))
@job(lock="test_download_slow_pipe_with_lock", lock_queue_size=0, pipes=["output"])
def test_download_slow_pipe_with_lock(self, job, arg):
time.sleep(5)
job.pipes.output.w.write(json.dumps(arg).encode("utf-8"))
job.pipes.output.w.close()
3 changes: 3 additions & 0 deletions src/middlewared/middlewared/service/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ def scrub(self, pool_name):
If lock queue size is exceeded then the new job is discarded and the `id` of the last job in the queue is
returned.

If lock queue size is zero, then launching a job when another job with the same lock is running will raise an
`EBUSY` error.

Default value is `5`. `None` would mean that lock queue is infinite.

:param logs: If `True` then `job.logs_fd` object will be available. It is an unbuffered file opened in binary mode;
Expand Down
8 changes: 4 additions & 4 deletions src/middlewared/middlewared/service_exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class CallException(ErrnoMixin, Exception):


class CallError(CallException):
def __init__(self, errmsg: str, errno: int=errno.EFAULT, extra=None):
def __init__(self, errmsg: str, errno: int = errno.EFAULT, extra=None):
self.errmsg = errmsg
self.errno = errno
self.extra = extra
Expand All @@ -30,7 +30,7 @@ class ValidationError(CallException):
attribute of a middleware method is invalid/not allowed.
"""

def __init__(self, attribute, errmsg, errno: int=errno.EINVAL):
def __init__(self, attribute, errmsg, errno: int = errno.EINVAL):
self.attribute = attribute
self.errmsg = errmsg
self.errno = errno
Expand All @@ -53,11 +53,11 @@ class ValidationErrors(CallException):
CallException with a collection of ValidationError
"""

def __init__(self, errors: typing.List[ValidationError]=None):
def __init__(self, errors: typing.List[ValidationError] = None):
self.errors = errors or []
super().__init__(self.errors)

def add(self, attribute, errmsg: str, errno: int=errno.EINVAL):
def add(self, attribute, errmsg: str, errno: int = errno.EINVAL):
self.errors.append(ValidationError(attribute, errmsg, errno))

def add_validation_error(self, validation_error: ValidationError):
Expand Down
10 changes: 9 additions & 1 deletion tests/api2/test_rest_api_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from middlewared.service_exception import CallError
from middlewared.test.integration.assets.account import unprivileged_user
from middlewared.test.integration.utils import client, session, url
from middlewared.test.integration.utils import call, client, session, url


@pytest.mark.parametrize("method", ["test_download_pipe", "test_download_unchecked_pipe"])
Expand Down Expand Up @@ -79,6 +79,14 @@ def test_buffered_download_from_slow_download_endpoint(buffered, sleep, result):
assert r.text == result


def test_download_duplicate_job():
call("core.download", "resttest.test_download_slow_pipe_with_lock", [{"key": "value"}], "file.bin")
with pytest.raises(CallError) as ve:
call("core.download", "resttest.test_download_slow_pipe_with_lock", [{"key": "value"}], "file.bin")

assert ve.value.errno == errno.EBUSY


def test_download_authorization_ok():
with unprivileged_user(
username="unprivileged",
Expand Down
Loading