Skip to content

Commit

Permalink
perf(robot-server): Parallelize the migration to schema 3 (#14465)
Browse files Browse the repository at this point in the history
  • Loading branch information
SyntaxColoring authored and TamarZanzouri committed Feb 16, 2024
1 parent 0cdbd4c commit 1a55549
Show file tree
Hide file tree
Showing 8 changed files with 253 additions and 114 deletions.
7 changes: 5 additions & 2 deletions robot-server/robot_server/persistence/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Support for persisting data across device reboots."""


from ._database import create_schema_3_sql_engine, sqlite_rowid
from ._database import create_sql_engine, sql_engine_ctx, sqlite_rowid
from ._fastapi_dependencies import (
start_initializing_persistence,
clean_up_persistence,
Expand All @@ -12,6 +12,7 @@
)
from ._persistence_directory import PersistenceResetter
from ._tables import (
metadata,
protocol_table,
analysis_table,
run_table,
Expand All @@ -22,9 +23,11 @@

__all__ = [
# database utilities and helpers
"create_schema_3_sql_engine",
"create_sql_engine",
"sql_engine_ctx",
"sqlite_rowid",
# database tables
"metadata",
"protocol_table",
"analysis_table",
"run_table",
Expand Down
43 changes: 13 additions & 30 deletions robot-server/robot_server/persistence/_database.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
"""SQLite database initialization and utilities."""
from contextlib import contextmanager
from pathlib import Path
from typing import Generator

import sqlalchemy

from server_utils import sql_utils

from ._tables import schema_2, schema_3
from ._migrations.up_to_2 import migrate


# A reference to SQLite's built-in ROWID column.
#
Expand All @@ -24,23 +23,17 @@
sqlite_rowid = sqlalchemy.column("_ROWID_")


def create_schema_2_sql_engine(path: Path) -> sqlalchemy.engine.Engine:
"""Create a SQL engine for a schema 2 database.
If provided a schema 0 or 1 database, this will migrate it in-place to schema 2.
def create_sql_engine(path: Path) -> sqlalchemy.engine.Engine:
"""Return an engine for accessing the given SQLite database file.
Warning:
Migrations can take several minutes. If calling this from an async function,
offload this to a thread to avoid blocking the event loop.
If the file does not already exist, it will be created, empty.
You must separately set up any tables you're expecting.
"""
sql_engine = sqlalchemy.create_engine(sql_utils.get_connection_url(path))

try:
sql_utils.enable_foreign_key_constraints(sql_engine)
sql_utils.fix_transactions(sql_engine)
schema_2.metadata.create_all(sql_engine)

migrate(sql_engine)

except Exception:
sql_engine.dispose()
Expand All @@ -49,21 +42,11 @@ def create_schema_2_sql_engine(path: Path) -> sqlalchemy.engine.Engine:
return sql_engine


def create_schema_3_sql_engine(path: Path) -> sqlalchemy.engine.Engine:
"""Create a SQL engine for a schema 3 database.
Unlike `create_schema_2_sql_engine()`, this assumes the database is already
at schema 3. Migration is done through other mechanisms.
"""
sql_engine = sqlalchemy.create_engine(sql_utils.get_connection_url(path))

@contextmanager
def sql_engine_ctx(path: Path) -> Generator[sqlalchemy.engine.Engine, None, None]:
"""Like `create_sql_engine()`, but clean up when done."""
engine = create_sql_engine(path)
try:
sql_utils.enable_foreign_key_constraints(sql_engine)
sql_utils.fix_transactions(sql_engine)
schema_3.metadata.create_all(sql_engine)

except Exception:
sql_engine.dispose()
raise

return sql_engine
yield engine
finally:
engine.dispose()
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
)
from robot_server.errors import ErrorDetails

