Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add backend-tooling for asynchronous tasks #1624

Merged
merged 14 commits into from
Jul 18, 2017

Conversation

alukach
Copy link
Contributor

@alukach alukach commented Jul 4, 2017

Proposed changes in this pull request

This PR includes the Cadasta Platform backend for calling and tracking asynchronous operations. The Cadasta Platform codebase is intended to be only an interface for asynchronous tooling, it will not contain code for the actual asynchronous operations. Currently, this PR includes some example tasks and an example worker to help with testing. The example tasks and workers will be removed or fixed-up before this code makes its way into master.

Goals

This is an implementation of the planning done in this document: https://docs.google.com/document/d/169fK4G9NkgUulCzs9aHND7aN0GwUKVIP2QPJMi3Jejk/edit

How it works

This backend tooling performs three duties:

  • Scheduling asynchronous work.
  • Storing all scheduled asynchronous tasks within our DB.
  • Storing all task results within our DB.
Scheduling Work

Stub task functions are written to represent the tasks available to the Cadasta Platform. These functions are intended provide structure around the tasks' signatures so that improperly called tasks throw exceptions when before they're scheduled on the webserver rather than when they are executed on the worker. Additionally, the BackgroundTask model's type field validates against the names of all stub-tasks written in the CadastaPlatform codebase (any task located in a {module}/tasks.py file should be autodiscovered when the CadastaPlatform starts up). These stub-functions comes at the price of introducing some coupling between the task producer (the CadastaPlatform) and the task consumers (the workers). This seemed appropriate, as some structure around what input the asynchronous tasks expects should exist somewhere. If we ever enter into a situation where we want the capability of adding and using new asyncronous workers and tasks without ever updating the CadastaPlatform (e.g. we could schedule tasks via the Cadasta API and the Cadasta Platform would not need to have any knowledge of what tasks are available or not, it would simply enter the tasks in the queue), then we can remove the validation of the BackgroundTask.type field.

Tracking scheduled tasks

The Cadasta Platform should be aware of all tasks scheduled within our system. This is made difficult by the fact that asynchronous tasks can schedule other tasks. In effort to track all tasks scheduled by our systems, all workers should schedule tasks to a topic exchange. The tasks are placed in both their desired queue and in a special queue designed to keep track of scheduled tasks (platform.fifo). On the Cadasta Platform, a process should be running (python manage.py sync_tasks) to read the scheduled tasks off of this queue and insert them into our database.

Tracking task results

The database model's design is such that we have a BackgroundTask model to manage the tasks and a TaskResult model to manage the results of the tasks. Celery supports a number of Result Backends. After experimenting with quite a few of them, I've come to the conclusion that the best option is for us to allow each work to directly insert their results into a table in a database. The upside of using the DB as a Result Backend is that we a) get up-to-date storage of task results as it happens; and b) it allows workers to access the task results for the purpose of chord tasks. The downside is that all workers must have connections to both the message queue and the production database. For security concerns, it would probably be wise to create a worker user that only has permissions to access the the results table (celery_taskmeta and celery_tasksetmeta). We could use a separate DB to handle results with little change to our current system if that's considered desirable, however we lose a bit of Django's help regarding relating data (not a lot though, since I disabled a bit of this already to avoid issues if Results come in to the system before the actual BackgroundTask has been recorded).

Testing

To test this codebase:

  1. Ensure SQS-enabled AWS credentials are available in the system environment
  2. Copy CELERY_BROKER_TRANSPORT and CELERY_BROKER_TRANSPORT_OPTIONS settings from config/settings/production.py to config/settings/default.py (this is laborious, see Risks below for more thoughts on dev configuration)
  3. Run migrations, install requirements
  4. Start message consumer: python manage.py sync_tasks
  5. Create example worker virtualenv, install requirements pip install -r /vagrant/worker-boilerplate/requirements.txt, start worker /vagrant/worker-boilerplate/start_worker -l INFO
  6. Schedule tasks via shell (python manage.py shell_plus):
from tasks.celery import app
app.send_task('export.chord')  # Schedule task

# Count to 10
BackgroundTask.objects.all()  # See results

When should this PR be merged

This PR is only aiming to merge into feature/async which will be the master branch for all asynchronous work while its being developed for release, so any time should be fine.

Risks/Room for Improvement

