Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix lock encryption key retrieval #236

Merged
merged 6 commits into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
aiohttp>=3.9.5
bleak>=0.17.0
bleak-retry-connector>=2.9.0
cryptography>=38.0.3
boto3>=1.20.24
requests>=2.28.1
3 changes: 1 addition & 2 deletions requirements_dev.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
pytest-asyncio
pytest-cov
aiohttp>=3.9.5
bleak>=0.17.0
bleak-retry-connector>=3.4.0
cryptography>=38.0.3
boto3>=1.20.24
requests>=2.28.1
3 changes: 1 addition & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@
name="PySwitchbot",
packages=["switchbot", "switchbot.devices", "switchbot.adv_parsers"],
install_requires=[
"aiohttp>=3.9.5",
"bleak>=0.19.0",
"bleak-retry-connector>=3.4.0",
"cryptography>=39.0.0",
"pyOpenSSL>=23.0.0",
"boto3>=1.20.24",
"requests>=2.28.1",
],
version="0.44.1",
description="A library to communicate with Switchbot",
Expand Down
11 changes: 2 additions & 9 deletions switchbot/api_config.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,6 @@
# Those values have been obtained from the following files in SwitchBot Android app
# That's how you can verify them yourself
# /assets/switchbot_config.json
# /res/raw/amplifyconfiguration.json
# /res/raw/awsconfiguration.json

SWITCHBOT_APP_API_BASE_URL = "https://l9ren7efdj.execute-api.us-east-1.amazonaws.com"
SWITCHBOT_APP_COGNITO_POOL = {
"PoolId": "us-east-1_x1fixo5LC",
"AppClientId": "66r90hdllaj4nnlne4qna0muls",
"AppClientSecret": "1v3v7vfjsiggiupkeuqvsovg084e3msbefpj9rgh611u30uug6t8",
"Region": "us-east-1",
}
SWITCHBOT_APP_API_BASE_URL = "api.switchbot.net"
SWITCHBOT_APP_CLIENT_ID = "5nnwmhmsa9xxskm14hd85lm9bm"
8 changes: 8 additions & 0 deletions switchbot/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,14 @@
DEFAULT_SCAN_TIMEOUT = 5


class SwitchbotApiError(RuntimeError):
dsypniewski marked this conversation as resolved.
Show resolved Hide resolved
"""Raised when API call fails.

This exception inherits from RuntimeError to avoid breaking existing code
but will be changed to Exception in a future release.
"""


class SwitchbotAuthenticationError(RuntimeError):
"""Raised when authentication fails.

Expand Down
137 changes: 77 additions & 60 deletions switchbot/devices/lock.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,20 @@
"""Library to handle connection with Switchbot Lock."""
from __future__ import annotations

import base64
import hashlib
import hmac
import json
import asyncio
import logging
import time
from typing import Any

import boto3
import requests
import aiohttp
from bleak.backends.device import BLEDevice
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes

from ..api_config import SWITCHBOT_APP_API_BASE_URL, SWITCHBOT_APP_COGNITO_POOL
from ..api_config import SWITCHBOT_APP_API_BASE_URL, SWITCHBOT_APP_CLIENT_ID
from ..const import (
LockStatus,
SwitchbotAccountConnectionError,
SwitchbotApiError,
SwitchbotAuthenticationError,
)
from .device import SwitchbotDevice, SwitchbotOperationError
Expand Down Expand Up @@ -86,77 +83,97 @@ async def verify_encryption_key(

return lock_info is not None

@staticmethod
async def api_request(
session: aiohttp.ClientSession,
subdomain: str,
path: str,
data: dict = None,
headers: dict = None,
) -> dict:
url = f"https://{subdomain}.{SWITCHBOT_APP_API_BASE_URL}/{path}"
async with session.post(url, json=data, headers=headers) as result:
dsypniewski marked this conversation as resolved.
Show resolved Hide resolved
if result.status > 299:
raise SwitchbotApiError(
f"Unexpected status code returned by SwitchBot API: {result.status}"
)

response = await result.json()
if response["statusCode"] != 100:
raise SwitchbotApiError(
f"{response['message']}, status code: {response['statusCode']}"
)

return response["body"]

# Old non-async method preserved for backwards compatibility
@staticmethod
def retrieve_encryption_key(device_mac: str, username: str, password: str):
async def async_fn():
async with aiohttp.ClientSession() as session:
return await SwitchbotLock.async_retrieve_encryption_key(
session, device_mac, username, password
)

return asyncio.run(async_fn())

@staticmethod
async def async_retrieve_encryption_key(
session: aiohttp.ClientSession, device_mac: str, username: str, password: str
) -> dict:
"""Retrieve lock key from internal SwitchBot API."""
device_mac = device_mac.replace(":", "").replace("-", "").upper()
msg = bytes(username + SWITCHBOT_APP_COGNITO_POOL["AppClientId"], "utf-8")
secret_hash = base64.b64encode(
hmac.new(
SWITCHBOT_APP_COGNITO_POOL["AppClientSecret"].encode(),
msg,
digestmod=hashlib.sha256,
).digest()
).decode()

cognito_idp_client = boto3.client(
"cognito-idp", region_name=SWITCHBOT_APP_COGNITO_POOL["Region"]
)

try:
auth_response = cognito_idp_client.initiate_auth(
ClientId=SWITCHBOT_APP_COGNITO_POOL["AppClientId"],
AuthFlow="USER_PASSWORD_AUTH",
AuthParameters={
"USERNAME": username,
"PASSWORD": password,
"SECRET_HASH": secret_hash,
auth_result = await SwitchbotLock.api_request(
session,
"account",
"account/api/v1/user/login",
{
"clientId": SWITCHBOT_APP_CLIENT_ID,
"username": username,
"password": password,
"grantType": "password",
"verifyCode": "",
},
)
except cognito_idp_client.exceptions.NotAuthorizedException as err:
raise SwitchbotAuthenticationError(
f"Failed to authenticate: {err}"
) from err
auth_headers = {"authorization": auth_result["access_token"]}
except Exception as err:
raise SwitchbotAuthenticationError(
f"Unexpected error during authentication: {err}"
) from err
raise SwitchbotAuthenticationError(f"Authentication failed: {err}") from err

if (
auth_response is None
or "AuthenticationResult" not in auth_response
or "AccessToken" not in auth_response["AuthenticationResult"]
):
raise SwitchbotAuthenticationError("Unexpected authentication response")
try:
userinfo = await SwitchbotLock.api_request(
session, "account", "account/api/v1/user/userinfo", {}, auth_headers
)
if "botRegion" in userinfo and userinfo["botRegion"] != "":
region = userinfo["botRegion"]
else:
region = "us"
except Exception as err:
raise SwitchbotAccountConnectionError(
f"Failed to retrieve SwitchBot Account user details: {err}"
) from err

access_token = auth_response["AuthenticationResult"]["AccessToken"]
try:
key_response = requests.post(
url=SWITCHBOT_APP_API_BASE_URL + "/developStage/keys/v1/communicate",
headers={"authorization": access_token},
json={
device_info = await SwitchbotLock.api_request(
session,
f"wonderlabs.{region}",
"wonder/keys/v1/communicate",
{
"device_mac": device_mac,
"keyType": "user",
},
timeout=10,
auth_headers,
)
except requests.exceptions.RequestException as err:

return {
"key_id": device_info["communicationKey"]["keyId"],
"encryption_key": device_info["communicationKey"]["key"],
}
except Exception as err:
raise SwitchbotAccountConnectionError(
f"Failed to retrieve encryption key from SwitchBot Account: {err}"
) from err
if key_response.status_code > 299:
raise SwitchbotAuthenticationError(
f"Unexpected status code returned by SwitchBot Account API: {key_response.status_code}"
)
key_response_content = json.loads(key_response.content)
if key_response_content["statusCode"] != 100:
raise SwitchbotAuthenticationError(
f"Unexpected status code returned by SwitchBot API: {key_response_content['statusCode']}"
)

return {
"key_id": key_response_content["body"]["communicationKey"]["keyId"],
"encryption_key": key_response_content["body"]["communicationKey"]["key"],
}

async def lock(self) -> bool:
"""Send lock command."""
Expand Down
Loading