Skip to content

Commit

Permalink
Merge pull request craigjmidwinter#237 from austinmroczek/more_typing
Browse files Browse the repository at this point in the history
Improve typing
  • Loading branch information
austinmroczek authored Jan 20, 2025
2 parents deb65a7 + 600f184 commit 2449829
Show file tree
Hide file tree
Showing 7 changed files with 173 additions and 140 deletions.
127 changes: 74 additions & 53 deletions total_connect_client/client.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""TotalConnectClient() in this file is the primary class of this package.
Instantiate it like this:
usercodes = { 'default': '1234' }
Expand All @@ -10,9 +11,10 @@

import logging
import ssl
from ssl import SSLContext
import time
from importlib import resources as impresources

from typing import Dict, Any
import requests
import urllib3.poolmanager
import zeep
Expand Down Expand Up @@ -49,11 +51,13 @@
class _SslContextAdapter(requests.adapters.HTTPAdapter):
"""Makes Zeep use our ssl_context."""

def __init__(self, ssl_context, **kwargs):
def __init__(self, ssl_context: SSLContext, **kwargs) -> None:
self.ssl_context = ssl_context
super().__init__(**kwargs)

def init_poolmanager(self, num_pools, maxsize, block=False):
def init_poolmanager(
self, num_pools: int, maxsize: int, block: bool = False
) -> None:
self.poolmanager = urllib3.poolmanager.PoolManager(
num_pools=num_pools,
maxsize=maxsize,
Expand All @@ -69,36 +73,36 @@ class TotalConnectClient:

def __init__( # pylint: disable=too-many-arguments
self,
username,
password,
usercodes=None,
auto_bypass_battery=False,
retry_delay=6, # seconds between retries
):
username: str,
password: str,
usercodes: Dict[str, str] | None = None,
auto_bypass_battery: bool = False,
retry_delay: int = 6, # seconds between retries
) -> None:
"""Initialize."""
self.times = {}
self.time_start = time.time()
self.soap_client = None

self.username = username
self.password = password
self.username: str = username
self.password: str = password
self.usercodes = usercodes or {}
self.auto_bypass_low_battery = auto_bypass_battery
self.retry_delay = retry_delay

self.token = None
self._invalid_credentials = False
self._module_flags = None
self._user = None
self._locations = {}
self._locations_unfetched = {}
self._module_flags: Dict[str, str] = {}
self._user: TotalConnectUser | None = None
self._locations: Dict[Any, TotalConnectLocation] = {}
self._locations_unfetched: Dict[Any, TotalConnectLocation] = {}

self.authenticate()

self.times["__init__"] = time.time() - self.time_start

@property
def locations(self):
def locations(self) -> Dict[Any, TotalConnectLocation]:
"""Raises an exception if the panel cannot be reached to retrieve
metadata or details. This can be retried later and will succeed
if/when the panel becomes reachable.
Expand All @@ -118,7 +122,7 @@ def locations(self):
assert not self._locations_unfetched
return self._locations

def __str__(self):
def __str__(self) -> str:
"""Return a text string that is printable."""
data = (
f"CLIENT\n\n"
Expand All @@ -141,7 +145,7 @@ def __str__(self):

return data + locations

def times_as_string(self):
def times_as_string(self) -> str:
"""Return a string with times."""
self.times["total running time"] = time.time() - self.time_start
msg = "total-connect-client time info (seconds):\n"
Expand All @@ -150,10 +154,8 @@ def times_as_string(self):

return msg

def _raise_for_retry(self, response):
"""Used internally to determine which responses should be retried in
request().
"""
def _raise_for_retry(self, response: Dict[str, Any]) -> None:
"""Determine which responses should be retried in request()."""
rc = _ResultCode.from_response(response)
if rc == _ResultCode.INVALID_SESSION:
raise InvalidSessionError("invalid session", response)
Expand All @@ -166,8 +168,9 @@ def _raise_for_retry(self, response):
if rc == _ResultCode.BAD_OBJECT_REFERENCE:
raise RetryableTotalConnectError("bad object reference", response)

def raise_for_resultcode(self, response):
def raise_for_resultcode(self, response: Dict[str, Any]) -> None:
"""If response.ResultCode indicates success, return and do nothing.
If it indicates an authentication error, raise AuthenticationError.
"""
rc = _ResultCode.from_response(response)
Expand All @@ -179,7 +182,11 @@ def raise_for_resultcode(self, response):
):
return
self._raise_for_retry(response)
if rc in (_ResultCode.BAD_USER_OR_PASSWORD, _ResultCode.AUTHENTICATION_FAILED, _ResultCode.ACCOUNT_LOCKED):
if rc in (
_ResultCode.BAD_USER_OR_PASSWORD,
_ResultCode.AUTHENTICATION_FAILED,
_ResultCode.ACCOUNT_LOCKED,
):
raise AuthenticationError(rc.name, response)
if rc == _ResultCode.USER_CODE_UNAVAILABLE:
raise UsercodeUnavailable(rc.name, response)
Expand All @@ -191,7 +198,9 @@ def raise_for_resultcode(self, response):
raise FailedToBypassZone(rc.name, response)
raise BadResultCodeError(rc.name, response)

def _send_one_request(self, operation_name, args):
def _send_one_request(
self, operation_name: str, args: list[Any] | tuple[Any, ...]
) -> Dict[str, Any]:
LOGGER.debug(f"sending API request {operation_name}{args}")
operation_proxy = self.soap_client.service[operation_name]
return zeep.helpers.serialize_object(operation_proxy(*args))
Expand All @@ -200,9 +209,15 @@ def _send_one_request(self, operation_name, args):
API_APP_ID = "14588"
API_APP_VERSION = "1.0.34"

def request(self, operation_name, args, attempts_remaining: int=5):
"""Send a SOAP request. args is a list or tuple defining the
parameters to the operation.
def request(
self,
operation_name: str,
args: list[Any] | tuple[Any, ...],
attempts_remaining: int = 5,
) -> Dict[str, Any]:
"""Send a SOAP request.
args is a list or tuple defining the parameters to the operation.
"""
is_first_request = attempts_remaining == 5
attempts_remaining -= 1
Expand Down Expand Up @@ -274,8 +289,10 @@ def request(self, operation_name, args, attempts_remaining: int=5):
args = [self.token if old_token == arg else arg for arg in args]
return self.request(operation_name, args, attempts_remaining)

