Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(robot-server): maintain correct order of protocol analyses #14762

Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions api/src/opentrons/protocol_engine/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -847,6 +847,7 @@ def from_hw_state(cls, state: HwTipStateType) -> "TipPresenceStatus":
}[state]


# TODO (spp, 2024-04-02): move all RTP types to runner
class RTPBase(BaseModel):
"""Parameters defined in a protocol."""

Expand Down
52 changes: 52 additions & 0 deletions robot-server/robot_server/persistence/_migrations/v3_to_v4.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
"""Migrate the persistence directory from schema 3 to 4.

Summary of changes from schema 3:

- Adds a new "run_time_parameter_values_and_defaults" column to analysis table
"""

from pathlib import Path
from contextlib import ExitStack
import shutil
from typing import Any

import sqlalchemy

from ..database import sql_engine_ctx
from ..tables import schema_4
from .._folder_migrator import Migration

_DB_FILE = "robot_server.db"


class Migration3to4(Migration): # noqa: D101
def migrate(self, source_dir: Path, dest_dir: Path) -> None:
"""Migrate the persistence directory from schema 3 to 4."""
# Copy over all existing directories and files to new version
for item in source_dir.iterdir():
if item.is_dir():
shutil.copytree(src=item, dst=dest_dir / item.name)
else:
shutil.copy(src=item, dst=dest_dir / item.name)
dest_db_file = dest_dir / _DB_FILE

# Append the new column to existing analyses in v4 database
with ExitStack() as exit_stack:
dest_engine = exit_stack.enter_context(sql_engine_ctx(dest_db_file))
schema_4.metadata.create_all(dest_engine)

def add_column(
engine: sqlalchemy.engine.Engine,
table_name: str,
column: Any,
) -> None:
column_type = column.type.compile(engine.dialect)
engine.execute(
f"ALTER TABLE {table_name} ADD COLUMN {column.key} {column_type}"
)

add_column(
dest_engine,
schema_4.analysis_table.name,
schema_4.analysis_table.c.run_time_parameter_values_and_defaults,
)
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from anyio import Path as AsyncPath, to_thread

from ._folder_migrator import MigrationOrchestrator
from ._migrations import up_to_3
from ._migrations import up_to_3, v3_to_v4


_TEMP_PERSISTENCE_DIR_PREFIX: Final = "opentrons-robot-server-"
Expand Down Expand Up @@ -48,7 +48,10 @@ async def prepare_active_subdirectory(prepared_root: Path) -> Path:
"""Return the active persistence subdirectory after preparing it, if necessary."""
migration_orchestrator = MigrationOrchestrator(
root=prepared_root,
migrations=[up_to_3.MigrationUpTo3(subdirectory="3")],
migrations=[
up_to_3.MigrationUpTo3(subdirectory="3"),
v3_to_v4.Migration3to4(subdirectory="4"),
],
temp_file_prefix="temp-",
)

