-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathflask_producer.py
executable file
·67 lines (53 loc) · 1.79 KB
/
flask_producer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
#!/usr/bin/env python
import pika
import sys
import socket
import json
import rabbit_config as rcfg
if len(sys.argv) < 3:
sys.stderr.write("Usage: {} TAG USERNAME ".format(sys.argv[0]))
exit(1)
node = sys.argv[1]
user_name = sys.argv[2]
message = {
"username": user_name,
"fullname": "Full Name",
"reason": "Reason1, Reason2.",
}
hostname = socket.gethostname().split(".", 1)[0]
connect_host = rcfg.Server if hostname != rcfg.Server else "localhost"
# Set up credentials to connect to RabbitMQ server
credentials = pika.PlainCredentials(rcfg.User, rcfg.Password)
parameters = pika.ConnectionParameters(
connect_host, rcfg.Port, rcfg.VHost, credentials
)
# Establish connection to RabbitMQ server
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.exchange_declare(exchange=rcfg.Exchange, exchange_type="direct")
channel.basic_publish(
exchange=rcfg.Exchange, routing_key=node, body=json.dumps(message)
)
print(" [x] Sent {}: {}".format(node, json.dumps(message)))
# creates a named queue
result = channel.queue_declare(queue=user_name, exclusive=False, durable=True)
# bind the queue with exchange
channel.queue_bind(
exchange=rcfg.Exchange, queue=user_name, routing_key=user_name
)
def work(ch, method, properties, body):
msg = json.loads(body)
print("Received message from {}: \n\t{}".format(method.routing_key, msg))
channel.queue_delete(method.routing_key)
# ingest messages, and assume delivered via auto_ack
channel.basic_consume(
queue=sys.argv[2], on_message_callback=work, auto_ack=True
)
print("Subscribing to queue: {}".format(sys.argv[2]))
# initiate message ingestion
try:
channel.start_consuming()
except KeyboardInterrupt:
print("Disconnecting from broker.")
channel.stop_consuming()
connection.close()