Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(robot-server): Parallelize the migration to schema 3 #14465

Merged
merged 19 commits into from
Feb 14, 2024
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 136 additions & 38 deletions robot-server/robot_server/persistence/_migrations/up_to_3.py
SyntaxColoring marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@
"""


from contextlib import ExitStack
import logging
import concurrent.futures
import multiprocessing
from contextlib import contextmanager
from pathlib import Path
from typing import Any, Dict, Iterable, List
from typing import Any, Dict, Generator, Iterable, List, Optional

from opentrons.protocol_engine import Command, StateSummary
import pydantic
Expand All @@ -36,6 +39,8 @@
from ._util import copy_rows_unmodified, copy_if_exists, copytree_if_exists


_log = logging.getLogger(__name__)

# TODO: Define a single source of truth somewhere for these paths.
_DECK_CONFIGURATION_FILE = "deck_configuration.json"
_PROTOCOLS_DIRECTORY = "protocols"
Expand All @@ -52,19 +57,22 @@ def migrate(self, source_dir: Path, dest_dir: Path) -> None:
source_dir / _PROTOCOLS_DIRECTORY, dest_dir / _PROTOCOLS_DIRECTORY
)

with ExitStack() as exit_stack:
# If the source is schema 0 or 1, this will migrate it to 2 in-place.
source_db = create_schema_2_sql_engine(source_dir / _DB_FILE)
exit_stack.callback(source_db.dispose)

dest_db = create_schema_3_sql_engine(dest_dir / _DB_FILE)
exit_stack.callback(dest_db.dispose)
source_db_file = source_dir / _DB_FILE
dest_db_file = dest_dir / _DB_FILE

# If the source is schema 0 or 1, this will migrate it to 2 in-place.
with _schema_2_engine(source_db_file) as source_db, _schema_3_engine(
dest_db_file
) as dest_db:
with source_db.begin() as source_transaction, dest_db.begin() as dest_transaction:
_migrate_db(source_transaction, dest_transaction)
_migrate_everything_except_commands(
source_transaction, dest_transaction
)

_migrate_commands(source_db_file, dest_db_file)

def _migrate_db(

def _migrate_everything_except_commands(
source_transaction: sqlalchemy.engine.Connection,
dest_transaction: sqlalchemy.engine.Connection,
) -> None:
Expand Down Expand Up @@ -101,9 +109,11 @@ def _migrate_run_table(
) -> None:
select_old_runs = sqlalchemy.select(schema_2.run_table).order_by(sqlite_rowid)
insert_new_run = sqlalchemy.insert(schema_3.run_table)
insert_new_command = sqlalchemy.insert(schema_3.run_command_table)

for old_run_row in source_transaction.execute(select_old_runs).all():
old_run_rows = source_transaction.execute(select_old_runs).all()

# Migrate scalar run data:
for old_run_row in old_run_rows:
old_state_summary = old_run_row.state_summary
new_state_summary = (
None
Expand All @@ -122,31 +132,8 @@ def _migrate_run_table(
_updated_at=old_run_row._updated_at,
)

old_commands: List[Dict[str, Any]] = old_run_row.commands or []
pydantic_old_commands: Iterable[Command] = (
pydantic.parse_obj_as(
Command, # type: ignore[arg-type]
c,
)
for c in old_commands
)
new_command_rows = [
{
"run_id": old_run_row.id,
"index_in_run": index_in_run,
"command_id": pydantic_command.id,
"command": pydantic_to_json(pydantic_command),
}
for index_in_run, pydantic_command in enumerate(pydantic_old_commands)
]
# Insert all the commands for this run in one go, to avoid the overhead of
# separate statements, and since we had to bring them all into memory at once
# in order to parse them anyway.
if len(new_command_rows) > 0:
# This needs to be guarded by a len>0 check because if the list is empty,
# SQLAlchemy misinterprets this as inserting a single row with all default
# values.
dest_transaction.execute(insert_new_command, new_command_rows)
# Migrate run commands. There are potentially a lot of these, so offload them
# to worker threads.


def _migrate_analysis_table(
Expand All @@ -173,3 +160,114 @@ def _migrate_analysis_table(
protocol_id=row.protocol_id,
analyzer_version=row.analyzer_version,
)


@contextmanager
def _schema_2_engine(db_file: Path) -> Generator[sqlalchemy.engine.Engine, None, None]:
engine = create_schema_2_sql_engine(db_file)
try:
yield engine
finally:
engine.dispose()


@contextmanager
def _schema_3_engine(db_file: Path) -> Generator[sqlalchemy.engine.Engine, None, None]:
engine = create_schema_3_sql_engine(db_file)
try:
yield engine
finally:
engine.dispose()


def _migrate_commands(source_db_file: Path, dest_db_file: Path) -> None:
engine = create_schema_2_sql_engine(source_db_file)
try:
run_ids = (
engine.execute(sqlalchemy.select(schema_2.run_table.c.id)).scalars().all()
)
finally:
engine.dispose()

# Each process could safely insert without this lock in the sense that SQLite
# can handle the concurrency and produce the correct result. However, I suspect
# it's slow. The World Wide Web has mentions of busy-retry loops if SQLite can't
# immediately acquire a transaction.
#
# Straight up copy-paste from Stack Overflow:
manager = multiprocessing.Manager()
insertion_lock = manager.Lock()

with concurrent.futures.ProcessPoolExecutor(
# One worker per core of the OT-2's Raspberry Pi.
#
# This should be safe from a memory footprint point of view.
# Suppose a very large protocol has ~10MB of commands (see e.g. RQA-443).
# The maximum number of runs at the time of writing is 20,
# so that's at most ~200MB total.
max_workers=4
) as pool:
futures = [
pool.submit(
_migrate_commands_for_run,
source_db_file,
dest_db_file,
run_id,
insertion_lock,
)
for run_id in run_ids
]
for future in futures:
# TODO: See if there's a better way to await all the results.
future.result()


def _migrate_commands_for_run(
source_db_file: Path,
dest_db_file: Path,
run_id: str,
insertion_lock: Any, # multiprocessing.Lock can't be typed.
) -> None:
with _schema_2_engine(source_db_file) as source_engine, _schema_3_engine(
dest_db_file
) as dest_engine:
_log.error(f"Retrieving commands for {run_id}")
old_commands: Optional[List[Dict[str, Any]]] = source_engine.execute(
sqlalchemy.select(schema_2.run_table.c.commands).where(
schema_2.run_table.c.id == run_id
)
).scalar_one()
if old_commands is None:
old_commands = []

_log.error(f"Parsing commands for {run_id}")
pydantic_old_commands: Iterable[Command] = (
pydantic.parse_obj_as(
Command, # type: ignore[arg-type]
c,
)
for c in old_commands
)
_log.error(f"Reserializing commands for {run_id}")
new_command_rows = [
{
"run_id": run_id,
"index_in_run": index_in_run,
"command_id": pydantic_command.id,
"command": pydantic_to_json(pydantic_command),
}
for index_in_run, pydantic_command in enumerate(pydantic_old_commands)
]
_log.error(f"Inserting commands for {run_id}")
insert_new_command = sqlalchemy.insert(schema_3.run_command_table)
with insertion_lock, dest_engine.begin() as dest_transaction:
# Insert all the commands for this run in one go, to avoid the overhead of
# separate statements, and since we had to bring them all into memory at once
# in order to parse them anyway.
if len(new_command_rows) > 0:
# This needs to be guarded by a len>0 check because if the list is empty,
# SQLAlchemy misinterprets this as inserting a single row with all default
# values.
dest_transaction.execute(insert_new_command, new_command_rows)

_log.error(f"Done with commands for {run_id}")
Loading