Skip to content

Commit

Permalink
Async: Implement circuit breaker for celery tooling (#1830)
Browse files Browse the repository at this point in the history
* Fixup docstring, ensure no duplicate namespace

* Defer queue setup to workertoolbox

* Add circuit breaker for celery tooling

* Cleanup Celery queue prefix settings

* Ensure all tasks are found

* Add sync task to vagrant machine

* Fixup

* Fixup

* Ensure logging helper is disabled

* Fix factory to add real task

* Update field type
  • Loading branch information
alukach authored and Anthony Lukach committed Nov 13, 2017
1 parent 85da069 commit 6274225
Show file tree
Hide file tree
Showing 28 changed files with 689 additions and 82 deletions.
4 changes: 4 additions & 0 deletions cadasta/config/settings/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -573,3 +573,7 @@
ES_HOST = 'localhost'
ES_PORT = '9200'
ES_MAX_RESULTS = 10000

# Async Tooling
CELERY_BROKER_TRANSPORT = 'sqs' if os.environ.get('SQS') else 'memory'
CELERY_QUEUE_PREFIX = os.environ.get('QUEUE_PREFIX', 'dev')
10 changes: 0 additions & 10 deletions cadasta/config/settings/dev.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import os
from .default import * # NOQA

DEBUG = True
Expand Down Expand Up @@ -106,12 +105,3 @@
}

ES_PORT = '8000'

# Async Tooling
CELERY_BROKER_TRANSPORT = 'sqs' if os.environ.get('SQS') else 'memory'
CELERY_BROKER_TRANSPORT_OPTIONS = {
'region': 'us-west-2',
'queue_name_prefix': '{}-'.format(os.environ.get('QUEUE-PREFIX', 'dev')),
'wait_time_seconds': 20,
'visibility_timeout': 20,
} if CELERY_BROKER_TRANSPORT.lower() == 'sqs' else {}
7 changes: 1 addition & 6 deletions cadasta/config/settings/production.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,4 @@

# Async Tooling
CELERY_BROKER_TRANSPORT = 'sqs'
CELERY_BROKER_TRANSPORT_OPTIONS = {
'region': 'us-west-2',
'queue_name_prefix': '{}-'.format(os.environ['QUEUE-PREFIX']),
'wait_time_seconds': 20,
'visibility_timeout': 20,
}
CELERY_QUEUE_PREFIX = os.environ['QUEUE_PREFIX']
8 changes: 8 additions & 0 deletions cadasta/core/breakers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from kombu.exceptions import OperationalError

from .breaker import CircuitBreaker


celery = CircuitBreaker(
'celery', fail_max=1,
expected_errors=(OperationalError,))
29 changes: 29 additions & 0 deletions cadasta/core/breakers/breaker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import pybreaker

from .storages import CircuitBreakerCacheStorage as CacheStorage
from .listeners import LogListener


class CircuitBreaker(pybreaker.CircuitBreaker):
def __init__(self, name, expected_errors=(), **kwargs):
# Set defaults
if 'state_storage' not in kwargs:
kwargs['state_storage'] = CacheStorage(name)
if 'listeners' not in kwargs:
kwargs['listeners'] = [LogListener()]
if 'exclude' not in kwargs:
kwargs['exclude'] = [KeyboardInterrupt]

self.name = name
# Convenience attribute to allow codebase to easily catch and
# handle expected errors
self.expected_errors = tuple(
(pybreaker.CircuitBreakerError,) + tuple(expected_errors))
super(CircuitBreaker, self).__init__(**kwargs)

@property
def is_open(self):
return self.current_state == pybreaker.STATE_OPEN

def __repr__(self):
return "<{0.__class__.__name__}: {0.name}>".format(self)
26 changes: 26 additions & 0 deletions cadasta/core/breakers/listeners.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import logging

import pybreaker


logger = logging.getLogger(__name__)


class LogListener(pybreaker.CircuitBreakerListener):
""" Listener used to log circuit breaker events. """

def state_change(self, cb, old_state, new_state):
was_closed = old_state.name == pybreaker.STATE_CLOSED
now_open = new_state.name == pybreaker.STATE_OPEN
level = "error" if was_closed and now_open else "info"
getattr(logger, level)(
"Changed %r breaker state from %r to %r",
cb._state_storage.name, old_state.name, new_state.name)

def failure(self, cb, exc):
logger.exception("Failed call on circuit breaker %r",
cb._state_storage.name)