I'm looking for input on the following:

  • What if SQS goes down? Currently, if our system can't connect to SQS it won't be possible to launch the Cadasta platform. Implementing something like circuit breakers in our codebase is a possible solution to this, however I'm not sure how doable this would be without getting deep into Celery's inner-workings.
  • Currently, the dev setup is configured so that the async-tooling disabled. We may want to consider supporting local development by install RabbitMQ on the dev machines and having the worker codebases deployed on the machines, however this feels like it could bloat the machine.
  • The manage.py sync_tasks command should be run as a daemon. Not sure what Cadasta's best-practice is to handle daemon processes (upstart/supervisor/?). Upstart script created. Additionally, it is currently single-threaded and synchronous. We may want to look at running it in a threadpool async manner (gevent, asyncio, etc) or run multiple processes.
  • It is not currently possible to send status-updates of in-process tasks as described in the original design spec. This was removed as it looked non-trivial with the DB backend (We'd have to listen for DB writes and then copy those values over to another field) so I left this for future additions.

Follow-up actions

  • The Celery tooling is very configuration heavy/sensitive. To make the production of asynchronous workers a bit less error-prone and laborious, I think we should pull out the tooling into a worker-configuration codebase.
  • Create tooling to daemonize sync_tasks process
  • Add circuit breaker for Celery application on Cadasta Platform to prevent an SQS-outage from bringing down our system
  • Make tests importable in Worker Toolbox (Add importable tests cadasta-workertoolbox#1)
  • Buildout a separate worker to perform export operations on our data.

Checklist (for reviewing)

General

  • Is this PR explained thoroughly? All code changes must be accounted for in the PR description.
    • Review 1
    • Review 2
  • Is the PR labeled correctly? It should have the migration label if a new migration is added.
    • Review 1
    • Review 2
  • Is the risk level assessment sufficient? The risks section should contain all risks that might be introduced with the PR and which actions we need to take to mitigate these risks. Possible risks are database migrations, new libraries that need to be installed or changes to deployment scripts.
    • Review 1
    • Review 2

Functionality

  • Are all requirements met? Compare implemented functionality with the requirements specification.
    • Review 1
    • Review 2
  • Does the UI work as expected? There should be no Javascript errors in the console; all resources should load. There should be no unexpected errors. Deliberately try to break the feature to find out if there are corner cases that are not handled.
    • Review 1
    • Review 2

Code

  • Do you fully understand the introduced changes to the code? If not ask for clarification, it might uncover ways to solve a problem in a more elegant and efficient way.
    • Review 1
    • Review 2
  • Does the PR introduce any inefficient database requests? Use the debug server to check for duplicate requests.
    • Review 1
    • Review 2
  • Are all necessary strings marked for translation? All strings that are exposed to users via the UI must be marked for translation.
    • Review 1
    • Review 2
  • Is the code documented sufficiently? Large and complex classes, functions or methods must be annotated with comments following our code-style guidelines.
    • Review 1
    • Review 2
  • Has the scalability of this change been evaluated?
    • Review 1
    • Review 2
  • Is there a maintenance plan in place?
    • Review 1
    • Review 2

Tests

  • Are there sufficient test cases? Ensure that all components are tested individually; models, forms, and serializers should be tested in isolation even if a test for a view covers these components.
    • Review 1
    • Review 2
  • If this is a bug fix, are tests for the issue in place? There must be a test case for the bug to ensure the issue won’t regress. Make sure that the tests break without the new code to fix the issue.
    • Review 1
    • Review 2
  • If this is a new feature or a significant change to an existing feature? has the manual testing spreadsheet been updated with instructions for manual testing?
    • Review 1
    • Review 2

Security

  • Confirm this PR doesn't commit any keys, passwords, tokens, usernames, or other secrets.
    • Review 1
    • Review 2
  • Are all UI and API inputs run through forms or serializers?
    • Review 1
    • Review 2
  • Are all external inputs validated and sanitized appropriately?
    • Review 1
    • Review 2
  • Does all branching logic have a default case?
    • Review 1
    • Review 2
  • Does this solution handle outliers and edge cases gracefully?
    • Review 1
    • Review 2
  • Are all external communications secured and restricted to SSL?
    • Review 1
    • Review 2

Documentation

  • Are changes to the UI documented in the platform docs? If this PR introduces new platform site functionality or changes existing ones, the changes must be documented in the Cadasta Platform Documentation.
    • Review 1
    • Review 2
  • Are changes to the API documented in the API docs? If this PR introduces new API functionality or changes existing ones, the changes must be documented in the API docs.
    • Review 1
    • Review 2
  • Are reusable components documented? If this PR introduces components that are relevant to other developers (for instance a mixin for a view or a generic form) they should be documented in the Wiki.
    • Review 1
    • Review 2

@alukach alukach requested review from oliverroick and amplifi July 4, 2017 21:25
@alukach alukach force-pushed the async/task-tooling-db-backend branch 4 times, most recently from 81f5a06 to 8cea06b Compare July 7, 2017 03:20
@alukach
Copy link
Contributor Author

alukach commented Jul 7, 2017

Broke out worker configuration into Cadasta/cadasta-workertoolbox, however apparently installing the toolbox is problematic and causing tests to fail. Will resolve ASAP.

@alukach alukach force-pushed the async/task-tooling-db-backend branch from df14e76 to 60bed9e Compare July 7, 2017 22:13
Copy link
Member

@oliverroick oliverroick left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, this looks like a solid start. I have some comments on the code and some more general questions:

  • I'm still struggling the overall architecture of the whole processing system. Could you provide some documentation including an architecture diagram to illustrate how everything is connected?
  • Regarding the worker toolbox: I still don't fully understand it's purpose or what advantages breaking it out into a separate library has. What I understand, the configuration of Celery tasks is cumbersome and error-prone. But we also only do it once; so we shouldn't have to deal with often once it's running. Could you explain why you think using the worker toolbox as an external library would be beneficial? We can have a call about it if that's easier for you.

Re your request for feedback:

What if SQS goes down? Currently, if our system can't connect to SQS, it won't be possible to launch the Cadasta platform. Implementing something like circuit breakers in our codebase is a possible solution to this. However, I'm not sure how doable this would be without getting deep into Celery's inner-workings.

I don't know much about the inner workings of Celery, could you fill me in how connecting to SQS works? From what you describe above, it reads as if a connection is opened when you launch the platform. Is that correct?

Currently, the dev setup is configured so that the async-tooling disabled. We may want to consider supporting local development by install RabbitMQ on the dev machines and having the worker codebases deployed on the machines. However, this feels like it could bloat the machine.

What would happen if I tested the export on my local machine? Would it just throw an exception? Usually, we implemented a way to mimic the behavior of external services in the local VM, but I'm not sure how feasible that is. Running RabbitMQ and the worker processes on the local machines sound like a lot of overhead indeed. I'm not sure what to do about this

The manage.py sync_tasks command should be run as a daemon. Not sure what Cadasta's best-practice is to handle daemon processes (upstart/supervisor/?). Additionally, it is currently single-threaded and synchronous. We may want to look at running it in a threadpool async manner (gevent, asyncio, etc) or run multiple processes.

We don't have a policy for this, as there was no need for this before. I'm open to suggestions...

It is not currently possible to send status updates of in-process tasks as described in the original design spec. This was removed as it looked non-trivial with the DB backend (I'd we'd have to listen for DB writes and then copy those values over to another field) so I left this for future additions.

I don't think this is important at this point. We can look into that another time.

@@ -1 +1,2 @@
opbeat==3.5.1
boto3==1.4.1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1.4.4 is the latest for boto3, is there a reason for not using it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No reason, will update.


class BackgroundTask(RandomIDModel):

task_id = models.CharField(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand the reason for introducing the task_id field. Couldn't we use the object's id?

Copy link
Contributor Author

@alukach alukach Jul 11, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The task_id is generated by Celery at the time of scheduling. We could set the DB id to be equal to the Task's id. Alternatively, we could set the Task's id to be equal to the DB id (as it seems like you're suggesting), however I would advise against this as it would be a pain being that workers can schedule tasks and we wouldn't want to have those workers have to negotiate the id with the DB (basically, the DB should not be a source of authority for Task IDs).

My first approach was to use the Task id as the DB id, however I later felt that it was a bit of data-leak to expose the internal IDs of the tasks (as I would expect they would be used in the URL of a future API endpoint exposing BackgroundTask instances). I'm not really committed to this idea and wouldn't put up any protest if others would like it changed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, let's avoid leaking internal IDs.

_('Task function'), max_length=128,
choices=choices())

created = models.DateTimeField(auto_now_add=True)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For created and updated, there has been work on ticket #1138. Can you check in with @amplifi to make sure that the naming for these fields aligns with similar fields introduced to other models?

Copy link
Contributor Author

@alukach alukach Jul 11, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reading through #1138 (and the History Log wiki entry), my understanding is that updated should be changed to last_updated. I see no information about the naming of the created or creator fields.

Additionally, I see that:

The last_updated values for each table should be managed via PostgreSQL trigger, which eliminates the need for any platform code changes and results in a more reliable implementation, as the database will be solely responsible for setting and modifying the value (particularly in the case of platform action failure, rollback, transaction queuing, etc).

I'm not certain on how this affects the Django model. My interpretation is that we should still include the fields on the model as it is the Django migration that will be creating the table/columns. We could use auto_now_add=True and auto_now=True (which implicitly mean editable=False and blank=True), however the dates added by Django will be overwritten by the DB's trigger. Alternatively, we could have them both be defined as models.DateTimeField(editable=False, blank=True, null=False) to prevent Python from setting the values (this may be a bit clearer).

Will wait for confirmation from either @amplifi or @oliverroick before I implement a change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fields for created_date and last_updated have long existed in some platform models, so establishing a naming convention predates #1138 by quite a while. The existing naming is continued in #1138 to extend these fields across all models. It's also already using Django's auto_now and auto_now_add -- see branch in progress last-updated for reference.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't believe created_date is used in any Cadasta models today (I could only find SpatialResource.time), however it seems like a fine convention.

So my plan moving forward:

  • created will be renamed to created_date and will be defined as models.DateTimeField(auto_now_add=True)
  • updated will be renamed to last_updated and will be defined as models.DateTimeField(auto_now=True)
  • creator will be unchanged.

def test_input_args_property(self):
""" Ensure that input args getter/setter works as expected """
task = BackgroundTaskFactory.build(input={'args': [], 'kwargs': {}})
self.assertEqual(task.input_args, [])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should write this line and all following similar assertions as assert task.input_args == []. Super nitpicky, I know, but it's the way we've always done it. (Can't believe I just wrote that).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I did notice that I was breaking convention. I am willing to change it, however I must first say that using self.assertEqual provides much more descriptive error messages when assertions fail (exposing the data's values and describing why the assertion failed). For this reason, I think self.assert... is the right move going forward. Assuming I'm not wrong on this, is there any opposition to using self.assert... moving forward? Or does assert ... have a benefit I'm not seeing?

@alukach
Copy link
Contributor Author

alukach commented Jul 11, 2017

@oliverroick a warning: you may want to pour yourself a cup of coffee and take a bathroom break before reading through these replies.

  • I'm still struggling the overall architecture of the whole processing system. Could you provide some documentation including an architecture diagram to illustrate how everything is connected?

I suppose a diagram is in order. See below:

cadasta message lifecycle

We send all tasks through a Topic Exchange. All tasks are assigned a routing_key based on their task name and queues register with the Topic Exchange stating that they should receive any message with a routing_key matching a given pattern. As currently implemented, we have a Platform Queue that subscribes to all messages (subscribing to routing_key='#'), while service-specific queues will subscribe to messages that match their queue name. For example, our queue to handle data-export tasks is named export, it subscribes to routing_key='export'. Any task with a name starting with export. (eg export.project_data) will be assigned a routing_key='export' before being sent to the Topic Exchange. That message will then be routed to both the export queue and the Platform Queue. It's critical that the message make its way to both the Platform Queue (so that it can be logged in our system) and the service-specific queue (so that it can be actually executed).


  • Regarding the worker toolbox: I still don't fully understand it's purpose or what advantages breaking it out into a separate library has. What I understand, the configuration of Celery tasks is cumbersome and error-prone. But we also only do it once; so we shouldn't have to deal with often once it's running. Could you explain why you think using the worker toolbox as an external library would be beneficial? We can have a call about it if that's easier for you.

First, let me describe the nuances of setting up the system. The diagram and comment above describe the flow of a Task through our system. It's important to note that with the SQS backend, the Exchange is virtual. A RabbitMQ server would manage the logic around queues and exchanges and routing for us, but SQS isn't built with such a feature so Kombu emulates this logic on the client before sending messages to queues. For this reason, it's critical that all task-producers (which include any workers that may schedule a task) have the exchange set up properly (the Topic Exchange is not enabled by default) and have their queues properly registered with the Exchange. Additionally, it's tricky to ensure that the task exchange is actually set up correctly; the only way I could get it to work was creating a signal handler that manually ensures that all queues are set up when a worker starts up. Finally, to ensure that the results of an executed task makes its way back to the central Cadasta Platform, each worker must be set up with the correct Result Backend.

Now to address your question, the reason why I feel that we should break out configuration into a separate module is three fold: ease-of-setup, configuration validation, and maintenance.

Ease-of-setup: It is critical that every worker is configured exactly as expected for the system to work properly. Ideally, we want it to be quick/easy/cheap for developers to spin up new services that work with our async system. The Config class in the Worker Toolbox provides the diff between Celery's default setup and the Cadasta system's required setup. Additionally, when provided with the name of the queues that the worker is interested in, it will generate the correct queue configuration that complies with our routing convention (eg task export.foo gets a routing key of export and the export queue is listening for routing key export). Initially I thought we'd just have some boilerplate that a developer would copy/paste and edit, however this started to feel very error-prone as it wouldn't be entirely obvious which settings should be edited, which settings should not be edited, and why. Extensive commenting could help, however this still felt risky to me.

Configuration validation: One of the hardest parts about all of this configuration is that it's not easy to verify that your system is set up correctly. If you forget to set up a signal for registering the queues when the worker starts up, you may be able to schedule tasks to your specific queue but they won't be duplicated to the Cadasta Platform queue. If you forget to enable the Result Backend, the tasks will be executed properly however you'll never see the results in the Cadasta Platform DB. It takes a lot of attention to detail to notice that the system isn't doing everything it's supposed to be doing. We want developers to be able to easily confirm that they have set up their worker correctly. While this isn't a feature of the Worker Toolbench currently (Cadasta/cadasta-workertoolbox#1), the Worker Toolbox could easily offer a suite of tests that could be imported by each worker and run to verify that it's set to expected configuration. These tests already exist for the toolbox, they just need to made installable and importable. Writing these tests would not be intuitive for someone not intimately familiar with the inner workings of Celery and our configuration.

Maintenance: What if we decide to use a Redis backend rather than a DB backend? Or perhaps we want to upgrade the version of Celery or Kombu. This would require us to alter the configuration or setup on every service worker manually. By storing this data in a versioned module, we can update the module and bump the version and then bump the version on all service workers. I feel that centralizing the configuration will make the system more manageable.


What if SQS goes down? Currently, if our system can't connect to SQS, it won't be possible to launch the Cadasta platform. Implementing something like circuit breakers in our codebase is a possible solution to this. However, I'm not sure how doable this would be without getting deep into Celery's inner-workings.

I don't know much about the inner workings of Celery, could you fill me in how connecting to SQS works? From what you describe above, it reads as if a connection is opened when you launch the platform. Is that correct?

Yes, currently when the Cadasta Platform is ready, the tasks.apps.TasksConfig.ready() method runs. This imports the app from tasks.apps which, in the process of importing the app, initializes the app. When the app initializes, its Channel (which is an object that manages the interaction between Kombu and SQS) lists all the queues available to it. The reason for this is described in the code:

class Channel(virtual.Channel):
    """SQS Channel."""

    # ...

    def __init__(self, *args, **kwargs):
        # ...

        # SQS blows up if you try to create a new queue when one already
        # exists but with a different visibility_timeout.  This prepopulates
        # the queue_cache to protect us from recreating
        # queues that are known to already exist.
        self._update_queue_cache(self.queue_name_prefix)

        # ...

    def _update_queue_cache(self, queue_name_prefix):
        resp = self.sqs.list_queues(QueueNamePrefix=queue_name_prefix)
        for url in resp.get('QueueUrls', []):
            queue_name = url.split('/')[-1]
            self._queue_cache[queue_name] = url

When experimenting (disconnecting my wifi and running ./manage shell_plus), it would hang indefinitely.

Click for stacktrace
(env)vagrant@vagrant-ubuntu-trusty-64:/vagrant/cadasta$ SQS=1 ./manage.py shell_plus
^CTraceback (most recent call last):
  File "/opt/cadasta/env/src/kombu/kombu/utils/functional.py", line 36, in __call__
    return self.__value__
AttributeError: 'ChannelPromise' object has no attribute '__value__'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/cadasta/env/src/kombu/kombu/transport/virtual/base.py", line 921, in create_channel
    return self._avail_channels.pop()
IndexError: pop from empty list

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/cadasta/env/src/kombu/kombu/utils/functional.py", line 333, in retry_over_time
    return fun(*args, **kwargs)
  File "/opt/cadasta/env/src/kombu/kombu/connection.py", line 261, in connect
    return self.connection
  File "/opt/cadasta/env/src/kombu/kombu/connection.py", line 802, in connection
    self._connection = self._establish_connection()
  File "/opt/cadasta/env/src/kombu/kombu/connection.py", line 757, in _establish_connection
    conn = self.transport.establish_connection()
  File "/opt/cadasta/env/src/kombu/kombu/transport/virtual/base.py", line 941, in establish_connection
    self._avail_channels.append(self.create_channel(self))
  File "/opt/cadasta/env/src/kombu/kombu/transport/virtual/base.py", line 923, in create_channel
    channel = self.Channel(connection)
  File "/opt/cadasta/env/src/kombu/kombu/transport/SQS.py", line 102, in __init__
    self._update_queue_cache(self.queue_name_prefix)
  File "/opt/cadasta/env/src/kombu/kombu/transport/SQS.py", line 107, in _update_queue_cache
    resp = self.sqs.list_queues(QueueNamePrefix=queue_name_prefix)
  File "/opt/cadasta/env/lib/python3.5/site-packages/botocore/client.py", line 251, in _api_call
    return self._make_api_call(operation_name, kwargs)
  File "/opt/cadasta/env/lib/python3.5/site-packages/botocore/client.py", line 526, in _make_api_call
    operation_model, request_dict)
  File "/opt/cadasta/env/lib/python3.5/site-packages/botocore/endpoint.py", line 141, in make_request
    return self._send_request(request_dict, operation_model)
  File "/opt/cadasta/env/lib/python3.5/site-packages/botocore/endpoint.py", line 170, in _send_request
    success_response, exception):
  File "/opt/cadasta/env/lib/python3.5/site-packages/botocore/endpoint.py", line 249, in _needs_retry
    caught_exception=caught_exception, request_dict=request_dict)
  File "/opt/cadasta/env/lib/python3.5/site-packages/botocore/hooks.py", line 227, in emit
    return self._emit(event_name, kwargs)
  File "/opt/cadasta/env/lib/python3.5/site-packages/botocore/hooks.py", line 210, in _emit
    response = handler(**kwargs)
  File "/opt/cadasta/env/lib/python3.5/site-packages/botocore/retryhandler.py", line 183, in __call__
    if self._checker(attempts, response, caught_exception):
  File "/opt/cadasta/env/lib/python3.5/site-packages/botocore/retryhandler.py", line 251, in __call__
    caught_exception)
  File "/opt/cadasta/env/lib/python3.5/site-packages/botocore/retryhandler.py", line 277, in _should_retry
    return self._checker(attempt_number, response, caught_exception)
  File "/opt/cadasta/env/lib/python3.5/site-packages/botocore/retryhandler.py", line 317, in __call__
    caught_exception)
  File "/opt/cadasta/env/lib/python3.5/site-packages/botocore/retryhandler.py", line 223, in __call__
    attempt_number, caught_exception)
  File "/opt/cadasta/env/lib/python3.5/site-packages/botocore/retryhandler.py", line 359, in _check_caught_exception
    raise caught_exception
  File "/opt/cadasta/env/lib/python3.5/site-packages/botocore/endpoint.py", line 204, in _get_response
    proxies=self.proxies, timeout=self.timeout)
  File "/opt/cadasta/env/lib/python3.5/site-packages/botocore/vendored/requests/sessions.py", line 573, in send
    r = adapter.send(request, **kwargs)
  File "/opt/cadasta/env/lib/python3.5/site-packages/botocore/vendored/requests/adapters.py", line 424, in send
    raise ConnectionError(e, request=request)
