Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IMP] queue: Store context in order to reuse it #121

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 35 additions & 9 deletions queue_job/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,15 @@ class DelayableRecordset(object):

def __init__(self, recordset, priority=None, eta=None,
max_retries=None, description=None, channel=None,
identity_key=None):
identity_key=None, job_context=None):
self.recordset = recordset
self.priority = priority
self.eta = eta
self.max_retries = max_retries
self.description = description
self.channel = channel
self.identity_key = identity_key
self.job_context = job_context or {}

def __getattr__(self, name):
if name in self.recordset:
Expand All @@ -87,9 +88,18 @@ def delay(*args, **kwargs):
eta=self.eta,
description=self.description,
channel=self.channel,
identity_key=self.identity_key)
identity_key=self.identity_key,
job_context=self.get_context(recordset_method))
return delay

def get_context(self, method):
original_ctx = self.job_context
ctx = {}
for key in getattr(method, 'allow_context', []):
if key in original_ctx:
ctx[key] = original_ctx[key]
return ctx

def __str__(self):
return "DelayableRecordset(%s%s)" % (
self.recordset._name,
Expand Down Expand Up @@ -241,6 +251,10 @@ class Job(object):
be added to a channel if the existing job with the same key is not yet
started or executed.

.. attribute::job_context

Original context of the job

"""
@classmethod
def load(cls, env, job_uuid):
Expand All @@ -259,7 +273,6 @@ def _load_from_db_record(cls, job_db_record):
args = stored.args
kwargs = stored.kwargs
method_name = stored.method_name

model = env[stored.model_name]

recordset = model.browse(stored.record_ids)
Expand Down Expand Up @@ -297,6 +310,7 @@ def _load_from_db_record(cls, job_db_record):
if stored.company_id:
job_.company_id = stored.company_id.id
job_.identity_key = stored.identity_key
job_.job_context = stored.job_context or {}
return job_

def job_record_with_same_identity_key(self):
Expand All @@ -311,7 +325,7 @@ def job_record_with_same_identity_key(self):
@classmethod
def enqueue(cls, func, args=None, kwargs=None,
priority=None, eta=None, max_retries=None, description=None,
channel=None, identity_key=None):
channel=None, identity_key=None, job_context=None):
"""Create a Job and enqueue it in the queue. Return the job uuid.

This expects the arguments specific to the job to be already extracted
Expand All @@ -324,7 +338,8 @@ def enqueue(cls, func, args=None, kwargs=None,
new_job = cls(func=func, args=args,
kwargs=kwargs, priority=priority, eta=eta,
max_retries=max_retries, description=description,
channel=channel, identity_key=identity_key)
channel=channel, identity_key=identity_key,
job_context=job_context)
if new_job.identity_key:
existing = new_job.job_record_with_same_identity_key()
if existing:
Expand Down Expand Up @@ -355,7 +370,8 @@ def db_record_from_uuid(env, job_uuid):
def __init__(self, func,
args=None, kwargs=None, priority=None,
eta=None, job_uuid=None, max_retries=None,
description=None, channel=None, identity_key=None):
description=None, channel=None, identity_key=None,
job_context=None):
""" Create a Job

:param func: function to execute
Expand Down Expand Up @@ -397,6 +413,7 @@ def __init__(self, func,

recordset = func.__self__
env = recordset.env
self.job_context = job_context or {}
self.model_name = recordset._name
self.method_name = func.__name__
self.recordset = recordset
Expand Down Expand Up @@ -497,6 +514,7 @@ def store(self):
'date_done': False,
'eta': False,
'identity_key': False,
'job_context': self.job_context,
}

dt_to_string = odoo.fields.Datetime.to_string
Expand Down Expand Up @@ -539,7 +557,10 @@ def db_record(self):

@property
def func(self):
recordset = self.recordset.with_context(job_uuid=self.uuid)
recordset = self.recordset.with_context(
**(self.job_context or {})
).with_context(job_uuid=self.uuid, )
# We want to be sure that the job_uuid is not rewritten
recordset = recordset.sudo(self.user_id)
return getattr(recordset, self.method_name)

Expand Down Expand Up @@ -675,7 +696,8 @@ def _is_model_method(func):
isinstance(func.__self__.__class__, odoo.models.MetaModel))


def job(func=None, default_channel='root', retry_pattern=None):
def job(func=None, default_channel='root', retry_pattern=None,
allow_context=None):
"""Decorator for job methods.

It enables the possibility to use a Model's method as a job function.
Expand All @@ -692,6 +714,8 @@ def job(func=None, default_channel='root', retry_pattern=None):
is provided, jobs will be retried after
:const:`RETRY_INTERVAL` seconds.
:type retry_pattern: dict(retry_count,retry_eta_seconds)
:param allow_context: List of allowed context keys.
:type allow_context: array

Indicates that a method of a Model can be delayed in the Job Queue.

Expand Down Expand Up @@ -759,7 +783,8 @@ def retryable_example():
"""
if func is None:
return functools.partial(job, default_channel=default_channel,
retry_pattern=retry_pattern)
retry_pattern=retry_pattern,
allow_context=allow_context)

def delay_from_model(*args, **kwargs):
raise AttributeError(
Expand All @@ -779,6 +804,7 @@ def delay_from_model(*args, **kwargs):
func.delay = delay_func
func.retry_pattern = retry_pattern
func.default_channel = default_channel
func.allow_context = allow_context or []
return func


Expand Down
3 changes: 2 additions & 1 deletion queue_job/models/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,5 @@ def with_delay(self, priority=None, eta=None,
max_retries=max_retries,
description=description,
channel=channel,
identity_key=identity_key)
identity_key=identity_key,
job_context=self.env.context.copy())
1 change: 1 addition & 0 deletions queue_job/models/queue_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ class QueueJob(models.Model):
index=True)

identity_key = fields.Char()
job_context = JobSerialized(readonly=True)

@api.model_cr
def init(self):
Expand Down
6 changes: 5 additions & 1 deletion test_queue_job/models/test_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ class TestQueueJob(models.Model):

name = fields.Char()

@job
@job(allow_context=[
'return_context_from_context', 'expected_element_from_context'
])
@related_action(action='testing_related_method')
@api.multi
def testing_method(self, *args, **kwargs):
Expand All @@ -48,6 +50,8 @@ def testing_method(self, *args, **kwargs):
raise RetryableJobError('Must be retried later')
if kwargs.get('return_context'):
return self.env.context
if self.env.context.get('return_context_from_context', False):
return self.env.context
return args, kwargs

@job
Expand Down
13 changes: 12 additions & 1 deletion test_queue_job/tests/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

from datetime import datetime, timedelta
import mock

from odoo import SUPERUSER_ID
import odoo.tests.common as common

Expand Down Expand Up @@ -530,6 +529,18 @@ def test_context_uuid(self):
self.assertTrue(key_present)
self.assertEqual(result['job_uuid'], test_job._uuid)

def test_context_from_context(self):
element = 'EXPECTED VALUE'
delayable = self.env['test.queue.job'].with_context(
return_context_from_context=True,
expected_element_from_context=element,
).with_delay()
test_job = delayable.testing_method(return_context=True)
result = test_job.perform()
key_present = 'expected_element_from_context' in result
self.assertTrue(key_present)
self.assertEqual(result['expected_element_from_context'], element)

def test_override_channel(self):
delayable = self.env['test.queue.job'].with_delay(
channel='root.sub.sub')
Expand Down