def success(self, cb):
logger.debug("Successful call on circuit breaker %r",
cb._state_storage.name)
115 changes: 115 additions & 0 deletions cadasta/core/breakers/storages.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
from functools import partial
import weakref

from django.core.cache import cache
import pybreaker

from core.decorators import handle_exc


def get_offline_cache_errors():
if getattr(cache, '_lib', None) and cache._lib.__name__ == 'pylibmc':
import pylibmc # pragma: no cover
return ( # pragma: no cover
pylibmc.ConnectionError, pylibmc.UnixSocketError,
pylibmc.ServerDown, pylibmc.ServerDead, pylibmc.NoServers,
)
return ()


class CircuitBreakerCacheStorage(pybreaker.CircuitBreakerStorage):
"""
A cache-based storage for pybreaker. If cache retrieval fails (in the event
of an infrastructure failure) the storage defaults to its provided
fallback_state.
"""
BASE_NAMESPACE = 'pybreaker'
__NAMESPACES = weakref.WeakValueDictionary()

def __init__(self, namespace, fallback_state=pybreaker.STATE_CLOSED):
"""
Creates a new instance with the given `namespace` and an optional
`fallback_state` object. The namespace is used to identify the circuit
breaker within the cache and therefore must be different from any other
circuit breaker namespaces. If there are any connection issues with
cache, the `fallback_circuit_state` is used to determine the state of
the circuit.
"""
assert namespace not in self.__NAMESPACES, (
"Attempt to create circuit breaker for already-used namespace "
"{!r}".format(namespace))
self.__NAMESPACES[namespace] = self

super(CircuitBreakerCacheStorage, self).__init__(namespace)
self._namespace_name = namespace
self._fallback_state = fallback_state

# Helpers to handle down cache-backend
self._handled_cache_errs = get_offline_cache_errors()
self._down_cache_handler_partial = partial(
handle_exc, *self._handled_cache_errs)

@property
def _safe_cache_set(self):
""" cache.set that silently handles down cache """
return self._down_cache_handler_partial()(cache.set)

@property
def _safe_cache_get(self):
""" cache.get that silently handles down cache """
return self._down_cache_handler_partial()(cache.get)

@property
def _safe_cache_incr(self):
""" cache.incr that silently handles down cache """
return self._down_cache_handler_partial()(cache.incr)

@property
def namespace(self):
return '{}:{}'.format(self.BASE_NAMESPACE, self._namespace_name)

@property
def _state_namespace(self):
return '{}:state'.format(self.namespace)

@property
def _counter_namespace(self):
return '{}:counter'.format(self.namespace)

@property
def _opened_at_namespace(self):
return '{}:opened_at'.format(self.namespace)

@property
def state(self):
down_cache_handler = self._down_cache_handler_partial(
fallback=self._fallback_state)
safe_cache_get = down_cache_handler(cache.get)
return safe_cache_get(self._state_namespace, self._fallback_state)

@state.setter
def state(self, state):
self._safe_cache_set(self._state_namespace, state)

def increment_counter(self):
namespace = self._counter_namespace
try:
return self._safe_cache_incr(namespace)
except ValueError:
# No counter at specified namespace, make counter starting at 1
return self._safe_cache_set(namespace, 1)

def reset_counter(self):
return self._safe_cache_set(self._counter_namespace, 0)

@property
def counter(self):
return self._safe_cache_get(self._counter_namespace, 0)

@property
def opened_at(self):
return self._safe_cache_get(self._opened_at_namespace)

@opened_at.setter
def opened_at(self, datetime):
return self._safe_cache_set(self._opened_at_namespace, datetime)
31 changes: 31 additions & 0 deletions cadasta/core/decorators.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from functools import wraps
import inspect
import logging


logger = logging.getLogger(__name__)


def handle_exc(*exceptions, fallback=None):
"""
Decorator to catch, log, and suppress whitelisted exceptions. Optionally,
will return fallback value if exception is caught.
"""
msg = "{!r} not an instance of BaseException"
for e in exceptions:
try:
assert inspect.isclass(e)
assert issubclass(e, BaseException)
except AssertionError:
raise TypeError(msg.format(e))

def wrapper(f):
@wraps(f)
def func(*args, **kwargs):
try:
return f(*args, **kwargs)
except exceptions:
logger.exception("Handling failure for %r", f.__name__)
return fallback
return func
return wrapper
Loading

0 comments on commit 6274225

Please sign in to comment.