diff --git a/custom_components/ltss/__init__.py b/custom_components/ltss/__init__.py deleted file mode 100644 index 6870570..0000000 --- a/custom_components/ltss/__init__.py +++ /dev/null @@ -1,314 +0,0 @@ -"""Support for recording details.""" -import asyncio -import concurrent.futures -from contextlib import contextmanager -from datetime import datetime, timedelta -import logging -import queue -import threading -import time -import json -from typing import Any, Dict, Optional, Callable - -import voluptuous as vol -from sqlalchemy import exc, create_engine -from sqlalchemy.engine import Engine -from sqlalchemy.orm import scoped_session, sessionmaker - -from homeassistant.const import ( - ATTR_ENTITY_ID, - CONF_DOMAINS, - CONF_ENTITIES, - CONF_EXCLUDE, - CONF_INCLUDE, - EVENT_HOMEASSISTANT_START, - EVENT_HOMEASSISTANT_STOP, - EVENT_STATE_CHANGED, - STATE_UNKNOWN -) -from homeassistant.components import persistent_notification -from homeassistant.core import CoreState, HomeAssistant, callback -import homeassistant.helpers.config_validation as cv -from homeassistant.helpers.entityfilter import ( - convert_include_exclude_filter, - INCLUDE_EXCLUDE_BASE_FILTER_SCHEMA, -) -from homeassistant.helpers.typing import ConfigType -import homeassistant.util.dt as dt_util -from homeassistant.helpers.json import JSONEncoder - -from sqlalchemy import text - -from .models import Base, LTSS -from .migrations import check_and_migrate - -_LOGGER = logging.getLogger(__name__) - -DOMAIN = "ltss" - -CONF_DB_URL = "db_url" -CONF_CHUNK_TIME_INTERVAL = "chunk_time_interval" - -CONNECT_RETRY_WAIT = 3 - -CONFIG_SCHEMA = vol.Schema( - { - DOMAIN: INCLUDE_EXCLUDE_BASE_FILTER_SCHEMA.extend( - { - vol.Required(CONF_DB_URL): cv.string, - vol.Optional(CONF_CHUNK_TIME_INTERVAL, default=2592000000000): cv.positive_int, # 30 days - } - ) - }, - extra=vol.ALLOW_EXTRA, -) - - -async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: - """Set up LTSS.""" - conf = config[DOMAIN] - - db_url = conf.get(CONF_DB_URL) - chunk_time_interval = conf.get(CONF_CHUNK_TIME_INTERVAL) - entity_filter = convert_include_exclude_filter(conf) - - instance = LTSS_DB( - hass=hass, - uri=db_url, - chunk_time_interval=chunk_time_interval, - entity_filter=entity_filter, - ) - instance.async_initialize() - instance.start() - - return await instance.async_db_ready - -@contextmanager -def session_scope(*, session=None): - """Provide a transactional scope around a series of operations.""" - - if session is None: - raise RuntimeError("Session required") - - need_rollback = False - try: - yield session - if session.transaction: - need_rollback = True - session.commit() - except Exception as err: # pylint: disable=broad-except - _LOGGER.error("Error executing query: %s", err) - if need_rollback: - session.rollback() - raise - finally: - session.close() - - -class LTSS_DB(threading.Thread): - """A threaded LTSS class.""" - - def __init__( - self, - hass: HomeAssistant, - uri: str, - chunk_time_interval: int, - entity_filter: Callable[[str], bool], - ) -> None: - """Initialize the ltss.""" - threading.Thread.__init__(self, name="LTSS") - - self.hass = hass - self.queue: Any = queue.Queue() - self.recording_start = dt_util.utcnow() - self.db_url = uri - self.chunk_time_interval = chunk_time_interval - self.async_db_ready = asyncio.Future() - self.engine: Any = None - self.run_info: Any = None - - self.entity_filter = entity_filter - - self.get_session = None - - @callback - def async_initialize(self): - """Initialize the ltss.""" - self.hass.bus.async_listen(EVENT_STATE_CHANGED, self.event_listener) - - def run(self): - """Start processing events to save.""" - tries = 1 - connected = False - - while not connected and tries <= 10: - if tries != 1: - time.sleep(CONNECT_RETRY_WAIT) - try: - self._setup_connection() - connected = True - _LOGGER.debug("Connected to ltss database") - except Exception as err: # pylint: disable=broad-except - _LOGGER.error( - "Error during connection setup: %s (retrying " "in %s seconds)", - err, - CONNECT_RETRY_WAIT, - ) - tries += 1 - - if not connected: - - @callback - def connection_failed(): - """Connect failed tasks.""" - self.async_db_ready.set_result(False) - persistent_notification.async_create( - self.hass, - "LTSS could not start, please check the log", - "LTSS", - ) - - self.hass.add_job(connection_failed) - return - - shutdown_task = object() - hass_started = concurrent.futures.Future() - - @callback - def register(): - """Post connection initialize.""" - self.async_db_ready.set_result(True) - - def shutdown(event): - """Shut down the ltss.""" - if not hass_started.done(): - hass_started.set_result(shutdown_task) - self.queue.put(None) - self.join() - - self.hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, shutdown) - - if self.hass.state == CoreState.running: - hass_started.set_result(None) - else: - - @callback - def notify_hass_started(event): - """Notify that hass has started.""" - hass_started.set_result(None) - - self.hass.bus.async_listen_once( - EVENT_HOMEASSISTANT_START, notify_hass_started - ) - - self.hass.add_job(register) - result = hass_started.result() - - # If shutdown happened before Home Assistant finished starting - if result is shutdown_task: - return - - while True: - event = self.queue.get() - - if event is None: - self._close_connection() - self.queue.task_done() - return - - tries = 1 - updated = False - while not updated and tries <= 10: - if tries != 1: - time.sleep(CONNECT_RETRY_WAIT) - try: - with session_scope(session=self.get_session()) as session: - try: - row = LTSS.from_event(event) - session.add(row) - except (TypeError, ValueError): - _LOGGER.warning( - "State is not JSON serializable: %s", - event.data.get("new_state"), - ) - - updated = True - - except exc.OperationalError as err: - _LOGGER.error( - "Error in database connectivity: %s. " - "(retrying in %s seconds)", - err, - CONNECT_RETRY_WAIT, - ) - tries += 1 - - except exc.SQLAlchemyError: - updated = True - _LOGGER.exception("Error saving event: %s", event) - - if not updated: - _LOGGER.error( - "Error in database update. Could not save " - "after %d tries. Giving up", - tries, - ) - - self.queue.task_done() - - @callback - def event_listener(self, event): - """Listen for new events and put them in the process queue.""" - # Filer on entity_id - entity_id = event.data.get(ATTR_ENTITY_ID) - state = event.data.get("new_state") - - if entity_id is not None and state is not None and state.state != STATE_UNKNOWN: - if self.entity_filter(entity_id): - self.queue.put(event) - - def _setup_connection(self): - """Ensure database is ready to fly.""" - - if self.engine is not None: - self.engine.dispose() - - self.engine = create_engine(self.db_url, echo=False, - json_serializer=lambda obj: json.dumps(obj, cls=JSONEncoder)) - - # Make sure TimescaleDB and PostGIS extensions are loaded - with self.engine.connect() as con: - con.execute( - text("CREATE EXTENSION IF NOT EXISTS postgis CASCADE" - ).execution_options(autocommit=True)) - con.execute( - text("CREATE EXTENSION IF NOT EXISTS timescaledb CASCADE" - ).execution_options(autocommit=True)) - - # Create all tables if not exists - Base.metadata.create_all(self.engine) - - # Create hypertable and set chunk_time_interval - with self.engine.connect() as con: - con.execute(text(f"""SELECT create_hypertable( - '{LTSS.__tablename__}', - 'time', - if_not_exists => TRUE);""").execution_options(autocommit=True)) - - con.execute( - text( - f"SELECT set_chunk_time_interval('{LTSS.__tablename__}'," - f" {self.chunk_time_interval});" - ).execution_options(autocommit=True) - ) - - # Migrate to newest schema if required - check_and_migrate(self.engine) - - self.get_session = scoped_session(sessionmaker(bind=self.engine)) - - def _close_connection(self): - """Close the connection.""" - self.engine.dispose() - self.engine = None - self.get_session = None diff --git a/custom_components/ltss/manifest.json b/custom_components/ltss/manifest.json deleted file mode 100644 index 832bded..0000000 --- a/custom_components/ltss/manifest.json +++ /dev/null @@ -1,14 +0,0 @@ -{ - "domain": "ltss", - "name": "Long Time State Storage (LTSS)", - "documentation": "https://github.com/freol35241/ltss", - "requirements": [ - "sqlalchemy>=1.0,<2.0", - "psycopg2>=2.8,<3.0", - "geoalchemy2>=0.8,<0.9" - ], - "dependencies": [], - "codeowners": [ - "@freol35241" - ] - } diff --git a/custom_components/ltss/migrations.py b/custom_components/ltss/migrations.py deleted file mode 100644 index 2e1e00a..0000000 --- a/custom_components/ltss/migrations.py +++ /dev/null @@ -1,69 +0,0 @@ -import logging - -from sqlalchemy import inspect, text, Text - -from .models import LTSS, LTSS_attributes_index, LTSS_entityid_time_composite_index - -_LOGGER = logging.getLogger(__name__) - -def check_and_migrate(engine): - - #Inspect the DB - iengine = inspect(engine) - columns = iengine.get_columns(LTSS.__tablename__) - indexes = iengine.get_indexes(LTSS.__tablename__) - - def index_exists(index_name): - matches = [idx for idx in indexes if idx["name"] == index_name] - return True if matches else False - - # Attributes column Text -> JSONB - attributes_column = next(col for col in columns if col["name"] == 'attributes') - if isinstance(attributes_column['type'], Text): - _LOGGER.warning('Migrating you LTSS table to the latest schema, this might take a couple of minutes!') - migrate_attributes_text_to_jsonb(engine) - _LOGGER.info('Migration completed successfully!') - - # Attributes Index? - if not index_exists('ltss_attributes_idx'): - _LOGGER.warning('Creating an index for the attributes column, this might take a couple of minutes!') - create_attributes_index(engine) - _LOGGER.info('Index created successfully!') - - # entity_id and time composite Index? - if not index_exists('ltss_entityid_time_composite_idx'): - _LOGGER.warning('Creating a composite index over entity_id and time columns, this might take a couple of minutes!') - create_entityid_time_index(engine) - _LOGGER.info('Index created successfully!') - - if index_exists('ix_ltss_entity_id'): - _LOGGER.warning('Index on entity_id no longer needed, dropping...') - drop_entityid_index(engine) - -def migrate_attributes_text_to_jsonb(engine): - - with engine.connect() as con: - - _LOGGER.info("Migrating attributes column from type text to type JSONB") - con.execute(text( - f"""ALTER TABLE {LTSS.__tablename__} - ALTER COLUMN attributes TYPE JSONB USING attributes::JSONB;""" - ).execution_options(autocommit=True)) - -def create_attributes_index(engine): - - _LOGGER.info("Creating GIN index on the attributes column") - LTSS_attributes_index.create(bind=engine) - -def create_entityid_time_index(engine): - - _LOGGER.info("Creating composite index over entity_id and time columns") - LTSS_entityid_time_composite_index.create(bind=engine) - -def drop_entityid_index(engine): - - with engine.connect() as con: - con.execute(text( - f"""DROP INDEX ix_ltss_entity_id;""" - ).execution_options(autocommit=True)) - diff --git a/custom_components/ltss/models.py b/custom_components/ltss/models.py deleted file mode 100644 index 5820a1c..0000000 --- a/custom_components/ltss/models.py +++ /dev/null @@ -1,60 +0,0 @@ -"""Models for SQLAlchemy.""" -import json -from datetime import datetime -import logging - -from sqlalchemy import ( - Column, - BigInteger, - DateTime, - String, - Text, -) - -from sqlalchemy.schema import Index -from sqlalchemy.dialects.postgresql import JSONB -from geoalchemy2 import Geometry -from sqlalchemy.ext.declarative import declarative_base - -# SQLAlchemy Schema -# pylint: disable=invalid-name -Base = declarative_base() - -_LOGGER = logging.getLogger(__name__) - - -class LTSS(Base): # type: ignore - """State change history.""" - - __tablename__ = "ltss" - id = Column(BigInteger, primary_key=True, autoincrement=True) - time = Column(DateTime(timezone=True), default=datetime.utcnow, primary_key=True) - entity_id = Column(String(255)) - state = Column(String(255), index=True) - attributes = Column(JSONB) - location = Column(Geometry('POINT', srid=4326)) - - @staticmethod - def from_event(event): - """Create object from a state_changed event.""" - entity_id = event.data["entity_id"] - state = event.data.get("new_state") - - attrs = dict(state.attributes) - lat = attrs.pop('latitude', None) - lon = attrs.pop('longitude', None) - - row = LTSS( - entity_id=entity_id, - time=event.time_fired, - state=state.state, - attributes=attrs, - location=f'SRID=4326;POINT({lon} {lat})' if lon and lat else None - ) - - return row - -LTSS_attributes_index = Index('ltss_attributes_idx', LTSS.attributes, postgresql_using='gin') -LTSS_entityid_time_composite_index = Index( - 'ltss_entityid_time_composite_idx', LTSS.entity_id, LTSS.time.desc() -) \ No newline at end of file