Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(robot-server): Parallelize the migration to schema 3 #14465

Merged
merged 19 commits into from
Feb 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions robot-server/robot_server/persistence/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Support for persisting data across device reboots."""


from ._database import create_schema_3_sql_engine, sqlite_rowid
from ._database import create_sql_engine, sql_engine_ctx, sqlite_rowid
from ._fastapi_dependencies import (
start_initializing_persistence,
clean_up_persistence,
Expand All @@ -12,6 +12,7 @@
)
from ._persistence_directory import PersistenceResetter
from ._tables import (
metadata,
protocol_table,
analysis_table,
run_table,
Expand All @@ -22,9 +23,11 @@

__all__ = [
# database utilities and helpers
"create_schema_3_sql_engine",
"create_sql_engine",
"sql_engine_ctx",
"sqlite_rowid",
# database tables
"metadata",
"protocol_table",
"analysis_table",
"run_table",
Expand Down
43 changes: 13 additions & 30 deletions robot-server/robot_server/persistence/_database.py
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes in this file are to separate connecting to the database (sqlalchemy.create_engine()) from adding all the tables to the database (metadata.create_all()).

The new migration code has to close and reopen the database several times, and we don't want to re-add all the tables every time we do that. I think it would technically be harmless, but it felt confusing, and it spammed the logs.

Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
"""SQLite database initialization and utilities."""
from contextlib import contextmanager
from pathlib import Path
from typing import Generator

import sqlalchemy

from server_utils import sql_utils

from ._tables import schema_2, schema_3
from ._migrations.up_to_2 import migrate


# A reference to SQLite's built-in ROWID column.
#
Expand All @@ -24,23 +23,17 @@
sqlite_rowid = sqlalchemy.column("_ROWID_")


def create_schema_2_sql_engine(path: Path) -> sqlalchemy.engine.Engine:
"""Create a SQL engine for a schema 2 database.

If provided a schema 0 or 1 database, this will migrate it in-place to schema 2.
def create_sql_engine(path: Path) -> sqlalchemy.engine.Engine:
"""Return an engine for accessing the given SQLite database file.

Warning:
Migrations can take several minutes. If calling this from an async function,
offload this to a thread to avoid blocking the event loop.
If the file does not already exist, it will be created, empty.
You must separately set up any tables you're expecting.
"""
sql_engine = sqlalchemy.create_engine(sql_utils.get_connection_url(path))

try:
sql_utils.enable_foreign_key_constraints(sql_engine)
sql_utils.fix_transactions(sql_engine)
schema_2.metadata.create_all(sql_engine)

migrate(sql_engine)

except Exception:
sql_engine.dispose()
Expand All @@ -49,21 +42,11 @@ def create_schema_2_sql_engine(path: Path) -> sqlalchemy.engine.Engine:
return sql_engine


def create_schema_3_sql_engine(path: Path) -> sqlalchemy.engine.Engine:
"""Create a SQL engine for a schema 3 database.

Unlike `create_schema_2_sql_engine()`, this assumes the database is already
at schema 3. Migration is done through other mechanisms.
"""
sql_engine = sqlalchemy.create_engine(sql_utils.get_connection_url(path))

@contextmanager
def sql_engine_ctx(path: Path) -> Generator[sqlalchemy.engine.Engine, None, None]:
"""Like `create_sql_engine()`, but clean up when done."""
Comment on lines +46 to +47
Copy link
Contributor Author

@SyntaxColoring SyntaxColoring Feb 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only reason we haven't been defining this as a context manager this whole time is that it was difficult to integrate with FastAPI.

In recent versions of FastAPI, that's fixed, yay! So at some point, we can switch everything to this and delete the non-context-manager version.

engine = create_sql_engine(path)
try:
sql_utils.enable_foreign_key_constraints(sql_engine)
sql_utils.fix_transactions(sql_engine)
schema_3.metadata.create_all(sql_engine)

except Exception:
sql_engine.dispose()
raise

return sql_engine
yield engine
finally:
engine.dispose()
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
)
from robot_server.errors import ErrorDetails

