Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent endless loop in recorder when using a filter and there are no more states to purge #126149

Merged
merged 6 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions homeassistant/components/recorder/purge.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ def purge_old_data(
_LOGGER.debug("Purging hasn't fully completed yet")
return False

if apply_filter and _purge_filtered_data(instance, session) is False:
if apply_filter and not _purge_filtered_data(instance, session):
_LOGGER.debug("Cleanup filtered data hasn't fully completed yet")
return False

Expand Down Expand Up @@ -631,15 +631,18 @@ def _purge_old_entity_ids(instance: Recorder, session: Session) -> None:


def _purge_filtered_data(instance: Recorder, session: Session) -> bool:
"""Remove filtered states and events that shouldn't be in the database."""
"""Remove filtered states and events that shouldn't be in the database.

Returns true if all states and events are purged.
"""
_LOGGER.debug("Cleanup filtered data")
database_engine = instance.database_engine
assert database_engine is not None
now_timestamp = time.time()

# Check if excluded entity_ids are in database
entity_filter = instance.entity_filter
has_more_states_to_purge = False
has_more_to_purge = False
excluded_metadata_ids: list[str] = [
metadata_id
for (metadata_id, entity_id) in session.query(
Expand All @@ -648,12 +651,11 @@ def _purge_filtered_data(instance: Recorder, session: Session) -> bool:
if entity_filter and not entity_filter(entity_id)
]
if excluded_metadata_ids:
has_more_states_to_purge = _purge_filtered_states(
has_more_to_purge |= not _purge_filtered_states(
instance, session, excluded_metadata_ids, database_engine, now_timestamp
)

# Check if excluded event_types are in database
has_more_events_to_purge = False
if (
event_type_to_event_type_ids := instance.event_type_manager.get_many(
instance.exclude_event_types, session
Expand All @@ -665,12 +667,12 @@ def _purge_filtered_data(instance: Recorder, session: Session) -> bool:
if event_type_id is not None
]
):
has_more_events_to_purge = _purge_filtered_events(
has_more_to_purge |= not _purge_filtered_events(
instance, session, excluded_event_type_ids, now_timestamp
)

# Purge has completed if there are not more state or events to purge
return not (has_more_states_to_purge or has_more_events_to_purge)
return not has_more_to_purge


def _purge_filtered_states(
Expand Down
165 changes: 165 additions & 0 deletions tests/components/recorder/test_purge.py
Original file line number Diff line number Diff line change
Expand Up @@ -964,6 +964,171 @@ def _add_db_entries(hass: HomeAssistant) -> None:
assert session.query(StateAttributes).count() == 0


@pytest.mark.parametrize(
"recorder_config", [{"exclude": {"entities": ["sensor.excluded"]}}]
)
async def test_purge_filtered_states_multiple_rounds(
hass: HomeAssistant,
recorder_mock: Recorder,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test filtered states are purged when there are multiple rounds to purge."""
assert recorder_mock.entity_filter("sensor.excluded") is False

def _add_db_entries(hass: HomeAssistant) -> None:
with session_scope(hass=hass) as session:
# Add states and state_changed events that should be purged
for days in range(1, 4):
timestamp = dt_util.utcnow() - timedelta(days=days)
for event_id in range(1000, 1020):
_add_state_with_state_attributes(
session,
"sensor.excluded",
"purgeme",
timestamp,
event_id * days,
)
# Add state **without** state_changed event that should be purged
timestamp = dt_util.utcnow() - timedelta(days=1)
session.add(
States(
entity_id="sensor.excluded",
state="purgeme",
attributes="{}",
last_changed_ts=dt_util.utc_to_timestamp(timestamp),
last_updated_ts=dt_util.utc_to_timestamp(timestamp),
)
)
# Add states and state_changed events that should be keeped
timestamp = dt_util.utcnow() - timedelta(days=2)
for event_id in range(200, 210):
_add_state_with_state_attributes(
session,
"sensor.keep",
"keep",
timestamp,
event_id,
)
# Add states with linked old_state_ids that need to be handled
timestamp = dt_util.utcnow() - timedelta(days=0)
state_attrs = StateAttributes(
hash=0,
shared_attrs=json.dumps(
{"sensor.linked_old_state_id": "sensor.linked_old_state_id"}
),
)
state_1 = States(
entity_id="sensor.linked_old_state_id",
state="keep",
attributes="{}",
last_changed_ts=dt_util.utc_to_timestamp(timestamp),
last_updated_ts=dt_util.utc_to_timestamp(timestamp),
old_state_id=1,
state_attributes=state_attrs,
)
timestamp = dt_util.utcnow() - timedelta(days=4)
state_2 = States(
entity_id="sensor.linked_old_state_id",
state="keep",
attributes="{}",
last_changed_ts=dt_util.utc_to_timestamp(timestamp),
last_updated_ts=dt_util.utc_to_timestamp(timestamp),
old_state_id=2,
state_attributes=state_attrs,
)
state_3 = States(
entity_id="sensor.linked_old_state_id",
state="keep",
attributes="{}",
last_changed_ts=dt_util.utc_to_timestamp(timestamp),
last_updated_ts=dt_util.utc_to_timestamp(timestamp),
old_state_id=62, # keep
state_attributes=state_attrs,
)
session.add_all((state_attrs, state_1, state_2, state_3))
# Add event that should be keeped
session.add(
Events(
event_id=100,
event_type="EVENT_KEEP",
event_data="{}",
origin="LOCAL",
time_fired_ts=dt_util.utc_to_timestamp(timestamp),
)
)
convert_pending_states_to_meta(recorder_mock, session)
convert_pending_events_to_event_types(recorder_mock, session)

service_data = {"keep_days": 10, "apply_filter": True}
_add_db_entries(hass)

with session_scope(hass=hass) as session:
states = session.query(States)
assert states.count() == 74
events_keep = session.query(Events).filter(
Events.event_type_id.in_(select_event_type_ids(("EVENT_KEEP",)))
)
assert events_keep.count() == 1

await hass.services.async_call(
RECORDER_DOMAIN, SERVICE_PURGE, service_data, blocking=True
)

for _ in range(2):
# Make sure the second round of purging runs
await async_recorder_block_till_done(hass)
await async_wait_purge_done(hass)

assert "Cleanup filtered data hasn't fully completed yet" in caplog.text
caplog.clear()

with session_scope(hass=hass) as session:
states = session.query(States)
assert states.count() == 13
events_keep = session.query(Events).filter(
Events.event_type_id.in_(select_event_type_ids(("EVENT_KEEP",)))
)
assert events_keep.count() == 1

states_sensor_excluded = (
session.query(States)
.outerjoin(StatesMeta, States.metadata_id == StatesMeta.metadata_id)
.filter(StatesMeta.entity_id == "sensor.excluded")
)
assert states_sensor_excluded.count() == 0
query = session.query(States)

assert query.filter(States.state_id == 72).first().old_state_id is None
assert query.filter(States.state_id == 72).first().attributes_id == 71
assert query.filter(States.state_id == 73).first().old_state_id is None
assert query.filter(States.state_id == 73).first().attributes_id == 71

final_keep_state = session.query(States).filter(States.state_id == 74).first()
assert final_keep_state.old_state_id == 62 # should have been kept
assert final_keep_state.attributes_id == 71

assert session.query(StateAttributes).count() == 11

# Do it again to make sure nothing changes
await hass.services.async_call(RECORDER_DOMAIN, SERVICE_PURGE, service_data)
await async_recorder_block_till_done(hass)
await async_wait_purge_done(hass)

with session_scope(hass=hass) as session:
final_keep_state = session.query(States).filter(States.state_id == 74).first()
assert final_keep_state.old_state_id == 62 # should have been kept
assert final_keep_state.attributes_id == 71

assert session.query(StateAttributes).count() == 11

for _ in range(2):
# Make sure the second round of purging runs
await async_recorder_block_till_done(hass)
await async_wait_purge_done(hass)

assert "Cleanup filtered data hasn't fully completed yet" not in caplog.text
bdraco marked this conversation as resolved.
Show resolved Hide resolved


@pytest.mark.parametrize("use_sqlite", [True, False], indirect=True)
@pytest.mark.parametrize(
"recorder_config", [{"exclude": {"entities": ["sensor.excluded"]}}]
Expand Down