botocore.vendored.requests.exceptions.ConnectionError: HTTPSConnectionPool(host='us-west-2.queue.amazonaws.com', port=443): Max retries exceeded with url: / (Caused by ProxyError('Cannot connect to proxy.', OSError('Tunnel connection failed: 503 Service Unavailable',)))

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "./manage.py", line 10, in <module>
    execute_from_command_line(sys.argv)
  File "/opt/cadasta/env/lib/python3.5/site-packages/django/core/management/__init__.py", line 367, in execute_from_command_line
    utility.execute()
  File "/opt/cadasta/env/lib/python3.5/site-packages/django/core/management/__init__.py", line 341, in execute
    django.setup()
  File "/opt/cadasta/env/lib/python3.5/site-packages/django/__init__.py", line 27, in setup
    apps.populate(settings.INSTALLED_APPS)
  File "/opt/cadasta/env/lib/python3.5/site-packages/django/apps/registry.py", line 115, in populate
    app_config.ready()
  File "/vagrant/cadasta/tasks/apps.py", line 14, in ready
    P.maybe_declare(q)
  File "/opt/cadasta/env/src/kombu/kombu/messaging.py", line 102, in maybe_declare
    return maybe_declare(entity, self.channel, retry, **retry_policy)
  File "/opt/cadasta/env/src/kombu/kombu/messaging.py", line 209, in _get_channel
    channel = self._channel = channel()
  File "/opt/cadasta/env/src/kombu/kombu/utils/functional.py", line 38, in __call__
    value = self.__value__ = self.__contract__()
  File "/opt/cadasta/env/src/kombu/kombu/messaging.py", line 224, in <lambda>
    channel = ChannelPromise(lambda: connection.default_channel)
  File "/opt/cadasta/env/src/kombu/kombu/connection.py", line 819, in default_channel
    self.ensure_connection()
  File "/opt/cadasta/env/src/kombu/kombu/connection.py", line 405, in ensure_connection
    callback)
  File "/opt/cadasta/env/src/kombu/kombu/utils/functional.py", line 345, in retry_over_time
    sleep(1.0)
