Skip to content

Commit

Permalink
Improve recorder history queries (#131702)
Browse files Browse the repository at this point in the history
* Improve recorder history queries

* Remove some comments

* Update StatesManager._oldest_ts when adding pending state

* Update after review

* Improve tests

* Improve post-purge logic

* Avoid calling dt_util.utc_to_timestamp in new code

---------

Co-authored-by: J. Nick Koston <[email protected]>
  • Loading branch information
emontnemery and bdraco authored Nov 27, 2024
1 parent e04b6f0 commit 381d545
Show file tree
Hide file tree
Showing 11 changed files with 102 additions and 38 deletions.
7 changes: 5 additions & 2 deletions homeassistant/components/history/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

from . import websocket_api
from .const import DOMAIN
from .helpers import entities_may_have_state_changes_after, has_recorder_run_after
from .helpers import entities_may_have_state_changes_after, has_states_before

CONF_ORDER = "use_include_order"

Expand Down Expand Up @@ -107,7 +107,10 @@ async def get(
no_attributes = "no_attributes" in request.query

if (
(end_time and not has_recorder_run_after(hass, end_time))
# has_states_before will return True if there are states older than
# end_time. If it's false, we know there are no states in the
# database up until end_time.
(end_time and not has_states_before(hass, end_time))
or not include_start_time_state
and entity_ids
and not entities_may_have_state_changes_after(
Expand Down
13 changes: 7 additions & 6 deletions homeassistant/components/history/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from datetime import datetime as dt

from homeassistant.components.recorder import get_instance
from homeassistant.components.recorder.models import process_timestamp
from homeassistant.core import HomeAssistant


Expand All @@ -26,8 +25,10 @@ def entities_may_have_state_changes_after(
return False


def has_recorder_run_after(hass: HomeAssistant, run_time: dt) -> bool:
"""Check if the recorder has any runs after a specific time."""
return run_time >= process_timestamp(
get_instance(hass).recorder_runs_manager.first.start
)
def has_states_before(hass: HomeAssistant, run_time: dt) -> bool:
"""Check if the recorder has states as old or older than run_time.
Returns True if there may be such states.
"""
oldest_ts = get_instance(hass).states_manager.oldest_ts
return oldest_ts is not None and run_time.timestamp() >= oldest_ts
7 changes: 5 additions & 2 deletions homeassistant/components/history/websocket_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import homeassistant.util.dt as dt_util

from .const import EVENT_COALESCE_TIME, MAX_PENDING_HISTORY_STATES
from .helpers import entities_may_have_state_changes_after, has_recorder_run_after
from .helpers import entities_may_have_state_changes_after, has_states_before

_LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -142,7 +142,10 @@ async def ws_get_history_during_period(
no_attributes = msg["no_attributes"]

if (
(end_time and not has_recorder_run_after(hass, end_time))
# has_states_before will return True if there are states older than
# end_time. If it's false, we know there are no states in the
# database up until end_time.
(end_time and not has_states_before(hass, end_time))
or not include_start_time_state
and entity_ids
and not entities_may_have_state_changes_after(
Expand Down
1 change: 1 addition & 0 deletions homeassistant/components/recorder/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1424,6 +1424,7 @@ def _setup_run(self) -> None:
with session_scope(session=self.get_session()) as session:
end_incomplete_runs(session, self.recorder_runs_manager.recording_start)
self.recorder_runs_manager.start(session)
self.states_manager.load_from_db(session)

self._open_event_session()

Expand Down
18 changes: 8 additions & 10 deletions homeassistant/components/recorder/history/legacy.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
from homeassistant.helpers.recorder import get_instance
import homeassistant.util.dt as dt_util

from ..db_schema import RecorderRuns, StateAttributes, States
from ..db_schema import StateAttributes, States
from ..filters import Filters
from ..models import process_timestamp, process_timestamp_to_utc_isoformat
from ..models import process_timestamp_to_utc_isoformat
from ..models.legacy import LegacyLazyState, legacy_row_to_compressed_state
from ..util import execute_stmt_lambda_element, session_scope
from .const import (
Expand Down Expand Up @@ -436,7 +436,7 @@ def get_last_state_changes(


def _get_states_for_entities_stmt(
run_start: datetime,
run_start_ts: float,
utc_point_in_time: datetime,
entity_ids: list[str],
no_attributes: bool,
Expand All @@ -447,7 +447,6 @@ def _get_states_for_entities_stmt(
)
# We got an include-list of entities, accelerate the query by filtering already
# in the inner query.
run_start_ts = process_timestamp(run_start).timestamp()
utc_point_in_time_ts = dt_util.utc_to_timestamp(utc_point_in_time)
stmt += lambda q: q.join(
(
Expand Down Expand Up @@ -483,7 +482,7 @@ def _get_rows_with_session(
session: Session,
utc_point_in_time: datetime,
entity_ids: list[str],
run: RecorderRuns | None = None,
*,
no_attributes: bool = False,
) -> Iterable[Row]:
"""Return the states at a specific point in time."""
Expand All @@ -495,17 +494,16 @@ def _get_rows_with_session(
),
)

if run is None:
run = get_instance(hass).recorder_runs_manager.get(utc_point_in_time)
oldest_ts = get_instance(hass).states_manager.oldest_ts

if run is None or process_timestamp(run.start) > utc_point_in_time:
# History did not run before utc_point_in_time
if oldest_ts is None or oldest_ts > utc_point_in_time.timestamp():
# We don't have any states for the requested time
return []

# We have more than one entity to look at so we need to do a query on states
# since the last recorder run started.
stmt = _get_states_for_entities_stmt(
run.start, utc_point_in_time, entity_ids, no_attributes
oldest_ts, utc_point_in_time, entity_ids, no_attributes
)
return execute_stmt_lambda_element(session, stmt)

Expand Down
31 changes: 15 additions & 16 deletions homeassistant/components/recorder/history/modern.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
LazyState,
datetime_to_timestamp_or_none,
extract_metadata_ids,
process_timestamp,
row_to_compressed_state,
)
from ..util import execute_stmt_lambda_element, session_scope
Expand Down Expand Up @@ -246,9 +245,9 @@ def get_significant_states_with_session(
if metadata_id is not None
and split_entity_id(entity_id)[0] in SIGNIFICANT_DOMAINS
]
run_start_ts: float | None = None
oldest_ts: float | None = None
if include_start_time_state and not (
run_start_ts := _get_run_start_ts_for_utc_point_in_time(hass, start_time)
oldest_ts := _get_oldest_possible_ts(hass, start_time)
):
include_start_time_state = False
start_time_ts = dt_util.utc_to_timestamp(start_time)
Expand All @@ -264,7 +263,7 @@ def get_significant_states_with_session(
significant_changes_only,
no_attributes,
include_start_time_state,
run_start_ts,
oldest_ts,
),
track_on=[
bool(single_metadata_id),
Expand Down Expand Up @@ -411,9 +410,9 @@ def state_changes_during_period(
entity_id_to_metadata_id: dict[str, int | None] = {
entity_id: single_metadata_id
}
run_start_ts: float | None = None
oldest_ts: float | None = None
if include_start_time_state and not (
run_start_ts := _get_run_start_ts_for_utc_point_in_time(hass, start_time)
oldest_ts := _get_oldest_possible_ts(hass, start_time)
):
include_start_time_state = False
start_time_ts = dt_util.utc_to_timestamp(start_time)
Expand All @@ -426,7 +425,7 @@ def state_changes_during_period(
no_attributes,
limit,
include_start_time_state,
run_start_ts,
oldest_ts,
has_last_reported,
),
track_on=[
Expand Down Expand Up @@ -600,17 +599,17 @@ def _get_start_time_state_for_entities_stmt(
)


def _get_run_start_ts_for_utc_point_in_time(
def _get_oldest_possible_ts(
hass: HomeAssistant, utc_point_in_time: datetime
) -> float | None:
"""Return the start time of a run."""
run = get_instance(hass).recorder_runs_manager.get(utc_point_in_time)
if (
run is not None
and (run_start := process_timestamp(run.start)) < utc_point_in_time
):
return run_start.timestamp()
# History did not run before utc_point_in_time but we still
"""Return the oldest possible timestamp.
Returns None if there are no states as old as utc_point_in_time.
"""

oldest_ts = get_instance(hass).states_manager.oldest_ts
if oldest_ts is not None and oldest_ts < utc_point_in_time.timestamp():
return oldest_ts
return None


Expand Down
3 changes: 3 additions & 0 deletions homeassistant/components/recorder/purge.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ def purge_old_data(
_purge_old_entity_ids(instance, session)

_purge_old_recorder_runs(instance, session, purge_before)
with session_scope(session=instance.get_session(), read_only=True) as session:
instance.recorder_runs_manager.load_from_db(session)
instance.states_manager.load_from_db(session)
if repack:
repack_database(instance)
return True
Expand Down
9 changes: 9 additions & 0 deletions homeassistant/components/recorder/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,15 @@ def find_states_to_purge(
)


def find_oldest_state() -> StatementLambdaElement:
"""Find the last_updated_ts of the oldest state."""
return lambda_stmt(
lambda: select(States.last_updated_ts).where(
States.state_id.in_(select(func.min(States.state_id)))
)
)


def find_short_term_statistics_to_purge(
purge_before: datetime, max_bind_vars: int
) -> StatementLambdaElement:
Expand Down
32 changes: 32 additions & 0 deletions homeassistant/components/recorder/table_managers/states.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,15 @@

from __future__ import annotations

from collections.abc import Sequence
from typing import Any, cast

from sqlalchemy.engine.row import Row
from sqlalchemy.orm.session import Session

from ..db_schema import States
from ..queries import find_oldest_state
from ..util import execute_stmt_lambda_element


class StatesManager:
Expand All @@ -13,6 +21,12 @@ def __init__(self) -> None:
self._pending: dict[str, States] = {}
self._last_committed_id: dict[str, int] = {}
self._last_reported: dict[int, float] = {}
self._oldest_ts: float | None = None

@property
def oldest_ts(self) -> float | None:
"""Return the oldest timestamp."""
return self._oldest_ts

def pop_pending(self, entity_id: str) -> States | None:
"""Pop a pending state.
Expand Down Expand Up @@ -44,6 +58,8 @@ def add_pending(self, entity_id: str, state: States) -> None:
recorder thread.
"""
self._pending[entity_id] = state
if self._oldest_ts is None:
self._oldest_ts = state.last_updated_ts

def update_pending_last_reported(
self, state_id: int, last_reported_timestamp: float
Expand Down Expand Up @@ -74,6 +90,22 @@ def reset(self) -> None:
"""
self._last_committed_id.clear()
self._pending.clear()
self._oldest_ts = None

def load_from_db(self, session: Session) -> None:
"""Update the cache.
Must run in the recorder thread.
"""
result = cast(
Sequence[Row[Any]],
execute_stmt_lambda_element(session, find_oldest_state()),
)
if not result:
ts = None
else:
ts = result[0].last_updated_ts
self._oldest_ts = ts

def evict_purged_state_ids(self, purged_state_ids: set[int]) -> None:
"""Evict purged states from the committed states.
Expand Down
2 changes: 0 additions & 2 deletions homeassistant/components/recorder/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,6 @@ def run(self, instance: Recorder) -> None:
if purge.purge_old_data(
instance, self.purge_before, self.repack, self.apply_filter
):
with instance.get_session() as session:
instance.recorder_runs_manager.load_from_db(session)
# We always need to do the db cleanups after a purge
# is finished to ensure the WAL checkpoint and other
# tasks happen after a vacuum.
Expand Down
17 changes: 17 additions & 0 deletions tests/components/recorder/test_purge.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ async def test_purge_big_database(hass: HomeAssistant, recorder_mock: Recorder)

async def test_purge_old_states(hass: HomeAssistant, recorder_mock: Recorder) -> None:
"""Test deleting old states."""
assert recorder_mock.states_manager.oldest_ts is None
oldest_ts = recorder_mock.states_manager.oldest_ts

await _add_test_states(hass)

# make sure we start with 6 states
Expand All @@ -127,6 +130,10 @@ async def test_purge_old_states(hass: HomeAssistant, recorder_mock: Recorder) ->
events = session.query(Events).filter(Events.event_type == "state_changed")
assert events.count() == 0

assert recorder_mock.states_manager.oldest_ts != oldest_ts
assert recorder_mock.states_manager.oldest_ts == states[0].last_updated_ts
oldest_ts = recorder_mock.states_manager.oldest_ts

assert "test.recorder2" in recorder_mock.states_manager._last_committed_id

purge_before = dt_util.utcnow() - timedelta(days=4)
Expand All @@ -140,6 +147,8 @@ async def test_purge_old_states(hass: HomeAssistant, recorder_mock: Recorder) ->
repack=False,
)
assert not finished
# states_manager.oldest_ts is not updated until after the purge is complete
assert recorder_mock.states_manager.oldest_ts == oldest_ts

with session_scope(hass=hass) as session:
states = session.query(States)
Expand All @@ -162,13 +171,19 @@ async def test_purge_old_states(hass: HomeAssistant, recorder_mock: Recorder) ->

finished = purge_old_data(recorder_mock, purge_before, repack=False)
assert finished
# states_manager.oldest_ts should now be updated
assert recorder_mock.states_manager.oldest_ts != oldest_ts

with session_scope(hass=hass) as session:
states = session.query(States)
state_attributes = session.query(StateAttributes)
assert states.count() == 2
assert state_attributes.count() == 1

assert recorder_mock.states_manager.oldest_ts != oldest_ts
assert recorder_mock.states_manager.oldest_ts == states[0].last_updated_ts
oldest_ts = recorder_mock.states_manager.oldest_ts

assert "test.recorder2" in recorder_mock.states_manager._last_committed_id

# run purge_old_data again
Expand All @@ -181,6 +196,8 @@ async def test_purge_old_states(hass: HomeAssistant, recorder_mock: Recorder) ->
repack=False,
)
assert not finished
# states_manager.oldest_ts is not updated until after the purge is complete
assert recorder_mock.states_manager.oldest_ts == oldest_ts

with session_scope(hass=hass) as session:
assert states.count() == 0
Expand Down

0 comments on commit 381d545

Please sign in to comment.