def authenticate(self):
"""Login to the system. Upon success, self.token is a valid credential
def authenticate(self) -> None:
"""Login to the system.
Upon success, self.token is a valid credential
for further API calls, and self._user and self.locations are valid.
self.locations will not be refreshed if it was non-empty on entry.
"""
Expand Down Expand Up @@ -313,11 +330,9 @@ def authenticate(self):
LOGGER.info(f"{self.username} authenticated: {len(self._locations)} locations")
self.times["authenticate()"] = time.time() - start_time

def validate_usercode(self, device_id, usercode:str)-> bool:
def validate_usercode(self, device_id: str, usercode: str) -> bool:
"""Return True if the usercode is valid for the device."""
response = self.request(
"ValidateUserCode", (self.token, device_id, str(usercode))
)
response = self.request("ValidateUserCode", (self.token, device_id, usercode))
try:
self.raise_for_resultcode(response)
except UsercodeInvalid:
Expand All @@ -328,29 +343,32 @@ def validate_usercode(self, device_id, usercode:str)-> bool:
return False
return True

def is_logged_in(self)->bool:
"""Return true if the client is logged into the Total Connect service
with valid credentials.
"""
def is_logged_in(self) -> bool:
"""Return true if the client is logged in to Total Connect."""
return self.token is not None

def log_out(self):
"""Upon return, we are logged out. Raises TotalConnectError if we
still might be logged in.
def log_out(self) -> None:
"""Upon return, we are logged out.
Raises TotalConnectError if we still might be logged in.
"""
if self.is_logged_in():
response = self.request("Logout", (self.token,))
self.raise_for_resultcode(response)
LOGGER.info("Logout Successful")
self.token = None

def get_number_locations(self)->int:
"""Return the number of locations. Home Assistant needs a way
to force the locations to load inside a callable function.
def get_number_locations(self) -> int:
"""Return the number of locations.
Home Assistant needs a way to force the locations to load
inside a callable function.
"""
return len(self.locations)

def _make_locations(self, response):
def _make_locations(
self, response: Dict[str, Any]
) -> Dict[Any, TotalConnectLocation]:
"""Return a dict mapping LocationID to TotalConnectLocation."""
start_time = time.time()
new_locations = {}
Expand Down Expand Up @@ -381,34 +399,37 @@ def _make_locations(self, response):
class ArmingHelper:
"""
For a partition or location, you can call its arm() or disarm() method directly.
Example: partition.arm(ArmType.AWAY)
Example: partition.arm(ArmType.AWAY)
Alternatively, you can use ArmingHelper.
Example: ArmingHelper(partition).arm_away()
"""

def __init__(self, partition_or_location):
def __init__(self, partition_or_location) -> None:
"""Initialize ArmingHelper."""
self.armable = partition_or_location

def arm_away(self):
def arm_away(self) -> None:
"""Arm the system (Away)."""
self.armable.arm(ArmType.AWAY)

def arm_stay(self):
def arm_stay(self) -> None:
"""Arm the system (Stay)."""
self.armable.arm(ArmType.STAY)

def arm_stay_instant(self):
def arm_stay_instant(self) -> None:
"""Arm the system (Stay - Instant)."""
self.armable.arm(ArmType.STAY_INSTANT)

def arm_away_instant(self):
def arm_away_instant(self) -> None:
"""Arm the system (Away - Instant)."""
self.armable.arm(ArmType.AWAY_INSTANT)

def arm_stay_night(self):
def arm_stay_night(self) -> None:
"""Arm the system (Stay - Night)."""
self.armable.arm(ArmType.STAY_NIGHT)

def disarm(self):
def disarm(self) -> None:
"""Disarm the system."""
self.armable.disarm()
31 changes: 17 additions & 14 deletions total_connect_client/const.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Total Connect Client constants."""

