Skip to content

Commit

Permalink
Async: Display export tasks on project dashboard (#1801)
Browse files Browse the repository at this point in the history
* Display export tasks on project dashboard
  • Loading branch information
alukach authored and Anthony Lukach committed Oct 17, 2017
1 parent 9673dd5 commit 1b2f863
Show file tree
Hide file tree
Showing 10 changed files with 190 additions and 31 deletions.
1 change: 1 addition & 0 deletions cadasta/config/settings/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
'django.contrib.staticfiles',
'django.contrib.sites',
'django.contrib.gis',
'django.contrib.humanize',
'corsheaders',

'core',
Expand Down
7 changes: 7 additions & 0 deletions cadasta/organization/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from django.conf import settings
from django.db import models
from django_countries.fields import CountryField
from django.contrib.contenttypes.fields import GenericRelation
from django.contrib.postgres.fields import JSONField, ArrayField
from django.dispatch import receiver
from django.utils.translation import ugettext as _
Expand Down Expand Up @@ -196,6 +197,12 @@ class Project(ResourceModelMixin, SlugModel, RandomIDModel):
)
area = models.FloatField(default=0)

tasks = GenericRelation(
'tasks.BackgroundTask',
content_type_field='related_content_type',
object_id_field='related_object_id',
)

# Audit history
created_date = models.DateTimeField(auto_now_add=True)
last_updated = models.DateTimeField(auto_now=True)
Expand Down
9 changes: 8 additions & 1 deletion cadasta/organization/views/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from django.db.models import Sum, When, Case, IntegerField
from django.shortcuts import get_object_or_404, redirect
from django.utils.translation import ugettext as _
from django.utils import timezone

from accounts.models import User
import core.views.generic as generic
Expand All @@ -32,7 +33,7 @@
from .. import forms
from ..importers.exceptions import DataImportError
from ..models import Organization, OrganizationRole, Project, ProjectRole
from ..tasks import schedule_project_export
from ..tasks import schedule_project_export, export


class OrganizationList(PermissionRequiredMixin, generic.ListView):
Expand Down Expand Up @@ -451,6 +452,12 @@ def get_context_data(self, **kwargs):
context['num_parties'] = num_parties
context['num_resources'] = num_resources
context['members'] = members

exports = self.object.tasks.filter(type=export.name)
exports = exports.select_related('result').order_by('-created_date')
last_week = timezone.now() - timezone.timedelta(days=7)
exports = exports.filter(created_date__gte=last_week)[:5]
context['recent_exports'] = exports
try:
context['questionnaire'] = Questionnaire.objects.get(
id=self.object.current_questionnaire)
Expand Down
15 changes: 11 additions & 4 deletions cadasta/tasks/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ def process_task(self, body, message):
try:
return self._handle_task(body, message)
except:
logger.exception("Failed to process message: %r", message)
logger.exception(
"Failed to process message: %r", message)
finally:
logger.info("ACKing message %r", message)
if self.connection.as_uri().lower().startswith('sqs://'):
Expand All @@ -54,22 +55,28 @@ def _handle_task(body, message):
logger.debug("Handling task message %r", body)
args, kwargs, options = message.decode()
task_id = message.headers['id']
task_type = message.headers['task']

# Add default properties
# Add additional option data from headers to properties
option_keys = ['eta', 'expires', 'retries', 'timelimit']
message.properties.update(
**{k: v for k, v in message.headers.items()
if k in option_keys and v not in (None, [None, None])})

props = message.properties
_, created = BackgroundTask.objects.get_or_create(
task_id=task_id,
defaults={
'type': message.headers['task'],
'type': task_type,
'input_args': args,
'input_kwargs': kwargs,
'options': message.properties,
'options': props,
'parent_id': message.headers['parent_id'],
'root_id': message.headers['root_id'],
'creator_id': props.get('creator_id'),
'related_content_type_id':
props.get('related_content_type_id'),
'related_object_id': props.get('related_object_id'),
}
)
if created:
Expand Down
10 changes: 5 additions & 5 deletions cadasta/tasks/migrations/0001_initial.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.10.7 on 2017-07-11 22:38
# Generated by Django 1.10.7 on 2017-09-13 05:41
from __future__ import unicode_literals

