Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix compiling missing statistics losing rows #101616

Merged
merged 17 commits into from
Oct 8, 2023
133 changes: 57 additions & 76 deletions homeassistant/components/recorder/statistics.py
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,7 @@ def _compile_statistics(
):
continue
compiled: PlatformCompiledStatistics = platform_compile_statistics(
instance.hass, start, end
instance.hass, session, start, end
)
_LOGGER.debug(
"Statistics for %s during %s-%s: %s",
Expand Down Expand Up @@ -1871,7 +1871,7 @@ def get_latest_short_term_statistics_by_ids(
return list(
cast(
Sequence[Row],
execute_stmt_lambda_element(session, stmt, orm_rows=False),
execute_stmt_lambda_element(session, stmt),
)
)

Expand All @@ -1887,75 +1887,69 @@ def _latest_short_term_statistics_by_ids_stmt(
)


def get_latest_short_term_statistics(
def get_latest_short_term_statistics_with_session(
hass: HomeAssistant,
session: Session,
statistic_ids: set[str],
types: set[Literal["last_reset", "max", "mean", "min", "state", "sum"]],
metadata: dict[str, tuple[int, StatisticMetaData]] | None = None,
) -> dict[str, list[StatisticsRow]]:
"""Return the latest short term statistics for a list of statistic_ids."""
with session_scope(hass=hass, read_only=True) as session:
# Fetch metadata for the given statistic_ids
if not metadata:
metadata = get_instance(hass).statistics_meta_manager.get_many(
session, statistic_ids=statistic_ids
)
if not metadata:
return {}
metadata_ids = set(
_extract_metadata_and_discard_impossible_columns(metadata, types)
"""Return the latest short term statistics for a list of statistic_ids with a session."""
# Fetch metadata for the given statistic_ids
if not metadata:
metadata = get_instance(hass).statistics_meta_manager.get_many(
session, statistic_ids=statistic_ids
)
run_cache = get_short_term_statistics_run_cache(hass)
# Try to find the latest short term statistics ids for the metadata_ids
# from the run cache first if we have it. If the run cache references
# a non-existent id because of a purge, we will detect it missing in the
# next step and run a query to re-populate the cache.
stats: list[Row] = []
if metadata_id_to_id := run_cache.get_latest_ids(metadata_ids):
stats = get_latest_short_term_statistics_by_ids(
session, metadata_id_to_id.values()
)
# If we are missing some metadata_ids in the run cache, we need run a query
# to populate the cache for each metadata_id, and then run another query
# to get the latest short term statistics for the missing metadata_ids.
if (missing_metadata_ids := metadata_ids - set(metadata_id_to_id)) and (
found_latest_ids := {
latest_id
for metadata_id in missing_metadata_ids
if (
latest_id := cache_latest_short_term_statistic_id_for_metadata_id(
# orm_rows=False is used here because we are in
# a read-only session, and there will never be
# any pending inserts in the session.
run_cache,
session,
metadata_id,
orm_rows=False,
)
if not metadata:
return {}
metadata_ids = set(
_extract_metadata_and_discard_impossible_columns(metadata, types)
)
run_cache = get_short_term_statistics_run_cache(hass)
# Try to find the latest short term statistics ids for the metadata_ids
# from the run cache first if we have it. If the run cache references
# a non-existent id because of a purge, we will detect it missing in the
# next step and run a query to re-populate the cache.
stats: list[Row] = []
if metadata_id_to_id := run_cache.get_latest_ids(metadata_ids):
stats = get_latest_short_term_statistics_by_ids(
session, metadata_id_to_id.values()
)
# If we are missing some metadata_ids in the run cache, we need run a query
# to populate the cache for each metadata_id, and then run another query
# to get the latest short term statistics for the missing metadata_ids.
if (missing_metadata_ids := metadata_ids - set(metadata_id_to_id)) and (
found_latest_ids := {
latest_id
for metadata_id in missing_metadata_ids
if (
latest_id := cache_latest_short_term_statistic_id_for_metadata_id(
run_cache,
session,
metadata_id,
)
is not None
}
):
stats.extend(
get_latest_short_term_statistics_by_ids(session, found_latest_ids)
)
is not None
}
):
stats.extend(get_latest_short_term_statistics_by_ids(session, found_latest_ids))

if not stats:
return {}
if not stats:
return {}

# Return statistics combined with metadata
return _sorted_statistics_to_dict(
hass,
session,
stats,
statistic_ids,
metadata,
False,
StatisticsShortTerm,
None,
None,
types,
)
# Return statistics combined with metadata
return _sorted_statistics_to_dict(
hass,
session,
stats,
statistic_ids,
metadata,
False,
StatisticsShortTerm,
None,
None,
types,
)


def _generate_statistics_at_time_stmt(
Expand Down Expand Up @@ -2316,14 +2310,8 @@ def _import_statistics_with_session(
# We just inserted new short term statistics, so we need to update the
# ShortTermStatisticsRunCache with the latest id for the metadata_id
run_cache = get_short_term_statistics_run_cache(instance.hass)
#
# Because we are in the same session and we want to read rows
# that have not been flushed yet, we need to pass orm_rows=True
# to cache_latest_short_term_statistic_id_for_metadata_id
# to ensure that it gets the rows that were just inserted
#
cache_latest_short_term_statistic_id_for_metadata_id(
run_cache, session, metadata_id, orm_rows=True
run_cache, session, metadata_id
)

return True
Expand All @@ -2341,7 +2329,6 @@ def cache_latest_short_term_statistic_id_for_metadata_id(
run_cache: ShortTermStatisticsRunCache,
session: Session,
metadata_id: int,
orm_rows: bool,
) -> int | None:
"""Cache the latest short term statistic for a given metadata_id.

Expand All @@ -2352,13 +2339,7 @@ def cache_latest_short_term_statistic_id_for_metadata_id(
if latest := cast(
Sequence[Row],
execute_stmt_lambda_element(
session,
_find_latest_short_term_statistic_for_metadata_id_stmt(metadata_id),
orm_rows=orm_rows
# _import_statistics_with_session needs to be able
# to read back the rows it just inserted without
# a flush so we have to pass orm_rows so we get
# back the latest data.
session, _find_latest_short_term_statistic_for_metadata_id_stmt(metadata_id)
),
):
id_: int = latest[0].id
Expand Down
27 changes: 3 additions & 24 deletions homeassistant/components/sensor/recorder.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
get_instance,
history,
statistics,
util as recorder_util,
)
from homeassistant.components.recorder.models import (
StatisticData,
Expand Down Expand Up @@ -374,27 +373,7 @@ def _timestamp_to_isoformat_or_none(timestamp: float | None) -> str | None:
return dt_util.utc_from_timestamp(timestamp).isoformat()


def compile_statistics(
hass: HomeAssistant, start: datetime.datetime, end: datetime.datetime
) -> statistics.PlatformCompiledStatistics:
"""Compile statistics for all entities during start-end.

Note: This will query the database and must not be run in the event loop
"""
# There is already an active session when this code is called since
# it is called from the recorder statistics. We need to make sure
# this session never gets committed since it would be out of sync
# with the recorder statistics session so we mark it as read only.
#
# If we ever need to write to the database from this function we
# will need to refactor the recorder statistics to use a single
# session.
Comment on lines -384 to -391
Copy link
Member Author

@bdraco bdraco Oct 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should have refactored this as soon as I noticed this was happening when I added this comment in f6f3565 This wasn't the source of the problem, but its when I noticed this pattern.

I added the comment because I was worried this was a bit brittle but I didn't realize it was actually a problem as well at the time because I didn't understand the full impact of the nested sessions.

The irony is, I was too concerned about refactoring risk that I under-estimated the impact here even though I thought it was a problem enough to add this comment 🤦

with recorder_util.session_scope(hass=hass, read_only=True) as session:
compiled = _compile_statistics(hass, session, start, end)
return compiled


def _compile_statistics( # noqa: C901
def compile_statistics( # noqa: C901
hass: HomeAssistant,
session: Session,
start: datetime.datetime,
Expand Down Expand Up @@ -471,8 +450,8 @@ def _compile_statistics( # noqa: C901
if "sum" in wanted_statistics[entity_id]:
to_query.add(entity_id)

last_stats = statistics.get_latest_short_term_statistics(
hass, to_query, {"last_reset", "state", "sum"}, metadata=old_metadatas
last_stats = statistics.get_latest_short_term_statistics_with_session(
hass, session, to_query, {"last_reset", "state", "sum"}, metadata=old_metadatas
)
Comment on lines +453 to 455
Copy link
Member Author

@bdraco bdraco Oct 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was the only call that created a new session here which is the source of the issue

for ( # pylint: disable=too-many-nested-blocks
entity_id,
Expand Down
18 changes: 13 additions & 5 deletions tests/components/energy/test_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import pytest

from homeassistant.components.energy import data
from homeassistant.components.recorder.util import session_scope
from homeassistant.components.sensor import (
ATTR_LAST_RESET,
ATTR_STATE_CLASS,
Expand Down Expand Up @@ -155,7 +156,10 @@ async def test_cost_sensor_price_entity_total_increasing(
"""Test energy cost price from total_increasing type sensor entity."""

def _compile_statistics(_):
return compile_statistics(hass, now, now + timedelta(seconds=1)).platform_stats
with session_scope(hass=hass) as session:
return compile_statistics(
hass, session, now, now + timedelta(seconds=1)
).platform_stats

energy_attributes = {
ATTR_UNIT_OF_MEASUREMENT: UnitOfEnergy.KILO_WATT_HOUR,
Expand Down Expand Up @@ -365,9 +369,10 @@ async def test_cost_sensor_price_entity_total(
"""Test energy cost price from total type sensor entity."""

def _compile_statistics(_):
return compile_statistics(
hass, now, now + timedelta(seconds=0.17)
).platform_stats
with session_scope(hass=hass) as session:
return compile_statistics(
hass, session, now, now + timedelta(seconds=0.17)
).platform_stats

energy_attributes = {
ATTR_UNIT_OF_MEASUREMENT: UnitOfEnergy.KILO_WATT_HOUR,
Expand Down Expand Up @@ -579,7 +584,10 @@ async def test_cost_sensor_price_entity_total_no_reset(
"""Test energy cost price from total type sensor entity with no last_reset."""

def _compile_statistics(_):
return compile_statistics(hass, now, now + timedelta(seconds=1)).platform_stats
with session_scope(hass=hass) as session:
return compile_statistics(
hass, session, now, now + timedelta(seconds=1)
).platform_stats

energy_attributes = {
ATTR_UNIT_OF_MEASUREMENT: UnitOfEnergy.KILO_WATT_HOUR,
Expand Down
Loading