-
Notifications
You must be signed in to change notification settings - Fork 9
/
start-kombu-message-processor-rabbitmq.py
executable file
·56 lines (45 loc) · 1.98 KB
/
start-kombu-message-processor-rabbitmq.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
#!/usr/bin/env python
from spylunking.log.setup_logging import build_colorized_logger
from celery_connectors.utils import ev
from celery_connectors.message_processor import MessageProcessor
name = "msg-proc"
log = build_colorized_logger(
name=name)
log.info("Start - {}".format(name))
# want to change where you're subscribing vs publishing?
sub_ssl_options = {}
sub_auth_url = ev("SUB_BROKER_URL", "pyamqp://rabbitmq:rabbitmq@localhost:5672//")
pub_ssl_options = {}
pub_auth_url = ev("PUB_BROKER_URL", "redis://localhost:6379/0")
# start the message processor
msg_proc = MessageProcessor(name=name,
sub_auth_url=sub_auth_url,
sub_ssl_options=sub_ssl_options,
pub_auth_url=pub_auth_url,
pub_ssl_options=pub_ssl_options)
# configure where this is consuming:
queue = ev("CONSUME_QUEUE", "user.events.conversions")
# Relay Publish Hook - sending to Redis
# where is it sending handled messages using a publish-hook or auto-caching:
exchange = ev("PUBLISH_EXCHANGE", "reporting.accounts")
routing_key = ev("PUBLISH_ROUTING_KEY", "reporting.accounts")
# set up the controls and long-term connection attributes
seconds_to_consume = 10.0
heartbeat = 60
serializer = "application/json"
pub_serializer = "json"
expiration = None
consume_forever = True
# start consuming
msg_proc.consume_queue(queue=queue,
heartbeat=heartbeat,
expiration=expiration,
sub_serializer=serializer,
pub_serializer=pub_serializer,
seconds_to_consume=seconds_to_consume,
forever=consume_forever,
# Optional: if you're chaining a publish hook to another system
exchange=exchange,
# Optional: if you're chaining a publish hook to another system
routing_key=routing_key)
log.info("End - {}".format(name))