KeyboardInterrupt

To fix this, I think we'd have to make a PR to Kombu to set a max-retry count (either in Boto3 or in Kombu) and then handle the error on our end. We'd probably want to make the initialization of the app lazy (this isn't currently possible as we use the app's task discovery to validate the BackgroundTask.type field), caching the app instance somewhere, and put the app's usage behind a circuit breaker that throttles usage attempts when it stopped working. This feels like a bit of work, maybe a week?


Currently, the dev setup is configured so that the async-tooling disabled. We may want to consider supporting local development by install RabbitMQ on the dev machines and having the worker codebases deployed on the machines. However, this feels like it could bloat the machine.

What would happen if I tested the export on my local machine? Would it just throw an exception? Usually, we implemented a way to mimic the behavior of external services in the local VM, but I'm not sure how feasible that is. Running RabbitMQ and the worker processes on the local machines sound like a lot of overhead indeed. I'm not sure what to do about this.

The current setup has the app writing all tasks to memory on dev machines. This means that you can schedule tasks but they are not retrievable by other worker processes. According to this blogpost, it may be possible to use the filesystem as a broker (and result backend) during development. I haven't really tried this, but it may be a nice middle-ground between setting up RabbitMQ and have a non-functional async setup.


The manage.py sync_tasks command should be run as a daemon. Not sure what Cadasta's best-practice is to handle daemon processes (upstart/supervisor/?). Additionally, it is currently single-threaded and synchronous. We may want to look at running it in a threadpool async manner (gevent, asyncio, etc) or run multiple processes.