from django.conf import settings
Expand All @@ -16,8 +16,8 @@ class Migration(migrations.Migration):
initial = True

dependencies = [
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
('contenttypes', '0002_remove_content_type_name'),
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
]

operations = [
Expand All @@ -31,12 +31,12 @@ class Migration(migrations.Migration):
('last_updated', models.DateTimeField(auto_now=True)),
('input', django.contrib.postgres.fields.jsonb.JSONField(blank=True, default=tasks.utils.fields.input_field_default, validators=[functools.partial(tasks.utils.fields.validate_type, *(dict,), **{}), tasks.utils.fields.validate_input_field])),
('options', django.contrib.postgres.fields.jsonb.JSONField(blank=True, default=dict, validators=[functools.partial(tasks.utils.fields.validate_type, *(dict,), **{})], verbose_name='Task scheduling options')),
('related_object_id', models.PositiveIntegerField(blank=True, null=True)),
('related_object_id', models.CharField(blank=True, max_length=24, null=True)),
('immutable', models.NullBooleanField(verbose_name='If arguments are immutable (only applies to chained tasks).')),
('creator', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, to=settings.AUTH_USER_MODEL)),
('parent', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, related_name='children', to='tasks.BackgroundTask', to_field='task_id')),
('related_content_type', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, to='contenttypes.ContentType')),
('root', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, related_name='descendents', to='tasks.BackgroundTask', to_field='task_id')),
('related_content_type', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, related_name='+', to='contenttypes.ContentType')),
('root', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='descendents', to='tasks.BackgroundTask', to_field='task_id')),
],
options={
'ordering': ['created_date'],
Expand Down
42 changes: 38 additions & 4 deletions cadasta/tasks/models.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
from celery import states
from core.models import RandomIDModel
from django.conf import settings
from django.contrib.contenttypes.fields import GenericForeignKey
from django.contrib.contenttypes.models import ContentType
from django.contrib.postgres.fields import JSONField
from django.db import models, transaction
from django.db.models.expressions import F
from django.utils.translation import ugettext_lazy as _
from django.utils.functional import lazy

from core.util import ID_FIELD_LENGTH
from core.models import RandomIDModel

from .celery import app
from .utils import fields as utils
from .fields import PickledObjectField
Expand Down Expand Up @@ -61,8 +64,10 @@ class BackgroundTask(RandomIDModel):
validators=[utils.is_type(dict)])

related_content_type = models.ForeignKey(
ContentType, on_delete=models.CASCADE, null=True, blank=True)
related_object_id = models.PositiveIntegerField(null=True, blank=True)
ContentType, on_delete=models.CASCADE, related_name='+',
null=True, blank=True)
related_object_id = models.CharField(
max_length=ID_FIELD_LENGTH, null=True, blank=True)
related_object = GenericForeignKey(
'related_content_type', 'related_object_id')

Expand All @@ -71,7 +76,7 @@ class BackgroundTask(RandomIDModel):
on_delete=models.CASCADE, blank=True, null=True)
root = models.ForeignKey(
'self', related_name='descendents', to_field='task_id',
on_delete=models.CASCADE, blank=True, null=True)
on_delete=models.CASCADE)
immutable = models.NullBooleanField(
_("If arguments are immutable (only applies to chained tasks)."))

Expand Down Expand Up @@ -110,3 +115,32 @@ def input_kwargs(self):
@input_kwargs.setter
def input_kwargs(self, value):
self.input['kwargs'] = value

@property
def family(self):
""" Return all tasks with matching root id (including self) """
return self.__class__._default_manager.filter(root_id=self.root_id)

