Skip to content

Commit

Permalink
Revert "Optimize start time state queries for PostgreSQL" (#133555)
Browse files Browse the repository at this point in the history
  • Loading branch information
bdraco authored Dec 19, 2024
1 parent a3fb6e8 commit 69a8d3f
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 400 deletions.
76 changes: 17 additions & 59 deletions homeassistant/components/recorder/history/modern.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,8 @@
from homeassistant.helpers.recorder import get_instance
import homeassistant.util.dt as dt_util

from ..const import LAST_REPORTED_SCHEMA_VERSION, SupportedDialect
from ..db_schema import (
SHARED_ATTR_OR_LEGACY_ATTRIBUTES,
StateAttributes,
States,
StatesMeta,
)
from ..const import LAST_REPORTED_SCHEMA_VERSION
from ..db_schema import SHARED_ATTR_OR_LEGACY_ATTRIBUTES, StateAttributes, States
from ..filters import Filters
from ..models import (
LazyState,
Expand Down Expand Up @@ -150,7 +145,6 @@ def _significant_states_stmt(
no_attributes: bool,
include_start_time_state: bool,
run_start_ts: float | None,
lateral_join_for_start_time: bool,
) -> Select | CompoundSelect:
"""Query the database for significant state changes."""
include_last_changed = not significant_changes_only
Expand Down Expand Up @@ -190,7 +184,6 @@ def _significant_states_stmt(
metadata_ids,
no_attributes,
include_last_changed,
lateral_join_for_start_time,
).subquery(),
no_attributes,
include_last_changed,
Expand Down Expand Up @@ -261,7 +254,6 @@ def get_significant_states_with_session(
start_time_ts = start_time.timestamp()
end_time_ts = datetime_to_timestamp_or_none(end_time)
single_metadata_id = metadata_ids[0] if len(metadata_ids) == 1 else None
lateral_join_for_start_time = instance.dialect_name == SupportedDialect.POSTGRESQL
stmt = lambda_stmt(
lambda: _significant_states_stmt(
start_time_ts,
Expand All @@ -273,7 +265,6 @@ def get_significant_states_with_session(
no_attributes,
include_start_time_state,
run_start_ts,
lateral_join_for_start_time,
),
track_on=[
bool(single_metadata_id),
Expand Down Expand Up @@ -565,61 +556,30 @@ def _get_start_time_state_for_entities_stmt(
metadata_ids: list[int],
no_attributes: bool,
include_last_changed: bool,
lateral_join_for_start_time: bool,
) -> Select:
"""Baked query to get states for specific entities."""
# We got an include-list of entities, accelerate the query by filtering already
# in the inner and the outer query.
if lateral_join_for_start_time:
# PostgreSQL does not support index skip scan/loose index scan
# https://wiki.postgresql.org/wiki/Loose_indexscan
# so we need to do a lateral join to get the max last_updated_ts
# for each metadata_id as a group-by is too slow.
# https://github.com/home-assistant/core/issues/132865
max_metadata_id = StatesMeta.metadata_id.label("max_metadata_id")
max_last_updated = (
select(func.max(States.last_updated_ts))
.where(
(States.metadata_id == max_metadata_id)
& (States.last_updated_ts >= run_start_ts)
& (States.last_updated_ts < epoch_time)
)
.subquery()
.lateral()
)
most_recent_states_for_entities_by_date = (
select(max_metadata_id, max_last_updated.c[0].label("max_last_updated"))
.select_from(StatesMeta)
.join(
max_last_updated,
StatesMeta.metadata_id == max_metadata_id,
)
.where(StatesMeta.metadata_id.in_(metadata_ids))
).subquery()
else:
# Simple group-by for MySQL and SQLite, must use less
# than 1000 metadata_ids in the IN clause for MySQL
# or it will optimize poorly.
most_recent_states_for_entities_by_date = (
select(
States.metadata_id.label("max_metadata_id"),
func.max(States.last_updated_ts).label("max_last_updated"),
)
.filter(
(States.last_updated_ts >= run_start_ts)
& (States.last_updated_ts < epoch_time)
& States.metadata_id.in_(metadata_ids)
)
.group_by(States.metadata_id)
.subquery()
)

stmt = (
_stmt_and_join_attributes_for_start_state(
no_attributes, include_last_changed, False
)
.join(
most_recent_states_for_entities_by_date,
(
most_recent_states_for_entities_by_date := (
select(
States.metadata_id.label("max_metadata_id"),
func.max(States.last_updated_ts).label("max_last_updated"),
)
.filter(
(States.last_updated_ts >= run_start_ts)
& (States.last_updated_ts < epoch_time)
& States.metadata_id.in_(metadata_ids)
)
.group_by(States.metadata_id)
.subquery()
)
),
and_(
States.metadata_id
== most_recent_states_for_entities_by_date.c.max_metadata_id,
Expand Down Expand Up @@ -661,7 +621,6 @@ def _get_start_time_state_stmt(
metadata_ids: list[int],
no_attributes: bool,
include_last_changed: bool,
lateral_join_for_start_time: bool,
) -> Select:
"""Return the states at a specific point in time."""
if single_metadata_id:
Expand All @@ -682,7 +641,6 @@ def _get_start_time_state_stmt(
metadata_ids,
no_attributes,
include_last_changed,
lateral_join_for_start_time,
)


Expand Down
61 changes: 13 additions & 48 deletions homeassistant/components/recorder/statistics.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@
STATISTICS_TABLES,
Statistics,
StatisticsBase,
StatisticsMeta,
StatisticsRuns,
StatisticsShortTerm,
)
Expand Down Expand Up @@ -1670,7 +1669,6 @@ def _augment_result_with_change(
drop_sum = "sum" not in _types
prev_sums = {}
if tmp := _statistics_at_time(
hass,
session,
{metadata[statistic_id][0] for statistic_id in result},
table,
Expand Down Expand Up @@ -2034,50 +2032,22 @@ def _generate_statistics_at_time_stmt(
metadata_ids: set[int],
start_time_ts: float,
types: set[Literal["last_reset", "max", "mean", "min", "state", "sum"]],
lateral_join_for_start_time: bool,
) -> StatementLambdaElement:
"""Create the statement for finding the statistics for a given time."""
stmt = _generate_select_columns_for_types_stmt(table, types)
if lateral_join_for_start_time:
# PostgreSQL does not support index skip scan/loose index scan
# https://wiki.postgresql.org/wiki/Loose_indexscan
# so we need to do a lateral join to get the max max_start_ts
# for each metadata_id as a group-by is too slow.
# https://github.com/home-assistant/core/issues/132865
max_metadata_id = StatisticsMeta.id.label("max_metadata_id")
max_start = (
select(func.max(table.start_ts))
.filter(table.metadata_id == max_metadata_id)
.filter(table.start_ts < start_time_ts)
.filter(table.metadata_id.in_(metadata_ids))
.subquery()
.lateral()
)
most_recent_statistic_ids = (
select(max_metadata_id, max_start.c[0].label("max_start_ts"))
.select_from(StatisticsMeta)
.join(
max_start,
StatisticsMeta.id == max_metadata_id,
)
.where(StatisticsMeta.id.in_(metadata_ids))
).subquery()
else:
# Simple group-by for MySQL and SQLite, must use less
# than 1000 metadata_ids in the IN clause for MySQL
# or it will optimize poorly.
most_recent_statistic_ids = (
select(
func.max(table.start_ts).label("max_start_ts"),
table.metadata_id.label("max_metadata_id"),
)
.filter(table.start_ts < start_time_ts)
.filter(table.metadata_id.in_(metadata_ids))
.group_by(table.metadata_id)
.subquery()
)
stmt += lambda q: q.join(
most_recent_statistic_ids,
(
most_recent_statistic_ids := (
select(
func.max(table.start_ts).label("max_start_ts"),
table.metadata_id.label("max_metadata_id"),
)
.filter(table.start_ts < start_time_ts)
.filter(table.metadata_id.in_(metadata_ids))
.group_by(table.metadata_id)
.subquery()
)
),
and_(
table.start_ts == most_recent_statistic_ids.c.max_start_ts,
table.metadata_id == most_recent_statistic_ids.c.max_metadata_id,
Expand All @@ -2087,7 +2057,6 @@ def _generate_statistics_at_time_stmt(


def _statistics_at_time(
hass: HomeAssistant,
session: Session,
metadata_ids: set[int],
table: type[StatisticsBase],
Expand All @@ -2096,11 +2065,7 @@ def _statistics_at_time(
) -> Sequence[Row] | None:
"""Return last known statistics, earlier than start_time, for the metadata_ids."""
start_time_ts = start_time.timestamp()
dialect_name = get_instance(hass).dialect_name
lateral_join_for_start_time = dialect_name == SupportedDialect.POSTGRESQL
stmt = _generate_statistics_at_time_stmt(
table, metadata_ids, start_time_ts, types, lateral_join_for_start_time
)
stmt = _generate_statistics_at_time_stmt(table, metadata_ids, start_time_ts, types)
return cast(Sequence[Row], execute_stmt_lambda_element(session, stmt))


Expand Down
124 changes: 0 additions & 124 deletions tests/components/recorder/test_history.py
Original file line number Diff line number Diff line change
Expand Up @@ -1014,127 +1014,3 @@ async def test_get_last_state_changes_with_non_existent_entity_ids_returns_empty
) -> None:
"""Test get_last_state_changes returns an empty dict when entities not in the db."""
assert history.get_last_state_changes(hass, 1, "nonexistent.entity") == {}


@pytest.mark.skip_on_db_engine(["sqlite", "mysql"])
@pytest.mark.usefixtures("skip_by_db_engine")
@pytest.mark.usefixtures("recorder_db_url")
async def test_get_significant_states_with_session_uses_lateral_with_postgresql(
hass: HomeAssistant, caplog: pytest.LogCaptureFixture
) -> None:
"""Test get_significant_states_with_session uses the lateral path with PostgreSQL."""
entity_id = "media_player.test"
hass.states.async_set("any.other", "on")
await async_wait_recording_done(hass)
hass.states.async_set(entity_id, "off")

def set_state(state):
"""Set the state."""
hass.states.async_set(entity_id, state, {"any": 1})
return hass.states.get(entity_id)

start = dt_util.utcnow().replace(microsecond=0)
point = start + timedelta(seconds=1)
point2 = start + timedelta(seconds=1, microseconds=100)
point3 = start + timedelta(seconds=1, microseconds=200)
end = point + timedelta(seconds=1, microseconds=400)

with freeze_time(start) as freezer:
set_state("idle")
set_state("YouTube")

freezer.move_to(point)
states = [set_state("idle")]

freezer.move_to(point2)
states.append(set_state("Netflix"))

freezer.move_to(point3)
states.append(set_state("Plex"))

freezer.move_to(end)
set_state("Netflix")
set_state("Plex")
await async_wait_recording_done(hass)

start_time = point2 + timedelta(microseconds=10)
hist = history.get_significant_states(
hass=hass,
start_time=start_time, # Pick a point where we will generate a start time state
end_time=end,
entity_ids=[entity_id, "any.other"],
include_start_time_state=True,
)
assert len(hist[entity_id]) == 2

sqlalchemy_logs = "".join(
[
record.getMessage()
for record in caplog.records
if record.name.startswith("sqlalchemy.engine")
]
)
# We can't patch inside the lambda so we have to check the logs
assert "JOIN LATERAL" in sqlalchemy_logs


@pytest.mark.skip_on_db_engine(["postgresql"])
@pytest.mark.usefixtures("skip_by_db_engine")
@pytest.mark.usefixtures("recorder_db_url")
async def test_get_significant_states_with_session_uses_non_lateral_without_postgresql(
hass: HomeAssistant, caplog: pytest.LogCaptureFixture
) -> None:
"""Test get_significant_states_with_session does not use a the lateral path without PostgreSQL."""
entity_id = "media_player.test"
hass.states.async_set("any.other", "on")
await async_wait_recording_done(hass)
hass.states.async_set(entity_id, "off")

def set_state(state):
"""Set the state."""
hass.states.async_set(entity_id, state, {"any": 1})
return hass.states.get(entity_id)

start = dt_util.utcnow().replace(microsecond=0)
point = start + timedelta(seconds=1)
point2 = start + timedelta(seconds=1, microseconds=100)
point3 = start + timedelta(seconds=1, microseconds=200)
end = point + timedelta(seconds=1, microseconds=400)

with freeze_time(start) as freezer:
set_state("idle")
set_state("YouTube")

freezer.move_to(point)
states = [set_state("idle")]

freezer.move_to(point2)
states.append(set_state("Netflix"))

freezer.move_to(point3)
states.append(set_state("Plex"))

freezer.move_to(end)
set_state("Netflix")
set_state("Plex")
await async_wait_recording_done(hass)

start_time = point2 + timedelta(microseconds=10)
hist = history.get_significant_states(
hass=hass,
start_time=start_time, # Pick a point where we will generate a start time state
end_time=end,
entity_ids=[entity_id, "any.other"],
include_start_time_state=True,
)
assert len(hist[entity_id]) == 2

sqlalchemy_logs = "".join(
[
record.getMessage()
for record in caplog.records
if record.name.startswith("sqlalchemy.engine")
]
)
# We can't patch inside the lambda so we have to check the logs
assert "JOIN LATERAL" not in sqlalchemy_logs
Loading

0 comments on commit 69a8d3f

Please sign in to comment.