From 86a49e04fdd94f898ec6117772516ca8a7508d09 Mon Sep 17 00:00:00 2001 From: Elad Bar Date: Sat, 17 Feb 2024 10:27:43 +0200 Subject: [PATCH 1/3] Fix freeze after publishing message with AWS IoT SDK v2 --- CHANGELOG.md | 9 ++++++ .../mydolphin_plus/managers/aws_client.py | 28 ++++++++++--------- .../mydolphin_plus/manifest.json | 4 +-- 3 files changed, 26 insertions(+), 15 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a54448c..df932f7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,14 @@ # Changelog +## v1.0.10 + +- Update manifest requirements to use only AWS IoT Device SDK v2 +- Fix parameter of callback to the right convention +- Fix AWS IoT MQTT publish message validation and QoS +- Change clean_session parameter to start a new session on every attempt to connect +- Set the AWS Client ID to the entry ID instead of randomize on every attempt to connect +- Set message being sent while publishing empty message to empty JSON instead of empty string + ## v1.0.9 - Upgrade AWS IoT Device SDK to v2 diff --git a/custom_components/mydolphin_plus/managers/aws_client.py b/custom_components/mydolphin_plus/managers/aws_client.py index e07e931..15eed02 100644 --- a/custom_components/mydolphin_plus/managers/aws_client.py +++ b/custom_components/mydolphin_plus/managers/aws_client.py @@ -7,7 +7,6 @@ import sys from time import sleep from typing import Any -import uuid from awscrt import auth, mqtt from awsiot import mqtt_connection_builder @@ -95,6 +94,7 @@ def __init__(self, hass: HomeAssistant | None, config_manager: ConfigManager): try: self._hass = hass self._config_manager = config_manager + self._awsiot_id = config_manager.entry_id self._api_data = {} self._data = {} @@ -165,7 +165,6 @@ async def initialize(self): self._set_status(ConnectivityStatus.Connecting) - awsiot_id = str(uuid.uuid4()) aws_token = self._api_data.get(API_RESPONSE_DATA_TOKEN) aws_key = self._api_data.get(API_RESPONSE_DATA_ACCESS_KEY_ID) aws_secret = self._api_data.get(API_RESPONSE_DATA_SECRET_ACCESS_KEY) @@ -192,10 +191,9 @@ async def initialize(self): region=AWS_REGION, ca_filepath=ca_file_path, credentials_provider=credentials_provider, - client_id=awsiot_id, - clean_session=False, + client_id=self._awsiot_id, + clean_session=True, keep_alive_secs=30, - http_proxy_options=None, on_connection_success=self._connection_callbacks.get( ConnectionCallbacks.SUCCESS ), @@ -288,7 +286,7 @@ def _on_connection_closed(self, connection, callback_data): self._set_status(ConnectivityStatus.Disconnected) - def _on_connection_interrupted(self, _connection, error, **_kwargs): + def _on_connection_interrupted(self, connection, error, **_kwargs): _LOGGER.error(f"AWS IoT connection interrupted, Error: {error}") self._set_status(ConnectivityStatus.Failed) @@ -411,19 +409,23 @@ def _send_dynamic_command(self, description: str, payload: dict | None): self._publish(self._topic_data.dynamic, payload) def _publish(self, topic: str, data: dict | None): - payload = "" if data is None else json.dumps(data) + if data is None: + data = {} + + payload = json.dumps(data) if self._status == ConnectivityStatus.Connected: try: if self._awsiot_client is not None: - published = self._awsiot_client.publish( - topic, payload, mqtt.QoS.AT_LEAST_ONCE + _LOGGER.debug(f"Trying to publish message {payload} to {topic}") + + publish_future, packet_id = self._awsiot_client.publish( + topic, payload, mqtt.QoS.AT_MOST_ONCE ) - if published: - _LOGGER.debug(f"Published message: {data} to {topic}") - else: - _LOGGER.warning(f"Failed to publish message: {data} to {topic}") + publish_future.result() + + _LOGGER.debug(f"Published message: {data} to {topic}") except Exception as ex: _LOGGER.error( diff --git a/custom_components/mydolphin_plus/manifest.json b/custom_components/mydolphin_plus/manifest.json index 38538b7..d195ed7 100644 --- a/custom_components/mydolphin_plus/manifest.json +++ b/custom_components/mydolphin_plus/manifest.json @@ -8,6 +8,6 @@ "documentation": "https://github.com/sh00t2kill/dolphin-robot", "iot_class": "cloud_push", "issue_tracker": "https://github.com/sh00t2kill/dolphin-robot/issues", - "requirements": ["awscrt~=0.20.2", "awsiotsdk~=1.21.0"], - "version": "1.0.9" + "requirements": ["awsiotsdk"], + "version": "1.0.10" } From aace17e8fe3be30f99dfe046b19c16360778dfec Mon Sep 17 00:00:00 2001 From: Elad Bar Date: Sat, 17 Feb 2024 11:01:45 +0200 Subject: [PATCH 2/3] full async implrementation of publish message --- CHANGELOG.md | 1 + .../mydolphin_plus/managers/aws_client.py | 37 +++++++++++++++++-- 2 files changed, 34 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index df932f7..19dc753 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ - Change clean_session parameter to start a new session on every attempt to connect - Set the AWS Client ID to the entry ID instead of randomize on every attempt to connect - Set message being sent while publishing empty message to empty JSON instead of empty string +- Switch publish to fully async implementation ## v1.0.9 diff --git a/custom_components/mydolphin_plus/managers/aws_client.py b/custom_components/mydolphin_plus/managers/aws_client.py index 15eed02..519c8f8 100644 --- a/custom_components/mydolphin_plus/managers/aws_client.py +++ b/custom_components/mydolphin_plus/managers/aws_client.py @@ -101,6 +101,7 @@ def __init__(self, hass: HomeAssistant | None, config_manager: ConfigManager): self._topic_data = None self._awsiot_client = None + self._messages_published: dict[int, dict[str, str]] = {} self._status = None @@ -114,6 +115,10 @@ def __init__(self, hass: HomeAssistant | None, config_manager: ConfigManager): ConnectionCallbacks.RESUMED: self._on_connection_resumed, } + self._on_publish_completed_callback = lambda f: self._on_publish_completed( + f + ) + except Exception as ex: exc_type, exc_obj, tb = sys.exc_info() line_number = tb.tb_lineno @@ -417,15 +422,15 @@ def _publish(self, topic: str, data: dict | None): if self._status == ConnectivityStatus.Connected: try: if self._awsiot_client is not None: - _LOGGER.debug(f"Trying to publish message {payload} to {topic}") - publish_future, packet_id = self._awsiot_client.publish( topic, payload, mqtt.QoS.AT_MOST_ONCE ) - publish_future.result() + self._pre_publish_message(packet_id, topic, payload) - _LOGGER.debug(f"Published message: {data} to {topic}") + publish_future.add_done_callback( + self._on_publish_completed_callback + ) except Exception as ex: _LOGGER.error( @@ -437,6 +442,30 @@ def _publish(self, topic: str, data: dict | None): f"Failed to publish message: {data} to {topic}, Broker is not connected" ) + def _pre_publish_message(self, message_id: int, topic: str, payload: str): + _LOGGER.debug(f"Published message to {topic}, Data: {payload}") + + self._messages_published[message_id] = {"topic": topic, "payload": payload} + + def _post_message_published(self, message_id: int): + published_data = self._messages_published.get(message_id, {}) + + topic = published_data.get("topic") + payload = published_data.get("payload") + + _LOGGER.info(f"Published message #{message_id} to {topic}, Data: {payload}") + + del self._messages_published[message_id] + + def _on_publish_completed(self, publish_future): + publish_results = publish_future.result() + _LOGGER.debug(f"Publish results: {publish_results}") + + if publish_results is not None and "packet_id" in publish_results: + packet_id = publish_results.get("packet_id") + + self._post_message_published(packet_id) + def set_cleaning_mode(self, clean_mode: CleanModes): data = {DATA_SCHEDULE_CLEANING_MODE: {CONF_MODE: str(clean_mode)}} From 764fb2c488a5d4a8e4d0c835c4e803c9d70f4f9f Mon Sep 17 00:00:00 2001 From: Elad Bar Date: Sat, 17 Feb 2024 11:08:08 +0200 Subject: [PATCH 3/3] fix lint error --- custom_components/mydolphin_plus/managers/aws_client.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/custom_components/mydolphin_plus/managers/aws_client.py b/custom_components/mydolphin_plus/managers/aws_client.py index 70a96f5..5e06767 100644 --- a/custom_components/mydolphin_plus/managers/aws_client.py +++ b/custom_components/mydolphin_plus/managers/aws_client.py @@ -280,13 +280,17 @@ def _on_connection_success(self, connection, callback_data): self._set_status(ConnectivityStatus.Connected) def _on_connection_failure(self, connection, callback_data): - if connection is not None and isinstance(callback_data, mqtt.OnConnectionFailureData): + if connection is not None and isinstance( + callback_data, mqtt.OnConnectionFailureData + ): _LOGGER.error(f"AWS IoT connection failed, Error: {callback_data.error}") self._set_status(ConnectivityStatus.Failed) def _on_connection_closed(self, connection, callback_data): - if connection is not None and isinstance(callback_data, mqtt.OnConnectionClosedData): + if connection is not None and isinstance( + callback_data, mqtt.OnConnectionClosedData + ): _LOGGER.debug("AWS IoT connection was closed") self._set_status(ConnectivityStatus.Disconnected)