@property
def overall_status(self):
# chord_unlock doesn't use the result backed, so best to ignore
subtasks = self.family.exclude(type='celery.chord_unlock')
subtasks = subtasks.annotate(_status=F('result__status'))
# If we are to get distinct _status values, we must order_by('_status')
subtasks = subtasks.order_by('_status')
statuses = subtasks.values_list('_status', flat=True)
statuses = statuses.distinct()
num_statuses = len(statuses)
if num_statuses > 1:
if 'FAILURE' in statuses:
return 'FAILURE'
return 'STARTED'
# It's possible for all statuses to equal None, in which case we
# can call them 'PENDING'.
return statuses[0] or 'PENDING'

@property
def overall_results(self):
""" Return results of any tasks where is_result is set to True """
results = self.family.filter(options__is_result=True)
return results.values_list('result__result', flat=True)
11 changes: 8 additions & 3 deletions cadasta/tasks/tests/factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,18 @@
from ..utils.fields import input_field_default


def gen_id(length=24):
return str(uuid.uuid4())[:length]


class BackgroundTaskFactory(ExtendedFactory):

class Meta:
model = BackgroundTask

id = str(uuid.uuid1())
id = factory.LazyFunction(gen_id)
task_id = factory.LazyFunction(gen_id)
root_id = factory.LazyAttribute(lambda o: o.task_id)
type = 'foo.bar'
input = factory.LazyFunction(input_field_default)
creator = factory.SubFactory(UserFactory)
Expand All @@ -24,7 +30,6 @@ class TaskResultFactory(ExtendedFactory):
class Meta:
model = TaskResult

id = str(uuid.uuid1())
task_id = str(uuid.uuid1())
task = factory.SubFactory(BackgroundTaskFactory)
result = 'complete'
status = 'PENDING'
32 changes: 26 additions & 6 deletions cadasta/tasks/tests/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def get_task_msg(self, chain=None):
'lang': 'py',
'root_id': '486e8738-a9ef-475a-b8e1-158e987f4ae6',
'argsrepr': '()',
'id': '486e8738-a9ef-475a-b8e1-158e987f4ae6'
'id': '486e8738-a9ef-475a-b8e1-158e987f4ae6',
}
return MagicMock(
headers=headers,
Expand All @@ -40,7 +40,11 @@ def get_task_msg(self, chain=None):
'errbacks': None, 'chord': None}
],
decode=MagicMock(return_value=(args, kwargs, headers)),
properties={}
properties={
'creator_id': 1,
'related_content_type_id': 2,
'related_object_id': 3,
}
)

@staticmethod
Expand Down Expand Up @@ -120,8 +124,16 @@ def test_handle_new_task(self, BackgroundTask, logger):
'root_id': '486e8738-a9ef-475a-b8e1-158e987f4ae6',
'input_kwargs': {},
'input_args': [],
'options': {'retries': 0},
'parent_id': None
'options': {
'retries': 0,
'creator_id': 1,
'related_content_type_id': 2,
'related_object_id': 3,
},
'parent_id': None,
'creator_id': 1,
'related_content_type_id': 2,
'related_object_id': 3,
},
task_id='486e8738-a9ef-475a-b8e1-158e987f4ae6'
)
Expand All @@ -147,8 +159,16 @@ def test_handle_existing_task(self, BackgroundTask, logger):
'root_id': '486e8738-a9ef-475a-b8e1-158e987f4ae6',
'input_kwargs': {},
'input_args': [],
'options': {'retries': 0},
'parent_id': None
'options': {
'retries': 0,
'creator_id': 1,
'related_content_type_id': 2,
'related_object_id': 3,
},
'parent_id': None,
'creator_id': 1,
'related_content_type_id': 2,
'related_object_id': 3,
},
task_id='486e8738-a9ef-475a-b8e1-158e987f4ae6'
)
Expand Down
52 changes: 45 additions & 7 deletions cadasta/tasks/tests/test_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@ def test_save_invalid_input_args(self):
""" Ensure that invalid input args will raise validation error """
with self.assertRaises(ValidationError) as context:
BackgroundTaskFactory.build(
id=None, type='foo.bar', input={'args': None},
type='foo.bar', input={'args': None},
creator=self.user
).save()
assert context.exception.error_dict.get('input')