from ._database import create_schema_3_sql_engine
from ._database import create_sql_engine
from ._persistence_directory import (
PersistenceResetter,
prepare_active_subdirectory,
Expand Down Expand Up @@ -102,7 +102,7 @@ async def init_sql_engine() -> SQLEngine:
prepared_subdirectory = await subdirectory_prep_task

sql_engine = await to_thread.run_sync(
create_schema_3_sql_engine, prepared_subdirectory / _DATABASE_FILE
create_sql_engine, prepared_subdirectory / _DATABASE_FILE
)
return sql_engine

Expand Down
111 changes: 111 additions & 0 deletions robot-server/robot_server/persistence/_migrations/_up_to_3_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
"""Code that runs in a worker subprocess for the `up_to_3` migration."""


# fmt: off

# We keep a list of all the modules that this file imports
# so we can preload them when launching the subprocesses.
from types import ModuleType
_imports: "list[ModuleType]" = []

import contextlib # noqa: E402
import pathlib # noqa: E402
import typing # noqa: E402
_imports.extend([contextlib, pathlib, typing])

import pydantic # noqa: E402
import sqlalchemy # noqa: E402
_imports.extend([pydantic, sqlalchemy])

from opentrons.protocol_engine import commands # noqa: E402
from server_utils import sql_utils # noqa: E402
_imports.extend([commands, sql_utils])

from robot_server.persistence._tables import schema_2, schema_3 # noqa: E402
from robot_server.persistence import ( # noqa: E402
_database,
legacy_pickle,
pydantic as pydantic_helpers
)
_imports.extend([schema_2, schema_3, _database, legacy_pickle, pydantic_helpers])

# fmt: on


imports: typing.List[str] = [m.__name__ for m in _imports]
"""The names of all modules imported by this module, e.g. "foo.bar.baz"."""


def migrate_commands_for_run(
source_db_file: pathlib.Path,
dest_db_file: pathlib.Path,
run_id: str,
# This is a multiprocessing.Lock, which can't be a type annotation for some reason.
lock: typing.ContextManager[object],
) -> None:
"""Perform the schema 2->3 migration for a single run's commands.

See the `up_to_3` migration for background.

Args:
source_db_file: The SQLite database file to migrate from.
dest_db_file: The SQLite database file to migrate into. Assumed to have all the
proper tables set up already.
run_id: Which run's commands to migrate.
lock: A lock to hold while accessing the database. Concurrent access would be
safe in the sense that SQLite would always provide isolation. But, when
there are conflicts, we'd have to deal with SQLite retrying transactions or
raising SQLITE_BUSY. A Python-level lock is simpler and more reliable.
"""
with contextlib.suppress(
# The format that we're migrating from is prone to bugs where our latest
# code can't read records created by older code. (See RSS-98).
# If that happens, it's better to silently drop the run than to fail the
# whole migration.
#
# TODO(mm, 2024-02-14): Log these somehow. Logging is a little tricky from
# subprocesses.
Exception
), _database.sql_engine_ctx(
source_db_file
) as source_engine, _database.sql_engine_ctx(
dest_db_file
) as dest_engine:
select_old_commands = sqlalchemy.select(schema_2.run_table.c.commands).where(
schema_2.run_table.c.id == run_id
)
insert_new_command = sqlalchemy.insert(schema_3.run_command_table)

with lock, source_engine.begin() as source_transaction:
old_commands_bytes: typing.Optional[bytes] = source_transaction.execute(
select_old_commands
).scalar_one()

old_commands: typing.List[typing.Dict[str, object]] = (
legacy_pickle.loads(old_commands_bytes) if old_commands_bytes else []
)

parsed_commands: typing.Iterable[commands.Command] = (
pydantic.parse_obj_as(
commands.Command, # type: ignore[arg-type]
c,
)
for c in old_commands
)

new_command_rows = [
{
"run_id": run_id,
"index_in_run": index_in_run,
"command_id": parsed_command.id,
"command": pydantic_helpers.pydantic_to_json(parsed_command),
}
for index_in_run, parsed_command in enumerate(parsed_commands)
]

with lock, dest_engine.begin() as dest_transaction:
if len(new_command_rows) > 0:
# This needs to be guarded by a len>0 check because if the list is empty,
# SQLAlchemy misinterprets this as inserting a single row with all default
# values.
dest_transaction.execute(insert_new_command, new_command_rows)
Loading
Loading