Skip to content

Commit

Permalink
Add backend-tooling for asynchronous tasks (#1624)
Browse files Browse the repository at this point in the history
  • Loading branch information
Anthony Lukach committed Jan 24, 2018
1 parent e48537e commit cec43b3
Show file tree
Hide file tree
Showing 31 changed files with 930 additions and 1 deletion.
3 changes: 3 additions & 0 deletions cadasta/config/settings/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
'party',
'xforms',
'search',
'tasks',

'django_filters',
'crispy_forms',
Expand Down Expand Up @@ -599,3 +600,5 @@
TWILIO_ACCOUNT_SID = os.environ.get('TWILIO_ACCOUNT_SID')
TWILIO_AUTH_TOKEN = os.environ.get('TWILIO_AUTH_TOKEN')
TWILIO_PHONE = '+123'

CELERY_QUEUES = ['export']
10 changes: 10 additions & 0 deletions cadasta/config/settings/dev.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
from .default import * # NOQA

DEBUG = True
Expand Down Expand Up @@ -116,3 +117,12 @@
}

ES_PORT = '8000'

# Async Tooling
CELERY_BROKER_TRANSPORT = 'sqs' if os.environ.get('SQS') else 'memory'
CELERY_BROKER_TRANSPORT_OPTIONS = {
'region': 'us-west-2',
'queue_name_prefix': '{}-'.format(os.environ.get('QUEUE-PREFIX', 'dev')),
'wait_time_seconds': 20,
'visibility_timeout': 20,
} if CELERY_BROKER_TRANSPORT.lower() == 'sqs' else {}
9 changes: 9 additions & 0 deletions cadasta/config/settings/production.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,3 +148,12 @@

SMS_GATEWAY = 'accounts.gateways.TwilioGateway'
TWILIO_PHONE = os.environ['TWILIO_PHONE']

# Async Tooling
CELERY_BROKER_TRANSPORT = 'sqs'
CELERY_BROKER_TRANSPORT_OPTIONS = {
'region': 'us-west-2',
'queue_name_prefix': '{}-'.format(os.environ['QUEUE-PREFIX']),
'wait_time_seconds': 20,
'visibility_timeout': 20,
}
6 changes: 6 additions & 0 deletions cadasta/config/settings/travis.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,9 @@
SASS_PROCESSOR_INCLUDE_DIRS = (
os.path.join(os.path.dirname(BASE_DIR), 'core/node_modules'),
)


# Async Tooling
CELERY_ALWAYS_EAGER = True
CELERY_EAGER_PROPAGATES_EXCEPTIONS = True
CELERY_BROKER_TRANSPORT = 'memory'
1 change: 1 addition & 0 deletions cadasta/tasks/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
default_app_config = 'tasks.apps.TasksConfig'
11 changes: 11 additions & 0 deletions cadasta/tasks/apps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from celery import signals
from django.apps import AppConfig


class TasksConfig(AppConfig):
name = 'tasks'

def ready(self):
from .celery import app
app.autodiscover_tasks(force=True)
signals.worker_init.send(sender=None)
13 changes: 13 additions & 0 deletions cadasta/tasks/celery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from celery import Celery
from django.conf import settings

from cadasta.workertoolbox.conf import Config

conf = Config(
queues=settings.CELERY_QUEUES,
broker_transport=settings.CELERY_BROKER_TRANSPORT,
broker_transport_options=getattr(
settings, 'CELERY_BROKER_TRANSPORT_OPTIONS', {}),
)
app = Celery()
app.config_from_object(conf)
78 changes: 78 additions & 0 deletions cadasta/tasks/consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import logging

import boto3
from django.conf import settings
from kombu.mixins import ConsumerMixin

from .models import BackgroundTask


logger = logging.getLogger(__name__)


class Worker(ConsumerMixin):

def __init__(self, connection, queues):
self.connection = connection
self.queues = queues
super(Worker, self).__init__()
logger.info("Started worker %r for queues %r", self, self.queues)

def get_consumers(self, Consumer, channel):
return [Consumer(queues=self.queues,
accept=['pickle', 'json'],
callbacks=[self.process_task])]

def process_task(self, body, message):
logger.info('Processing message: %r', message)
try:
return self._handle_task(body, message)
except:
logger.exception("Failed to process message: %r", message)
finally:
logger.info("ACKing message %r", message)
if self.connection.as_uri().lower().startswith('sqs://'):
# HACK: Can't seem to get message.ack() to work for SQS
# backend. Without this hack, messages will keep
# re-appearing after the visibility_timeout expires.
# See https://github.com/celery/kombu/issues/758
return self._sqs_ack(message)
return message.ack()

def _sqs_ack(self, message):
logger.debug("Manually ACKing SQS message %r", message)
region = settings.CELERY_BROKER_TRANSPORT_OPTIONS['region']
boto3.client('sqs', region).delete_message(
QueueUrl=message.delivery_info['sqs_queue'],
ReceiptHandle=message.delivery_info['sqs_message']['ReceiptHandle']
)
message._state = 'ACK'
message.channel.qos.ack(message.delivery_tag)

@staticmethod
def _handle_task(body, message):
logger.debug("Handling task message %r", body)
args, kwargs, options = message.decode()
task_id = message.headers['id']

