Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug fix: status from a sub-device goes to its gateway #315

Merged
merged 8 commits into from
Jul 31, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 17 additions & 10 deletions custom_components/localtuya/core/pytuya/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -827,7 +827,7 @@ def error_json(self, number=None, payload=None):
try:
spayload = json.dumps(payload)
# spayload = payload.replace('\"','').replace('\'','')
except Exception:
except Exception: # pylint: disable=broad-except
spayload = '""'

vals = (error_codes[number], str(number), spayload)
Expand Down Expand Up @@ -867,10 +867,10 @@ async def _action():
updated_states["online"] = list(set(cached_on_devs + on_devs))
updated_states["offline"] = list(set(cached_off_devs + off_devs))

self.sub_devices_states = updated_states

if self._sub_devs_query_task is not None:
self._sub_devs_query_task.cancel()

self.sub_devices_states = updated_states
self._sub_devs_query_task = self.loop.create_task(_action())

def _setup_dispatcher(self) -> MessageDispatcher:
Expand Down Expand Up @@ -902,12 +902,16 @@ def _status_update(msg, ack=False):
listener = self.listener and self.listener()
if listener is not None:
if cid:
listener = listener.sub_devices.get(cid, listener)
device = self.dps_cache.get(cid, {})
# Don't pass sub-device's payload to the (fake)gateway!
if not (listener := listener.sub_devices.get(cid, None)):
return self.debug(
f'Payload for missing sub-device discarded: "{decoded_message}"'
)
status = self.dps_cache.get(cid, {})
else:
device = self.dps_cache.get("parent", {})
status = self.dps_cache.get("parent", {})

listener.status_updated(device)
listener.status_updated(status)

return MessageDispatcher(self.id, _status_update, self.version, self.local_key)

Expand Down Expand Up @@ -1031,7 +1035,7 @@ async def exchange_quick(self, payload, recv_retries):

try:
await self.transport_write(enc_payload)
except Exception:
except Exception: # pylint: disable=broad-except
await self.close()
return None
while recv_retries:
Expand All @@ -1040,7 +1044,7 @@ async def exchange_quick(self, payload, recv_retries):
msg = await self.dispatcher.wait_for(seqno, payload.cmd)
# for 3.4 devices, we get the starting seqno with the SESS_KEY_NEG_RESP message
self.seqno = msg.seqno
except Exception:
except Exception: # pylint: disable=broad-except
msg = None
if msg and len(msg.payload) != 0:
return msg
Expand Down Expand Up @@ -1086,7 +1090,10 @@ async def exchange(self, command, dps=None, nodeID=None, payload=None):

enc_payload = self._encode_message(payload)

await self.transport_write(enc_payload)
try:
await self.transport_write(enc_payload)
except Exception: # pylint: disable=broad-except
return self.clean_up_session()
msg = await self.dispatcher.wait_for(seqno, payload.cmd)
if msg is None:
self.debug("Wait was aborted for seqno %d", seqno)
Expand Down
Loading