Expand Down
2 changes: 1 addition & 1 deletion robot-server/robot_server/persistence/tables/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""SQL database schemas."""

# Re-export the latest schema.
from .schema_3 import (
from .schema_4 import (
metadata,
protocol_table,
analysis_table,
Expand Down
130 changes: 130 additions & 0 deletions robot-server/robot_server/persistence/tables/schema_4.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
"""v4 of our SQLite schema."""

import sqlalchemy

from robot_server.persistence._utc_datetime import UTCDateTime

metadata = sqlalchemy.MetaData()

protocol_table = sqlalchemy.Table(
"protocol",
metadata,
sqlalchemy.Column(
"id",
sqlalchemy.String,
primary_key=True,
),
sqlalchemy.Column(
"created_at",
UTCDateTime,
nullable=False,
),
sqlalchemy.Column("protocol_key", sqlalchemy.String, nullable=True),
)

analysis_table = sqlalchemy.Table(
"analysis",
metadata,
sqlalchemy.Column(
"id",
sqlalchemy.String,
primary_key=True,
),
sqlalchemy.Column(
"protocol_id",
sqlalchemy.String,
sqlalchemy.ForeignKey("protocol.id"),
index=True,
nullable=False,
),
sqlalchemy.Column(
"analyzer_version",
sqlalchemy.String,
nullable=False,
),
sqlalchemy.Column(
"completed_analysis",
# Stores a JSON string. See CompletedAnalysisStore.
sqlalchemy.String,
nullable=False,
),
# column added in schema v4
sqlalchemy.Column(
"run_time_parameter_values_and_defaults",
sqlalchemy.String,
nullable=True,
),
)

run_table = sqlalchemy.Table(
"run",
metadata,
sqlalchemy.Column(
"id",
sqlalchemy.String,
primary_key=True,
),
sqlalchemy.Column(
"created_at",
UTCDateTime,
nullable=False,
),
sqlalchemy.Column(
"protocol_id",
sqlalchemy.String,
sqlalchemy.ForeignKey("protocol.id"),
nullable=True,
),
# column added in schema v1
sqlalchemy.Column(
"state_summary",
sqlalchemy.String,
nullable=True,
),
# column added in schema v1
sqlalchemy.Column("engine_status", sqlalchemy.String, nullable=True),
# column added in schema v1
sqlalchemy.Column("_updated_at", UTCDateTime, nullable=True),
)

action_table = sqlalchemy.Table(
"action",
metadata,
sqlalchemy.Column(
"id",
sqlalchemy.String,
primary_key=True,
),
sqlalchemy.Column("created_at", UTCDateTime, nullable=False),
sqlalchemy.Column("action_type", sqlalchemy.String, nullable=False),
sqlalchemy.Column(
"run_id",
sqlalchemy.String,
sqlalchemy.ForeignKey("run.id"),
nullable=False,
),
)

run_command_table = sqlalchemy.Table(
"run_command",
metadata,
sqlalchemy.Column("row_id", sqlalchemy.Integer, primary_key=True),
sqlalchemy.Column(
"run_id", sqlalchemy.String, sqlalchemy.ForeignKey("run.id"), nullable=False
),
sqlalchemy.Column("index_in_run", sqlalchemy.Integer, nullable=False),
sqlalchemy.Column("command_id", sqlalchemy.String, nullable=False),
sqlalchemy.Column("command", sqlalchemy.String, nullable=False),
sqlalchemy.Index(
"ix_run_run_id_command_id", # An arbitrary name for the index.
"run_id",
"command_id",
unique=True,
),
sqlalchemy.Index(
"ix_run_run_id_index_in_run", # An arbitrary name for the index.
"run_id",
"index_in_run",
unique=True,
),
)
9 changes: 8 additions & 1 deletion robot-server/robot_server/protocols/analysis_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from opentrons.protocol_engine.types import RunTimeParameter
from opentrons_shared_data.robot.dev_types import RobotType
from pydantic import BaseModel, Field
from typing import List, Optional, Union
from typing import List, Optional, Union, NamedTuple
from typing_extensions import Literal

from opentrons.protocol_engine import (
Expand Down Expand Up @@ -150,4 +150,11 @@ class CompletedAnalysis(BaseModel):
)


class RunTimeParameterAnalysisData(NamedTuple):
"""Data from analysis of a run-time parameter."""

value: Union[float, bool, str]
default: Union[float, bool, str]


ProtocolAnalysis = Union[PendingAnalysis, CompletedAnalysis]
100 changes: 98 additions & 2 deletions robot-server/robot_server/protocols/analysis_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
LoadedModule,
Liquid,
)
from opentrons.protocol_engine.types import RunTimeParamValuesType

from .analysis_models import (
AnalysisSummary,
Expand All @@ -27,6 +28,7 @@
CompletedAnalysis,
AnalysisResult,
AnalysisStatus,
RunTimeParameterAnalysisData,
)

from .completed_analysis_store import CompletedAnalysisStore, CompletedAnalysisResource
Expand Down Expand Up @@ -71,6 +73,14 @@ def __init__(self, analysis_id: str) -> None:
super().__init__(f'Analysis "{analysis_id}" not found.')


class AnalysisIsPendingError(RuntimeError):
"""Exception raised if a given analysis is still pending."""

def __init__(self, analysis_id: str) -> None:
"""Initialize the error's message."""
super().__init__(f'Analysis "{analysis_id}" is still pending.')


