Skip to content

Commit

Permalink
Improve recorder data migrator tests (#133628)
Browse files Browse the repository at this point in the history
  • Loading branch information
emontnemery authored Dec 20, 2024
1 parent 17f0c24 commit a23b371
Showing 1 changed file with 80 additions and 13 deletions.
93 changes: 80 additions & 13 deletions tests/components/recorder/test_migration_from_schema_32.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from sqlalchemy import create_engine, inspect
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import Session
from sqlalchemy.schema import Index

from homeassistant.components import recorder
from homeassistant.components.recorder import (
Expand Down Expand Up @@ -120,9 +121,11 @@ def db_schema_32():

@pytest.mark.parametrize("persistent_database", [True])
@pytest.mark.parametrize("enable_migrate_event_context_ids", [True])
@pytest.mark.parametrize("indices_to_drop", [[], [("events", "ix_events_context_id")]])
@pytest.mark.usefixtures("hass_storage") # Prevent test hass from writing to storage
async def test_migrate_events_context_ids(
async_test_recorder: RecorderInstanceGenerator,
indices_to_drop: list[tuple[str, str]],
) -> None:
"""Test we can migrate old uuid context ids and ulid context ids to binary format."""
importlib.import_module(SCHEMA_MODULE_32)
Expand Down Expand Up @@ -237,6 +240,13 @@ def _insert_events():
]
await _async_wait_migration_done(hass)

# Remove index
instance.recorder_and_worker_thread_ids.add(threading.get_ident())
for table, index in indices_to_drop:
with session_scope(hass=hass) as session:
assert get_index_by_name(session, table, index) is not None
migration._drop_index(instance.get_session, table, index)

await hass.async_stop()
await hass.async_block_till_done()

Expand Down Expand Up @@ -266,7 +276,13 @@ def _fetch_migrated_events():

# Run again with new schema, let migration run
async with async_test_home_assistant() as hass:
with freeze_time(now), instrument_migration(hass) as instrumented_migration:
with (
freeze_time(now),
instrument_migration(hass) as instrumented_migration,
patch(
"sqlalchemy.schema.Index.create", autospec=True, wraps=Index.create
) as wrapped_idx_create,
):
async with async_test_recorder(
hass, wait_recorder=False, wait_recorder_setup=False
) as instance:
Expand Down Expand Up @@ -297,6 +313,10 @@ def _fetch_migrated_events():
await hass.async_stop()
await hass.async_block_till_done()

# Check the index we removed was recreated
index_names = [call[1][0].name for call in wrapped_idx_create.mock_calls]
assert index_names == [index for _, index in indices_to_drop]

old_uuid_context_id_event = events_by_type["old_uuid_context_id_event"]
assert old_uuid_context_id_event["context_id"] is None
assert old_uuid_context_id_event["context_user_id"] is None
Expand Down Expand Up @@ -482,9 +502,11 @@ def _insert_migration():

@pytest.mark.parametrize("persistent_database", [True])
@pytest.mark.parametrize("enable_migrate_state_context_ids", [True])
@pytest.mark.parametrize("indices_to_drop", [[], [("states", "ix_states_context_id")]])
@pytest.mark.usefixtures("hass_storage") # Prevent test hass from writing to storage
async def test_migrate_states_context_ids(
async_test_recorder: RecorderInstanceGenerator,
indices_to_drop: list[tuple[str, str]],
) -> None:
"""Test we can migrate old uuid context ids and ulid context ids to binary format."""
importlib.import_module(SCHEMA_MODULE_32)
Expand Down Expand Up @@ -577,6 +599,13 @@ def _insert_states():
await async_wait_recording_done(hass)
await _async_wait_migration_done(hass)

# Remove index
instance.recorder_and_worker_thread_ids.add(threading.get_ident())
for table, index in indices_to_drop:
with session_scope(hass=hass) as session:
assert get_index_by_name(session, table, index) is not None
migration._drop_index(instance.get_session, table, index)

await hass.async_stop()
await hass.async_block_till_done()

Expand Down Expand Up @@ -606,7 +635,12 @@ def _fetch_migrated_states():

# Run again with new schema, let migration run
async with async_test_home_assistant() as hass:
with instrument_migration(hass) as instrumented_migration:
with (
instrument_migration(hass) as instrumented_migration,
patch(
"sqlalchemy.schema.Index.create", autospec=True, wraps=Index.create
) as wrapped_idx_create,
):
async with async_test_recorder(
hass, wait_recorder=False, wait_recorder_setup=False
) as instance:
Expand Down Expand Up @@ -637,6 +671,10 @@ def _fetch_migrated_states():
await hass.async_stop()
await hass.async_block_till_done()

# Check the index we removed was recreated
index_names = [call[1][0].name for call in wrapped_idx_create.mock_calls]
assert index_names == [index for _, index in indices_to_drop]

old_uuid_context_id = states_by_entity_id["state.old_uuid_context_id"]
assert old_uuid_context_id["context_id"] is None
assert old_uuid_context_id["context_user_id"] is None
Expand Down Expand Up @@ -1049,9 +1087,13 @@ def _fetch_migrated_states():

@pytest.mark.parametrize("persistent_database", [True])
@pytest.mark.parametrize("enable_migrate_entity_ids", [True])
@pytest.mark.parametrize(
"indices_to_drop", [[], [("states", "ix_states_entity_id_last_updated_ts")]]
)
@pytest.mark.usefixtures("hass_storage") # Prevent test hass from writing to storage
async def test_post_migrate_entity_ids(
async_test_recorder: RecorderInstanceGenerator,
indices_to_drop: list[tuple[str, str]],
) -> None:
"""Test we can migrate entity_ids to the StatesMeta table."""
importlib.import_module(SCHEMA_MODULE_32)
Expand Down Expand Up @@ -1096,6 +1138,13 @@ def _insert_events():
await async_wait_recording_done(hass)
await _async_wait_migration_done(hass)

# Remove index
instance.recorder_and_worker_thread_ids.add(threading.get_ident())
for table, index in indices_to_drop:
with session_scope(hass=hass) as session:
assert get_index_by_name(session, table, index) is not None
migration._drop_index(instance.get_session, table, index)

await hass.async_stop()
await hass.async_block_till_done()

Expand All @@ -1109,20 +1158,38 @@ def _fetch_migrated_states():
return {state.state: state.entity_id for state in states}

# Run again with new schema, let migration run
async with (
async_test_home_assistant() as hass,
async_test_recorder(hass) as instance,
):
instance.recorder_and_worker_thread_ids.add(threading.get_ident())
with patch(
"sqlalchemy.schema.Index.create", autospec=True, wraps=Index.create
) as wrapped_idx_create:
async with (
async_test_home_assistant() as hass,
async_test_recorder(hass) as instance,
):
instance.recorder_and_worker_thread_ids.add(threading.get_ident())

await hass.async_block_till_done()
await async_wait_recording_done(hass)
await async_wait_recording_done(hass)
await hass.async_block_till_done()
await async_wait_recording_done(hass)
await async_wait_recording_done(hass)

states_by_state = await instance.async_add_executor_job(_fetch_migrated_states)
states_by_state = await instance.async_add_executor_job(
_fetch_migrated_states
)

await hass.async_stop()
await hass.async_block_till_done()
# Check the index which will be removed by the migrator no longer exists
with session_scope(hass=hass) as session:
assert (
get_index_by_name(
session, "states", "ix_states_entity_id_last_updated_ts"
)
is None
)

await hass.async_stop()
await hass.async_block_till_done()

# Check the index we removed was recreated
index_names = [call[1][0].name for call in wrapped_idx_create.mock_calls]
assert index_names == [index for _, index in indices_to_drop]

assert states_by_state["one_1"] is None
assert states_by_state["two_2"] is None
Expand Down

0 comments on commit a23b371

Please sign in to comment.