We don't have a policy for this, as there was no need for this before. I'm open to suggestions...

I'm a fan of Upstart, but have used Supervisor in the past. Looking at the provisioning code, it looks like we may already be using Upstart.


It is not currently possible to send status updates of in-process tasks as described in the original design spec. This was removed as it looked non-trivial with the DB backend (I'd we'd have to listen for DB writes and then copy those values over to another field) so I left this for future additions.

I don't think this is important at this point. We can look into that another time.

Great, leaving out for now.

@alukach alukach force-pushed the async/task-tooling-db-backend branch 2 times, most recently from 63ffe67 to ba6c637 Compare July 12, 2017 00:18
@alukach
Copy link
Contributor Author

alukach commented Jul 12, 2017

I've added an Upstart file to be generated by Ansible in our production environment. I'm not really sure how to test this, any tips?

@oliverroick
Copy link
Member

Thanks for all this documentation. It's much more clear now.

Based on your discussion of the worker-toolbox, it sounds like keeping things in a separate and reusable repo is the way to go.

To fix this, I think we'd have to make a PR to Kombu to set a max-retry count (either in Boto3 or in Kombu) and then handle the error on our end. We'd probably want to make the initialization of the app lazy (this isn't currently possible as we use the app's task discovery to validate the BackgroundTask.type field), caching the app instance somewhere, and put the app's usage behind a circuit breaker that throttles usage attempts when it stopped working. This feels like a bit of work, maybe a week?

