Skip to content

Commit

Permalink
Merge pull request #174 from sh00t2kill/fix-errors-with-aws-sdk-v2
Browse files Browse the repository at this point in the history
Fix errors with aws sdk v2
  • Loading branch information
elad-bar authored Feb 17, 2024
2 parents 5832bf6 + 764fb2c commit 3b9fd5e
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
- Change clean_session parameter to start a new session on every attempt to connect
- Set the AWS Client ID to the entry ID instead of randomize on every attempt to connect
- Set message being sent while publishing empty message to empty JSON instead of empty string
- Switch publish to fully async implementation

## v1.0.9

Expand Down
49 changes: 41 additions & 8 deletions custom_components/mydolphin_plus/managers/aws_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ def __init__(self, hass: HomeAssistant | None, config_manager: ConfigManager):

self._topic_data = None
self._awsiot_client = None
self._messages_published: dict[int, dict[str, str]] = {}

self._status = None

Expand All @@ -114,6 +115,10 @@ def __init__(self, hass: HomeAssistant | None, config_manager: ConfigManager):
ConnectionCallbacks.RESUMED: self._on_connection_resumed,
}

self._on_publish_completed_callback = lambda f: self._on_publish_completed(
f
)

except Exception as ex:
exc_type, exc_obj, tb = sys.exc_info()
line_number = tb.tb_lineno
Expand Down Expand Up @@ -275,21 +280,26 @@ def _on_connection_success(self, connection, callback_data):
self._set_status(ConnectivityStatus.Connected)

def _on_connection_failure(self, connection, callback_data):
if isinstance(callback_data, mqtt.OnConnectionFailureData):
if connection is not None and isinstance(
callback_data, mqtt.OnConnectionFailureData
):
_LOGGER.error(f"AWS IoT connection failed, Error: {callback_data.error}")

self._set_status(ConnectivityStatus.Failed)

def _on_connection_closed(self, connection, callback_data):
if isinstance(callback_data, mqtt.OnConnectionClosedData):
if connection is not None and isinstance(
callback_data, mqtt.OnConnectionClosedData
):
_LOGGER.debug("AWS IoT connection was closed")

self._set_status(ConnectivityStatus.Disconnected)

def _on_connection_interrupted(self, connection, error, **_kwargs):
_LOGGER.error(f"AWS IoT connection interrupted, Error: {error}")

self._set_status(ConnectivityStatus.Failed)
if connection is not None:
self._set_status(ConnectivityStatus.Failed)

def _on_connection_resumed(
self, connection, return_code, session_present, **_kwargs
Expand Down Expand Up @@ -417,15 +427,14 @@ def _publish(self, topic: str, data: dict | None):
if self._status == ConnectivityStatus.Connected:
try:
if self._awsiot_client is not None:
_LOGGER.debug(f"Trying to publish message {payload} to {topic}")

publish_future, packet_id = self._awsiot_client.publish(
topic, payload, mqtt.QoS.AT_MOST_ONCE
)
self._pre_publish_message(packet_id, topic, payload)

publish_future.result()

_LOGGER.debug(f"Published message: {data} to {topic}")
publish_future.add_done_callback(
self._on_publish_completed_callback
)

except Exception as ex:
_LOGGER.error(
Expand All @@ -437,6 +446,30 @@ def _publish(self, topic: str, data: dict | None):
f"Failed to publish message: {data} to {topic}, Broker is not connected"
)

def _pre_publish_message(self, message_id: int, topic: str, payload: str):
_LOGGER.debug(f"Published message to {topic}, Data: {payload}")

self._messages_published[message_id] = {"topic": topic, "payload": payload}

def _post_message_published(self, message_id: int):
published_data = self._messages_published.get(message_id, {})

topic = published_data.get("topic")
payload = published_data.get("payload")

_LOGGER.info(f"Published message #{message_id} to {topic}, Data: {payload}")

del self._messages_published[message_id]

def _on_publish_completed(self, publish_future):
publish_results = publish_future.result()
_LOGGER.debug(f"Publish results: {publish_results}")

if publish_results is not None and "packet_id" in publish_results:
packet_id = publish_results.get("packet_id")

self._post_message_published(packet_id)

def set_cleaning_mode(self, clean_mode: CleanModes):
data = {DATA_SCHEDULE_CLEANING_MODE: {CONF_MODE: str(clean_mode)}}

Expand Down

0 comments on commit 3b9fd5e

Please sign in to comment.