Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Explicit tasks cancel #383

Merged
merged 23 commits into from
Nov 19, 2024
Merged
Changes from 11 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
308a1e0
Eliminated _call_on_close, which was growing without a control
Lurker00 Nov 4, 2024
1cbf4bc
Different cancellation order
Lurker00 Nov 5, 2024
c0332aa
Clear the task on return
Lurker00 Nov 5, 2024
59f4f8e
Typo in the comment
Lurker00 Nov 5, 2024
a54664e
Clear after use
Lurker00 Nov 5, 2024
b3563b0
Useless parameter
Lurker00 Nov 5, 2024
742b0bb
Interruptable sleep
Lurker00 Nov 5, 2024
24a4b15
Stop the previous _shutdown_entities and start a new one.
Lurker00 Nov 6, 2024
845b82f
Force debug logging of sleep cancel event
Lurker00 Nov 6, 2024
018f372
Comment
Lurker00 Nov 6, 2024
5092a9d
Fix mistaken edit
Lurker00 Nov 6, 2024
3122839
Calling _shutdown_entities() in close() is not required.
Lurker00 Nov 12, 2024
260eaee
Early interrupt of _connect_subdevices
Lurker00 Nov 13, 2024
9f5e2ec
Group cancel of async tasks
Lurker00 Nov 13, 2024
a317328
Close detached sub-devices and faster close of sub-devices
Lurker00 Nov 13, 2024
fcecc02
_sleep() actually is not required (anymore?)
Lurker00 Nov 18, 2024
709b105
Don't remove sub-devices from its gateway's container
Lurker00 Nov 19, 2024
141047c
Don't rely on disconnect(): _unsub_refresh in close()
Lurker00 Nov 19, 2024
cd5098a
Don't count "absent" events as "offline" events
Lurker00 Nov 19, 2024
08ec9e1
try block around _async_reconnect loop body
Lurker00 Nov 19, 2024
1f4bb08
try block in _shutdown_entities
Lurker00 Nov 19, 2024
107ec1a
More on cancel handling in _make_connection
Lurker00 Nov 19, 2024
5e3f5ba
cleans up and missed expection
xZetsubou Nov 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 82 additions & 54 deletions custom_components/localtuya/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,15 @@ def __init__(
# last_update_time: Sleep timer, a device that reports the status every x seconds then goes into sleep.
self._last_update_time = time.time() - 5
self._pending_status: dict[str, dict[str, Any]] = {}
self._connect_task: asyncio.Task | None = None

self._task_connect: asyncio.Task | None = None
self._task_reconnect: asyncio.Task | None = None
self._task_shutdown_entities: asyncio.Task | None = None
self._unsub_refresh: CALLBACK_TYPE | None = None
self._call_on_close: list[CALLBACK_TYPE] = []
self._unsub_new_entity: CALLBACK_TYPE | None = None

self._entities = []
self._is_closing = False
self._reconnect_task = False

self._default_reset_dpids: list | None = None
dev = self._device_config
Expand Down Expand Up @@ -118,7 +121,7 @@ def connected(self):
@property
def is_connecting(self):
"""Return whether device is currently connecting."""
return self._connect_task is not None
return self._task_connect is not None

@property
def is_subdevice(self):
Expand Down Expand Up @@ -148,9 +151,9 @@ async def async_connect(self, _now=None) -> None:
if self.connected:
return self._dispatch_status()

self._connect_task = asyncio.create_task(self._make_connection())
self._task_connect = asyncio.create_task(self._make_connection())
if not self.is_sleep:
await self._connect_task
await self._task_connect

async def _connect_subdevices(self):
"""Gateway: connect to sub-devices one by one."""
Expand Down Expand Up @@ -178,7 +181,7 @@ async def _make_connection(self):

self.debug(f"Trying to connect to: {host}...", force=True)
# Connect to the device, interface should be connected for next steps.
while retry < max_retries:
while retry < max_retries and not self._is_closing:
retry += 1
try:
if self.is_subdevice:
Expand Down Expand Up @@ -219,7 +222,7 @@ async def _make_connection(self):
break

# Get device status and configure DPS.
if self.connected:
if self.connected and not self._is_closing:
try:
# If reset dpids set - then assume reset is needed before status.
reset_dpids = self._default_reset_dpids
Expand Down Expand Up @@ -256,27 +259,28 @@ async def _make_connection(self):
update_localkey = True

# Connect and configure the entities, at this point the device should be ready to get commands.
if self.connected:
if self.connected and not self._is_closing:
# Attempt to restore status for all entities that need to first set
# the DPS value before the device will respond with status.
for entity in self._entities:
await entity.restore_state_when_connected()

def _new_entity_handler(entity_id):
self.debug(f"New entity {entity_id} was added to {host}")
self._dispatch_status()
if self._unsub_new_entity is None:
def _new_entity_handler(entity_id):
self.debug(f"New entity {entity_id} was added to {host}")
self._dispatch_status()

signal = f"localtuya_entity_{self._device_config.id}"
self._call_on_close.append(
async_dispatcher_connect(self._hass, signal, _new_entity_handler)
)
signal = f"localtuya_entity_{self._device_config.id}"
self._unsub_new_entity = async_dispatcher_connect(
self._hass, signal, _new_entity_handler
)

if (scan_inv := int(self._device_config.scan_interval)) > 0:
self._unsub_refresh = async_track_time_interval(
self._hass, self._async_refresh, timedelta(seconds=scan_inv)
)

self._connect_task = None
self._task_connect = None
# Ensure the connected sub-device is in its gateway's sub_devices
# and reset offline/absent counters
if self.gateway:
Expand All @@ -299,33 +303,31 @@ def _new_entity_handler(entity_id):
self._interface.keep_alive(len(self.sub_devices) > 0)

# If not connected try to handle the errors.
if not self.connected:
if not self._reconnect_task:
self._call_on_close.append(
asyncio.create_task(self._async_reconnect()).cancel
)
if not self.connected and not self._is_closing:
if self._task_reconnect is None:
self._task_reconnect = asyncio.create_task(self._async_reconnect())
if update_localkey:
# Check if the cloud device info has changed!.
await self.update_local_key()

self._connect_task = None
self._task_connect = None

async def abort_connect(self):
"""Abort the connect process to the interface[device]"""
if self.is_subdevice:
self._interface = None
self._connect_task = None
self._task_connect = None

if self._interface is not None:
await self._interface.close()
self._interface = None

async def check_connection(self):
"""Ensure that the device is not still connecting; if it is, wait for it."""
if not self.connected and self._connect_task:
await self._connect_task
if not self.connected and self.gateway and self.gateway._connect_task:
await self.gateway._connect_task
if not self.connected and self._task_connect:
await self._task_connect
if not self.connected and self.gateway and self.gateway._task_connect:
await self.gateway._task_connect
if not self.connected:
self.error(f"Not connected to device {self._device_config.name}")

Expand All @@ -337,19 +339,30 @@ async def close(self):
self._is_closing = True
await self._shutdown_entities()

if self._connect_task is not None:
self._connect_task.cancel()
await self._connect_task
self._connect_task = None
if self._task_connect is not None:
self._task_connect.cancel()
await self._task_connect
self._task_connect = None

# Close subdevices first, to prevent them try to reconnect
# after gateway disconnected.
subdevices = list(self.sub_devices.values())
for subdevice in subdevices:
await subdevice.close()

for cb in self._call_on_close:
cb()
if self._task_reconnect is not None:
self._task_reconnect.cancel()
await self._task_reconnect
self._task_reconnect = None

if self._task_shutdown_entities is not None:
self._task_shutdown_entities.cancel()
await self._task_shutdown_entities
self._task_shutdown_entities = None

if self._unsub_new_entity is not None:
self._unsub_new_entity()
self._unsub_new_entity = None

await self.abort_connect()
self.debug(f"Closed connection", force=True)
Expand Down Expand Up @@ -428,40 +441,47 @@ async def set_dps(self, states):
async def _async_refresh(self, _now):
if self.connected:
self.debug("Refreshing dps for device")
# This a workdaround for >= 3.4 devices, since there is an issue on waiting for the correct seqno
# This a workaround for >= 3.4 devices, since there is an issue on waiting for the correct seqno
try:
await self._interface.update_dps(cid=self._node_id)
except TimeoutError:
pass

async def _sleep(self, seconds) -> bool:
xZetsubou marked this conversation as resolved.
Show resolved Hide resolved
"""Interruptable sleep. Returns True if cancelled."""
try:
await asyncio.sleep(seconds)
except asyncio.CancelledError:
self.debug(f"Sleep({seconds}) interrupted", force=True)
return True
return False

async def _async_reconnect(self):
"""Task: continuously attempt to reconnect to the device."""
if self._reconnect_task:
return

self._reconnect_task = True
attempts = 0
while True:
# for sub-devices, if it is reported as offline then no need for reconnect.
if self.is_subdevice and self._subdevice_off_count >= MIN_OFFLINE_EVENTS:
await asyncio.sleep(1)
if await self._sleep(1):
Lurker00 marked this conversation as resolved.
Show resolved Hide resolved
break
continue

# for sub-devices, if the gateway isn't connected then no need for reconnect.
if self.gateway and (
not self.gateway.connected or self.gateway.is_connecting
):
await asyncio.sleep(3)
if await self._sleep(3):
break
continue

if self._is_closing:
break

try:
if not self._connect_task:
if not self._task_connect:
await self.async_connect()
if self._connect_task:
await self._connect_task
if self._task_connect:
await self._task_connect
except asyncio.CancelledError as e:
self.debug(f"Reconnect task has been canceled: {e}", force=True)
break
Expand All @@ -475,9 +495,10 @@ async def _async_reconnect(self):
scale = (
2 if (self._subdevice_absent or attempts > MIN_OFFLINE_EVENTS) else 1
)
await asyncio.sleep(scale * RECONNECT_INTERVAL.total_seconds())
if await self._sleep(scale * RECONNECT_INTERVAL.total_seconds()):
break

self._reconnect_task = False
self._task_reconnect = None

def _dispatch_status(self):
signal = f"localtuya_{self._device_config.id}"
Expand Down Expand Up @@ -521,9 +542,11 @@ async def _shutdown_entities(self, exc=""):
"""Shutdown device entities"""
# Delay shutdown.
if not self._is_closing:
await asyncio.sleep(3 + self._device_config.sleep_time)
if await self._sleep(3 + self._device_config.sleep_time):
return

if self.connected or self.is_sleep:
self._task_shutdown_entities = None
return

signal = f"localtuya_{self._device_config.id}"
Expand All @@ -541,6 +564,8 @@ async def _shutdown_entities(self, exc=""):
else:
self.info(f"Disconnected due to: {exc}")

self._task_shutdown_entities = None

@callback
def status_updated(self, status: dict):
"""Device updated status."""
Expand All @@ -560,23 +585,26 @@ def disconnected(self, exc=""):

if self._unsub_refresh:
self._unsub_refresh()
self._unsub_refresh = None

if self.sub_devices:
for sub_dev in self.sub_devices.values():
sub_dev.disconnected("Gateway disconnected")

if self._connect_task is not None:
self._connect_task.cancel("Device disconnected")
self._connect_task = None
if self._task_connect is not None:
self._task_connect.cancel()
self._task_connect = None

# If it disconnects unexpectedly.
if self._is_closing:
return

self._call_on_close.append(asyncio.create_task(self._async_reconnect()).cancel)
self._call_on_close.append(
asyncio.create_task(self._shutdown_entities(exc=exc)).cancel
)
if self._task_reconnect is None:
self._task_reconnect = asyncio.create_task(self._async_reconnect())

if self._task_shutdown_entities is not None:
self._task_shutdown_entities.cancel()
self._task_shutdown_entities = asyncio.create_task(self._shutdown_entities(exc=exc))

@callback
def subdevice_state(self, state: SubdeviceState):
Expand Down
Loading