from ._database import create_schema_3_sql_engine
from ._database import create_sql_engine
from ._persistence_directory import (
PersistenceResetter,
prepare_active_subdirectory,
Expand Down Expand Up @@ -102,7 +102,7 @@ async def init_sql_engine() -> SQLEngine:
prepared_subdirectory = await subdirectory_prep_task

sql_engine = await to_thread.run_sync(
create_schema_3_sql_engine, prepared_subdirectory / _DATABASE_FILE
create_sql_engine, prepared_subdirectory / _DATABASE_FILE
)
return sql_engine

Expand Down
111 changes: 111 additions & 0 deletions robot-server/robot_server/persistence/_migrations/_up_to_3_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
"""Code that runs in a worker subprocess for the `up_to_3` migration."""


# fmt: off

# We keep a list of all the modules that this file imports
# so we can preload them when launching the subprocesses.
from types import ModuleType
_imports: "list[ModuleType]" = []

import contextlib # noqa: E402
import pathlib # noqa: E402
import typing # noqa: E402
_imports.extend([contextlib, pathlib, typing])

import pydantic # noqa: E402
import sqlalchemy # noqa: E402
_imports.extend([pydantic, sqlalchemy])

from opentrons.protocol_engine import commands # noqa: E402
from server_utils import sql_utils # noqa: E402
_imports.extend([commands, sql_utils])

from robot_server.persistence._tables import schema_2, schema_3 # noqa: E402
from robot_server.persistence import ( # noqa: E402
_database,
legacy_pickle,
pydantic as pydantic_helpers
)
_imports.extend([schema_2, schema_3, _database, legacy_pickle, pydantic_helpers])

# fmt: on


imports: typing.List[str] = [m.__name__ for m in _imports]
"""The names of all modules imported by this module, e.g. "foo.bar.baz"."""


def migrate_commands_for_run(
source_db_file: pathlib.Path,
dest_db_file: pathlib.Path,
run_id: str,
# This is a multiprocessing.Lock, which can't be a type annotation for some reason.
lock: typing.ContextManager[object],
) -> None:
"""Perform the schema 2->3 migration for a single run's commands.
See the `up_to_3` migration for background.
Args:
source_db_file: The SQLite database file to migrate from.
dest_db_file: The SQLite database file to migrate into. Assumed to have all the
proper tables set up already.
run_id: Which run's commands to migrate.
lock: A lock to hold while accessing the database. Concurrent access would be
safe in the sense that SQLite would always provide isolation. But, when
there are conflicts, we'd have to deal with SQLite retrying transactions or
raising SQLITE_BUSY. A Python-level lock is simpler and more reliable.
"""
with contextlib.suppress(
# The format that we're migrating from is prone to bugs where our latest
# code can't read records created by older code. (See RSS-98).
# If that happens, it's better to silently drop the run than to fail the
# whole migration.
#
# TODO(mm, 2024-02-14): Log these somehow. Logging is a little tricky from
# subprocesses.
Exception
), _database.sql_engine_ctx(
source_db_file
) as source_engine, _database.sql_engine_ctx(
dest_db_file
) as dest_engine:
select_old_commands = sqlalchemy.select(schema_2.run_table.c.commands).where(
schema_2.run_table.c.id == run_id
)
insert_new_command = sqlalchemy.insert(schema_3.run_command_table)

with lock, source_engine.begin() as source_transaction:
old_commands_bytes: typing.Optional[bytes] = source_transaction.execute(
select_old_commands
).scalar_one()

old_commands: typing.List[typing.Dict[str, object]] = (
legacy_pickle.loads(old_commands_bytes) if old_commands_bytes else []
)

parsed_commands: typing.Iterable[commands.Command] = (
pydantic.parse_obj_as(
commands.Command, # type: ignore[arg-type]
c,
)
for c in old_commands
)

new_command_rows = [
{
"run_id": run_id,
"index_in_run": index_in_run,
"command_id": parsed_command.id,
"command": pydantic_helpers.pydantic_to_json(parsed_command),
}
for index_in_run, parsed_command in enumerate(parsed_commands)
]

with lock, dest_engine.begin() as dest_transaction:
if len(new_command_rows) > 0:
# This needs to be guarded by a len>0 check because if the list is empty,
# SQLAlchemy misinterprets this as inserting a single row with all default
# values.
dest_transaction.execute(insert_new_command, new_command_rows)
Loading

0 comments on commit 1a55549

Please sign in to comment.