from enum import Enum
from typing import Dict, Any

from .exceptions import BadResultCodeError, ServiceUnavailable

Expand Down Expand Up @@ -51,27 +52,29 @@ class ArmingState(Enum):
ARMING = 10307
DISARMING = 10308

def is_arming(self):
UNKNOWN = 0

def is_arming(self) -> bool:
"""Return true if the system is in the process of arming."""
return self == ArmingState.ARMING

def is_disarming(self):
def is_disarming(self) -> bool:
"""Return true if the system is in the process of disarming."""
return self == ArmingState.DISARMING

def is_pending(self):
def is_pending(self) -> bool:
"""Return true if the system is pending an action."""
return self.is_disarming() or self.is_arming()

def is_disarmed(self):
def is_disarmed(self) -> bool:
"""Return True if the system is disarmed."""
return self in (
ArmingState.DISARMED,
ArmingState.DISARMED_BYPASS,
ArmingState.DISARMED_ZONE_FAULTED,
)

def is_armed_away(self):
def is_armed_away(self) -> bool:
"""Return True if the system is armed away in any way."""
return self in (
ArmingState.ARMED_AWAY,
Expand All @@ -80,11 +83,11 @@ def is_armed_away(self):
ArmingState.ARMED_AWAY_INSTANT_BYPASS,
)

def is_armed_custom_bypass(self):
def is_armed_custom_bypass(self) -> bool:
"""Return True if the system is armed custom bypass in any way."""
return self == ArmingState.ARMED_CUSTOM_BYPASS

def is_armed_home(self):
def is_armed_home(self) -> bool:
"""Return True if the system is armed home/stay in any way."""
return self in (
ArmingState.ARMED_STAY,
Expand All @@ -102,7 +105,7 @@ def is_armed_home(self):
ArmingState.ARMED_STAY_OTHER,
)

def is_armed_night(self):
def is_armed_night(self) -> bool:
"""Return True if the system is armed night in any way."""
return self in (
ArmingState.ARMED_STAY_NIGHT,
Expand All @@ -111,7 +114,7 @@ def is_armed_night(self):
ArmingState.ARMED_STAY_NIGHT_INSTANT_BYPASS_PROA7,
)

def is_armed(self):
def is_armed(self) -> bool:
"""Return True if the system is armed in any way."""
return (
self.is_armed_away()
Expand All @@ -120,22 +123,22 @@ def is_armed(self):
or self.is_armed_custom_bypass()
)

def is_triggered_police(self):
def is_triggered_police(self) -> bool:
"""Return True if the system is triggered for police or medical."""
return self == ArmingState.ALARMING

def is_triggered_fire(self):
def is_triggered_fire(self) -> bool:
"""Return True if the system is triggered for fire or smoke."""
return self == ArmingState.ALARMING_FIRE_SMOKE

def is_triggered_gas(self):
def is_triggered_gas(self) -> bool:
"""Return True if the system is triggered for carbon monoxide."""
return self in (
ArmingState.ALARMING_CARBON_MONOXIDE,
ArmingState.ALARMING_CARBON_MONOXIDE_PROA7,
)

def is_triggered(self):
def is_triggered(self) -> bool:
"""Return True if the system is triggered in any way."""
return (
self.is_triggered_fire()
Expand All @@ -152,7 +155,7 @@ class _ResultCode(Enum):
"""

@staticmethod
def from_response(response_dict):
def from_response(response_dict:Dict[str, Any]):
try:
return _ResultCode(response_dict["ResultCode"])
except TypeError:
Expand Down
Loading

0 comments on commit 2449829

Please sign in to comment.