Skip to content
This repository has been archived by the owner on Jun 9, 2023. It is now read-only.

Commit

Permalink
Get mypy passing (with a few ignores to fix later) (#16)
Browse files Browse the repository at this point in the history
  • Loading branch information
bdraco authored Jan 13, 2022
1 parent a52ac9b commit 417a0a9
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 73 deletions.
70 changes: 26 additions & 44 deletions aiosenseme/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@
Source can be found at https://github.com/mikelawrence/aiosenseme
"""
from __future__ import annotations

import asyncio
import inspect
import ipaddress
import logging
import random
import traceback
from typing import Any, Callable, Tuple
from typing import Any, Callable, Coroutine

_LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -142,7 +144,7 @@ def __init__(self, name, endpoint: SensemeEndpoint):
self._endpoint = endpoint

# Protocol methods
def connection_made(self, transport: asyncio.Protocol):
def connection_made(self, transport: asyncio.BaseTransport) -> None:
"""Socket connect on SenseME Protocol."""
_LOGGER.debug("%s: Connected", self._name)
self._endpoint.transport = transport
Expand Down Expand Up @@ -207,12 +209,12 @@ def __init__(
self._has_sensor = False
else:
self._has_sensor = None
self._room_name = None
self._room_type = None
self._room_name: str | None = None
self._room_type: int | None = None
self._fw_name = "Unknown"
self._fw_version = None
self._fw_version: str | None = None

self._data = dict()
self._data: dict[str, Any] = dict()
self._is_running = False
self._is_connected = asyncio.Event()
self._connection_lost = False
Expand All @@ -221,8 +223,8 @@ def __init__(
self._updater_task = None
self._error_count = 0
self._leftover = ""
self._callbacks = []
self._coroutine_callbacks = []
self._callbacks: list[Callable] = []
self._coroutine_callbacks: list[Coroutine] = []
self._first_update = asyncio.Event()

def __eq__(self, other: Any) -> bool:
Expand All @@ -247,30 +249,10 @@ def __hash__(self) -> int:
@property
def is_sec_info_complete(self) -> bool:
"""Return if all secondary information is complete."""
if self._name is None:
return False
if self._uuid is None:
return False
if self._mac is None:
return False
if self._base_model is None:
return False
if self._room_name is None:
return False
if self._room_type is None:
return False
if self._has_light is None:
return False
if self._has_sensor is None:
return False
if self._fw_version is None:
return False
if self._base_model is None:
return False
return True
return any(val is None for val in self.get_device_info.values())

@property
def get_device_info(self) -> dict:
def get_device_info(self) -> dict[str, Any]:
"""Get a dict with all key information abouth this device."""
return {
"name": self._name,
Expand Down Expand Up @@ -549,7 +531,7 @@ def light_brightness_max(self) -> int | None:
return None

@property
def light_brightness_limits_room(self) -> Tuple | None:
def light_brightness_limits_room(self) -> tuple | None:
"""Return a tuple of the min and max light brightness for the room.
A room can limit the minimum/maximum light brightness while keeping the same
Expand Down Expand Up @@ -628,9 +610,9 @@ async def async_fill_out_info(self) -> bool:
"Retrieve device information: Status Update from address %s",
self.address,
)
writer.write(f"<{self._address};DEVICE;ID;GET>".encode("utf-8"))
writer.write(f"<{self._address};SNSROCC;STATUS;GET>".encode("utf-8"))
writer.write(f"<{self._address};GETALL;GET>".encode("utf-8"))
writer.write(f"<{self._address};DEVICE;ID;GET>".encode())
writer.write(f"<{self._address};SNSROCC;STATUS;GET>".encode())
writer.write(f"<{self._address};GETALL;GET>".encode())

leftover = ""
while True:
Expand Down Expand Up @@ -668,12 +650,12 @@ def add_callback(self, callback: Callable) -> None:
is_coroutine = inspect.iscoroutinefunction(callback)
if is_coroutine:
if callback not in self._coroutine_callbacks:
self._coroutine_callbacks.append(callback)
self._coroutine_callbacks.append(callback) # type: ignore
_LOGGER.debug("%s: Added coroutine callback", self.name)
return

if callback not in self._callbacks:
self._callbacks.append(callback)
self._callbacks.append(callback) # type: ignore
_LOGGER.debug("%s: Added function callback", self.name)

def remove_callback(self, callback) -> None:
Expand Down Expand Up @@ -714,9 +696,9 @@ def _execute_callbacks(self) -> None:
for callback in self._callbacks:
count += 1
callback()
for callback in self._coroutine_callbacks:
for coro in self._coroutine_callbacks:
count += 1
asyncio.create_task(callback())
asyncio.create_task(coro()) # type: ignore
_LOGGER.debug("%s: %s callback(s) happened.", self.name, count)

def _send_command(self, cmd) -> None:
Expand All @@ -726,7 +708,7 @@ def _send_command(self, cmd) -> None:
msg = f"<{self.mac};{cmd}>"
self._endpoint.send(msg)

def _process_message(self, line) -> Tuple[str, bool]:
def _process_message(self, line) -> tuple[str, bool]: # noqa: C901
"""Process messages from device.
May contain multiple responses in one line string.
Expand Down Expand Up @@ -852,7 +834,7 @@ async def _updater(self):
await asyncio.sleep(10)
raise

async def _listener(self):
async def _listener(self): # noqa: C901
"""Task that listens for device status changes.
Maintains an open socket to the device at all times.
Expand Down Expand Up @@ -999,7 +981,7 @@ def has_light(self) -> bool | None:
return self._has_light

@property
def fan_on(self) -> bool:
def fan_on(self) -> bool | None:
"""Return True when fan is on at any speed."""
state = self._data.get("FAN;PWR", None)
if state:
Expand Down Expand Up @@ -1046,12 +1028,12 @@ def fan_speed_max(self) -> int | None:
return None

@property
def fan_speed_limits(self) -> Tuple[int | None, int | None]:
def fan_speed_limits(self) -> tuple[int | None, int | None]:
"""Return a tuple of the min/max fan speeds."""
return self.fan_speed_min, self.fan_speed_max

@property
def fan_speed_limits_room(self) -> Tuple[int, int] | None:
def fan_speed_limits_room(self) -> tuple[int, int] | None:
"""Return a tuple of the min/max fan speeds the room is configured to support.
A room can limit the minimum/maximum fan speed while keeping the same number of
Expand All @@ -1069,7 +1051,7 @@ def fan_speed_limits_room(self) -> Tuple[int, int] | None:
return min_speed, max_speed

@fan_speed_limits_room.setter
def fan_speed_limits_room(self, speeds: Tuple) -> None:
def fan_speed_limits_room(self, speeds: tuple) -> None:
"""Set a tuple of the min/max fan speeds the room is configured to support."""
if speeds[0] >= speeds[1]:
_LOGGER.error("Min speed cannot exceed max speed")
Expand Down
Loading

0 comments on commit 417a0a9

Please sign in to comment.