From 7470ff780714a8949c18cd8f3f9f5768f0da826a Mon Sep 17 00:00:00 2001 From: Max Marrone Date: Fri, 26 Jan 2024 16:02:21 -0500 Subject: [PATCH 01/18] WIP: Parallelize --- .../persistence/_migrations/up_to_3.py | 166 ++++++++++++++---- 1 file changed, 128 insertions(+), 38 deletions(-) diff --git a/robot-server/robot_server/persistence/_migrations/up_to_3.py b/robot-server/robot_server/persistence/_migrations/up_to_3.py index a6fb8afacdb..b04d887cd49 100644 --- a/robot-server/robot_server/persistence/_migrations/up_to_3.py +++ b/robot-server/robot_server/persistence/_migrations/up_to_3.py @@ -17,9 +17,11 @@ """ -from contextlib import ExitStack +import concurrent.futures +import multiprocessing +from contextlib import contextmanager from pathlib import Path -from typing import Any, Dict, Iterable, List +from typing import Any, Dict, Generator, Iterable, List, Optional from opentrons.protocol_engine import Command, StateSummary import pydantic @@ -52,19 +54,22 @@ def migrate(self, source_dir: Path, dest_dir: Path) -> None: source_dir / _PROTOCOLS_DIRECTORY, dest_dir / _PROTOCOLS_DIRECTORY ) - with ExitStack() as exit_stack: - # If the source is schema 0 or 1, this will migrate it to 2 in-place. - source_db = create_schema_2_sql_engine(source_dir / _DB_FILE) - exit_stack.callback(source_db.dispose) - - dest_db = create_schema_3_sql_engine(dest_dir / _DB_FILE) - exit_stack.callback(dest_db.dispose) + source_db_file = source_dir / _DB_FILE + dest_db_file = dest_dir / _DB_FILE + # If the source is schema 0 or 1, this will migrate it to 2 in-place. + with _schema_2_engine(source_db_file) as source_db, _schema_3_engine( + dest_db_file + ) as dest_db: with source_db.begin() as source_transaction, dest_db.begin() as dest_transaction: - _migrate_db(source_transaction, dest_transaction) + _migrate_everything_except_commands( + source_transaction, dest_transaction + ) + + _migrate_commands(source_db_file, dest_db_file) -def _migrate_db( +def _migrate_everything_except_commands( source_transaction: sqlalchemy.engine.Connection, dest_transaction: sqlalchemy.engine.Connection, ) -> None: @@ -101,9 +106,11 @@ def _migrate_run_table( ) -> None: select_old_runs = sqlalchemy.select(schema_2.run_table).order_by(sqlite_rowid) insert_new_run = sqlalchemy.insert(schema_3.run_table) - insert_new_command = sqlalchemy.insert(schema_3.run_command_table) - for old_run_row in source_transaction.execute(select_old_runs).all(): + old_run_rows = source_transaction.execute(select_old_runs).all() + + # Migrate scalar run data: + for old_run_row in old_run_rows: old_state_summary = old_run_row.state_summary new_state_summary = ( None @@ -122,31 +129,8 @@ def _migrate_run_table( _updated_at=old_run_row._updated_at, ) - old_commands: List[Dict[str, Any]] = old_run_row.commands or [] - pydantic_old_commands: Iterable[Command] = ( - pydantic.parse_obj_as( - Command, # type: ignore[arg-type] - c, - ) - for c in old_commands - ) - new_command_rows = [ - { - "run_id": old_run_row.id, - "index_in_run": index_in_run, - "command_id": pydantic_command.id, - "command": pydantic_to_json(pydantic_command), - } - for index_in_run, pydantic_command in enumerate(pydantic_old_commands) - ] - # Insert all the commands for this run in one go, to avoid the overhead of - # separate statements, and since we had to bring them all into memory at once - # in order to parse them anyway. - if len(new_command_rows) > 0: - # This needs to be guarded by a len>0 check because if the list is empty, - # SQLAlchemy misinterprets this as inserting a single row with all default - # values. - dest_transaction.execute(insert_new_command, new_command_rows) + # Migrate run commands. There are potentially a lot of these, so offload them + # to worker threads. def _migrate_analysis_table( @@ -173,3 +157,109 @@ def _migrate_analysis_table( protocol_id=row.protocol_id, analyzer_version=row.analyzer_version, ) + + +@contextmanager +def _schema_2_engine(db_file: Path) -> Generator[sqlalchemy.engine.Engine, None, None]: + engine = create_schema_2_sql_engine(db_file) + try: + yield engine + finally: + engine.dispose() + + +@contextmanager +def _schema_3_engine(db_file: Path) -> Generator[sqlalchemy.engine.Engine, None, None]: + engine = create_schema_3_sql_engine(db_file) + try: + yield engine + finally: + engine.dispose() + + +def _migrate_commands(source_db_file: Path, dest_db_file: Path) -> None: + engine = create_schema_2_sql_engine(source_db_file) + try: + run_ids = ( + engine.execute(sqlalchemy.select(schema_2.run_table.c.id)).scalars().all() + ) + finally: + engine.dispose() + + # Each process could safely insert without this lock in the sense that SQLite + # can handle the concurrency and produce the correct result. However, I suspect + # it's slow. The World Wide Web has mentions of busy-retry loops if SQLite can't + # immediately acquire a transaction. + # + # Straight up copy-paste from Stack Overflow: + manager = multiprocessing.Manager() + insertion_lock = manager.Lock() + + with concurrent.futures.ProcessPoolExecutor( + # One worker per core of the OT-2's Raspberry Pi. + # + # This should be safe from a memory footprint point of view. + # Suppose a very large protocol has ~10MB of commands (see e.g. RQA-443). + # The maximum number of runs at the time of writing is 20, + # so that's at most ~200MB total. + max_workers=4 + ) as pool: + futures = [ + pool.submit( + _migrate_commands_for_run, + source_db_file, + dest_db_file, + run_id, + insertion_lock, + ) + for run_id in run_ids + ] + for future in futures: + # TODO: See if there's a better way to await all the results. + future.result() + + +def _migrate_commands_for_run( + source_db_file: Path, + dest_db_file: Path, + run_id: str, + insertion_lock: Any, # multiprocessing.Lock can't be typed. +) -> None: + with _schema_2_engine(source_db_file) as source_engine, _schema_3_engine( + dest_db_file + ) as dest_engine: + old_commands: Optional[List[Dict[str, Any]]] = source_engine.execute( + sqlalchemy.select(schema_2.run_table.c.commands).where( + schema_2.run_table.c.id == run_id + ) + ).scalar_one() + if old_commands is None: + old_commands = [] + + pydantic_old_commands: Iterable[Command] = ( + pydantic.parse_obj_as( + Command, # type: ignore[arg-type] + c, + ) + for c in old_commands + ) + new_command_rows = [ + { + "run_id": run_id, + "index_in_run": index_in_run, + "command_id": pydantic_command.id, + "command": pydantic_to_json(pydantic_command), + } + for index_in_run, pydantic_command in enumerate(pydantic_old_commands) + ] + + insert_new_command = sqlalchemy.insert(schema_3.run_command_table) + with insertion_lock, dest_engine.begin() as dest_transaction: + # Insert all the commands for this run in one go, to avoid the overhead of + # separate statements, and since we had to bring them all into memory at once + # in order to parse them anyway. + if len(new_command_rows) > 0: + # This needs to be guarded by a len>0 check because if the list is empty, + # SQLAlchemy misinterprets this as inserting a single row with all default + # values. + dest_transaction.execute(insert_new_command, new_command_rows) From f486d313e6dca2d1ab7be7703798742b2b8f6850 Mon Sep 17 00:00:00 2001 From: Max Marrone Date: Fri, 26 Jan 2024 16:36:02 -0500 Subject: [PATCH 02/18] WIP: Logs --- .../robot_server/persistence/_migrations/up_to_3.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/robot-server/robot_server/persistence/_migrations/up_to_3.py b/robot-server/robot_server/persistence/_migrations/up_to_3.py index b04d887cd49..9d7094c3fcd 100644 --- a/robot-server/robot_server/persistence/_migrations/up_to_3.py +++ b/robot-server/robot_server/persistence/_migrations/up_to_3.py @@ -17,6 +17,7 @@ """ +import logging import concurrent.futures import multiprocessing from contextlib import contextmanager @@ -38,6 +39,8 @@ from ._util import copy_rows_unmodified, copy_if_exists, copytree_if_exists +_log = logging.getLogger(__name__) + # TODO: Define a single source of truth somewhere for these paths. _DECK_CONFIGURATION_FILE = "deck_configuration.json" _PROTOCOLS_DIRECTORY = "protocols" @@ -228,6 +231,7 @@ def _migrate_commands_for_run( with _schema_2_engine(source_db_file) as source_engine, _schema_3_engine( dest_db_file ) as dest_engine: + _log.error(f"Retrieving commands for {run_id}") old_commands: Optional[List[Dict[str, Any]]] = source_engine.execute( sqlalchemy.select(schema_2.run_table.c.commands).where( schema_2.run_table.c.id == run_id @@ -236,6 +240,7 @@ def _migrate_commands_for_run( if old_commands is None: old_commands = [] + _log.error(f"Parsing commands for {run_id}") pydantic_old_commands: Iterable[Command] = ( pydantic.parse_obj_as( Command, # type: ignore[arg-type] @@ -243,6 +248,7 @@ def _migrate_commands_for_run( ) for c in old_commands ) + _log.error(f"Reserializing commands for {run_id}") new_command_rows = [ { "run_id": run_id, @@ -252,7 +258,7 @@ def _migrate_commands_for_run( } for index_in_run, pydantic_command in enumerate(pydantic_old_commands) ] - + _log.error(f"Inserting commands for {run_id}") insert_new_command = sqlalchemy.insert(schema_3.run_command_table) with insertion_lock, dest_engine.begin() as dest_transaction: # Insert all the commands for this run in one go, to avoid the overhead of @@ -263,3 +269,5 @@ def _migrate_commands_for_run( # SQLAlchemy misinterprets this as inserting a single row with all default # values. dest_transaction.execute(insert_new_command, new_command_rows) + + _log.error(f"Done with commands for {run_id}") From 56af6fcc6612dffc1cd4d9ee76e54b9b3f240840 Mon Sep 17 00:00:00 2001 From: Max Marrone Date: Fri, 9 Feb 2024 16:26:23 -0500 Subject: [PATCH 03/18] General cleanup. --- .../persistence/_migrations/up_to_3.py | 181 ++++++++++-------- 1 file changed, 96 insertions(+), 85 deletions(-) diff --git a/robot-server/robot_server/persistence/_migrations/up_to_3.py b/robot-server/robot_server/persistence/_migrations/up_to_3.py index 9d7094c3fcd..9392e759e38 100644 --- a/robot-server/robot_server/persistence/_migrations/up_to_3.py +++ b/robot-server/robot_server/persistence/_migrations/up_to_3.py @@ -16,13 +16,11 @@ since the updated `analysis.completed_analysis` (see above) replaces it. """ - -import logging import concurrent.futures import multiprocessing -from contextlib import contextmanager +from contextlib import ExitStack, contextmanager from pathlib import Path -from typing import Any, Dict, Generator, Iterable, List, Optional +from typing import ContextManager, Dict, Generator, Iterable, List from opentrons.protocol_engine import Command, StateSummary import pydantic @@ -39,8 +37,6 @@ from ._util import copy_rows_unmodified, copy_if_exists, copytree_if_exists -_log = logging.getLogger(__name__) - # TODO: Define a single source of truth somewhere for these paths. _DECK_CONFIGURATION_FILE = "deck_configuration.json" _PROTOCOLS_DIRECTORY = "protocols" @@ -60,19 +56,51 @@ def migrate(self, source_dir: Path, dest_dir: Path) -> None: source_db_file = source_dir / _DB_FILE dest_db_file = dest_dir / _DB_FILE - # If the source is schema 0 or 1, this will migrate it to 2 in-place. - with _schema_2_engine(source_db_file) as source_db, _schema_3_engine( - dest_db_file - ) as dest_db: - with source_db.begin() as source_transaction, dest_db.begin() as dest_transaction: - _migrate_everything_except_commands( - source_transaction, dest_transaction - ) + with ExitStack() as exit_stack: + source_engine = exit_stack.enter_context( + # If the source is schema 0 or 1, this will migrate it to 2 in-place. + _schema_2_sql_engine(source_db_file) + ) + dest_engine = exit_stack.enter_context(_schema_3_sql_engine(dest_db_file)) + + with source_engine.begin() as source_transaction, dest_engine.begin() as dest_transaction: + run_ids = _get_run_ids(schema_2_transaction=source_transaction) + _migrate_db_excluding_commands(source_transaction, dest_transaction) - _migrate_commands(source_db_file, dest_db_file) + _migrate_db_commands(source_db_file, dest_db_file, run_ids) + + +@contextmanager +def _schema_2_sql_engine( + db_file: Path, +) -> Generator[sqlalchemy.engine.Engine, None, None]: + engine = create_schema_2_sql_engine(db_file) + try: + yield engine + finally: + engine.dispose() -def _migrate_everything_except_commands( +@contextmanager +def _schema_3_sql_engine( + db_file: Path, +) -> Generator[sqlalchemy.engine.Engine, None, None]: + engine = create_schema_3_sql_engine(db_file) + try: + yield engine + finally: + engine.dispose() + + +def _get_run_ids(*, schema_2_transaction: sqlalchemy.engine.Connection) -> List[str]: + return ( + schema_2_transaction.execute(sqlalchemy.select(schema_2.run_table.c.id)) + .scalars() + .all() + ) + + +def _migrate_db_excluding_commands( source_transaction: sqlalchemy.engine.Connection, dest_transaction: sqlalchemy.engine.Connection, ) -> None: @@ -89,7 +117,7 @@ def _migrate_everything_except_commands( dest_transaction, ) - _migrate_run_table( + _migrate_run_table_excluding_commands( source_transaction, dest_transaction, ) @@ -103,17 +131,22 @@ def _migrate_everything_except_commands( ) -def _migrate_run_table( +def _migrate_run_table_excluding_commands( source_transaction: sqlalchemy.engine.Connection, dest_transaction: sqlalchemy.engine.Connection, ) -> None: - select_old_runs = sqlalchemy.select(schema_2.run_table).order_by(sqlite_rowid) + select_old_runs = sqlalchemy.select( + schema_2.run_table.c.id, + schema_2.run_table.c.created_at, + schema_2.run_table.c.protocol_id, + schema_2.run_table.c.state_summary, + # schema_2.run_table.c.commands deliberately omitted + schema_2.run_table.c.engine_status, + schema_2.run_table.c._updated_at, + ).order_by(sqlite_rowid) insert_new_run = sqlalchemy.insert(schema_3.run_table) - old_run_rows = source_transaction.execute(select_old_runs).all() - - # Migrate scalar run data: - for old_run_row in old_run_rows: + for old_run_row in source_transaction.execute(select_old_runs).all(): old_state_summary = old_run_row.state_summary new_state_summary = ( None @@ -132,9 +165,6 @@ def _migrate_run_table( _updated_at=old_run_row._updated_at, ) - # Migrate run commands. There are potentially a lot of these, so offload them - # to worker threads. - def _migrate_analysis_table( source_connection: sqlalchemy.engine.Connection, @@ -162,41 +192,24 @@ def _migrate_analysis_table( ) -@contextmanager -def _schema_2_engine(db_file: Path) -> Generator[sqlalchemy.engine.Engine, None, None]: - engine = create_schema_2_sql_engine(db_file) - try: - yield engine - finally: - engine.dispose() - - -@contextmanager -def _schema_3_engine(db_file: Path) -> Generator[sqlalchemy.engine.Engine, None, None]: - engine = create_schema_3_sql_engine(db_file) - try: - yield engine - finally: - engine.dispose() - - -def _migrate_commands(source_db_file: Path, dest_db_file: Path) -> None: - engine = create_schema_2_sql_engine(source_db_file) - try: - run_ids = ( - engine.execute(sqlalchemy.select(schema_2.run_table.c.id)).scalars().all() - ) - finally: - engine.dispose() - - # Each process could safely insert without this lock in the sense that SQLite - # can handle the concurrency and produce the correct result. However, I suspect - # it's slow. The World Wide Web has mentions of busy-retry loops if SQLite can't - # immediately acquire a transaction. +def _migrate_db_commands( + source_db_file: Path, dest_db_file: Path, run_ids: List[str] +) -> None: + """Migrate the run commands stored in the database. + + Because there are potentially tens or hundreds of thousands of commands in total, + this is the most computationally expensive part of the migration. We distribute + the work across subprocesses. Each subprocess extracts, migrates, and inserts + all of the commands for a single run. + """ + # We'll use a lock to make sure only one process is accessing the database at once. # - # Straight up copy-paste from Stack Overflow: + # Concurrent access would be safe in the sense that SQLite would always provide + # isolation. But, when there are conflicts, we'd have to deal with SQLite retrying + # transactions or raising SQLITE_BUSY. A Python-level lock is simpler and more + # reliable. manager = multiprocessing.Manager() - insertion_lock = manager.Lock() + lock = manager.Lock() with concurrent.futures.ProcessPoolExecutor( # One worker per core of the OT-2's Raspberry Pi. @@ -209,11 +222,11 @@ def _migrate_commands(source_db_file: Path, dest_db_file: Path) -> None: ) as pool: futures = [ pool.submit( - _migrate_commands_for_run, + _migrate_db_commands_for_run, source_db_file, dest_db_file, run_id, - insertion_lock, + lock, ) for run_id in run_ids ] @@ -222,52 +235,50 @@ def _migrate_commands(source_db_file: Path, dest_db_file: Path) -> None: future.result() -def _migrate_commands_for_run( +def _migrate_db_commands_for_run( source_db_file: Path, dest_db_file: Path, run_id: str, - insertion_lock: Any, # multiprocessing.Lock can't be typed. + # This is a multiprocessing.Lock, which can't be a type annotation for some reason. + lock: ContextManager[object], ) -> None: - with _schema_2_engine(source_db_file) as source_engine, _schema_3_engine( + with _schema_2_sql_engine(source_db_file) as source_engine, _schema_3_sql_engine( dest_db_file ) as dest_engine: - _log.error(f"Retrieving commands for {run_id}") - old_commands: Optional[List[Dict[str, Any]]] = source_engine.execute( - sqlalchemy.select(schema_2.run_table.c.commands).where( - schema_2.run_table.c.id == run_id + select_old_commands = sqlalchemy.select(schema_2.run_table.c.commands).where( + schema_2.run_table.c.id == run_id + ) + insert_new_command = sqlalchemy.insert(schema_3.run_command_table) + + with lock, source_engine.begin() as source_transaction: + old_commands: List[Dict[str, object]] = ( + source_transaction.execute(select_old_commands).scalar_one() or [] ) - ).scalar_one() - if old_commands is None: - old_commands = [] - _log.error(f"Parsing commands for {run_id}") - pydantic_old_commands: Iterable[Command] = ( + parsed_commands: Iterable[Command] = ( pydantic.parse_obj_as( Command, # type: ignore[arg-type] c, ) for c in old_commands ) - _log.error(f"Reserializing commands for {run_id}") + new_command_rows = [ { "run_id": run_id, "index_in_run": index_in_run, - "command_id": pydantic_command.id, - "command": pydantic_to_json(pydantic_command), + "command_id": parsed_command.id, + "command": pydantic_to_json(parsed_command), } - for index_in_run, pydantic_command in enumerate(pydantic_old_commands) + for index_in_run, parsed_command in enumerate(parsed_commands) ] - _log.error(f"Inserting commands for {run_id}") - insert_new_command = sqlalchemy.insert(schema_3.run_command_table) - with insertion_lock, dest_engine.begin() as dest_transaction: - # Insert all the commands for this run in one go, to avoid the overhead of - # separate statements, and since we had to bring them all into memory at once - # in order to parse them anyway. + + # Insert all the commands for this run in one go, to avoid the overhead of + # separate statements, and since we had to bring them all into memory at once + # in order to parse them anyway. + with lock, dest_engine.begin() as dest_transaction: if len(new_command_rows) > 0: # This needs to be guarded by a len>0 check because if the list is empty, # SQLAlchemy misinterprets this as inserting a single row with all default # values. dest_transaction.execute(insert_new_command, new_command_rows) - - _log.error(f"Done with commands for {run_id}") From 14113f76a414f2a62da64e553841c6c004fdedff Mon Sep 17 00:00:00 2001 From: Max Marrone Date: Fri, 9 Feb 2024 18:30:27 -0500 Subject: [PATCH 04/18] Bring unpickling outside the lock. --- .../robot_server/persistence/_migrations/up_to_3.py | 13 +++++++++---- .../robot_server/persistence/_tables/schema_2.py | 3 ++- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/robot-server/robot_server/persistence/_migrations/up_to_3.py b/robot-server/robot_server/persistence/_migrations/up_to_3.py index 9392e759e38..fb3de89ffca 100644 --- a/robot-server/robot_server/persistence/_migrations/up_to_3.py +++ b/robot-server/robot_server/persistence/_migrations/up_to_3.py @@ -20,12 +20,13 @@ import multiprocessing from contextlib import ExitStack, contextmanager from pathlib import Path -from typing import ContextManager, Dict, Generator, Iterable, List +from typing import ContextManager, Dict, Generator, Iterable, List, Optional from opentrons.protocol_engine import Command, StateSummary import pydantic import sqlalchemy +from .. import legacy_pickle from ..pydantic import pydantic_to_json from .._database import ( create_schema_2_sql_engine, @@ -251,9 +252,13 @@ def _migrate_db_commands_for_run( insert_new_command = sqlalchemy.insert(schema_3.run_command_table) with lock, source_engine.begin() as source_transaction: - old_commands: List[Dict[str, object]] = ( - source_transaction.execute(select_old_commands).scalar_one() or [] - ) + old_commands_bytes: Optional[bytes] = source_transaction.execute( + select_old_commands + ).scalar_one() + + old_commands: List[Dict[str, object]] = ( + legacy_pickle.loads(old_commands_bytes) if old_commands_bytes else [] + ) parsed_commands: Iterable[Command] = ( pydantic.parse_obj_as( diff --git a/robot-server/robot_server/persistence/_tables/schema_2.py b/robot-server/robot_server/persistence/_tables/schema_2.py index fc23e96b5d7..baa95ca80dc 100644 --- a/robot-server/robot_server/persistence/_tables/schema_2.py +++ b/robot-server/robot_server/persistence/_tables/schema_2.py @@ -105,7 +105,8 @@ # column added in schema v1 sqlalchemy.Column( "commands", - sqlalchemy.PickleType(pickler=legacy_pickle, protocol=PICKLE_PROTOCOL_VERSION), + sqlalchemy.LargeBinary, + # sqlalchemy.PickleType(pickler=legacy_pickle, protocol=PICKLE_PROTOCOL_VERSION), nullable=True, ), # column added in schema v1 From bea752a3fd5c4dcf556330aeb6785e4f809c4a33 Mon Sep 17 00:00:00 2001 From: Max Marrone Date: Sat, 10 Feb 2024 01:22:12 -0500 Subject: [PATCH 05/18] Use a multiprocessing.Pool instead of concurrent.futures.ProcessPoolExecutor. We already need a multiprocessing.Lock, and we're not doing anything that needs concurrent.Futures, so this seems more direct. There also appears to be very slightly less overhead (~0-5%). --- .../persistence/_migrations/up_to_3.py | 43 ++++++++----------- 1 file changed, 19 insertions(+), 24 deletions(-) diff --git a/robot-server/robot_server/persistence/_migrations/up_to_3.py b/robot-server/robot_server/persistence/_migrations/up_to_3.py index fb3de89ffca..7fe7713e964 100644 --- a/robot-server/robot_server/persistence/_migrations/up_to_3.py +++ b/robot-server/robot_server/persistence/_migrations/up_to_3.py @@ -16,11 +16,10 @@ since the updated `analysis.completed_analysis` (see above) replaces it. """ -import concurrent.futures import multiprocessing from contextlib import ExitStack, contextmanager from pathlib import Path -from typing import ContextManager, Dict, Generator, Iterable, List, Optional +from typing import ContextManager, Dict, Generator, Iterable, List, Optional, Tuple from opentrons.protocol_engine import Command, StateSummary import pydantic @@ -212,37 +211,33 @@ def _migrate_db_commands( manager = multiprocessing.Manager() lock = manager.Lock() - with concurrent.futures.ProcessPoolExecutor( + with multiprocessing.Pool( # One worker per core of the OT-2's Raspberry Pi. + # We're compute-bound, so more workers would just thrash. # - # This should be safe from a memory footprint point of view. + # Napkin math for the memory footprint: # Suppose a very large protocol has ~10MB of commands (see e.g. RQA-443). # The maximum number of runs at the time of writing is 20, - # so that's at most ~200MB total. - max_workers=4 + # so that's at most ~200MB total, which should be fine. + processes=4 ) as pool: - futures = [ - pool.submit( - _migrate_db_commands_for_run, - source_db_file, - dest_db_file, - run_id, - lock, - ) - for run_id in run_ids - ] - for future in futures: - # TODO: See if there's a better way to await all the results. - future.result() + pool.map( + _migrate_db_commands_for_run, + ((source_db_file, dest_db_file, run_id, lock) for run_id in run_ids), + ) def _migrate_db_commands_for_run( - source_db_file: Path, - dest_db_file: Path, - run_id: str, - # This is a multiprocessing.Lock, which can't be a type annotation for some reason. - lock: ContextManager[object], + args: Tuple[ + Path, + Path, + str, + # This is a multiprocessing.Lock, which can't be a type annotation for some reason. + ContextManager[object], + ] ) -> None: + source_db_file, dest_db_file, run_id, lock = args + with _schema_2_sql_engine(source_db_file) as source_engine, _schema_3_sql_engine( dest_db_file ) as dest_engine: From 3a4b4c785c905336412acb497c3e159a718efa5c Mon Sep 17 00:00:00 2001 From: Max Marrone Date: Mon, 12 Feb 2024 20:59:14 -0500 Subject: [PATCH 06/18] Refactor: Split table creation from engine creation. --- .../robot_server/persistence/__init__.py | 7 ++- .../robot_server/persistence/_database.py | 43 ++++++------------- .../persistence/_fastapi_dependencies.py | 4 +- .../persistence/_migrations/up_to_3.py | 39 ++++------------- robot-server/tests/conftest.py | 8 ++-- 5 files changed, 33 insertions(+), 68 deletions(-) diff --git a/robot-server/robot_server/persistence/__init__.py b/robot-server/robot_server/persistence/__init__.py index 604c331f1c5..ad521aca62f 100644 --- a/robot-server/robot_server/persistence/__init__.py +++ b/robot-server/robot_server/persistence/__init__.py @@ -1,7 +1,7 @@ """Support for persisting data across device reboots.""" -from ._database import create_schema_3_sql_engine, sqlite_rowid +from ._database import create_sql_engine, sql_engine_ctx, sqlite_rowid from ._fastapi_dependencies import ( start_initializing_persistence, clean_up_persistence, @@ -12,6 +12,7 @@ ) from ._persistence_directory import PersistenceResetter from ._tables import ( + metadata, protocol_table, analysis_table, run_table, @@ -22,9 +23,11 @@ __all__ = [ # database utilities and helpers - "create_schema_3_sql_engine", + "create_sql_engine", + "sql_engine_ctx", "sqlite_rowid", # database tables + "metadata", "protocol_table", "analysis_table", "run_table", diff --git a/robot-server/robot_server/persistence/_database.py b/robot-server/robot_server/persistence/_database.py index 3110994bb5b..7204e47517f 100644 --- a/robot-server/robot_server/persistence/_database.py +++ b/robot-server/robot_server/persistence/_database.py @@ -1,13 +1,12 @@ """SQLite database initialization and utilities.""" +from contextlib import contextmanager from pathlib import Path +from typing import Generator import sqlalchemy from server_utils import sql_utils -from ._tables import schema_2, schema_3 -from ._migrations.up_to_2 import migrate - # A reference to SQLite's built-in ROWID column. # @@ -24,23 +23,17 @@ sqlite_rowid = sqlalchemy.column("_ROWID_") -def create_schema_2_sql_engine(path: Path) -> sqlalchemy.engine.Engine: - """Create a SQL engine for a schema 2 database. - - If provided a schema 0 or 1 database, this will migrate it in-place to schema 2. +def create_sql_engine(path: Path) -> sqlalchemy.engine.Engine: + """Return an engine for accessing the given SQLite database file. - Warning: - Migrations can take several minutes. If calling this from an async function, - offload this to a thread to avoid blocking the event loop. + If the file does not already exist, it will be created, empty. + You must separately set up any tables you're expecting. """ sql_engine = sqlalchemy.create_engine(sql_utils.get_connection_url(path)) try: sql_utils.enable_foreign_key_constraints(sql_engine) sql_utils.fix_transactions(sql_engine) - schema_2.metadata.create_all(sql_engine) - - migrate(sql_engine) except Exception: sql_engine.dispose() @@ -49,21 +42,11 @@ def create_schema_2_sql_engine(path: Path) -> sqlalchemy.engine.Engine: return sql_engine -def create_schema_3_sql_engine(path: Path) -> sqlalchemy.engine.Engine: - """Create a SQL engine for a schema 3 database. - - Unlike `create_schema_2_sql_engine()`, this assumes the database is already - at schema 3. Migration is done through other mechanisms. - """ - sql_engine = sqlalchemy.create_engine(sql_utils.get_connection_url(path)) - +@contextmanager +def sql_engine_ctx(path: Path) -> Generator[sqlalchemy.engine.Engine, None, None]: + """Like `create_sql_engine()`, but clean up when done.""" + engine = create_sql_engine(path) try: - sql_utils.enable_foreign_key_constraints(sql_engine) - sql_utils.fix_transactions(sql_engine) - schema_3.metadata.create_all(sql_engine) - - except Exception: - sql_engine.dispose() - raise - - return sql_engine + yield engine + finally: + engine.dispose() diff --git a/robot-server/robot_server/persistence/_fastapi_dependencies.py b/robot-server/robot_server/persistence/_fastapi_dependencies.py index 7a2e32a1575..ebdafb70e87 100644 --- a/robot-server/robot_server/persistence/_fastapi_dependencies.py +++ b/robot-server/robot_server/persistence/_fastapi_dependencies.py @@ -16,7 +16,7 @@ ) from robot_server.errors import ErrorDetails -from ._database import create_schema_3_sql_engine +from ._database import create_sql_engine from ._persistence_directory import ( PersistenceResetter, prepare_active_subdirectory, @@ -102,7 +102,7 @@ async def init_sql_engine() -> SQLEngine: prepared_subdirectory = await subdirectory_prep_task sql_engine = await to_thread.run_sync( - create_schema_3_sql_engine, prepared_subdirectory / _DATABASE_FILE + create_sql_engine, prepared_subdirectory / _DATABASE_FILE ) return sql_engine diff --git a/robot-server/robot_server/persistence/_migrations/up_to_3.py b/robot-server/robot_server/persistence/_migrations/up_to_3.py index 7fe7713e964..3227058e179 100644 --- a/robot-server/robot_server/persistence/_migrations/up_to_3.py +++ b/robot-server/robot_server/persistence/_migrations/up_to_3.py @@ -28,13 +28,13 @@ from .. import legacy_pickle from ..pydantic import pydantic_to_json from .._database import ( - create_schema_2_sql_engine, - create_schema_3_sql_engine, + sql_engine_ctx, sqlite_rowid, ) from .._folder_migrator import Migration from .._tables import schema_2, schema_3 from ._util import copy_rows_unmodified, copy_if_exists, copytree_if_exists +from . import up_to_2 # TODO: Define a single source of truth somewhere for these paths. @@ -57,11 +57,12 @@ def migrate(self, source_dir: Path, dest_dir: Path) -> None: dest_db_file = dest_dir / _DB_FILE with ExitStack() as exit_stack: - source_engine = exit_stack.enter_context( - # If the source is schema 0 or 1, this will migrate it to 2 in-place. - _schema_2_sql_engine(source_db_file) - ) - dest_engine = exit_stack.enter_context(_schema_3_sql_engine(dest_db_file)) + source_engine = exit_stack.enter_context(sql_engine_ctx(source_db_file)) + schema_2.metadata.create_all(source_engine) + up_to_2.migrate(source_engine) + + dest_engine = exit_stack.enter_context(sql_engine_ctx(dest_db_file)) + schema_3.metadata.create_all(dest_engine) with source_engine.begin() as source_transaction, dest_engine.begin() as dest_transaction: run_ids = _get_run_ids(schema_2_transaction=source_transaction) @@ -70,28 +71,6 @@ def migrate(self, source_dir: Path, dest_dir: Path) -> None: _migrate_db_commands(source_db_file, dest_db_file, run_ids) -@contextmanager -def _schema_2_sql_engine( - db_file: Path, -) -> Generator[sqlalchemy.engine.Engine, None, None]: - engine = create_schema_2_sql_engine(db_file) - try: - yield engine - finally: - engine.dispose() - - -@contextmanager -def _schema_3_sql_engine( - db_file: Path, -) -> Generator[sqlalchemy.engine.Engine, None, None]: - engine = create_schema_3_sql_engine(db_file) - try: - yield engine - finally: - engine.dispose() - - def _get_run_ids(*, schema_2_transaction: sqlalchemy.engine.Connection) -> List[str]: return ( schema_2_transaction.execute(sqlalchemy.select(schema_2.run_table.c.id)) @@ -238,7 +217,7 @@ def _migrate_db_commands_for_run( ) -> None: source_db_file, dest_db_file, run_id, lock = args - with _schema_2_sql_engine(source_db_file) as source_engine, _schema_3_sql_engine( + with sql_engine_ctx(source_db_file) as source_engine, sql_engine_ctx( dest_db_file ) as dest_engine: select_old_commands = sqlalchemy.select(schema_2.run_table.c.commands).where( diff --git a/robot-server/tests/conftest.py b/robot-server/tests/conftest.py index c3a225d7571..3014c922faa 100644 --- a/robot-server/tests/conftest.py +++ b/robot-server/tests/conftest.py @@ -40,7 +40,7 @@ from robot_server.hardware import get_hardware, get_ot2_hardware from robot_server.versioning import API_VERSION_HEADER, LATEST_API_VERSION_HEADER_VALUE from robot_server.service.session.manager import SessionManager -from robot_server.persistence import get_sql_engine, create_schema_3_sql_engine +from robot_server.persistence import get_sql_engine, metadata, sql_engine_ctx from robot_server.health.router import ComponentVersions, get_versions test_router = routing.APIRouter() @@ -393,6 +393,6 @@ def clear_custom_tiprack_def_dir() -> Iterator[None]: def sql_engine(tmp_path: Path) -> Generator[SQLEngine, None, None]: """Return a set-up database to back the store.""" db_file_path = tmp_path / "test.db" - sql_engine = create_schema_3_sql_engine(db_file_path) - yield sql_engine - sql_engine.dispose() + with sql_engine_ctx(db_file_path) as engine: + metadata.create_all(engine) + yield engine From 25077319ca66b8f90a5a07054e48f97a467c980d Mon Sep 17 00:00:00 2001 From: Max Marrone Date: Fri, 9 Feb 2024 20:38:51 -0500 Subject: [PATCH 07/18] forkserver + preload. --- .../_migrations/_up_to_3_worker.py | 89 +++++++++++++++++++ .../persistence/_migrations/up_to_3.py | 68 ++------------ 2 files changed, 97 insertions(+), 60 deletions(-) create mode 100644 robot-server/robot_server/persistence/_migrations/_up_to_3_worker.py diff --git a/robot-server/robot_server/persistence/_migrations/_up_to_3_worker.py b/robot-server/robot_server/persistence/_migrations/_up_to_3_worker.py new file mode 100644 index 00000000000..4b306d5ced6 --- /dev/null +++ b/robot-server/robot_server/persistence/_migrations/_up_to_3_worker.py @@ -0,0 +1,89 @@ +# fmt: off + +# We keep a list of all the modules that this file imports +# so we can preload them when launching the subprocesses. +from types import ModuleType +_imports: "list[ModuleType]" = [] + +import contextlib +import pathlib +import typing +_imports.extend([contextlib, pathlib, typing]) + +import pydantic +import sqlalchemy +_imports.extend([pydantic, sqlalchemy]) + +from opentrons.protocol_engine import commands +from server_utils import sql_utils +_imports.extend([commands, sql_utils]) + +from robot_server.persistence._tables import schema_2, schema_3 +from robot_server.persistence import ( + _database, + legacy_pickle, + pydantic as pydantic_helpers +) +_imports.extend([schema_2, schema_3, _database, legacy_pickle, pydantic_helpers]) + +# fmt: on + + +imports: typing.List[str] = [m.__name__ for m in _imports] + + +def migrate_db_commands_for_run( + args: typing.Tuple[ + pathlib.Path, + pathlib.Path, + str, + # This is a multiprocessing.Lock, which can't be a type annotation for some reason. + typing.ContextManager[object], + ] +) -> None: + source_db_file, dest_db_file, run_id, lock = args + + with _database.sql_engine_ctx( + source_db_file + ) as source_engine, _database.sql_engine_ctx(dest_db_file) as dest_engine: + select_old_commands = sqlalchemy.select(schema_2.run_table.c.commands).where( + schema_2.run_table.c.id == run_id + ) + insert_new_command = sqlalchemy.insert(schema_3.run_command_table) + + with lock, source_engine.begin() as source_transaction: + old_commands_bytes: typing.Optional[bytes] = source_transaction.execute( + select_old_commands + ).scalar_one() + + old_commands: typing.List[typing.Dict[str, object]] = ( + legacy_pickle.loads(old_commands_bytes) if old_commands_bytes else [] + ) + + parsed_commands: typing.Iterable[commands.Command] = ( + pydantic.parse_obj_as( + commands.Command, # type: ignore[arg-type] + c, + ) + for c in old_commands + ) + + new_command_rows = [ + { + "run_id": run_id, + "index_in_run": index_in_run, + "command_id": parsed_command.id, + "command": pydantic_helpers.pydantic_to_json(parsed_command), + } + for index_in_run, parsed_command in enumerate(parsed_commands) + ] + + # Insert all the commands for this run in one go, to avoid the overhead of + # separate statements, and since we had to bring them all into memory at once + # in order to parse them anyway. + with lock, dest_engine.begin() as dest_transaction: + if len(new_command_rows) > 0: + # This needs to be guarded by a len>0 check because if the list is empty, + # SQLAlchemy misinterprets this as inserting a single row with all default + # values. + dest_transaction.execute(insert_new_command, new_command_rows) diff --git a/robot-server/robot_server/persistence/_migrations/up_to_3.py b/robot-server/robot_server/persistence/_migrations/up_to_3.py index 3227058e179..eb9749fb47a 100644 --- a/robot-server/robot_server/persistence/_migrations/up_to_3.py +++ b/robot-server/robot_server/persistence/_migrations/up_to_3.py @@ -36,6 +36,8 @@ from ._util import copy_rows_unmodified, copy_if_exists, copytree_if_exists from . import up_to_2 +from . import _up_to_3_worker + # TODO: Define a single source of truth somewhere for these paths. _DECK_CONFIGURATION_FILE = "deck_configuration.json" @@ -181,16 +183,19 @@ def _migrate_db_commands( the work across subprocesses. Each subprocess extracts, migrates, and inserts all of the commands for a single run. """ + mp = multiprocessing.get_context("forkserver") + mp.set_forkserver_preload(_up_to_3_worker.imports) + # We'll use a lock to make sure only one process is accessing the database at once. # # Concurrent access would be safe in the sense that SQLite would always provide # isolation. But, when there are conflicts, we'd have to deal with SQLite retrying # transactions or raising SQLITE_BUSY. A Python-level lock is simpler and more # reliable. - manager = multiprocessing.Manager() + manager = mp.Manager() lock = manager.Lock() - with multiprocessing.Pool( + with mp.Pool( # One worker per core of the OT-2's Raspberry Pi. # We're compute-bound, so more workers would just thrash. # @@ -201,63 +206,6 @@ def _migrate_db_commands( processes=4 ) as pool: pool.map( - _migrate_db_commands_for_run, + _up_to_3_worker.migrate_db_commands_for_run, ((source_db_file, dest_db_file, run_id, lock) for run_id in run_ids), ) - - -def _migrate_db_commands_for_run( - args: Tuple[ - Path, - Path, - str, - # This is a multiprocessing.Lock, which can't be a type annotation for some reason. - ContextManager[object], - ] -) -> None: - source_db_file, dest_db_file, run_id, lock = args - - with sql_engine_ctx(source_db_file) as source_engine, sql_engine_ctx( - dest_db_file - ) as dest_engine: - select_old_commands = sqlalchemy.select(schema_2.run_table.c.commands).where( - schema_2.run_table.c.id == run_id - ) - insert_new_command = sqlalchemy.insert(schema_3.run_command_table) - - with lock, source_engine.begin() as source_transaction: - old_commands_bytes: Optional[bytes] = source_transaction.execute( - select_old_commands - ).scalar_one() - - old_commands: List[Dict[str, object]] = ( - legacy_pickle.loads(old_commands_bytes) if old_commands_bytes else [] - ) - - parsed_commands: Iterable[Command] = ( - pydantic.parse_obj_as( - Command, # type: ignore[arg-type] - c, - ) - for c in old_commands - ) - - new_command_rows = [ - { - "run_id": run_id, - "index_in_run": index_in_run, - "command_id": parsed_command.id, - "command": pydantic_to_json(parsed_command), - } - for index_in_run, parsed_command in enumerate(parsed_commands) - ] - - # Insert all the commands for this run in one go, to avoid the overhead of - # separate statements, and since we had to bring them all into memory at once - # in order to parse them anyway. - with lock, dest_engine.begin() as dest_transaction: - if len(new_command_rows) > 0: - # This needs to be guarded by a len>0 check because if the list is empty, - # SQLAlchemy misinterprets this as inserting a single row with all default - # values. - dest_transaction.execute(insert_new_command, new_command_rows) From d1874728173484c9f44fb4b875daa1831936dc0e Mon Sep 17 00:00:00 2001 From: Max Marrone Date: Mon, 12 Feb 2024 21:38:50 -0500 Subject: [PATCH 08/18] Use starmap(), not map(), to avoid manually packing the args tuple. --- .../persistence/_migrations/_up_to_3_worker.py | 14 +++++--------- .../persistence/_migrations/up_to_3.py | 2 +- 2 files changed, 6 insertions(+), 10 deletions(-) diff --git a/robot-server/robot_server/persistence/_migrations/_up_to_3_worker.py b/robot-server/robot_server/persistence/_migrations/_up_to_3_worker.py index 4b306d5ced6..6eaa84d7640 100644 --- a/robot-server/robot_server/persistence/_migrations/_up_to_3_worker.py +++ b/robot-server/robot_server/persistence/_migrations/_up_to_3_worker.py @@ -33,16 +33,12 @@ def migrate_db_commands_for_run( - args: typing.Tuple[ - pathlib.Path, - pathlib.Path, - str, - # This is a multiprocessing.Lock, which can't be a type annotation for some reason. - typing.ContextManager[object], - ] + source_db_file: pathlib.Path, + dest_db_file: pathlib.Path, + run_id: str, + # This is a multiprocessing.Lock, which can't be a type annotation for some reason. + lock: typing.ContextManager[object], ) -> None: - source_db_file, dest_db_file, run_id, lock = args - with _database.sql_engine_ctx( source_db_file ) as source_engine, _database.sql_engine_ctx(dest_db_file) as dest_engine: diff --git a/robot-server/robot_server/persistence/_migrations/up_to_3.py b/robot-server/robot_server/persistence/_migrations/up_to_3.py index eb9749fb47a..ac465670297 100644 --- a/robot-server/robot_server/persistence/_migrations/up_to_3.py +++ b/robot-server/robot_server/persistence/_migrations/up_to_3.py @@ -205,7 +205,7 @@ def _migrate_db_commands( # so that's at most ~200MB total, which should be fine. processes=4 ) as pool: - pool.map( + pool.starmap( _up_to_3_worker.migrate_db_commands_for_run, ((source_db_file, dest_db_file, run_id, lock) for run_id in run_ids), ) From f1decaf65447c5f91131d157f7fbbb44e24e47b6 Mon Sep 17 00:00:00 2001 From: Max Marrone Date: Mon, 12 Feb 2024 21:48:41 -0500 Subject: [PATCH 09/18] Improve docstrings. --- .../_migrations/_up_to_3_worker.py | 20 ++++++++++++++++++- .../persistence/_migrations/up_to_3.py | 2 +- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/robot-server/robot_server/persistence/_migrations/_up_to_3_worker.py b/robot-server/robot_server/persistence/_migrations/_up_to_3_worker.py index 6eaa84d7640..54dcbddaeb8 100644 --- a/robot-server/robot_server/persistence/_migrations/_up_to_3_worker.py +++ b/robot-server/robot_server/persistence/_migrations/_up_to_3_worker.py @@ -1,3 +1,6 @@ +"""Code that runs in a worker subprocess for the `up_to_3` migration.""" + + # fmt: off # We keep a list of all the modules that this file imports @@ -30,15 +33,30 @@ imports: typing.List[str] = [m.__name__ for m in _imports] +"""The names of all modules imported by this module, e.g. "foo.bar.baz".""" -def migrate_db_commands_for_run( +def migrate_commands_for_run( source_db_file: pathlib.Path, dest_db_file: pathlib.Path, run_id: str, # This is a multiprocessing.Lock, which can't be a type annotation for some reason. lock: typing.ContextManager[object], ) -> None: + """Perform the schema 2->3 migration for a single run's commands. + + See the `up_to_3` migration for background. + + Args: + source_db_file: The SQLite database file to migrate from. + dest_db_file: The SQLite database file to migrate into. Assumed to have all the + proper tables set up already. + run_id: Which run's commands to migrate. + lock: A lock to hold while accessing the database. Concurrent access would be + safe in the sense that SQLite would always provide isolation. But, when + there are conflicts, we'd have to deal with SQLite retrying transactions or + raising SQLITE_BUSY. A Python-level lock is simpler and more reliable. + """ with _database.sql_engine_ctx( source_db_file ) as source_engine, _database.sql_engine_ctx(dest_db_file) as dest_engine: diff --git a/robot-server/robot_server/persistence/_migrations/up_to_3.py b/robot-server/robot_server/persistence/_migrations/up_to_3.py index ac465670297..8641943a1d7 100644 --- a/robot-server/robot_server/persistence/_migrations/up_to_3.py +++ b/robot-server/robot_server/persistence/_migrations/up_to_3.py @@ -206,6 +206,6 @@ def _migrate_db_commands( processes=4 ) as pool: pool.starmap( - _up_to_3_worker.migrate_db_commands_for_run, + _up_to_3_worker.migrate_commands_for_run, ((source_db_file, dest_db_file, run_id, lock) for run_id in run_ids), ) From 49158a57bb30101f8e1eefe82ab55c63c2bc9691 Mon Sep 17 00:00:00 2001 From: Max Marrone Date: Mon, 12 Feb 2024 21:49:41 -0500 Subject: [PATCH 10/18] Lint. --- .../persistence/_migrations/_up_to_3_worker.py | 18 +++++++++--------- .../persistence/_migrations/up_to_3.py | 7 +++---- 2 files changed, 12 insertions(+), 13 deletions(-) diff --git a/robot-server/robot_server/persistence/_migrations/_up_to_3_worker.py b/robot-server/robot_server/persistence/_migrations/_up_to_3_worker.py index 54dcbddaeb8..34eccfc6a56 100644 --- a/robot-server/robot_server/persistence/_migrations/_up_to_3_worker.py +++ b/robot-server/robot_server/persistence/_migrations/_up_to_3_worker.py @@ -8,21 +8,21 @@ from types import ModuleType _imports: "list[ModuleType]" = [] -import contextlib -import pathlib -import typing +import contextlib # noqa: E402 +import pathlib # noqa: E402 +import typing # noqa: E402 _imports.extend([contextlib, pathlib, typing]) -import pydantic -import sqlalchemy +import pydantic # noqa: E402 +import sqlalchemy # noqa: E402 _imports.extend([pydantic, sqlalchemy]) -from opentrons.protocol_engine import commands -from server_utils import sql_utils +from opentrons.protocol_engine import commands # noqa: E402 +from server_utils import sql_utils # noqa: E402 _imports.extend([commands, sql_utils]) -from robot_server.persistence._tables import schema_2, schema_3 -from robot_server.persistence import ( +from robot_server.persistence._tables import schema_2, schema_3 # noqa: E402 +from robot_server.persistence import ( # noqa: E402 _database, legacy_pickle, pydantic as pydantic_helpers diff --git a/robot-server/robot_server/persistence/_migrations/up_to_3.py b/robot-server/robot_server/persistence/_migrations/up_to_3.py index 8641943a1d7..946eee881fb 100644 --- a/robot-server/robot_server/persistence/_migrations/up_to_3.py +++ b/robot-server/robot_server/persistence/_migrations/up_to_3.py @@ -17,15 +17,14 @@ """ import multiprocessing -from contextlib import ExitStack, contextmanager +from contextlib import ExitStack from pathlib import Path -from typing import ContextManager, Dict, Generator, Iterable, List, Optional, Tuple +from typing import List -from opentrons.protocol_engine import Command, StateSummary +from opentrons.protocol_engine import StateSummary import pydantic import sqlalchemy -from .. import legacy_pickle from ..pydantic import pydantic_to_json from .._database import ( sql_engine_ctx, From e6eadaabe77a1bdd54c38eb1a0109e324a5dcfca Mon Sep 17 00:00:00 2001 From: Max Marrone Date: Mon, 12 Feb 2024 22:01:51 -0500 Subject: [PATCH 11/18] Type legacy_pickle.loads() as returning Any. --- robot-server/robot_server/persistence/legacy_pickle.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/robot-server/robot_server/persistence/legacy_pickle.py b/robot-server/robot_server/persistence/legacy_pickle.py index 0ad36054cbc..36d68a1968a 100644 --- a/robot-server/robot_server/persistence/legacy_pickle.py +++ b/robot-server/robot_server/persistence/legacy_pickle.py @@ -15,7 +15,7 @@ # unknown types. dumps as dumps, ) -from typing import Dict, List +from typing import Any, Dict, List _log = getLogger(__name__) @@ -69,7 +69,7 @@ def find_class(self, module: str, name: str) -> object: # noqa: D102 return super().find_class(module, name) -def loads(data: bytes) -> object: +def loads(data: bytes) -> Any: """Drop-in replacement for `pickle.loads` that uses our custom unpickler.""" return LegacyUnpickler(BytesIO(data)).load() From 0bb3a6f9af4acb608316c9bc40034543691d726f Mon Sep 17 00:00:00 2001 From: Max Marrone Date: Tue, 13 Feb 2024 00:33:41 -0500 Subject: [PATCH 12/18] Remove unneeded comment. --- .../robot_server/persistence/_migrations/_up_to_3_worker.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/robot-server/robot_server/persistence/_migrations/_up_to_3_worker.py b/robot-server/robot_server/persistence/_migrations/_up_to_3_worker.py index 34eccfc6a56..4dfe3c8c054 100644 --- a/robot-server/robot_server/persistence/_migrations/_up_to_3_worker.py +++ b/robot-server/robot_server/persistence/_migrations/_up_to_3_worker.py @@ -92,9 +92,6 @@ def migrate_commands_for_run( for index_in_run, parsed_command in enumerate(parsed_commands) ] - # Insert all the commands for this run in one go, to avoid the overhead of - # separate statements, and since we had to bring them all into memory at once - # in order to parse them anyway. with lock, dest_engine.begin() as dest_transaction: if len(new_command_rows) > 0: # This needs to be guarded by a len>0 check because if the list is empty, From e0823a9840c75eaca86abb3b4a9574270008d623 Mon Sep 17 00:00:00 2001 From: Max Marrone Date: Tue, 13 Feb 2024 00:53:03 -0500 Subject: [PATCH 13/18] Small fixups. --- .../persistence/_migrations/up_to_3.py | 46 +++++++++---------- 1 file changed, 22 insertions(+), 24 deletions(-) diff --git a/robot-server/robot_server/persistence/_migrations/up_to_3.py b/robot-server/robot_server/persistence/_migrations/up_to_3.py index 946eee881fb..c251e2d0efb 100644 --- a/robot-server/robot_server/persistence/_migrations/up_to_3.py +++ b/robot-server/robot_server/persistence/_migrations/up_to_3.py @@ -2,7 +2,7 @@ Summary of changes from schema 2: -- Run commands were formerly stored as monolithic blobs in the `run` table, +- Run commands were formerly stored as monolithic blobs in the `run.commands` column, with each row storing an entire list. This has been split out into a new `run_command` table, where each individual command gets its own row. @@ -65,9 +65,11 @@ def migrate(self, source_dir: Path, dest_dir: Path) -> None: dest_engine = exit_stack.enter_context(sql_engine_ctx(dest_db_file)) schema_3.metadata.create_all(dest_engine) - with source_engine.begin() as source_transaction, dest_engine.begin() as dest_transaction: - run_ids = _get_run_ids(schema_2_transaction=source_transaction) - _migrate_db_excluding_commands(source_transaction, dest_transaction) + source_transaction = exit_stack.enter_context(source_engine.begin()) + dest_transaction = exit_stack.enter_context(dest_engine.begin()) + + run_ids = _get_run_ids(schema_2_transaction=source_transaction) + _migrate_db_excluding_commands(source_transaction, dest_transaction) _migrate_db_commands(source_db_file, dest_db_file, run_ids) @@ -126,49 +128,45 @@ def _migrate_run_table_excluding_commands( ).order_by(sqlite_rowid) insert_new_run = sqlalchemy.insert(schema_3.run_table) - for old_run_row in source_transaction.execute(select_old_runs).all(): - old_state_summary = old_run_row.state_summary + for old_row in source_transaction.execute(select_old_runs).all(): + old_state_summary = old_row.state_summary new_state_summary = ( None - if old_run_row.state_summary is None + if old_row.state_summary is None else pydantic_to_json( pydantic.parse_obj_as(StateSummary, old_state_summary) ) ) dest_transaction.execute( insert_new_run, - id=old_run_row.id, - created_at=old_run_row.created_at, - protocol_id=old_run_row.protocol_id, + id=old_row.id, + created_at=old_row.created_at, + protocol_id=old_row.protocol_id, state_summary=new_state_summary, - engine_status=old_run_row.engine_status, - _updated_at=old_run_row._updated_at, + engine_status=old_row.engine_status, + _updated_at=old_row._updated_at, ) def _migrate_analysis_table( - source_connection: sqlalchemy.engine.Connection, - dest_connection: sqlalchemy.engine.Connection, + source_transaction: sqlalchemy.engine.Connection, + dest_transaction: sqlalchemy.engine.Connection, ) -> None: select_old_analyses = sqlalchemy.select(schema_2.analysis_table).order_by( sqlite_rowid ) insert_new_analysis = sqlalchemy.insert(schema_3.analysis_table) - for row in ( - # The table is missing an explicit sequence number column, so we need - # sqlite_rowid to retain order across this copy. - source_connection.execute(select_old_analyses).all() - ): - dest_connection.execute( + for old_row in source_transaction.execute(select_old_analyses).all(): + dest_transaction.execute( insert_new_analysis, # The new `completed_analysis` column has the data that used to be in # `completed_analysis_as_document`. The separate # `completed_analysis_as_document` column is dropped. - completed_analysis=row.completed_analysis_as_document, + completed_analysis=old_row.completed_analysis_as_document, # The remaining columns are unchanged: - id=row.id, - protocol_id=row.protocol_id, - analyzer_version=row.analyzer_version, + id=old_row.id, + protocol_id=old_row.protocol_id, + analyzer_version=old_row.analyzer_version, ) From e20f49446bbc55cce9433f4b3ac541fd81841cb3 Mon Sep 17 00:00:00 2001 From: Max Marrone Date: Tue, 13 Feb 2024 00:57:20 -0500 Subject: [PATCH 14/18] Remove commented code. --- robot-server/robot_server/persistence/_tables/schema_2.py | 1 - 1 file changed, 1 deletion(-) diff --git a/robot-server/robot_server/persistence/_tables/schema_2.py b/robot-server/robot_server/persistence/_tables/schema_2.py index baa95ca80dc..3537757845e 100644 --- a/robot-server/robot_server/persistence/_tables/schema_2.py +++ b/robot-server/robot_server/persistence/_tables/schema_2.py @@ -106,7 +106,6 @@ sqlalchemy.Column( "commands", sqlalchemy.LargeBinary, - # sqlalchemy.PickleType(pickler=legacy_pickle, protocol=PICKLE_PROTOCOL_VERSION), nullable=True, ), # column added in schema v1 From 2afaf8fade5f49a6b8ea496fbd3ce7e83a0d76ce Mon Sep 17 00:00:00 2001 From: Max Marrone Date: Wed, 14 Feb 2024 11:06:24 -0500 Subject: [PATCH 15/18] Clarify the napkin math. --- robot-server/robot_server/persistence/_migrations/up_to_3.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/robot-server/robot_server/persistence/_migrations/up_to_3.py b/robot-server/robot_server/persistence/_migrations/up_to_3.py index c251e2d0efb..0e5850a5e17 100644 --- a/robot-server/robot_server/persistence/_migrations/up_to_3.py +++ b/robot-server/robot_server/persistence/_migrations/up_to_3.py @@ -197,9 +197,8 @@ def _migrate_db_commands( # We're compute-bound, so more workers would just thrash. # # Napkin math for the memory footprint: - # Suppose a very large protocol has ~10MB of commands (see e.g. RQA-443). - # The maximum number of runs at the time of writing is 20, - # so that's at most ~200MB total, which should be fine. + # Suppose a very large run has ~10 MB of commands (see e.g. RQA-443). + # We're limiting this to 4 runs at a time, so 40 MB, which should be fine. processes=4 ) as pool: pool.starmap( From ad9be135451f00ada19aa40cb47118890de32d99 Mon Sep 17 00:00:00 2001 From: Max Marrone Date: Wed, 14 Feb 2024 12:02:48 -0500 Subject: [PATCH 16/18] Suppress exceptions while migrating commands. --- .../persistence/_migrations/_up_to_3_worker.py | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/robot-server/robot_server/persistence/_migrations/_up_to_3_worker.py b/robot-server/robot_server/persistence/_migrations/_up_to_3_worker.py index 4dfe3c8c054..11adae2018b 100644 --- a/robot-server/robot_server/persistence/_migrations/_up_to_3_worker.py +++ b/robot-server/robot_server/persistence/_migrations/_up_to_3_worker.py @@ -57,9 +57,20 @@ def migrate_commands_for_run( there are conflicts, we'd have to deal with SQLite retrying transactions or raising SQLITE_BUSY. A Python-level lock is simpler and more reliable. """ - with _database.sql_engine_ctx( + with contextlib.suppress( + # The format that we're migrating from is prone to bugs where our latest + # code can't read records created by older code. (See RSS-98). + # If that happens, it's better to silently drop the run than to fail the + # whole migration. + # + # TODO(mm, 2024-02-14): Log these somehow. Logging is a little tricky from + # subprocesses. + Exception + ), _database.sql_engine_ctx( source_db_file - ) as source_engine, _database.sql_engine_ctx(dest_db_file) as dest_engine: + ) as source_engine, _database.sql_engine_ctx( + dest_db_file + ) as dest_engine: select_old_commands = sqlalchemy.select(schema_2.run_table.c.commands).where( schema_2.run_table.c.id == run_id ) From f7be0af0d48882134be7ee750dcd2aa8d8d2fdcf Mon Sep 17 00:00:00 2001 From: Max Marrone Date: Wed, 14 Feb 2024 12:04:46 -0500 Subject: [PATCH 17/18] Suppress and log exceptions while migrating run summaries. --- .../persistence/_migrations/up_to_3.py | 54 ++++++++++++------- 1 file changed, 35 insertions(+), 19 deletions(-) diff --git a/robot-server/robot_server/persistence/_migrations/up_to_3.py b/robot-server/robot_server/persistence/_migrations/up_to_3.py index 0e5850a5e17..60a9078c72f 100644 --- a/robot-server/robot_server/persistence/_migrations/up_to_3.py +++ b/robot-server/robot_server/persistence/_migrations/up_to_3.py @@ -19,6 +19,7 @@ import multiprocessing from contextlib import ExitStack from pathlib import Path +from logging import getLogger from typing import List from opentrons.protocol_engine import StateSummary @@ -44,6 +45,9 @@ _DB_FILE = "robot_server.db" +_log = getLogger(__name__) + + class MigrationUpTo3(Migration): # noqa: D101 def migrate(self, source_dir: Path, dest_dir: Path) -> None: """Migrate the persistence directory from schema 2 to 3.""" @@ -68,15 +72,17 @@ def migrate(self, source_dir: Path, dest_dir: Path) -> None: source_transaction = exit_stack.enter_context(source_engine.begin()) dest_transaction = exit_stack.enter_context(dest_engine.begin()) - run_ids = _get_run_ids(schema_2_transaction=source_transaction) _migrate_db_excluding_commands(source_transaction, dest_transaction) + # Get the run IDs *after* migrating runs, in case any runs got dropped. + run_ids = _get_run_ids(schema_3_transaction=dest_transaction) + _migrate_db_commands(source_db_file, dest_db_file, run_ids) -def _get_run_ids(*, schema_2_transaction: sqlalchemy.engine.Connection) -> List[str]: +def _get_run_ids(*, schema_3_transaction: sqlalchemy.engine.Connection) -> List[str]: return ( - schema_2_transaction.execute(sqlalchemy.select(schema_2.run_table.c.id)) + schema_3_transaction.execute(sqlalchemy.select(schema_3.run_table.c.id)) .scalars() .all() ) @@ -129,23 +135,33 @@ def _migrate_run_table_excluding_commands( insert_new_run = sqlalchemy.insert(schema_3.run_table) for old_row in source_transaction.execute(select_old_runs).all(): - old_state_summary = old_row.state_summary - new_state_summary = ( - None - if old_row.state_summary is None - else pydantic_to_json( - pydantic.parse_obj_as(StateSummary, old_state_summary) + try: + old_state_summary = old_row.state_summary + new_state_summary = ( + None + if old_row.state_summary is None + else pydantic_to_json( + pydantic.parse_obj_as(StateSummary, old_state_summary) + ) + ) + dest_transaction.execute( + insert_new_run, + id=old_row.id, + created_at=old_row.created_at, + protocol_id=old_row.protocol_id, + state_summary=new_state_summary, + engine_status=old_row.engine_status, + _updated_at=old_row._updated_at, + ) + except Exception: + # The format that we're migrating from is prone to bugs where our latest + # code can't read records created by older code. (See RSS-98). + # If that happens, it's better to silently drop the run than to fail the + # whole migration. + _log.warning( + f"Exception while migrating run {old_row.id}. Dropping it.", + exc_info=True, ) - ) - dest_transaction.execute( - insert_new_run, - id=old_row.id, - created_at=old_row.created_at, - protocol_id=old_row.protocol_id, - state_summary=new_state_summary, - engine_status=old_row.engine_status, - _updated_at=old_row._updated_at, - ) def _migrate_analysis_table( From 39ccd94000ddba9237e938035d4e1abdce2220da Mon Sep 17 00:00:00 2001 From: Max Marrone Date: Wed, 14 Feb 2024 12:17:50 -0500 Subject: [PATCH 18/18] Delete redundant comment. This is already in the worker process's function's docstring. --- .../robot_server/persistence/_migrations/up_to_3.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/robot-server/robot_server/persistence/_migrations/up_to_3.py b/robot-server/robot_server/persistence/_migrations/up_to_3.py index 60a9078c72f..906cdf70dd5 100644 --- a/robot-server/robot_server/persistence/_migrations/up_to_3.py +++ b/robot-server/robot_server/persistence/_migrations/up_to_3.py @@ -199,12 +199,6 @@ def _migrate_db_commands( mp = multiprocessing.get_context("forkserver") mp.set_forkserver_preload(_up_to_3_worker.imports) - # We'll use a lock to make sure only one process is accessing the database at once. - # - # Concurrent access would be safe in the sense that SQLite would always provide - # isolation. But, when there are conflicts, we'd have to deal with SQLite retrying - # transactions or raising SQLITE_BUSY. A Python-level lock is simpler and more - # reliable. manager = mp.Manager() lock = manager.Lock()