diff --git a/robot-server/robot_server/persistence/_migrations/up_to_3.py b/robot-server/robot_server/persistence/_migrations/up_to_3.py index 8a8294bfe03..a6fb8afacdb 100644 --- a/robot-server/robot_server/persistence/_migrations/up_to_3.py +++ b/robot-server/robot_server/persistence/_migrations/up_to_3.py @@ -2,18 +2,30 @@ Summary of changes from schema 2: -- Run commands were formerly stored as monolithic blobs in the run table, +- Run commands were formerly stored as monolithic blobs in the `run` table, with each row storing an entire list. This has been split out into a new - run_command table, where each individual command gets its own row. + `run_command` table, where each individual command gets its own row. + +- All columns that were storing binary pickles have been converted to storing + JSON strings: + - `analysis.completed_analysis` + - `run.state_summary` + - `run_commands.command` (formerly `run.commands`; see above) + +- `analysis.completed_analysis_as_document` has been removed, + since the updated `analysis.completed_analysis` (see above) replaces it. """ from contextlib import ExitStack from pathlib import Path -from typing import Any, Dict, List +from typing import Any, Dict, Iterable, List +from opentrons.protocol_engine import Command, StateSummary +import pydantic import sqlalchemy +from ..pydantic import pydantic_to_json from .._database import ( create_schema_2_sql_engine, create_schema_3_sql_engine, @@ -64,12 +76,9 @@ def _migrate_db( order_by_rowid=True, ) - copy_rows_unmodified( - schema_2.analysis_table, - schema_3.analysis_table, + _migrate_analysis_table( source_transaction, dest_transaction, - order_by_rowid=True, ) _migrate_run_table( @@ -95,25 +104,40 @@ def _migrate_run_table( insert_new_command = sqlalchemy.insert(schema_3.run_command_table) for old_run_row in source_transaction.execute(select_old_runs).all(): + old_state_summary = old_run_row.state_summary + new_state_summary = ( + None + if old_run_row.state_summary is None + else pydantic_to_json( + pydantic.parse_obj_as(StateSummary, old_state_summary) + ) + ) dest_transaction.execute( insert_new_run, id=old_run_row.id, created_at=old_run_row.created_at, protocol_id=old_run_row.protocol_id, - state_summary=old_run_row.state_summary, + state_summary=new_state_summary, engine_status=old_run_row.engine_status, _updated_at=old_run_row._updated_at, ) - commands: List[Dict[str, Any]] = old_run_row.commands or [] + old_commands: List[Dict[str, Any]] = old_run_row.commands or [] + pydantic_old_commands: Iterable[Command] = ( + pydantic.parse_obj_as( + Command, # type: ignore[arg-type] + c, + ) + for c in old_commands + ) new_command_rows = [ { "run_id": old_run_row.id, "index_in_run": index_in_run, - "command_id": command["id"], - "command": command, + "command_id": pydantic_command.id, + "command": pydantic_to_json(pydantic_command), } - for index_in_run, command in enumerate(commands) + for index_in_run, pydantic_command in enumerate(pydantic_old_commands) ] # Insert all the commands for this run in one go, to avoid the overhead of # separate statements, and since we had to bring them all into memory at once @@ -123,3 +147,29 @@ def _migrate_run_table( # SQLAlchemy misinterprets this as inserting a single row with all default # values. dest_transaction.execute(insert_new_command, new_command_rows) + + +def _migrate_analysis_table( + source_connection: sqlalchemy.engine.Connection, + dest_connection: sqlalchemy.engine.Connection, +) -> None: + select_old_analyses = sqlalchemy.select(schema_2.analysis_table).order_by( + sqlite_rowid + ) + insert_new_analysis = sqlalchemy.insert(schema_3.analysis_table) + for row in ( + # The table is missing an explicit sequence number column, so we need + # sqlite_rowid to retain order across this copy. + source_connection.execute(select_old_analyses).all() + ): + dest_connection.execute( + insert_new_analysis, + # The new `completed_analysis` column has the data that used to be in + # `completed_analysis_as_document`. The separate + # `completed_analysis_as_document` column is dropped. + completed_analysis=row.completed_analysis_as_document, + # The remaining columns are unchanged: + id=row.id, + protocol_id=row.protocol_id, + analyzer_version=row.analyzer_version, + ) diff --git a/robot-server/robot_server/persistence/_tables/schema_3.py b/robot-server/robot_server/persistence/_tables/schema_3.py index b20b443b754..c7e2499b617 100644 --- a/robot-server/robot_server/persistence/_tables/schema_3.py +++ b/robot-server/robot_server/persistence/_tables/schema_3.py @@ -2,8 +2,6 @@ import sqlalchemy -from robot_server.persistence import legacy_pickle -from robot_server.persistence.pickle_protocol_version import PICKLE_PROTOCOL_VERSION from robot_server.persistence._utc_datetime import UTCDateTime metadata = sqlalchemy.MetaData() @@ -46,18 +44,9 @@ ), sqlalchemy.Column( "completed_analysis", - # Stores a pickled dict. See CompletedAnalysisStore. - # TODO(mm, 2023-08-30): Remove this. See https://opentrons.atlassian.net/browse/RSS-98. - sqlalchemy.LargeBinary, - nullable=False, - ), - sqlalchemy.Column( - "completed_analysis_as_document", - # Stores the same data as completed_analysis, but serialized as a JSON string. + # Stores a JSON string. See CompletedAnalysisStore. sqlalchemy.String, - # This column should never be NULL in practice. - # It needs to be nullable=True because of limitations in SQLite and our migration code. - nullable=True, + nullable=False, ), ) @@ -83,7 +72,7 @@ # column added in schema v1 sqlalchemy.Column( "state_summary", - sqlalchemy.PickleType(pickler=legacy_pickle, protocol=PICKLE_PROTOCOL_VERSION), + sqlalchemy.String, nullable=True, ), # column added in schema v1 @@ -119,13 +108,7 @@ ), sqlalchemy.Column("index_in_run", sqlalchemy.Integer, nullable=False), sqlalchemy.Column("command_id", sqlalchemy.String, nullable=False), - sqlalchemy.Column( - "command", - # TODO(mm, 2024-01-25): This should be JSON instead of a pickle. See: - # https://opentrons.atlassian.net/browse/RSS-98. - sqlalchemy.PickleType(pickler=legacy_pickle, protocol=PICKLE_PROTOCOL_VERSION), - nullable=False, - ), + sqlalchemy.Column("command", sqlalchemy.String, nullable=False), sqlalchemy.Index( "ix_run_run_id_command_id", # An arbitrary name for the index. "run_id", diff --git a/robot-server/robot_server/persistence/pydantic.py b/robot-server/robot_server/persistence/pydantic.py new file mode 100644 index 00000000000..c3486394ad4 --- /dev/null +++ b/robot-server/robot_server/persistence/pydantic.py @@ -0,0 +1,22 @@ +"""Store Pydantic objects in the SQL database.""" + +from typing import Type, TypeVar +from pydantic import BaseModel, parse_raw_as + + +_BaseModelT = TypeVar("_BaseModelT", bound=BaseModel) + + +def pydantic_to_json(obj: BaseModel) -> str: + """Serialize a Pydantic object for storing in the SQL database.""" + return obj.json( + # by_alias and exclude_none should match how + # FastAPI + Pydantic + our customizations serialize these objects + by_alias=True, + exclude_none=True, + ) + + +def json_to_pydantic(model: Type[_BaseModelT], json: str) -> _BaseModelT: + """Parse a Pydantic object stored in the SQL database.""" + return parse_raw_as(model, json) diff --git a/robot-server/robot_server/protocols/completed_analysis_store.py b/robot-server/robot_server/protocols/completed_analysis_store.py index 29ee571d08c..7bbee59ef97 100644 --- a/robot-server/robot_server/protocols/completed_analysis_store.py +++ b/robot-server/robot_server/protocols/completed_analysis_store.py @@ -2,7 +2,7 @@ from __future__ import annotations import asyncio -from typing import Dict, List, Optional, Tuple +from typing import Dict, List, Optional from logging import getLogger from dataclasses import dataclass @@ -10,8 +10,7 @@ import anyio from robot_server.persistence import analysis_table, sqlite_rowid -from robot_server.persistence import legacy_pickle -from robot_server.persistence.pickle_protocol_version import PICKLE_PROTOCOL_VERSION +from robot_server.persistence.pydantic import json_to_pydantic, pydantic_to_json from .analysis_models import CompletedAnalysis from .analysis_memcache import MemoryCache @@ -43,16 +42,10 @@ async def to_sql_values(self) -> Dict[str, object]: Avoid calling this from inside a SQL transaction, since it might be slow. """ - def serialize_completed_analysis() -> Tuple[bytes, str]: - serialized_pickle = _serialize_completed_analysis_to_pickle( - self.completed_analysis - ) - serialized_json = _serialize_completed_analysis_to_json( - self.completed_analysis - ) - return serialized_pickle, serialized_json + def serialize_completed_analysis() -> str: + return pydantic_to_json(self.completed_analysis) - serialized_pickle, serialized_json = await anyio.to_thread.run_sync( + serialized_json = await anyio.to_thread.run_sync( serialize_completed_analysis, # Cancellation may orphan the worker thread, # but that should be harmless in this case. @@ -63,8 +56,7 @@ def serialize_completed_analysis() -> Tuple[bytes, str]: "id": self.id, "protocol_id": self.protocol_id, "analyzer_version": self.analyzer_version, - "completed_analysis": serialized_pickle, - "completed_analysis_as_document": serialized_json, + "completed_analysis": serialized_json, } @classmethod @@ -93,9 +85,7 @@ async def from_sql_row( assert isinstance(protocol_id, str) def parse_completed_analysis() -> CompletedAnalysis: - return CompletedAnalysis.parse_obj( - legacy_pickle.loads(sql_row.completed_analysis) - ) + return json_to_pydantic(CompletedAnalysis, sql_row.completed_analysis) completed_analysis = await anyio.to_thread.run_sync( parse_completed_analysis, @@ -181,21 +171,17 @@ async def get_by_id_as_document(self, analysis_id: str) -> Optional[str]: This is like `get_by_id()`, except it returns the analysis as a pre-serialized JSON document. """ - statement = sqlalchemy.select( - analysis_table.c.completed_analysis_as_document - ).where(analysis_table.c.id == analysis_id) + statement = sqlalchemy.select(analysis_table.c.completed_analysis).where( + analysis_table.c.id == analysis_id + ) with self._sql_engine.begin() as transaction: try: - document: Optional[str] = transaction.execute(statement).scalar_one() + document: str = transaction.execute(statement).scalar_one() except sqlalchemy.exc.NoResultFound: # No analysis with this ID. return None - # Although the completed_analysis_as_document column is nullable, - # our migration code is supposed to ensure that it's never NULL in practice. - assert document is not None - return document async def get_by_protocol( @@ -285,21 +271,3 @@ async def add(self, completed_analysis_resource: CompletedAnalysisResource) -> N self._memcache.insert( completed_analysis_resource.id, completed_analysis_resource ) - - -def _serialize_completed_analysis_to_pickle( - completed_analysis: CompletedAnalysis, -) -> bytes: - return legacy_pickle.dumps( - completed_analysis.dict(), protocol=PICKLE_PROTOCOL_VERSION - ) - - -def _serialize_completed_analysis_to_json(completed_analysis: CompletedAnalysis) -> str: - return completed_analysis.json( - # by_alias and exclude_none should match how - # FastAPI + Pydantic + our customizations serialize these objects - # over the `GET /protocols/:id/analyses/:id` endpoint. - by_alias=True, - exclude_none=True, - ) diff --git a/robot-server/robot_server/runs/run_store.py b/robot-server/robot_server/runs/run_store.py index 494ea9096f4..d5ad6866254 100644 --- a/robot-server/robot_server/runs/run_store.py +++ b/robot-server/robot_server/runs/run_store.py @@ -7,7 +7,7 @@ from typing import Dict, List, Optional import sqlalchemy -from pydantic import parse_obj_as, ValidationError +from pydantic import ValidationError from opentrons.util.helpers import utc_now from opentrons.protocol_engine import StateSummary, CommandSlice @@ -19,6 +19,7 @@ action_table, sqlite_rowid, ) +from robot_server.persistence.pydantic import json_to_pydantic, pydantic_to_json from robot_server.protocols import ProtocolNotFoundError from .action_models import RunAction, RunActionType @@ -114,7 +115,7 @@ def update_run_state( "run_id": run_id, "index_in_run": command_index, "command_id": command.id, - "command": command.dict(), + "command": pydantic_to_json(command), }, ) @@ -278,7 +279,7 @@ def get_state_summary(self, run_id: str) -> Optional[StateSummary]: try: return ( - StateSummary.parse_obj(row.state_summary) + json_to_pydantic(StateSummary, row.state_summary) if row.state_summary is not None else None ) @@ -336,7 +337,7 @@ def get_commands_slice( slice_result = transaction.execute(select_slice).all() sliced_commands: List[Command] = [ - parse_obj_as(Command, row.command) # type: ignore[arg-type] + json_to_pydantic(Command, row.command) # type: ignore[arg-type] for row in slice_result ] @@ -374,7 +375,7 @@ def get_command(self, run_id: str, command_id: str) -> Command: if command is None: raise CommandNotFoundError(command_id=command_id) - return parse_obj_as(Command, command) # type: ignore[arg-type] + return json_to_pydantic(Command, command) # type: ignore[arg-type] def remove(self, run_id: str) -> None: """Remove a run by its unique identifier. @@ -473,7 +474,7 @@ def _convert_state_to_sql_values( engine_status: str, ) -> Dict[str, object]: return { - "state_summary": state_summary.dict(), + "state_summary": pydantic_to_json(state_summary), "engine_status": engine_status, "_updated_at": utc_now(), } diff --git a/robot-server/tests/persistence/test_tables.py b/robot-server/tests/persistence/test_tables.py index c957da0e62b..bb860b29a6d 100644 --- a/robot-server/tests/persistence/test_tables.py +++ b/robot-server/tests/persistence/test_tables.py @@ -38,8 +38,7 @@ id VARCHAR NOT NULL, protocol_id VARCHAR NOT NULL, analyzer_version VARCHAR NOT NULL, - completed_analysis BLOB NOT NULL, - completed_analysis_as_document VARCHAR, + completed_analysis VARCHAR NOT NULL, PRIMARY KEY (id), FOREIGN KEY(protocol_id) REFERENCES protocol (id) ) @@ -52,7 +51,7 @@ id VARCHAR NOT NULL, created_at DATETIME NOT NULL, protocol_id VARCHAR, - state_summary BLOB, + state_summary VARCHAR, engine_status VARCHAR, _updated_at DATETIME, PRIMARY KEY (id), @@ -75,7 +74,7 @@ run_id VARCHAR NOT NULL, index_in_run INTEGER NOT NULL, command_id VARCHAR NOT NULL, - command BLOB NOT NULL, + command VARCHAR NOT NULL, PRIMARY KEY (row_id), FOREIGN KEY(run_id) REFERENCES run (id) )