I think we should invest the time to make this solid. So I wouldn't oppose if you gave this a try. @amplifi Do you have thoughts on this?

The current setup has the app writing all tasks to memory on dev machines. This means that you can schedule tasks but they are not retrievable by other worker processes. According to this blogpost, it may be possible to use the filesystem as a broker (and result backend) during development. I haven't really tried this, but it may be a nice middle-ground between setting up RabbitMQ and have a non-functional async setup.

I don't know, sounds like something we could do but it's not really a showstopper. Maybe we should worry about it later.

@alukach
Copy link
Contributor Author

alukach commented Jul 13, 2017

@amplifi Do you have any questions on this PR? If you have concerns regarding build failures, that appears to be entirely due to cadasta-workertoolbox not being on pypi, I'll have it available and installable within the week. However, I don't think it needs to be a hold-up for the review of this PR as the logic still applies.

@amplifi
Copy link
Contributor

amplifi commented Jul 14, 2017

Trying to go in mostly chronological order here, addressing comments and questions first :)

For security concerns, it would probably be wise to create a worker user that only has permissions to access the the results table (celery_taskmeta and celery_tasksetmeta). We could use a separate DB to handle results with little change to our current system if that's considered desirable, however we lose a bit of Django's help regarding relating data (not a lot though, since I disabled a bit of this already to avoid issues if Results come in to the system before the actual BackgroundTask has been recorded).

