-
Notifications
You must be signed in to change notification settings - Fork 213
/
Copy pathmqtt5_pubsub.py
147 lines (120 loc) · 5.69 KB
/
mqtt5_pubsub.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0.
from awsiot import mqtt5_client_builder
from awscrt import mqtt5, http
import threading
from concurrent.futures import Future
import time
import json
from utils.command_line_utils import CommandLineUtils
TIMEOUT = 100
topic_filter = "test/topic"
# cmdData is the arguments/input from the command line placed into a single struct for
# use in this sample. This handles all of the command line parsing, validating, etc.
# See the Utils/CommandLineUtils for more information.
cmdData = CommandLineUtils.parse_sample_input_mqtt5_pubsub()
received_count = 0
received_all_event = threading.Event()
future_stopped = Future()
future_connection_success = Future()
# Callback when any publish is received
def on_publish_received(publish_packet_data):
publish_packet = publish_packet_data.publish_packet
assert isinstance(publish_packet, mqtt5.PublishPacket)
print("Received message from topic'{}':{}".format(publish_packet.topic, publish_packet.payload))
global received_count
received_count += 1
if received_count == cmdData.input_count:
received_all_event.set()
# Callback for the lifecycle event Stopped
def on_lifecycle_stopped(lifecycle_stopped_data: mqtt5.LifecycleStoppedData):
print("Lifecycle Stopped")
global future_stopped
future_stopped.set_result(lifecycle_stopped_data)
# Callback for the lifecycle event Connection Success
def on_lifecycle_connection_success(lifecycle_connect_success_data: mqtt5.LifecycleConnectSuccessData):
print("Lifecycle Connection Success")
global future_connection_success
future_connection_success.set_result(lifecycle_connect_success_data)
# Callback for the lifecycle event Connection Failure
def on_lifecycle_connection_failure(lifecycle_connection_failure: mqtt5.LifecycleConnectFailureData):
print("Lifecycle Connection Failure")
print("Connection failed with exception:{}".format(lifecycle_connection_failure.exception))
if __name__ == '__main__':
print("\nStarting MQTT5 PubSub Sample\n")
message_count = cmdData.input_count
message_topic = cmdData.input_topic
message_string = cmdData.input_message
# Create the proxy options if the data is present in cmdData
proxy_options = None
if cmdData.input_proxy_host is not None and cmdData.input_proxy_port != 0:
proxy_options = http.HttpProxyOptions(
host_name=cmdData.input_proxy_host,
port=cmdData.input_proxy_port)
# Create MQTT5 client
client = mqtt5_client_builder.mtls_from_path(
endpoint=cmdData.input_endpoint,
port=cmdData.input_port,
cert_filepath=cmdData.input_cert,
pri_key_filepath=cmdData.input_key,
ca_filepath=cmdData.input_ca,
http_proxy_options=proxy_options,
on_publish_received=on_publish_received,
on_lifecycle_stopped=on_lifecycle_stopped,
on_lifecycle_connection_success=on_lifecycle_connection_success,
on_lifecycle_connection_failure=on_lifecycle_connection_failure,
client_id=cmdData.input_clientId)
print("MQTT5 Client Created")
if not cmdData.input_is_ci:
print(f"Connecting to {cmdData.input_endpoint} with client ID '{cmdData.input_clientId}'...")
else:
print("Connecting to endpoint with client ID")
client.start()
lifecycle_connect_success_data = future_connection_success.result(TIMEOUT)
connack_packet = lifecycle_connect_success_data.connack_packet
negotiated_settings = lifecycle_connect_success_data.negotiated_settings
if not cmdData.input_is_ci:
print(
f"Connected to endpoint:'{cmdData.input_endpoint}' with Client ID:'{cmdData.input_clientId}' with reason_code:{repr(connack_packet.reason_code)}")
# Subscribe
print("Subscribing to topic '{}'...".format(message_topic))
subscribe_future = client.subscribe(subscribe_packet=mqtt5.SubscribePacket(
subscriptions=[mqtt5.Subscription(
topic_filter=message_topic,
qos=mqtt5.QoS.AT_LEAST_ONCE)]
))
suback = subscribe_future.result(TIMEOUT)
print("Subscribed with {}".format(suback.reason_codes))
# Publish message to server desired number of times.
# This step is skipped if message is blank.
# This step loops forever if count was set to 0.
if message_string:
if message_count == 0:
print("Sending messages until program killed")
else:
print("Sending {} message(s)".format(message_count))
publish_count = 1
while (publish_count <= message_count) or (message_count == 0):
message = "{} [{}]".format(message_string, publish_count)
print("Publishing message to topic '{}': {}".format(message_topic, message))
publish_future = client.publish(mqtt5.PublishPacket(
topic=message_topic,
payload=json.dumps(message_string),
qos=mqtt5.QoS.AT_LEAST_ONCE
))
publish_completion_data = publish_future.result(TIMEOUT)
print("PubAck received with {}".format(repr(publish_completion_data.puback.reason_code)))
time.sleep(1)
publish_count += 1
received_all_event.wait(TIMEOUT)
print("{} message(s) received.".format(received_count))
# Unsubscribe
print("Unsubscribing from topic '{}'".format(message_topic))
unsubscribe_future = client.unsubscribe(unsubscribe_packet=mqtt5.UnsubscribePacket(
topic_filters=[message_topic]))
unsuback = unsubscribe_future.result(TIMEOUT)
print("Unsubscribed with {}".format(unsuback.reason_codes))
print("Stopping Client")
client.stop()
future_stopped.result(TIMEOUT)
print("Client Stopped!")