diff --git a/homeassistant/components/recorder/statistics.py b/homeassistant/components/recorder/statistics.py index 0ea16e09df4a49..a6fe7ddb22f9dc 100644 --- a/homeassistant/components/recorder/statistics.py +++ b/homeassistant/components/recorder/statistics.py @@ -526,7 +526,7 @@ def _compile_statistics( ): continue compiled: PlatformCompiledStatistics = platform_compile_statistics( - instance.hass, start, end + instance.hass, session, start, end ) _LOGGER.debug( "Statistics for %s during %s-%s: %s", @@ -1871,7 +1871,7 @@ def get_latest_short_term_statistics_by_ids( return list( cast( Sequence[Row], - execute_stmt_lambda_element(session, stmt, orm_rows=False), + execute_stmt_lambda_element(session, stmt), ) ) @@ -1887,75 +1887,69 @@ def _latest_short_term_statistics_by_ids_stmt( ) -def get_latest_short_term_statistics( +def get_latest_short_term_statistics_with_session( hass: HomeAssistant, + session: Session, statistic_ids: set[str], types: set[Literal["last_reset", "max", "mean", "min", "state", "sum"]], metadata: dict[str, tuple[int, StatisticMetaData]] | None = None, ) -> dict[str, list[StatisticsRow]]: - """Return the latest short term statistics for a list of statistic_ids.""" - with session_scope(hass=hass, read_only=True) as session: - # Fetch metadata for the given statistic_ids - if not metadata: - metadata = get_instance(hass).statistics_meta_manager.get_many( - session, statistic_ids=statistic_ids - ) - if not metadata: - return {} - metadata_ids = set( - _extract_metadata_and_discard_impossible_columns(metadata, types) + """Return the latest short term statistics for a list of statistic_ids with a session.""" + # Fetch metadata for the given statistic_ids + if not metadata: + metadata = get_instance(hass).statistics_meta_manager.get_many( + session, statistic_ids=statistic_ids ) - run_cache = get_short_term_statistics_run_cache(hass) - # Try to find the latest short term statistics ids for the metadata_ids - # from the run cache first if we have it. If the run cache references - # a non-existent id because of a purge, we will detect it missing in the - # next step and run a query to re-populate the cache. - stats: list[Row] = [] - if metadata_id_to_id := run_cache.get_latest_ids(metadata_ids): - stats = get_latest_short_term_statistics_by_ids( - session, metadata_id_to_id.values() - ) - # If we are missing some metadata_ids in the run cache, we need run a query - # to populate the cache for each metadata_id, and then run another query - # to get the latest short term statistics for the missing metadata_ids. - if (missing_metadata_ids := metadata_ids - set(metadata_id_to_id)) and ( - found_latest_ids := { - latest_id - for metadata_id in missing_metadata_ids - if ( - latest_id := cache_latest_short_term_statistic_id_for_metadata_id( - # orm_rows=False is used here because we are in - # a read-only session, and there will never be - # any pending inserts in the session. - run_cache, - session, - metadata_id, - orm_rows=False, - ) + if not metadata: + return {} + metadata_ids = set( + _extract_metadata_and_discard_impossible_columns(metadata, types) + ) + run_cache = get_short_term_statistics_run_cache(hass) + # Try to find the latest short term statistics ids for the metadata_ids + # from the run cache first if we have it. If the run cache references + # a non-existent id because of a purge, we will detect it missing in the + # next step and run a query to re-populate the cache. + stats: list[Row] = [] + if metadata_id_to_id := run_cache.get_latest_ids(metadata_ids): + stats = get_latest_short_term_statistics_by_ids( + session, metadata_id_to_id.values() + ) + # If we are missing some metadata_ids in the run cache, we need run a query + # to populate the cache for each metadata_id, and then run another query + # to get the latest short term statistics for the missing metadata_ids. + if (missing_metadata_ids := metadata_ids - set(metadata_id_to_id)) and ( + found_latest_ids := { + latest_id + for metadata_id in missing_metadata_ids + if ( + latest_id := cache_latest_short_term_statistic_id_for_metadata_id( + run_cache, + session, + metadata_id, ) - is not None - } - ): - stats.extend( - get_latest_short_term_statistics_by_ids(session, found_latest_ids) ) + is not None + } + ): + stats.extend(get_latest_short_term_statistics_by_ids(session, found_latest_ids)) - if not stats: - return {} + if not stats: + return {} - # Return statistics combined with metadata - return _sorted_statistics_to_dict( - hass, - session, - stats, - statistic_ids, - metadata, - False, - StatisticsShortTerm, - None, - None, - types, - ) + # Return statistics combined with metadata + return _sorted_statistics_to_dict( + hass, + session, + stats, + statistic_ids, + metadata, + False, + StatisticsShortTerm, + None, + None, + types, + ) def _generate_statistics_at_time_stmt( @@ -2316,14 +2310,8 @@ def _import_statistics_with_session( # We just inserted new short term statistics, so we need to update the # ShortTermStatisticsRunCache with the latest id for the metadata_id run_cache = get_short_term_statistics_run_cache(instance.hass) - # - # Because we are in the same session and we want to read rows - # that have not been flushed yet, we need to pass orm_rows=True - # to cache_latest_short_term_statistic_id_for_metadata_id - # to ensure that it gets the rows that were just inserted - # cache_latest_short_term_statistic_id_for_metadata_id( - run_cache, session, metadata_id, orm_rows=True + run_cache, session, metadata_id ) return True @@ -2341,7 +2329,6 @@ def cache_latest_short_term_statistic_id_for_metadata_id( run_cache: ShortTermStatisticsRunCache, session: Session, metadata_id: int, - orm_rows: bool, ) -> int | None: """Cache the latest short term statistic for a given metadata_id. @@ -2352,13 +2339,7 @@ def cache_latest_short_term_statistic_id_for_metadata_id( if latest := cast( Sequence[Row], execute_stmt_lambda_element( - session, - _find_latest_short_term_statistic_for_metadata_id_stmt(metadata_id), - orm_rows=orm_rows - # _import_statistics_with_session needs to be able - # to read back the rows it just inserted without - # a flush so we have to pass orm_rows so we get - # back the latest data. + session, _find_latest_short_term_statistic_for_metadata_id_stmt(metadata_id) ), ): id_: int = latest[0].id diff --git a/homeassistant/components/sensor/recorder.py b/homeassistant/components/sensor/recorder.py index cb5a81d6b84dcb..3cf1dc975ec7cb 100644 --- a/homeassistant/components/sensor/recorder.py +++ b/homeassistant/components/sensor/recorder.py @@ -16,7 +16,6 @@ get_instance, history, statistics, - util as recorder_util, ) from homeassistant.components.recorder.models import ( StatisticData, @@ -374,27 +373,7 @@ def _timestamp_to_isoformat_or_none(timestamp: float | None) -> str | None: return dt_util.utc_from_timestamp(timestamp).isoformat() -def compile_statistics( - hass: HomeAssistant, start: datetime.datetime, end: datetime.datetime -) -> statistics.PlatformCompiledStatistics: - """Compile statistics for all entities during start-end. - - Note: This will query the database and must not be run in the event loop - """ - # There is already an active session when this code is called since - # it is called from the recorder statistics. We need to make sure - # this session never gets committed since it would be out of sync - # with the recorder statistics session so we mark it as read only. - # - # If we ever need to write to the database from this function we - # will need to refactor the recorder statistics to use a single - # session. - with recorder_util.session_scope(hass=hass, read_only=True) as session: - compiled = _compile_statistics(hass, session, start, end) - return compiled - - -def _compile_statistics( # noqa: C901 +def compile_statistics( # noqa: C901 hass: HomeAssistant, session: Session, start: datetime.datetime, @@ -471,8 +450,8 @@ def _compile_statistics( # noqa: C901 if "sum" in wanted_statistics[entity_id]: to_query.add(entity_id) - last_stats = statistics.get_latest_short_term_statistics( - hass, to_query, {"last_reset", "state", "sum"}, metadata=old_metadatas + last_stats = statistics.get_latest_short_term_statistics_with_session( + hass, session, to_query, {"last_reset", "state", "sum"}, metadata=old_metadatas ) for ( # pylint: disable=too-many-nested-blocks entity_id, diff --git a/tests/components/energy/test_sensor.py b/tests/components/energy/test_sensor.py index f5fea153380e73..bf1513507f882d 100644 --- a/tests/components/energy/test_sensor.py +++ b/tests/components/energy/test_sensor.py @@ -6,6 +6,7 @@ import pytest from homeassistant.components.energy import data +from homeassistant.components.recorder.util import session_scope from homeassistant.components.sensor import ( ATTR_LAST_RESET, ATTR_STATE_CLASS, @@ -155,7 +156,10 @@ async def test_cost_sensor_price_entity_total_increasing( """Test energy cost price from total_increasing type sensor entity.""" def _compile_statistics(_): - return compile_statistics(hass, now, now + timedelta(seconds=1)).platform_stats + with session_scope(hass=hass) as session: + return compile_statistics( + hass, session, now, now + timedelta(seconds=1) + ).platform_stats energy_attributes = { ATTR_UNIT_OF_MEASUREMENT: UnitOfEnergy.KILO_WATT_HOUR, @@ -365,9 +369,10 @@ async def test_cost_sensor_price_entity_total( """Test energy cost price from total type sensor entity.""" def _compile_statistics(_): - return compile_statistics( - hass, now, now + timedelta(seconds=0.17) - ).platform_stats + with session_scope(hass=hass) as session: + return compile_statistics( + hass, session, now, now + timedelta(seconds=0.17) + ).platform_stats energy_attributes = { ATTR_UNIT_OF_MEASUREMENT: UnitOfEnergy.KILO_WATT_HOUR, @@ -579,7 +584,10 @@ async def test_cost_sensor_price_entity_total_no_reset( """Test energy cost price from total type sensor entity with no last_reset.""" def _compile_statistics(_): - return compile_statistics(hass, now, now + timedelta(seconds=1)).platform_stats + with session_scope(hass=hass) as session: + return compile_statistics( + hass, session, now, now + timedelta(seconds=1) + ).platform_stats energy_attributes = { ATTR_UNIT_OF_MEASUREMENT: UnitOfEnergy.KILO_WATT_HOUR, diff --git a/tests/components/recorder/test_statistics.py b/tests/components/recorder/test_statistics.py index e56b2b83274391..03dc7b84caa570 100644 --- a/tests/components/recorder/test_statistics.py +++ b/tests/components/recorder/test_statistics.py @@ -22,7 +22,7 @@ async_import_statistics, get_last_short_term_statistics, get_last_statistics, - get_latest_short_term_statistics, + get_latest_short_term_statistics_with_session, get_metadata, get_short_term_statistics_run_cache, list_statistic_ids, @@ -71,9 +71,13 @@ def test_compile_hourly_statistics(hass_recorder: Callable[..., HomeAssistant]) assert_dict_of_states_equal_without_context_and_last_changed(states, hist) # Should not fail if there is nothing there yet - stats = get_latest_short_term_statistics( - hass, {"sensor.test1"}, {"last_reset", "max", "mean", "min", "state", "sum"} - ) + with session_scope(hass=hass, read_only=True) as session: + stats = get_latest_short_term_statistics_with_session( + hass, + session, + {"sensor.test1"}, + {"last_reset", "max", "mean", "min", "state", "sum"}, + ) assert stats == {} for kwargs in ({}, {"statistic_ids": ["sensor.test1"]}): @@ -172,28 +176,38 @@ def test_compile_hourly_statistics(hass_recorder: Callable[..., HomeAssistant]) ) assert stats == {"sensor.test1": [expected_2]} - stats = get_latest_short_term_statistics( - hass, {"sensor.test1"}, {"last_reset", "max", "mean", "min", "state", "sum"} - ) + with session_scope(hass=hass, read_only=True) as session: + stats = get_latest_short_term_statistics_with_session( + hass, + session, + {"sensor.test1"}, + {"last_reset", "max", "mean", "min", "state", "sum"}, + ) assert stats == {"sensor.test1": [expected_2]} # Now wipe the latest_short_term_statistics_ids table and test again # to make sure we can rebuild the missing data run_cache = get_short_term_statistics_run_cache(instance.hass) run_cache._latest_id_by_metadata_id = {} - stats = get_latest_short_term_statistics( - hass, {"sensor.test1"}, {"last_reset", "max", "mean", "min", "state", "sum"} - ) + with session_scope(hass=hass, read_only=True) as session: + stats = get_latest_short_term_statistics_with_session( + hass, + session, + {"sensor.test1"}, + {"last_reset", "max", "mean", "min", "state", "sum"}, + ) assert stats == {"sensor.test1": [expected_2]} metadata = get_metadata(hass, statistic_ids={"sensor.test1"}) - stats = get_latest_short_term_statistics( - hass, - {"sensor.test1"}, - {"last_reset", "max", "mean", "min", "state", "sum"}, - metadata=metadata, - ) + with session_scope(hass=hass, read_only=True) as session: + stats = get_latest_short_term_statistics_with_session( + hass, + session, + {"sensor.test1"}, + {"last_reset", "max", "mean", "min", "state", "sum"}, + metadata=metadata, + ) assert stats == {"sensor.test1": [expected_2]} stats = get_last_short_term_statistics( @@ -225,10 +239,14 @@ def test_compile_hourly_statistics(hass_recorder: Callable[..., HomeAssistant]) instance.get_session().query(StatisticsShortTerm).delete() # Should not fail there is nothing in the table - stats = get_latest_short_term_statistics( - hass, {"sensor.test1"}, {"last_reset", "max", "mean", "min", "state", "sum"} - ) - assert stats == {} + with session_scope(hass=hass, read_only=True) as session: + stats = get_latest_short_term_statistics_with_session( + hass, + session, + {"sensor.test1"}, + {"last_reset", "max", "mean", "min", "state", "sum"}, + ) + assert stats == {} # Delete again, and manually wipe the cache since we deleted all the data instance.get_session().query(StatisticsShortTerm).delete() @@ -236,9 +254,13 @@ def test_compile_hourly_statistics(hass_recorder: Callable[..., HomeAssistant]) run_cache._latest_id_by_metadata_id = {} # And test again to make sure there is no data - stats = get_latest_short_term_statistics( - hass, {"sensor.test1"}, {"last_reset", "max", "mean", "min", "state", "sum"} - ) + with session_scope(hass=hass, read_only=True) as session: + stats = get_latest_short_term_statistics_with_session( + hass, + session, + {"sensor.test1"}, + {"last_reset", "max", "mean", "min", "state", "sum"}, + ) assert stats == {} @@ -259,7 +281,7 @@ def sensor_stats(entity_id, start): "stat": {"start": start}, } - def get_fake_stats(_hass, start, _end): + def get_fake_stats(_hass, session, start, _end): return statistics.PlatformCompiledStatistics( [ sensor_stats("sensor.test1", start), diff --git a/tests/components/recorder/test_websocket_api.py b/tests/components/recorder/test_websocket_api.py index 969fdd63ae58f6..b371d69fe5f158 100644 --- a/tests/components/recorder/test_websocket_api.py +++ b/tests/components/recorder/test_websocket_api.py @@ -14,11 +14,12 @@ from homeassistant.components.recorder.statistics import ( async_add_external_statistics, get_last_statistics, - get_latest_short_term_statistics, + get_latest_short_term_statistics_with_session, get_metadata, get_short_term_statistics_run_cache, list_statistic_ids, ) +from homeassistant.components.recorder.util import session_scope from homeassistant.components.recorder.websocket_api import UNIT_SCHEMA from homeassistant.components.sensor import UNIT_CONVERTERS from homeassistant.core import HomeAssistant @@ -636,9 +637,13 @@ def next_id(): "change": (imported_stats_5min[-1]["sum"] - imported_stats_5min[0]["sum"]) * 1000, } - stats = get_latest_short_term_statistics( - hass, {"sensor.test"}, {"last_reset", "max", "mean", "min", "state", "sum"} - ) + with session_scope(hass=hass, read_only=True) as session: + stats = get_latest_short_term_statistics_with_session( + hass, + session, + {"sensor.test"}, + {"last_reset", "max", "mean", "min", "state", "sum"}, + ) start = imported_stats_5min[-1]["start"].timestamp() end = start + (5 * 60) assert stats == { diff --git a/tests/components/sensor/test_recorder_missing_stats.py b/tests/components/sensor/test_recorder_missing_stats.py new file mode 100644 index 00000000000000..f6f6445a0fb023 --- /dev/null +++ b/tests/components/sensor/test_recorder_missing_stats.py @@ -0,0 +1,124 @@ +"""The tests for sensor recorder platform can catch up.""" +from datetime import datetime, timedelta +from pathlib import Path +from unittest.mock import patch + +from freezegun.api import FrozenDateTimeFactory +import pytest + +from homeassistant.components.recorder.history import get_significant_states +from homeassistant.components.recorder.statistics import ( + get_latest_short_term_statistics_with_session, + statistics_during_period, +) +from homeassistant.components.recorder.util import session_scope +from homeassistant.core import CoreState, HomeAssistant +from homeassistant.helpers import recorder as recorder_helper +from homeassistant.setup import setup_component +import homeassistant.util.dt as dt_util + +from tests.common import get_test_home_assistant +from tests.components.recorder.common import do_adhoc_statistics, wait_recording_done + +POWER_SENSOR_ATTRIBUTES = { + "device_class": "energy", + "state_class": "measurement", + "unit_of_measurement": "kWh", +} + + +@pytest.fixture(autouse=True) +def disable_db_issue_creation(): + """Disable the creation of the database issue.""" + with patch( + "homeassistant.components.recorder.util._async_create_mariadb_range_index_regression_issue" + ): + yield + + +@pytest.mark.timeout(25) +def test_compile_missing_statistics( + freezer: FrozenDateTimeFactory, recorder_db_url: str, tmp_path: Path +) -> None: + """Test compile missing statistics.""" + if recorder_db_url == "sqlite://": + # On-disk database because we need to stop and start hass + # and have it persist. + recorder_db_url = "sqlite:///" + str(tmp_path / "pytest.db") + config = { + "db_url": recorder_db_url, + } + three_days_ago = datetime(2021, 1, 1, 0, 0, 0, tzinfo=dt_util.UTC) + start_time = three_days_ago + timedelta(days=3) + freezer.move_to(three_days_ago) + hass: HomeAssistant = get_test_home_assistant() + hass.state = CoreState.not_running + recorder_helper.async_initialize_recorder(hass) + setup_component(hass, "sensor", {}) + setup_component(hass, "recorder", {"recorder": config}) + hass.start() + wait_recording_done(hass) + wait_recording_done(hass) + + hass.states.set("sensor.test1", "0", POWER_SENSOR_ATTRIBUTES) + wait_recording_done(hass) + + two_days_ago = three_days_ago + timedelta(days=1) + freezer.move_to(two_days_ago) + do_adhoc_statistics(hass, start=two_days_ago) + wait_recording_done(hass) + with session_scope(hass=hass, read_only=True) as session: + latest = get_latest_short_term_statistics_with_session( + hass, session, {"sensor.test1"}, {"state", "sum"} + ) + latest_stat = latest["sensor.test1"][0] + assert latest_stat["start"] == 1609545600.0 + assert latest_stat["end"] == 1609545600.0 + 300 + count = 1 + past_time = two_days_ago + while past_time <= start_time: + freezer.move_to(past_time) + hass.states.set("sensor.test1", str(count), POWER_SENSOR_ATTRIBUTES) + past_time += timedelta(minutes=5) + count += 1 + + wait_recording_done(hass) + + states = get_significant_states(hass, three_days_ago, past_time, ["sensor.test1"]) + assert len(states["sensor.test1"]) == 577 + + hass.stop() + freezer.move_to(start_time) + hass: HomeAssistant = get_test_home_assistant() + hass.state = CoreState.not_running + recorder_helper.async_initialize_recorder(hass) + setup_component(hass, "sensor", {}) + hass.states.set("sensor.test1", "0", POWER_SENSOR_ATTRIBUTES) + setup_component(hass, "recorder", {"recorder": config}) + hass.start() + wait_recording_done(hass) + wait_recording_done(hass) + with session_scope(hass=hass, read_only=True) as session: + latest = get_latest_short_term_statistics_with_session( + hass, session, {"sensor.test1"}, {"state", "sum", "max", "mean", "min"} + ) + latest_stat = latest["sensor.test1"][0] + assert latest_stat["start"] == 1609718100.0 + assert latest_stat["end"] == 1609718100.0 + 300 + assert latest_stat["mean"] == 576.0 + assert latest_stat["min"] == 575.0 + assert latest_stat["max"] == 576.0 + stats = statistics_during_period( + hass, + two_days_ago, + start_time, + units={"energy": "kWh"}, + statistic_ids={"sensor.test1"}, + period="hour", + types={"mean"}, + ) + # Make sure we have 48 hours of statistics + assert len(stats["sensor.test1"]) == 48 + # Make sure the last mean is 570.5 + assert stats["sensor.test1"][-1]["mean"] == 570.5 + hass.stop()