# Add default properties
option_keys = ['eta', 'expires', 'retries', 'timelimit']
message.properties.update(
**{k: v for k, v in message.headers.items()
if k in option_keys and v not in (None, [None, None])})

_, created = BackgroundTask.objects.get_or_create(
task_id=task_id,
defaults={
'type': message.headers['task'],
'input_args': args,
'input_kwargs': kwargs,
'options': message.properties,
'parent_id': message.headers['parent_id'],
'root_id': message.headers['root_id'],
}
)
if created:
logger.debug("Processed task: %r", message)
else:
logger.warn("Task already existed in db: %r", message)
35 changes: 35 additions & 0 deletions cadasta/tasks/fields.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import pickle

from django.db import models


class PickledObjectField(models.BinaryField):
"""
A simplified version of the more popular PickleField designed to work
with binary-encoded data.
https://github.com/gintas/django-picklefield
"""

def to_python(self, value):
if value is None:
return
return pickle.loads(value.tobytes())

def from_db_value(self, value, expression, connection, context):
return self.to_python(value)

def get_db_prep_value(self, value, connection=None, prepared=False):
value = pickle.dumps(value)
return super().get_db_prep_value(value, connection, prepared)

def value_to_string(self, obj):
value = self.value_from_object(obj)
return self.get_db_prep_value(value)

def get_lookup(self, lookup_name):
"""
We need to limit the lookup types.
"""
if lookup_name not in ['exact', 'in', 'isnull']:
raise TypeError('Lookup type %s is not supported.' % lookup_name)
return super(PickledObjectField, self).get_lookup(lookup_name)
Empty file.
Empty file.
34 changes: 34 additions & 0 deletions cadasta/tasks/management/commands/sync_tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import logging

from django.core.management.base import BaseCommand
from kombu import Queue
from kombu.async import Hub, set_event_loop

from tasks.celery import app, conf
from tasks.consumer import Worker


logger = logging.getLogger(__name__)


class Command(BaseCommand):
help = "Sync task and result messages with database."

def add_arguments(self, parser):
parser.add_argument('--queue', '-q', default=conf.PLATFORM_QUEUE_NAME)

def handle(self, queue, *args, **options):
fmt = '%(asctime)s %(name)-12s: %(levelname)-8s %(message)s'
log_level = 40 - (options['verbosity'] * 10)
logging.basicConfig(level=log_level, format=fmt)

# TODO: Ensure that failed processing does not requeue task into
# work queue
set_event_loop(Hub())
with app.connection() as conn:
try:
logger.info("Launching worker")
worker = Worker(conn, queues=[Queue(queue)])
worker.run()
except KeyboardInterrupt:
logger.info("KeyboardInterrupt, exiting. Bye!")
59 changes: 59 additions & 0 deletions cadasta/tasks/migrations/0001_initial.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.10.7 on 2017-07-11 22:38
from __future__ import unicode_literals

from django.conf import settings
import django.contrib.postgres.fields.jsonb
from django.db import migrations, models
import django.db.models.deletion
import functools
import tasks.fields
import tasks.utils.fields


class Migration(migrations.Migration):

initial = True

dependencies = [
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
('contenttypes', '0002_remove_content_type_name'),
]

operations = [
migrations.CreateModel(
name='BackgroundTask',
fields=[
('id', models.CharField(max_length=24, primary_key=True, serialize=False)),
('task_id', models.CharField(editable=False, max_length=155, unique=True, verbose_name='UUID')),
('type', models.CharField(max_length=128, verbose_name='Task function')),
('created_date', models.DateTimeField(auto_now_add=True)),
('last_updated', models.DateTimeField(auto_now=True)),
('input', django.contrib.postgres.fields.jsonb.JSONField(blank=True, default=tasks.utils.fields.input_field_default, validators=[functools.partial(tasks.utils.fields.validate_type, *(dict,), **{}), tasks.utils.fields.validate_input_field])),
('options', django.contrib.postgres.fields.jsonb.JSONField(blank=True, default=dict, validators=[functools.partial(tasks.utils.fields.validate_type, *(dict,), **{})], verbose_name='Task scheduling options')),
('related_object_id', models.PositiveIntegerField(blank=True, null=True)),
('immutable', models.NullBooleanField(verbose_name='If arguments are immutable (only applies to chained tasks).')),
('creator', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, to=settings.AUTH_USER_MODEL)),
('parent', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, related_name='children', to='tasks.BackgroundTask', to_field='task_id')),
('related_content_type', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, to='contenttypes.ContentType')),
('root', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, related_name='descendents', to='tasks.BackgroundTask', to_field='task_id')),
],
options={
'ordering': ['created_date'],
},
),
migrations.CreateModel(
name='TaskResult',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('status', models.CharField(max_length=50, verbose_name='State')),
('result', tasks.fields.PickledObjectField(null=True)),
('date_done', models.DateTimeField(null=True)),
('traceback', models.TextField(null=True)),
('task', models.OneToOneField(db_constraint=False, editable=False, null=True, on_delete=django.db.models.deletion.CASCADE, related_name='result', to='tasks.BackgroundTask', to_field='task_id')),
],
options={
'db_table': 'celery_taskmeta',
},
),
]
Empty file.
Loading

0 comments on commit cec43b3

Please sign in to comment.