with self.assertRaises(ValidationError) as context:
BackgroundTaskFactory.build(
id=None, type='foo.bar', input={'args': {}},
type='foo.bar', input={'args': {}},
creator=self.user
).save()
assert context.exception.error_dict.get('input')
Expand All @@ -44,14 +44,14 @@ def test_save_invalid_input_kwargs(self):
""" Ensure that invalid input kwargs will raise validation error """
with self.assertRaises(ValidationError) as context:
BackgroundTaskFactory.build(
id=None, type='foo.bar', input={'kwargs': None},
type='foo.bar', input={'kwargs': None},
creator=self.user
).save()
assert context.exception.error_dict.get('input')

with self.assertRaises(ValidationError) as context:
BackgroundTaskFactory.build(
id=None, type='foo.bar', input={'kwargs': []},
type='foo.bar', input={'kwargs': []},
creator=self.user
).save()
assert context.exception.error_dict.get('input')
Expand All @@ -62,22 +62,22 @@ def test_save_valid_input(self):
# Missing kwargs
with self.assertRaises(ValidationError) as context:
BackgroundTaskFactory.build(
id=None, type='foo.bar', input={'args': []},
type='foo.bar', input={'args': []},
creator=self.user
).save()
assert context.exception.error_dict.get('input')

# Missing args
with self.assertRaises(ValidationError) as context:
BackgroundTaskFactory.build(
id=None, type='foo.bar', input={'kwargs': {}},
type='foo.bar', input={'kwargs': {}},
creator=self.user
).save()
assert context.exception.error_dict.get('input')

# All good
BackgroundTaskFactory.build(
id=None, type='foo.bar', input={'args': [], 'kwargs': {}},
type='foo.bar', input={'args': [], 'kwargs': {}},
creator=self.user
).save()

Expand All @@ -98,3 +98,41 @@ def test_input_kwargs_property(self):
task.input_kwargs = kwargs
assert task.input_kwargs == kwargs
assert task.input == {'args': [], 'kwargs': kwargs}

def test_overall_status_no_result(self):
task1 = BackgroundTaskFactory.create()
# No results, then PENDING
assert task1.overall_status == 'PENDING'

def test_overall_status_many_results(self):
task1 = BackgroundTaskFactory.create()
TaskResultFactory.create(task_id=task1.task_id, status='SUCCESS')
task2 = BackgroundTaskFactory.create(root_id=task1.task_id)
# If multiple statuses and none of which are FAILURE, then STARTED
assert task1.overall_status == task2.overall_status
assert task1.overall_status == 'STARTED'

# If multiple statuses and one of which are FAILURE, then FAILURE
TaskResultFactory.create(task_id=task2.task_id, status='FAILURE')
assert task1.overall_status == task2.overall_status
assert task1.overall_status == 'FAILURE'

def test_overall_results(self):
task1 = BackgroundTaskFactory.create()
TaskResultFactory.create(task_id=task1.task_id, status='SUCCESS')
task2 = BackgroundTaskFactory.create(
root_id=task1.task_id,
options={'is_result': True}
)

assert list(task1.overall_results) == list(task2.overall_results)
assert list(task1.overall_results) == [None]

out = [{"link": "https://google.com", "text": "Google.com"}]
TaskResultFactory.create(
task_id=task2.task_id,
status='SUCCESS',
result=out
)
assert list(task1.overall_results) == list(task2.overall_results)
assert list(task1.overall_results) == [out]
Loading

0 comments on commit 1b2f863

Please sign in to comment.