Skip to content

Commit

Permalink
Support lock_queue_size=0
Browse files Browse the repository at this point in the history
  • Loading branch information
themylogin committed Sep 4, 2024
1 parent 7269140 commit b8dc188
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 22 deletions.
38 changes: 21 additions & 17 deletions src/middlewared/middlewared/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class State(enum.Enum):
ABORTED = 5


class JobSharedLock(object):
class JobSharedLock:
"""
Shared lock for jobs.
Each job method can specify a lock which will be shared
Expand Down Expand Up @@ -83,8 +83,7 @@ def release(self):
return self.lock.release()


class JobsQueue(object):

class JobsQueue:
def __init__(self, middleware):
self.middleware = middleware
self.deque = JobsDeque()
Expand All @@ -105,7 +104,7 @@ def __getitem__(self, item):
def get(self, item):
return self.deque.get(item)

def all(self):
def all(self) -> dict[int, "Job"]:
return self.deque.all()

def for_username(self, username):
Expand All @@ -124,19 +123,24 @@ def for_username(self, username):
def add(self, job):
self.handle_lock(job)
if job.options["lock_queue_size"] is not None:
queued_jobs = [another_job for another_job in self.queue if another_job.lock is job.lock]
if len(queued_jobs) >= job.options["lock_queue_size"]:
for queued_job in reversed(queued_jobs):
if not credential_is_limited_to_own_jobs(job.credentials):
return queued_job
if (
job.credentials.is_user_session and
queued_job.credentials.is_user_session and
job.credentials.user['username'] == queued_job.credentials.user['username']
):
return queued_job

raise CallError('This job is already being performed by another user', errno.EBUSY)
if job.options["lock_queue_size"] == 0:
for another_job in self.all().values():
if another_job.state == State.RUNNING and another_job.lock is job.lock:
raise CallError("This job is already being performed", errno.EBUSY)
else:
queued_jobs = [another_job for another_job in self.queue if another_job.lock is job.lock]
if len(queued_jobs) >= job.options["lock_queue_size"]:
for queued_job in reversed(queued_jobs):
if not credential_is_limited_to_own_jobs(job.credentials):
return queued_job
if (
job.credentials.is_user_session and
queued_job.credentials.is_user_session and
job.credentials.user['username'] == queued_job.credentials.user['username']
):
return queued_job

raise CallError('This job is already being performed by another user', errno.EBUSY)

self.deque.add(job)
self.queue.append(job)
Expand Down
7 changes: 7 additions & 0 deletions src/middlewared/middlewared/plugins/test/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,10 @@ def test_download_slow_pipe(self, job, arg):
time.sleep(2)
job.pipes.output.w.write(json.dumps(arg).encode("utf-8"))
job.pipes.output.w.close()

@accepts(Any("arg"))
@job(lock="test_download_slow_pipe_with_lock", lock_queue_size=0, pipes=["output"])
def test_download_slow_pipe_with_lock(self, job, arg):
time.sleep(5)
job.pipes.output.w.write(json.dumps(arg).encode("utf-8"))
job.pipes.output.w.close()
3 changes: 3 additions & 0 deletions src/middlewared/middlewared/service/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ def scrub(self, pool_name):
If lock queue size is exceeded then the new job is discarded and the `id` of the last job in the queue is
returned.
If lock queue size is zero, then launching a job when another job with the same lock is running will raise an
`EBUSY` error.
Default value is `5`. `None` would mean that lock queue is infinite.
:param logs: If `True` then `job.logs_fd` object will be available. It is an unbuffered file opened in binary mode;
Expand Down
8 changes: 4 additions & 4 deletions src/middlewared/middlewared/service_exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class CallException(ErrnoMixin, Exception):


class CallError(CallException):
def __init__(self, errmsg: str, errno: int=errno.EFAULT, extra=None):
def __init__(self, errmsg: str, errno: int = errno.EFAULT, extra=None):
self.errmsg = errmsg
self.errno = errno
self.extra = extra
Expand All @@ -30,7 +30,7 @@ class ValidationError(CallException):
attribute of a middleware method is invalid/not allowed.
"""

def __init__(self, attribute, errmsg, errno: int=errno.EINVAL):
def __init__(self, attribute, errmsg, errno: int = errno.EINVAL):
self.attribute = attribute
self.errmsg = errmsg
self.errno = errno
Expand All @@ -53,11 +53,11 @@ class ValidationErrors(CallException):
CallException with a collection of ValidationError
"""

def __init__(self, errors: typing.List[ValidationError]=None):
def __init__(self, errors: typing.List[ValidationError] = None):
self.errors = errors or []
super().__init__(self.errors)

def add(self, attribute, errmsg: str, errno: int=errno.EINVAL):
def add(self, attribute, errmsg: str, errno: int = errno.EINVAL):
self.errors.append(ValidationError(attribute, errmsg, errno))

def add_validation_error(self, validation_error: ValidationError):
Expand Down
10 changes: 9 additions & 1 deletion tests/api2/test_rest_api_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from middlewared.service_exception import CallError
from middlewared.test.integration.assets.account import unprivileged_user
from middlewared.test.integration.utils import client, session, url
from middlewared.test.integration.utils import call, client, session, url


@pytest.mark.parametrize("method", ["test_download_pipe", "test_download_unchecked_pipe"])
Expand Down Expand Up @@ -79,6 +79,14 @@ def test_buffered_download_from_slow_download_endpoint(buffered, sleep, result):
assert r.text == result


def test_download_duplicate_job():
call("core.download", "resttest.test_download_slow_pipe_with_lock", [{"key": "value"}], "file.bin")
with pytest.raises(CallError) as ve:
call("core.download", "resttest.test_download_slow_pipe_with_lock", [{"key": "value"}], "file.bin")

assert ve.value.errno == errno.EBUSY


def test_download_authorization_ok():
with unprivileged_user(
username="unprivileged",
Expand Down

0 comments on commit b8dc188

Please sign in to comment.