Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: retry failed requests with exponential backoff #9

Merged
merged 1 commit into from
Aug 28, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 59 additions & 39 deletions custom_components/wibeee/sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@

REQUIREMENTS = ["xmltodict"]

import xml
import aiohttp
import asyncio
import logging
import voluptuous as vol
from datetime import timedelta
Expand Down Expand Up @@ -41,6 +42,7 @@
ENERGY_WATT_HOUR,
CONF_HOST,
CONF_SCAN_INTERVAL,
CONF_TIMEOUT,
)
from homeassistant.exceptions import PlatformNotReady
from homeassistant.helpers.event import async_track_time_interval
Expand All @@ -62,11 +64,13 @@
DEFAULT_HOST = ''
DEFAULT_RESOURCE = 'http://{}/en/status.xml'
DEFAULT_SCAN_INTERVAL = timedelta(seconds=15)
DEFAULT_TIMEOUT = timedelta(seconds=10)
DEFAULT_PHASES = 3

PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Optional(CONF_HOST, default=DEFAULT_HOST): cv.string,
vol.Optional(CONF_SCAN_INTERVAL, default=DEFAULT_SCAN_INTERVAL): cv.time_period,
vol.Optional(CONF_TIMEOUT, default=DEFAULT_TIMEOUT): cv.time_period,
})

SENSOR_TYPES = {
Expand All @@ -93,13 +97,14 @@ async def async_setup_platform(hass, config, async_add_entities, discovery_info=
url_api = BASE_URL.format(host, PORT, API_PATH)

# Create a WIBEEE DATA OBJECT
wibeee_data = WibeeeData(hass, sensor_name_suffix, url_api)
scan_interval = config.get(CONF_SCAN_INTERVAL)
timeout = config.get(CONF_TIMEOUT)
wibeee_data = WibeeeData(hass, sensor_name_suffix, url_api, scan_interval, timeout)

# Then make first call and get sensors
await wibeee_data.set_sensors()

scan_interval = config.get(CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL)
_LOGGER.info(f"Start polling {url_api} with scan_interval: {scan_interval}")
_LOGGER.debug(f"Start polling {url_api} with scan_interval: {scan_interval}")
async_track_time_interval(hass, wibeee_data.fetching_data, scan_interval)

# Add Entities
Expand Down Expand Up @@ -157,36 +162,63 @@ def should_poll(self):
class WibeeeData(object):
"""Gets the latest data from Wibeee sensors."""

def __init__(self, hass, sensor_name_suffix, url_api):
def __init__(self, hass, sensor_name_suffix, url_api, scan_interval, timeout):
"""Initialize the data object."""
_LOGGER.debug("Initializing WibeeeData with url: %s", url_api)

self.hass = hass
self.sensor_name_suffix = sensor_name_suffix
self.url_api = url_api
self.timeout = min(timeout, scan_interval)
self.min_wait = timedelta(milliseconds=100)
self.max_wait = min(timedelta(seconds=5), scan_interval)
_LOGGER.info("Initializing WibeeeData with url: %s, scan_interval: %s, timeout %s, max_wait: %s", url_api, scan_interval,
self.timeout, self.max_wait)

self.timeout = 10
self.session = async_get_clientsession(hass)

self.sensors = None
self.data = None
self.ready = asyncio.Event()

async def async_fetch_status(self, retries=0):
"""Fetches the status XML from Wibeee as a dict, optionally retries"""

async def fetch_with_retries(try_n):
if try_n > 0:
wait = min(pow(2, try_n) * self.min_wait.total_seconds(), self.max_wait.total_seconds())
_LOGGER.debug("Waiting %0.3fs to retry %s...", wait, self.url_api)
await asyncio.sleep(wait)

try:
resp = await self.session.get(self.url_api, timeout=self.timeout.total_seconds())
if resp.status != 200:
raise aiohttp.ClientResponseError(
resp.request_info,
resp.history,
status=resp.status,
message=resp.reason,
headers=resp.headers,
)

async def set_sensors(self):
"""Make first Get call to Initialize sensor names"""
try:
with async_timeout.timeout(10, loop=self.hass.loop):
resp = await self.session.get(self.url_api)
resp.raise_for_status()
if resp.status != 200:
return (None)
else:
xml_data = await resp.text()
_LOGGER.debug("RAW Response from %s: %s)", self.url_api, xml_data)
dict_data = xmltodict.parse(xml_data)
self.data = dict_data["response"]
_LOGGER.debug("Dict Response: %s)", self.data)
except ValueError as error:
raise ValueError("Unable to obtain any response from %s, %s", self.url_api, error)
return dict_data["response"]

except Exception as exc:
if try_n == retries:
retry_info = f' after {try_n} retries' if retries > 0 else ''
_LOGGER.error('Error getting %s%s: %s: %s', self.url_api, retry_info, exc.__class__.__name__, exc, exc_info=True)
raise
else:
_LOGGER.warning('Error getting %s, will retry. %s: %s', self.url_api, exc.__class__.__name__, exc, exc_info=True)
return await fetch_with_retries(try_n=try_n + 1)

return await fetch_with_retries(try_n=0)

async def set_sensors(self):
"""Make first Get call to Initialize sensor names"""
self.data = await self.async_fetch_status(retries=10)

# Create tmp sensor array
tmp_sensors = []
Expand All @@ -208,30 +240,18 @@ async def set_sensors(self):
# Add sensors
self.sensors = tmp_sensors


#@Throttle(MIN_TIME_BETWEEN_UPDATES)
async def fetching_data(self, *_):
async def fetching_data(self, now=None):
""" Function to fetch REST Data and transform XML to data to DICT format """
xml_data = None
try:
with async_timeout.timeout(10, loop=self.hass.loop):
resp = await self.session.get(self.url_api)
resp.raise_for_status()
if resp.status != 200:
return(None)
else:
xml_data = await resp.text()
dict_data = xmltodict.parse(xml_data)
self.data = dict_data["response"]
except Exception as exc:
_LOGGER.error('Error while getting %s: %s: %s', self.url_api, exc.__class__.__name__, exc, exc_info=True)
if isinstance(exc, xml.parsers.expat.ExpatError):
_LOGGER.debug('Received XML:\n%s', xml_data)
return (None)
self.data = await self.async_fetch_status(retries=3)
except Exception as err:
if now is not None:
self.ready.clear()
return
raise PlatformNotReady from err

self.updating_sensors()


def updating_sensors(self, *_):
"""Find the current data from self.data."""
if not self.data:
Expand Down