Yes to worker user, no to separate DB (not necessary at this stage). I'll add provisioning for the additional DB user in Ansible.

Ensure SQS-enabled AWS credentials are available in the system environment

For staging/demo/prod, each env has a dedicated IAM role assigned to its platform EC2 instance. See: https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_use_switch-role-ec2.html

What if SQS goes down?

If SQS is down, it's due to AWS outage and we'll be alerted via CloudWatch. Should the connection fail to establish we need to allow the platform to load without this connection, continue retrying the connection in the background, and make features utilizing the async pipeline unavailable with a notification to the user (greyed out button, appropriate message that service is temporarily unavailable please try again later).

Currently, the dev setup is configured so that the async-tooling disabled. We may want to consider supporting local development by install RabbitMQ on the dev machines and having the worker codebases deployed on the machines, however this feels like it could bloat the machine.

it may be possible to use the filesystem as a broker (and result backend) during development.

As an optional provisioning flag, the RabbitMQ should be fine and it would make future development/maintenance/testing to the pipeline itself easier. As far as I'm aware, we don't have dev staff running less than 8GB RAM and that should be more than sufficient to support its inclusion in the VM. However, we're already using the filesystem to emulate django-buckets and local mem to emulate caching, so if devs prefer that route it's fine with me.

Upstart script created. Additionally, it is currently single-threaded and synchronous. We may want to look at running it in a threadpool async manner (gevent, asyncio, etc) or run multiple processes.

Fine for now, but we should make this a follow-up task.

the reason why I feel that we should break out configuration into a separate module is three fold

Strong yes to a separate config module. It will help ensure parity across not only workers but envs, provide ease of teardown/re-provisioning if needed, and without it maintenance will be an absolute pig. Manual provisioning only leads to error, and there are too many potential points of failure.

I think we'd have to make a PR to Kombu to set a max-retry count (either in Boto3 or in Kombu) and then handle the error on our end. We'd probably want to make the initialization of the app lazy (this isn't currently possible as we use the app's task discovery to validate the BackgroundTask.type field), caching the app instance somewhere, and put the app's usage behind a circuit breaker that throttles usage attempts when it stopped working. This feels like a bit of work, maybe a week?

It's essential; there isn't a good way to handle this scenario gracefully without it, so we really have to do it.

I'm a fan of Upstart, but have used Supervisor in the past. Looking at the provisioning code, it looks like we may already be using Upstart.
I've added an Upstart file to be generated by Ansible in our production environment. I'm not really sure how to test this, any tips?

Yes, we should continue using Upstart as our default. I see you've added the task already -- thanks! I can test this on its own in staging.

@@ -131,3 +131,12 @@
},
},
}

# Async Tooling
CELERY_BROKER_TRANSPORT = 'sqs'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tiny: this is uppercase in dev config; intentional?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's insignificant, I'll lowercase in the dev config for parity.

