Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve recorder history queries #131702

Merged
merged 8 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions homeassistant/components/history/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

from . import websocket_api
from .const import DOMAIN
from .helpers import entities_may_have_state_changes_after, has_recorder_run_after
from .helpers import entities_may_have_state_changes_after, has_states_before

CONF_ORDER = "use_include_order"

Expand Down Expand Up @@ -107,7 +107,10 @@ async def get(
no_attributes = "no_attributes" in request.query

if (
(end_time and not has_recorder_run_after(hass, end_time))
# has_states_before will return True if there are states older than
# end_time. If it's false, we know there are no states in the
# database up until end_time.
(end_time and not has_states_before(hass, end_time))
or not include_start_time_state
and entity_ids
and not entities_may_have_state_changes_after(
Expand Down
13 changes: 7 additions & 6 deletions homeassistant/components/history/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from datetime import datetime as dt

from homeassistant.components.recorder import get_instance
from homeassistant.components.recorder.models import process_timestamp
from homeassistant.core import HomeAssistant


Expand All @@ -26,8 +25,10 @@ def entities_may_have_state_changes_after(
return False


def has_recorder_run_after(hass: HomeAssistant, run_time: dt) -> bool:
"""Check if the recorder has any runs after a specific time."""
return run_time >= process_timestamp(
get_instance(hass).recorder_runs_manager.first.start
)
def has_states_before(hass: HomeAssistant, run_time: dt) -> bool:
"""Check if the recorder has states as old or older than run_time.

Returns True if there may be such states.
"""
oldest_ts = get_instance(hass).states_manager.oldest_ts
return oldest_ts is not None and run_time.timestamp() >= oldest_ts
emontnemery marked this conversation as resolved.
Show resolved Hide resolved
7 changes: 5 additions & 2 deletions homeassistant/components/history/websocket_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import homeassistant.util.dt as dt_util

from .const import EVENT_COALESCE_TIME, MAX_PENDING_HISTORY_STATES
from .helpers import entities_may_have_state_changes_after, has_recorder_run_after
from .helpers import entities_may_have_state_changes_after, has_states_before

_LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -142,7 +142,10 @@ async def ws_get_history_during_period(
no_attributes = msg["no_attributes"]

if (
(end_time and not has_recorder_run_after(hass, end_time))
# has_states_before will return True if there are states older than
# end_time. If it's false, we know there are no states in the
# database up until end_time.
(end_time and not has_states_before(hass, end_time))
or not include_start_time_state
and entity_ids
and not entities_may_have_state_changes_after(
Expand Down
1 change: 1 addition & 0 deletions homeassistant/components/recorder/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1424,6 +1424,7 @@ def _setup_run(self) -> None:
with session_scope(session=self.get_session()) as session:
end_incomplete_runs(session, self.recorder_runs_manager.recording_start)
self.recorder_runs_manager.start(session)
self.states_manager.load_from_db(session)

self._open_event_session()

Expand Down
18 changes: 8 additions & 10 deletions homeassistant/components/recorder/history/legacy.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
from homeassistant.helpers.recorder import get_instance
import homeassistant.util.dt as dt_util

from ..db_schema import RecorderRuns, StateAttributes, States
from ..db_schema import StateAttributes, States
from ..filters import Filters
from ..models import process_timestamp, process_timestamp_to_utc_isoformat
from ..models import process_timestamp_to_utc_isoformat
from ..models.legacy import LegacyLazyState, legacy_row_to_compressed_state
from ..util import execute_stmt_lambda_element, session_scope
from .const import (
Expand Down Expand Up @@ -436,7 +436,7 @@ def get_last_state_changes(


def _get_states_for_entities_stmt(
run_start: datetime,
run_start_ts: float,
utc_point_in_time: datetime,
entity_ids: list[str],
no_attributes: bool,
Expand All @@ -447,7 +447,6 @@ def _get_states_for_entities_stmt(
)
# We got an include-list of entities, accelerate the query by filtering already
# in the inner query.
run_start_ts = process_timestamp(run_start).timestamp()
utc_point_in_time_ts = dt_util.utc_to_timestamp(utc_point_in_time)
stmt += lambda q: q.join(
(
Expand Down Expand Up @@ -483,7 +482,7 @@ def _get_rows_with_session(
session: Session,
utc_point_in_time: datetime,
entity_ids: list[str],
run: RecorderRuns | None = None,
*,
no_attributes: bool = False,
) -> Iterable[Row]:
"""Return the states at a specific point in time."""
Expand All @@ -495,17 +494,16 @@ def _get_rows_with_session(
),
)

if run is None:
run = get_instance(hass).recorder_runs_manager.get(utc_point_in_time)
oldest_ts = get_instance(hass).states_manager.oldest_ts

if run is None or process_timestamp(run.start) > utc_point_in_time:
# History did not run before utc_point_in_time
if oldest_ts is None or oldest_ts > utc_point_in_time.timestamp():
emontnemery marked this conversation as resolved.
Show resolved Hide resolved
# We don't have any states for the requested time
return []

# We have more than one entity to look at so we need to do a query on states
# since the last recorder run started.
stmt = _get_states_for_entities_stmt(
run.start, utc_point_in_time, entity_ids, no_attributes
oldest_ts, utc_point_in_time, entity_ids, no_attributes
)
return execute_stmt_lambda_element(session, stmt)

Expand Down
31 changes: 15 additions & 16 deletions homeassistant/components/recorder/history/modern.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
LazyState,
datetime_to_timestamp_or_none,
extract_metadata_ids,
process_timestamp,
row_to_compressed_state,
)
from ..util import execute_stmt_lambda_element, session_scope
Expand Down Expand Up @@ -246,9 +245,9 @@ def get_significant_states_with_session(
if metadata_id is not None
and split_entity_id(entity_id)[0] in SIGNIFICANT_DOMAINS
]
run_start_ts: float | None = None
oldest_ts: float | None = None
if include_start_time_state and not (
run_start_ts := _get_run_start_ts_for_utc_point_in_time(hass, start_time)
oldest_ts := _get_oldest_possible_ts(hass, start_time)
):
include_start_time_state = False
start_time_ts = dt_util.utc_to_timestamp(start_time)
Expand All @@ -264,7 +263,7 @@ def get_significant_states_with_session(
significant_changes_only,
no_attributes,
include_start_time_state,
run_start_ts,
oldest_ts,
),
track_on=[
bool(single_metadata_id),
Expand Down Expand Up @@ -411,9 +410,9 @@ def state_changes_during_period(
entity_id_to_metadata_id: dict[str, int | None] = {
entity_id: single_metadata_id
}
run_start_ts: float | None = None
oldest_ts: float | None = None
if include_start_time_state and not (
run_start_ts := _get_run_start_ts_for_utc_point_in_time(hass, start_time)
oldest_ts := _get_oldest_possible_ts(hass, start_time)
):
include_start_time_state = False
start_time_ts = dt_util.utc_to_timestamp(start_time)
Expand All @@ -426,7 +425,7 @@ def state_changes_during_period(
no_attributes,
limit,
include_start_time_state,
run_start_ts,
oldest_ts,
has_last_reported,
),
track_on=[
Expand Down Expand Up @@ -600,17 +599,17 @@ def _get_start_time_state_for_entities_stmt(
)


def _get_run_start_ts_for_utc_point_in_time(
def _get_oldest_possible_ts(
hass: HomeAssistant, utc_point_in_time: datetime
) -> float | None:
"""Return the start time of a run."""
run = get_instance(hass).recorder_runs_manager.get(utc_point_in_time)
if (
run is not None
and (run_start := process_timestamp(run.start)) < utc_point_in_time
):
return run_start.timestamp()
# History did not run before utc_point_in_time but we still
"""Return the oldest possible timestamp.

Returns None if there are no states as old as utc_point_in_time.
"""

oldest_ts = get_instance(hass).states_manager.oldest_ts
if oldest_ts is not None and oldest_ts < utc_point_in_time.timestamp():
return oldest_ts
return None


Expand Down
3 changes: 3 additions & 0 deletions homeassistant/components/recorder/purge.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ def purge_old_data(
_purge_old_entity_ids(instance, session)

_purge_old_recorder_runs(instance, session, purge_before)
with session_scope(session=instance.get_session(), read_only=True) as session:
instance.recorder_runs_manager.load_from_db(session)
instance.states_manager.load_from_db(session)
if repack:
repack_database(instance)
return True
Expand Down
9 changes: 9 additions & 0 deletions homeassistant/components/recorder/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,15 @@ def find_states_to_purge(
)


def find_oldest_state() -> StatementLambdaElement:
"""Find the last_updated_ts of the oldest state."""
return lambda_stmt(
lambda: select(States.last_updated_ts).where(
States.state_id.in_(select(func.min(States.state_id)))
)
)


def find_short_term_statistics_to_purge(
purge_before: datetime, max_bind_vars: int
) -> StatementLambdaElement:
Expand Down
32 changes: 32 additions & 0 deletions homeassistant/components/recorder/table_managers/states.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,15 @@

from __future__ import annotations

from collections.abc import Sequence
from typing import Any, cast

from sqlalchemy.engine.row import Row
from sqlalchemy.orm.session import Session

from ..db_schema import States
from ..queries import find_oldest_state
from ..util import execute_stmt_lambda_element


class StatesManager:
Expand All @@ -13,6 +21,12 @@ def __init__(self) -> None:
self._pending: dict[str, States] = {}
self._last_committed_id: dict[str, int] = {}
self._last_reported: dict[int, float] = {}
self._oldest_ts: float | None = None

@property
def oldest_ts(self) -> float | None:
"""Return the oldest timestamp."""
return self._oldest_ts

def pop_pending(self, entity_id: str) -> States | None:
"""Pop a pending state.
Expand Down Expand Up @@ -44,6 +58,8 @@ def add_pending(self, entity_id: str, state: States) -> None:
recorder thread.
"""
self._pending[entity_id] = state
if self._oldest_ts is None:
self._oldest_ts = state.last_updated_ts

def update_pending_last_reported(
self, state_id: int, last_reported_timestamp: float
Expand Down Expand Up @@ -74,6 +90,22 @@ def reset(self) -> None:
"""
self._last_committed_id.clear()
self._pending.clear()
self._oldest_ts = None

def load_from_db(self, session: Session) -> None:
"""Update the cache.

Must run in the recorder thread.
"""
result = cast(
Sequence[Row[Any]],
execute_stmt_lambda_element(session, find_oldest_state()),
)
if not result:
ts = None
else:
ts = result[0].last_updated_ts
Comment on lines +104 to +107
Copy link
Contributor Author

@emontnemery emontnemery Nov 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we allow a scalar option to or add a scalar version of execute_stmt_lambda_element to avoid these extra checks?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there are other places it could be used, it sure would be nice to avoid the boilerplate checks

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, there are. For example here:

if stats := cast(Sequence[Row], execute_stmt_lambda_element(session, stmt)):
return dt_util.utc_from_timestamp(stats[0].start_ts)
return None
and several other places in the same module

I think we should do this in a separate PR though

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Separate PR for sure 👍

self._oldest_ts = ts
bdraco marked this conversation as resolved.
Show resolved Hide resolved

def evict_purged_state_ids(self, purged_state_ids: set[int]) -> None:
"""Evict purged states from the committed states.
Expand Down
2 changes: 0 additions & 2 deletions homeassistant/components/recorder/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,6 @@ def run(self, instance: Recorder) -> None:
if purge.purge_old_data(
instance, self.purge_before, self.repack, self.apply_filter
):
with instance.get_session() as session:
instance.recorder_runs_manager.load_from_db(session)
# We always need to do the db cleanups after a purge
# is finished to ensure the WAL checkpoint and other
# tasks happen after a vacuum.
Expand Down
17 changes: 17 additions & 0 deletions tests/components/recorder/test_purge.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ async def test_purge_big_database(hass: HomeAssistant, recorder_mock: Recorder)

async def test_purge_old_states(hass: HomeAssistant, recorder_mock: Recorder) -> None:
"""Test deleting old states."""
assert recorder_mock.states_manager.oldest_ts is None
oldest_ts = recorder_mock.states_manager.oldest_ts

await _add_test_states(hass)

# make sure we start with 6 states
Expand All @@ -127,6 +130,10 @@ async def test_purge_old_states(hass: HomeAssistant, recorder_mock: Recorder) ->
events = session.query(Events).filter(Events.event_type == "state_changed")
assert events.count() == 0

assert recorder_mock.states_manager.oldest_ts != oldest_ts
assert recorder_mock.states_manager.oldest_ts == states[0].last_updated_ts
oldest_ts = recorder_mock.states_manager.oldest_ts

assert "test.recorder2" in recorder_mock.states_manager._last_committed_id

purge_before = dt_util.utcnow() - timedelta(days=4)
Expand All @@ -140,6 +147,8 @@ async def test_purge_old_states(hass: HomeAssistant, recorder_mock: Recorder) ->
repack=False,
)
assert not finished
# states_manager.oldest_ts is not updated until after the purge is complete
assert recorder_mock.states_manager.oldest_ts == oldest_ts

with session_scope(hass=hass) as session:
states = session.query(States)
Expand All @@ -162,13 +171,19 @@ async def test_purge_old_states(hass: HomeAssistant, recorder_mock: Recorder) ->

finished = purge_old_data(recorder_mock, purge_before, repack=False)
assert finished
# states_manager.oldest_ts should now be updated
assert recorder_mock.states_manager.oldest_ts != oldest_ts

with session_scope(hass=hass) as session:
states = session.query(States)
state_attributes = session.query(StateAttributes)
assert states.count() == 2
assert state_attributes.count() == 1

assert recorder_mock.states_manager.oldest_ts != oldest_ts
assert recorder_mock.states_manager.oldest_ts == states[0].last_updated_ts
oldest_ts = recorder_mock.states_manager.oldest_ts

assert "test.recorder2" in recorder_mock.states_manager._last_committed_id

# run purge_old_data again
Expand All @@ -181,6 +196,8 @@ async def test_purge_old_states(hass: HomeAssistant, recorder_mock: Recorder) ->
repack=False,
)
assert not finished
# states_manager.oldest_ts is not updated until after the purge is complete
assert recorder_mock.states_manager.oldest_ts == oldest_ts

with session_scope(hass=hass) as session:
assert states.count() == 0
Expand Down