-
Notifications
You must be signed in to change notification settings - Fork 81
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Async: Implement circuit breaker for celery tooling #1830
Conversation
15c3784
to
bba5f7b
Compare
edd3b06
to
5ca3772
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work! I have two comments re your docstrings. They don't seem to be fully up to date.
cadasta/core/breakers/storages.py
Outdated
|
||
class CircuitBreakerCacheStorage(pybreaker.CircuitBreakerStorage): | ||
""" | ||
Defines the underlying storage for a circuit breaker - the underlying |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The docstring is not 100% clear. You say "the underlying implementation should be in a subclass that overrides the method this class defines" but CircuitBreakerCacheStorage
but there's no subclass anywhere. So I'm assuming that you mean pybreaker.CircuitBreakerStorage
needs a subclass, which you're implementing here. Is that correct?
cadasta/core/breakers/storages.py
Outdated
|
||
def __init__(self, namespace, fallback_state=pybreaker.STATE_CLOSED): | ||
""" | ||
Creates a new instance with the given `state` and `redis` object. The |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The docstring here doesn't seem to be correct. It looks like the docstring for pybreaker. CircuitRedisStorage
; can you update that?
fb0671c
to
5ca3772
Compare
You're completely correct, they should be resolved now. Also, when looking at this I realized that there was no logic to ensure that circuitbreakers would not have overlapping namespaces. I've added this in the last commit. |
17439a9
to
51d59b9
Compare
1f53b5d
to
47a97d3
Compare
e2f6628
to
0f7a99c
Compare
ab01ebc
to
1b2f863
Compare
0f7a99c
to
417bea8
Compare
417bea8
to
d0797b8
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the really long delay. I will admit that I did not fully understand everything because I don't have much familiarity with the packages used. But for the things that I could understand, there were no code smells or defects that I can see.
Note also that I wasn't able to actually test the circuit breaking function because I don't have AWS credentials.
* Fixup docstring, ensure no duplicate namespace * Defer queue setup to workertoolbox * Add circuit breaker for celery tooling * Cleanup Celery queue prefix settings * Ensure all tasks are found * Add sync task to vagrant machine * Fixup * Fixup * Ensure logging helper is disabled * Fix factory to add real task * Update field type
* Fixup docstring, ensure no duplicate namespace * Defer queue setup to workertoolbox * Add circuit breaker for celery tooling * Cleanup Celery queue prefix settings * Ensure all tasks are found * Add sync task to vagrant machine * Fixup * Fixup * Ensure logging helper is disabled * Fix factory to add real task * Update field type
* Fixup docstring, ensure no duplicate namespace * Defer queue setup to workertoolbox * Add circuit breaker for celery tooling * Cleanup Celery queue prefix settings * Ensure all tasks are found * Add sync task to vagrant machine * Fixup * Fixup * Ensure logging helper is disabled * Fix factory to add real task * Update field type
* Fixup docstring, ensure no duplicate namespace * Defer queue setup to workertoolbox * Add circuit breaker for celery tooling * Cleanup Celery queue prefix settings * Ensure all tasks are found * Add sync task to vagrant machine * Fixup * Fixup * Ensure logging helper is disabled * Fix factory to add real task * Update field type
* Fixup docstring, ensure no duplicate namespace * Defer queue setup to workertoolbox * Add circuit breaker for celery tooling * Cleanup Celery queue prefix settings * Ensure all tasks are found * Add sync task to vagrant machine * Fixup * Fixup * Ensure logging helper is disabled * Fix factory to add real task * Update field type
* Fixup docstring, ensure no duplicate namespace * Defer queue setup to workertoolbox * Add circuit breaker for celery tooling * Cleanup Celery queue prefix settings * Ensure all tasks are found * Add sync task to vagrant machine * Fixup * Fixup * Ensure logging helper is disabled * Fix factory to add real task * Update field type
* Fixup docstring, ensure no duplicate namespace * Defer queue setup to workertoolbox * Add circuit breaker for celery tooling * Cleanup Celery queue prefix settings * Ensure all tasks are found * Add sync task to vagrant machine * Fixup * Fixup * Ensure logging helper is disabled * Fix factory to add real task * Update field type
* Add backend-tooling for asynchronous tasks (#1624) * Async: Configure DB and Cadasta Platform dev VM to support async workers (#1800) * Refactor download form to schedule task (#1799) * Async: Display export tasks on project dashboard (#1801) * Async: Implement circuit breaker for celery tooling (#1830)
Proposed changes in this pull request
The PR applies the circuit breaker pattern to the Export tooling. The idea is that if calls to an external service fails (in this case, if SQS is unreachable) consistently over specified threshold number of times, the circuit breaker enters an 'open' state where subsequent calls are blocked for a specified "cool-off" period. After the cool-off period, another call is permitted, the success of which will dictate if the circuit breaker should remain 'open' or should 'close' (allowing more failures). The idea is that the service will fail fast rather than tie up a number of threads/processes waiting for a non-responsive external service to time out. For more information, see Martin Fowler's blog post.
To achieve this pattern, we need to:
kombu
requirement to a commit that includes this not-yet-released PR. This allows us to set a max number of retries for connecting to SQS, preventing the default behaviour of retrying indefinitely. This additionally requires us to add amax_retries
configuration to ourCELERY_BROKER_TRANSPORT_OPTIONS
CircuitBreaker
instance to manage connections to SQS. This is based off of thepybreaker
library. State of the breaker is stored in our cache, ensuring that it is shared between many instances of the Cadasta Platform (such as the webserver instances spun up by uwsgi, themanage.py sync_tasks
process, and multiple servers if we ever go down that route). If our cache is inaccessible (i.e. if memcached goes down or is unreachable for any reason), all circuit breakers are assumed to be 'open' (i.e. enabling communication to the service). When a circuit breaker changes state, it is logged at levelerror
when opened and at levelinfo
when closed.CircuitBreakerError
that is thrown when a breaker is open. To simplify the management of these errors, we've added aexpected_errors
attribute to the circuit breaker. This provides an object that can be used in the try/catch statementAdditionally, I've set up the system to:
manage.py sync_tasks
command to be running in the background, we have a service that will be constantly attempting to connect to SQS. This means that the state of the circuit breaker should represent (in near-real-time) the accessibility of SQS. I've added anis_open
property to our circuit breakers to return this value. The export button on the project dashboard is disabled whenis_open
returnsFalse
, providing the status of the export service to the end-user so as to have them avoid receiving a "export service down" error message.When should this PR be merged
ASAP, as this is targeted to
feature/async
How to test
To test this PR, I recommend developers:
sudo apt-get install memcached`
SQS=1
environment flag). The following helpers make it easy to disable the internet connection on your VMs:Risks
Follow-up actions
See #1400
Checklist (for reviewing)
General
migration
label if a new migration is added.Functionality
Code
Tests
Security
Documentation