CELERY_BROKER_TRANSPORT = 'sqs'
CELERY_BROKER_TRANSPORT_OPTIONS = {
'region': 'us-west-2',
'queue_name_prefix': '{}-'.format(os.environ.get('QUEUE-PREFIX', 'dev')),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why os.environ.get over os.environ? We wouldn't want prod config to default to dev prefix here -- environments are isolated, so attempting to connect to the wrong queue would fail. Because of the default option,os.environ.get doesn't throw an exception if the var is missing like os.environ does, and in that case we'd want this to fail noisily.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think this was copied over from when this was in the dev requirements. For prod it definitely makes sense to have a hard failure.

log_level = 40 - (options['verbosity'] * 10)
logging.basicConfig(level=log_level, format=fmt)

# TODO: Ensure that failed processing does not requeue task into
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll want to set a follow-up task for handling this condition, deciding how we want to proceed on failure.


def validate_type(_type, val):
"""
Throw ValidationError if the provided value is of the provided type.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also super-nitpicky: should this read "...is not of the provided type"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right you are, will fix.

setuid {{ app_user }}

exec {{ virtualenv_path }}bin/python \
{{ application_path }}manage.py \
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be {{ application_path }}cadasta/manage.py

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch.

djangorestframework==3.3.3
djoser==0.5.2
drfdocs-cadasta==0.0.12
git+https://github.com/celery/kombu.git@master
Copy link
Contributor

@amplifi amplifi Jul 14, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know it hasn't been updated in ages, but just in case let's still pin:
github.com/celery/[email protected]

Copy link
Contributor Author

@alukach alukach Jul 14, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason that this isn't pinned is because utilizing SQS required that we use the master branch of SQS. Luckily, 4.0.3 is coming down the pipe (I think it's a matter of preparing the release notes). Once the release is added to PyPi, it will be added to the cadasta-workertoolkit requirements and be managed there. The reason this needs to be in requirements.txt is that pip doesn't seem to respect the dependency_links of a setup.py file (something about security concerns around allowing Python modules to dictate the source for other third-party modules).

So, this is a temporary aspect to get pip install -r requirements.txt to function and I intend for it to be removed before this code makes its way to master.

@@ -0,0 +1,2 @@
cadasta-workertoolbox==0.1.2
git+https://github.com/celery/kombu.git@master
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same for pinning here, too

@alukach
Copy link
Contributor Author

alukach commented Jul 17, 2017

After chat w/ @oliverroick, it was determined that this PR can be tied off. TODOs like "adding circuit-breaker functionality" or "refactor export tooling" will be managed in #1400 and will be added to feature/async in separate PRs. This is to keep PRs smaller and manageable.

@alukach alukach mentioned this pull request Jul 17, 2017
14 tasks
@alukach alukach force-pushed the async/task-tooling-db-backend branch from 3eac444 to 7c7457e Compare July 18, 2017 05:25
@alukach alukach merged commit a7ed1b1 into feature/async Jul 18, 2017
@alukach alukach deleted the async/task-tooling-db-backend branch July 18, 2017 19:33
@alukach alukach restored the async/task-tooling-db-backend branch July 18, 2017 19:47
@alukach alukach deleted the async/task-tooling-db-backend branch July 18, 2017 19:47
alukach pushed a commit that referenced this pull request Jul 18, 2017
alukach pushed a commit that referenced this pull request Aug 5, 2017
@alukach
Copy link
Contributor Author

alukach commented Aug 22, 2017

Dev Notes

Worker processes will need to have access to a user with the following permssions:

GRANT CONNECT ON DATABASE cadasta TO asyncworker ;
GRANT SELECT, UPDATE, INSERT ON celery_taskmeta TO asyncworker ;
GRANT USAGE ON task_id_sequence TO asyncworker ;

Export Worker has made the following concessions:

Changes to download package:

  • locations.xls generated during shp download now contains geometry.ewkt column (it was simpler to leave this in, it could be removed if need be)
  • resources.xls now contains filename column to describe their filename in the zip. This can differ from original_name as a number is appended to the filename if it conflicts with a filename in the zip (ie ['foo.jpg', 'foo.jpg'] -> ['foo.jpg', 'foo_2.jpg'])
  • zip filename template changed from {project.id}-{user.id}-{unixtime}-{datatype}.zip to {date.isoformat}_{organization.slug}_{project_slug}_{datatype}.zip where datatype is one of ['res', 'shp', 'xls', 'all'].
  • README.txt no longer supports internationalization. Additionally, it does not include the Project Name.
  • I opted not to create .csv files when exporting Shapefiles. These files contained the same data as the .xlsx files. Instead, Shapefile exports now imply an XLS export process.

alukach pushed a commit that referenced this pull request Aug 24, 2017
alukach pushed a commit that referenced this pull request Sep 12, 2017
alukach pushed a commit that referenced this pull request Sep 12, 2017
alukach pushed a commit that referenced this pull request Sep 27, 2017
alukach pushed a commit that referenced this pull request Sep 28, 2017
alukach pushed a commit that referenced this pull request Sep 28, 2017
alukach pushed a commit that referenced this pull request Sep 28, 2017
alukach pushed a commit that referenced this pull request Nov 8, 2017
alukach pushed a commit that referenced this pull request Nov 13, 2017
alukach pushed a commit that referenced this pull request Nov 19, 2017
alukach pushed a commit that referenced this pull request Dec 6, 2017
alukach pushed a commit that referenced this pull request Dec 19, 2017
alukach pushed a commit that referenced this pull request Jan 3, 2018
alukach pushed a commit that referenced this pull request Jan 24, 2018
alukach added a commit that referenced this pull request Jan 24, 2018
* Add backend-tooling for asynchronous tasks (#1624)

* Async: Configure DB and Cadasta Platform dev VM to support async workers (#1800)

* Refactor download form to schedule task (#1799)

* Async: Display export tasks on project dashboard (#1801)

* Async: Implement circuit breaker for celery tooling (#1830)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants