Skip to content

Commit

Permalink
feat: add configuration option to change the upstream URL (#62)
Browse files Browse the repository at this point in the history
allows selecting between different Cloud services (or local-only mode) for uploading Wibeee push data.

* disable persistent HTTP connections to Wibeee Cloud to avoid `aiohttp.client_exceptions.ServerDisconnectedError`

* fix: hack to work around invalid JSON posted by Wibeee devices

---------

Co-authored-by: Luis Miranda <[email protected]>
  • Loading branch information
edurenye and luuuis authored Jun 12, 2023
1 parent 7d574d6 commit bb596f6
Show file tree
Hide file tree
Showing 6 changed files with 176 additions and 52 deletions.
24 changes: 22 additions & 2 deletions custom_components/wibeee/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant

from .api import WibeeeAPI
from .const import (DOMAIN, DEFAULT_SCAN_INTERVAL, DEFAULT_TIMEOUT)
from .const import DOMAIN, CONF_NEST_UPSTREAM, NEST_PROXY_DISABLED, NEST_DEFAULT_UPSTREAM

_LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -58,3 +57,24 @@ async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
async def async_update_options(hass: HomeAssistant, entry: ConfigEntry) -> None:
"""Update options."""
await hass.config_entries.async_reload(entry.entry_id)


async def async_migrate_entry(hass: HomeAssistant, config_entry: ConfigEntry):
"""Migrate old entry."""
_LOGGER.debug("Migrating from version %s", config_entry.version)

# Migrate from "Use Nest Proxy" checkbox to "Nest Cloud Service" select list
if config_entry.version == 1:
v1_conf_nest_proxy_enable = 'nest_proxy_enable' # v1 config option that is no longer used.

options = config_entry.options
use_nest_proxy = options.get(v1_conf_nest_proxy_enable, False)
nest_upstream = NEST_DEFAULT_UPSTREAM if use_nest_proxy else NEST_PROXY_DISABLED

new_options = {k: v for k, v in options.items() if k != v1_conf_nest_proxy_enable} | {CONF_NEST_UPSTREAM: nest_upstream}

config_entry.version = 2
hass.config_entries.async_update_entry(config_entry, options=new_options)
_LOGGER.info("Migration to version %s successful", config_entry.version)

return True
33 changes: 19 additions & 14 deletions custom_components/wibeee/config_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@
from homeassistant.data_entry_flow import AbortFlow
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.device_registry import format_mac
from homeassistant.helpers.selector import SelectSelectorConfig, SelectSelectorMode, SelectSelector

from .api import WibeeeAPI
from .const import (DOMAIN, DEFAULT_SCAN_INTERVAL, CONF_NEST_PROXY_ENABLE)
from .const import DOMAIN, DEFAULT_SCAN_INTERVAL, CONF_NEST_UPSTREAM, NEST_ALL_UPSTREAMS, NEST_PROXY_DISABLED
from .util import short_mac

_LOGGER = logging.getLogger(__name__)
Expand All @@ -37,7 +38,7 @@ async def validate_input(hass: HomeAssistant, user_input: dict) -> [str, str, di

class WibeeeConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
"""Wibeee config flow."""
VERSION = 1
VERSION = 2

async def async_step_user(self, user_input=None):
"""Handle the initial step."""
Expand Down Expand Up @@ -87,24 +88,28 @@ class WibeeeOptionsFlowHandler(config_entries.OptionsFlow):
def __init__(self, config_entry):
"""Initialize options flow."""
self.config_entry = config_entry
self.options = dict(config_entry.options)

async def async_step_init(self, user_input=None):
"""Manage the options."""
"""Main options."""
if user_input is not None:
return self.async_create_entry(title="", data=user_input)
self.options.update(user_input)
return self.async_create_entry(title="", data=self.options)

data_schema = vol.Schema({
vol.Optional(
CONF_SCAN_INTERVAL,
default=self.config_entry.options.get(CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL.total_seconds())
): int,
vol.Required(
CONF_NEST_UPSTREAM,
default=self.config_entry.options.get(CONF_NEST_UPSTREAM, NEST_PROXY_DISABLED)
): SelectSelector(SelectSelectorConfig(options=NEST_ALL_UPSTREAMS, mode=SelectSelectorMode.DROPDOWN))
})

return self.async_show_form(
step_id="init",
data_schema=vol.Schema({
vol.Optional(
CONF_SCAN_INTERVAL,
default=self.config_entry.options.get(CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL.total_seconds())
): int,
vol.Optional(
CONF_NEST_PROXY_ENABLE,
default=self.config_entry.options.get(CONF_NEST_PROXY_ENABLE, False)
): bool
}),
data_schema=data_schema
)


Expand Down
20 changes: 19 additions & 1 deletion custom_components/wibeee/const.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,26 @@
from datetime import timedelta

from homeassistant.helpers.selector import SelectOptionDict

DOMAIN = 'wibeee'
NEST_DEFAULT_UPSTREAM = 'http://nest-ingest.wibeee.com'

DEFAULT_SCAN_INTERVAL = timedelta(seconds=15)
DEFAULT_TIMEOUT = timedelta(seconds=10)

CONF_NEST_PROXY_ENABLE = 'nest_proxy_enable'
CONF_NEST_UPSTREAM = 'nest_upstream'


def _format_options(upstreams: dict[str, str]) -> list[SelectOptionDict]:
return [SelectOptionDict(label=f'{cloud} ({url})', value=url) for cloud, url in upstreams.items()]


NEST_PROXY_DISABLED: str = 'proxy_disabled'
NEST_NULL_UPSTREAM: str = 'proxy_null'
NEST_ALL_UPSTREAMS: list[SelectOptionDict] = [SelectOptionDict(label='Disabled (polling only)', value=NEST_PROXY_DISABLED),
SelectOptionDict(label='Local only (no Cloud)', value=NEST_NULL_UPSTREAM)] + \
_format_options({
'Wibeee Nest': NEST_DEFAULT_UPSTREAM,
'Iberdrola': 'http://datosmonitorconsumo.iberdrola.es:8080',
'SolarProfit': 'http://wdata.solarprofit.es:8080',
})
127 changes: 100 additions & 27 deletions custom_components/wibeee/nest.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
import logging
from typing import Callable, Dict, Tuple
from typing import Callable, Dict, Tuple, NamedTuple, Awaitable, Optional
from urllib.parse import parse_qsl

from homeassistant.components.network import async_get_source_ip
Expand All @@ -8,67 +9,100 @@
from homeassistant.helpers import singleton
from homeassistant.helpers.typing import EventType

from .const import NEST_NULL_UPSTREAM

LOGGER = logging.getLogger(__name__)

import aiohttp
from aiohttp import web
from aiohttp.web_routedef import _HandlerType

from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.core import HomeAssistant
from homeassistant.const import EVENT_HOMEASSISTANT_STOP


class DeviceConfig(NamedTuple):
handle_push_data: Callable[[Dict], None]
"""Callback that will receive push data."""
upstream: str
"""The upstream server to forward data to"""


class NestProxy(object):
_listeners = {}
_listeners: Dict[str, DeviceConfig] = {}

def register_device(self, mac_address: str, listener: Callable[[Dict], None]):
self._listeners[mac_address] = listener
def register_device(self, mac_address: str, push_data_listener: Callable[[Dict], None], upstream: str):
self._listeners[mac_address] = DeviceConfig(
handle_push_data=push_data_listener,
upstream=upstream
)

def unregister_device(self, mac_address: str):
self._listeners.pop(mac_address)

def dispatch(self, mac_addr: str, pushed_data: Dict):
if mac_addr in self._listeners:
self._listeners[mac_addr](pushed_data)
else:
LOGGER.debug("Ignoring pushed data from %s: %s", mac_addr, pushed_data)
def get_device_info(self, mac_addr: str) -> DeviceConfig:
return self._listeners.get(mac_addr, None)


@singleton.singleton("wibeee_nest_proxy")
async def get_nest_proxy(
hass: HomeAssistant,
upstream='http://nest-ingest.wibeee.com',
local_port=8600,
) -> NestProxy:
session = async_get_clientsession(hass)
nest_proxy = NestProxy()

def nest_forward(snoop_data: Callable[[web.Request], Tuple[str, Dict]] = None) -> _HandlerType:
# disable persistent HTTP connections as the Wibeee Cloud will otherwise
# time out our connections, causing a ServerDisconnectedError below.
connector = aiohttp.TCPConnector(force_close=True)
session = aiohttp.ClientSession(connector=connector)

@callback
def close_session(ev: EventType) -> None:
session.detach()
connector.close()

hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, close_session)

def nest_forward(decode_data: Callable[[web.Request], Awaitable[Tuple[str, Dict]]]) -> _HandlerType:
async def handler(req: web.Request) -> web.StreamResponse:
url = f'{upstream}{req.path}?{req.query_string}'
req_body = (await req.read()) if req.can_read_body else None
mac_addr, push_data, forward_body = await decode_data(req)
device_info = nest_proxy.get_device_info(mac_addr)

res = await session.request(req.method, url, data=req_body)
res_body = await res.read()
if device_info is None:
LOGGER.debug("Ignoring unexpected push data from %s received as %s %s: %s", mac_addr, req.method, req.path, push_data)
return web.Response(status=404) # Not Found

if snoop_data:
mac_addr, push_data = snoop_data(req)
nest_proxy.dispatch(mac_addr, push_data)
LOGGER.debug("Updating sensors using push data from %s received as %s %s: %s", mac_addr, req.method, req.path, push_data)
device_info.handle_push_data(push_data)

return web.Response(headers=res.headers, body=res_body)
if device_info.upstream == NEST_NULL_UPSTREAM:
# don't send to any upstream.
LOGGER.debug("Accepted local-only push data from %s in %s %s: %s", mac_addr, req.method, req.path, push_data)
return web.Response(status=202) # Accepted

return handler
url = f'{device_info.upstream}{req.path_qs}'
try:
LOGGER.debug("Forwarding push data from %s using %s %s: %s", mac_addr, req.method, url, push_data)
res = await session.request(req.method, url, data=forward_body)
res_body = await res.read()
if res.status < 200 or res.status > 299:
LOGGER.warning('Wibeee Cloud returned %d for forwarded request: %s', res.status, res_body)

return web.Response(status=res.status, headers=res.headers, body=res_body)

def extract_query_params(req: web.Request) -> Tuple[str, Dict]:
"""Extracts Wibeee data from query params."""
query = {k: v for k, v in parse_qsl(req.query_string)}
return query['mac'], query
except aiohttp.ClientError as e:
LOGGER.error('Wibeee Cloud HTTP error during %d %s', req.method, req.path, exc_info=e)
return web.Response(status=500) # Server Error

return handler

app = aiohttp.web.Application()
app.add_routes([
web.get('/Wibeee/receiverAvg', nest_forward(extract_query_params)),
web.get('/Wibeee/receiverLeap', nest_forward(extract_query_params)),
web.route('*', '/{anypath:.*}', nest_forward()),
web.post('/Wibeee/receiverAvgPost', nest_forward(extract_json_body)),
web.post('/Wibeee/receiverJSON', nest_forward(extract_json_body)),
web.route('*', '/{anypath:.*}', unknown_path_handler),
])

# don't listen on public IP
Expand All @@ -88,3 +122,42 @@ def shutdown_proxy(ev: EventType) -> None:

hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, shutdown_proxy)
return nest_proxy


async def extract_query_params(req: web.Request) -> Tuple[str, Dict, Optional[str]]:
"""Extracts Wibeee data from query params."""
query = {k: v for k, v in parse_qsl(req.query_string)}
return query['mac'], query, await req.text() if req.can_read_body else None


async def extract_json_body(req: web.Request) -> Tuple[Optional[str], Dict, str]:
"""Extracts Wibeee data from JSON request body."""
body = await req.text() if req.can_read_body else None
LOGGER.debug("Parsing JSON in %s %s", req.method, req.path, body)
parsed_body = None
parse_error = None
try:
parsed_body = {} if body is None else json.loads(body)

except json.decoder.JSONDecodeError as e:
# Wibeee will send invalid JSON at times. make a desperate attempt to fix the JSON and try again. (╯°□°)╯︵ ┻━┻
fixed_body = body.replace(',,', ',').replace('""', '","')
if fixed_body != body:
try:
parsed_body = json.loads(fixed_body)
LOGGER.debug("Fixed invalid JSON in %s %s [%s]: %s", req.method, req.path, e, body)
except json.decoder.JSONDecodeError:
parse_error = e
else:
parse_error = e

if parse_error:
LOGGER.debug("Error parsing JSON in %s %s: %s", req.method, req.path, body, exc_info=parse_error)
return None, {}, body

return parsed_body.get('mac', None), parsed_body, json.dumps(parsed_body)


async def unknown_path_handler(req: web.Request) -> web.StreamResponse:
LOGGER.debug("Ignoring unexpected %s %s", req.method, req.path)
return web.Response(status=200)
18 changes: 13 additions & 5 deletions custom_components/wibeee/sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,14 @@
from homeassistant.util import slugify

from .api import WibeeeAPI
from .const import (DOMAIN, DEFAULT_SCAN_INTERVAL, DEFAULT_TIMEOUT, CONF_NEST_PROXY_ENABLE)
from .const import (
DOMAIN,
DEFAULT_SCAN_INTERVAL,
DEFAULT_TIMEOUT,
CONF_NEST_UPSTREAM,
NEST_DEFAULT_UPSTREAM,
NEST_PROXY_DISABLED,
)
from .nest import get_nest_proxy
from .util import short_mac

Expand Down Expand Up @@ -162,7 +169,7 @@ async def fetching_data(now=None):
return async_track_time_interval(hass, fetching_data, scan_interval)


async def async_setup_local_push(hass: HomeAssistant, device, sensors: list['WibeeeSensor']):
async def async_setup_local_push(hass: HomeAssistant, entry: ConfigEntry, device, sensors: list['WibeeeSensor']):
mac_address = device['macAddr']
nest_proxy = await get_nest_proxy(hass)

Expand All @@ -175,7 +182,8 @@ def on_pushed_data(pushed_data: dict) -> None:
def unregister_listener():
nest_proxy.unregister_device(mac_address)

nest_proxy.register_device(mac_address, on_pushed_data)
upstream = entry.options.get(CONF_NEST_UPSTREAM)
nest_proxy.register_device(mac_address, on_pushed_data, upstream)
return unregister_listener


Expand All @@ -187,7 +195,7 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry, async_add_e
host = entry.data[CONF_HOST]
scan_interval = timedelta(seconds=entry.options.get(CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL.total_seconds()))
timeout = timedelta(seconds=entry.options.get(CONF_TIMEOUT, DEFAULT_TIMEOUT.total_seconds()))
use_nest_proxy = entry.options.get(CONF_NEST_PROXY_ENABLE)
use_nest_proxy = entry.options.get(CONF_NEST_UPSTREAM, NEST_PROXY_DISABLED) != NEST_PROXY_DISABLED

if use_nest_proxy:
# first set up the Nest proxy. it's important to do this first because the device will not respond to status.xml
Expand All @@ -209,7 +217,7 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry, async_add_e
disposers.update(fetch_status=remove_fetch_listener)

if use_nest_proxy:
remove_push_listener = await async_setup_local_push(hass, device, sensors)
remove_push_listener = await async_setup_local_push(hass, entry, device, sensors)
disposers.update(push_listener=remove_push_listener)

_LOGGER.info(f"Setup completed for '{entry.unique_id}' (host={host}, scan_interval={scan_interval}, timeout={timeout})")
Expand Down
6 changes: 3 additions & 3 deletions custom_components/wibeee/translations/en.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
"step": {
"init": {
"title": "Wibeee integration options",
"description": "Customize the integration.",
"description": "Configure Polling and and Local Push",
"data": {
"scan_interval": "Device polling interval in seconds.",
"nest_proxy_enable": "Enable Nest Proxy on port 8600"
"scan_interval": "Device polling interval in seconds",
"nest_upstream": "Cloud service to upload data to"
}
}
}
Expand Down

0 comments on commit bb596f6

Please sign in to comment.