# TODO(sf, 2023-05-05): Like for protocols and runs, there's an in-memory cache for
# elements of this store. Unlike for protocols and runs, it isn't just an lru_cache
# on the top-level store's access methods, because those access methods have to be
Expand All @@ -93,10 +103,14 @@ class AnalysisStore:
so they're only kept in-memory, and lost when the store instance is destroyed.
"""

def __init__(self, sql_engine: sqlalchemy.engine.Engine) -> None:
def __init__(
self,
sql_engine: sqlalchemy.engine.Engine,
completed_store: Optional[CompletedAnalysisStore] = None,
) -> None:
"""Initialize the `AnalysisStore`."""
self._pending_store = _PendingAnalysisStore()
self._completed_store = CompletedAnalysisStore(
self._completed_store = completed_store or CompletedAnalysisStore(
sql_engine=sql_engine,
memory_cache=MemoryCache(_CACHE_MAX_SIZE, str, CompletedAnalysisResource),
current_analyzer_version=_CURRENT_ANALYZER_VERSION,
Expand Down Expand Up @@ -180,6 +194,9 @@ async def update(
protocol_id=protocol_id,
analyzer_version=_CURRENT_ANALYZER_VERSION,
completed_analysis=completed_analysis,
run_time_parameter_values_and_defaults=self._extract_run_time_param_values_and_defaults(
completed_analysis
),
)
await self._completed_store.add(
completed_analysis_resource=completed_analysis_resource
Expand Down Expand Up @@ -258,6 +275,85 @@ async def get_by_protocol(self, protocol_id: str) -> List[ProtocolAnalysis]:
else:
return completed_analyses + [pending_analysis]

@staticmethod
def _extract_run_time_param_values_and_defaults(
completed_analysis: CompletedAnalysis,
) -> Dict[str, RunTimeParameterAnalysisData]:
"""Extract the Run Time Parameters with current value and default value of each.

We do this in order to save the RTP data separately, outside the analysis
in the database. This saves us from having to de-serialize the entire analysis
to read just the RTP values.
"""
rtp_list = completed_analysis.runTimeParameters

rtp_values_and_defaults = {}
for param_spec in rtp_list:
rtp_values_and_defaults.update(
{
param_spec.variableName: RunTimeParameterAnalysisData(
value=param_spec.value, default=param_spec.default
)
}
)
return rtp_values_and_defaults

async def matching_rtp_values_in_analysis(
self, analysis_summary: AnalysisSummary, new_rtp_values: RunTimeParamValuesType
) -> bool:
"""Return whether the last analysis of the given protocol used the mentioned RTP values.

It is not sufficient to just check the values of provided parameters against the
corresponding parameter values in analysis because a previous request could have
composed of some extra parameters that are not in the current list.

Similarly, it is not enough to only compare the current parameter values from
the client with the previous values from the client because a previous param
might have been assigned a default value by the client while the current request
doesn't include that param because it can rely on the API to assign the default
value to that param.

So, we check that the Run Time Parameters in the previous analysis has params
with the values provided in the current request, and also verify that rest of the
parameters in the analysis use default values.
"""
if analysis_summary.status == AnalysisStatus.PENDING:
raise AnalysisIsPendingError(analysis_summary.id)

rtp_values_and_defaults_in_last_analysis = (
await self._completed_store.get_rtp_values_and_defaults_by_analysis_id(
analysis_summary.id
)
)
assert (
SyntaxColoring marked this conversation as resolved.
Show resolved Hide resolved
rtp_values_and_defaults_in_last_analysis is not None
), "This protocol has no analysis associated with it."

if not set(new_rtp_values.keys()).issubset(
set(rtp_values_and_defaults_in_last_analysis.keys())
):
# Since the RTP keys in analysis represent all params defined in the protocol,
# if the client passes a parameter that's not present in the analysis,
# it means that the client is sending incorrect parameters.
# We will let this request trigger an analysis using the incorrect params
# and have the analysis raise an appropriate error instead of giving an
# error response to the protocols request.
# This makes the behavior of robot server consistent regardless of whether
# the client is sending a protocol for the first time or for the nth time.
return False
for (
parameter,
prev_value_and_default,
) in rtp_values_and_defaults_in_last_analysis.items():
if (
new_rtp_values.get(parameter, prev_value_and_default.default)
== prev_value_and_default.value
):
continue
else:
return False
return True


class _PendingAnalysisStore:
"""An in-memory store of protocol analyses that are pending.
Expand Down
Loading
Loading