Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(api): improve analysis cli #15027

Merged
merged 5 commits into from
Apr 26, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
243 changes: 194 additions & 49 deletions api/src/opentrons/cli/analyze.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,27 @@
"""Opentrons analyze CLI."""
import click

from anyio import run, Path as AsyncPath
from anyio import run
from contextlib import contextmanager
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from pydantic import BaseModel
from typing import Any, Dict, List, Optional, Sequence, Union
from typing_extensions import Literal
from typing import (
Any,
Dict,
List,
Optional,
Sequence,
Union,
Literal,
Callable,
IO,
TypeVar,
Iterator,
)
import logging
import sys

from opentrons.protocol_engine.types import RunTimeParameter
from opentrons.protocols.api_support.types import APIVersion
Expand All @@ -16,8 +31,9 @@
ProtocolType,
JsonProtocolConfig,
ProtocolFilesInvalidError,
ProtocolSource,
)
from opentrons.protocol_runner import create_simulating_runner
from opentrons.protocol_runner import create_simulating_runner, RunResult
from opentrons.protocol_engine import (
Command,
ErrorOccurrence,
Expand All @@ -30,6 +46,14 @@
from opentrons_shared_data.robot.dev_types import RobotType
from opentrons.util.performance_helpers import track_analysis

OutputKind = Literal["json", "human-json"]


@dataclass(frozen=True)
class _Output:
to_file: IO[bytes]
kind: OutputKind


@click.command()
@click.argument(
Expand All @@ -41,15 +65,107 @@
@click.option(
"--json-output",
help="Return analysis results as machine-readable JSON.",
type=click.Path(path_type=AsyncPath),
type=click.File(mode="wb"),
Copy link
Contributor

@SyntaxColoring SyntaxColoring Apr 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI, using stdout for machine-readable output is unreliable because if we're analyzing a Python protocol, it can also print to stdout, and that will corrupt the JSON stream.

Should we print() a warning if - is specified, so it will fail early and obviously if somebody tries to rely on this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO no, but we should put a warning in the help text. Somebody will only know to use that flag by actually reading the help text, so that should be good enough.

That's a good secondary improvement too - there's no reason we can't capture stdout and save it somewhere separate.

)
@click.option(
"--human-json-output",
help="Return analysis results as JSON, formatted for human eyes.",
type=click.File(mode="wb"),
)
@click.option(
"--check",
help="Fail (via exit code) if the protocol had an error. If not specified, always succeed.",
is_flag=True,
default=False,
)
@click.option(
"--log-output",
help="Where to send logs. Can be a path, - for stdout, or stderr for stderr.",
default="stderr",
type=str,
)
@click.option(
"--log-level",
help="Level of logs to capture.",
type=click.Choice(["DEBUG", "INFO", "WARNING", "ERROR"], case_sensitive=False),
default="WARNING",
)
def analyze(files: Sequence[Path], json_output: Optional[Path]) -> None:
def analyze(
files: Sequence[Path],
json_output: Optional[IO[bytes]],
human_json_output: Optional[IO[bytes]],
log_output: str,
log_level: str,
check: bool,
) -> int:
"""Analyze a protocol.

You can use `opentrons analyze` to get a protocol's expected
equipment and commands.
"""
run(_analyze, files, json_output)
outputs = _get_outputs(json=json_output, human_json=human_json_output)
if not outputs and not check:
raise click.UsageError(
message="Please specify at least --check or one of the output options."
)

try:
with _capture_logs(log_output, log_level):
sys.exit(run(_analyze, files, outputs, check))
except click.ClickException:
raise
except Exception as e:
raise click.ClickException(str(e))


@contextmanager
def _capture_logs_to_stream(stream: IO[str]) -> Iterator[None]:
handler = logging.StreamHandler(stream)
logging.getLogger().addHandler(handler)
try:
yield
finally:
logging.getLogger().removeHandler(handler)


@contextmanager
def _capture_logs_to_file(filepath: Path) -> Iterator[None]:
handler = logging.FileHandler(filepath, mode="w")
logging.getLogger().addHandler(handler)
try:
yield
finally:
logging.getLogger().removeHandler(handler)


@contextmanager
def _capture_logs(write_to: str, log_level: str) -> Iterator[None]:
try:
level = getattr(logging, log_level)
except AttributeError:
raise click.ClickException(f"No such log level {log_level}")
logging.getLogger().setLevel(level)
if write_to in ("-", "stdout"):
with _capture_logs_to_stream(sys.stdout):
yield
elif write_to == "stderr":
with _capture_logs_to_stream(sys.stderr):
yield
else:
with _capture_logs_to_file(Path(write_to)):
yield


def _get_outputs(
json: Optional[IO[bytes]],
human_json: Optional[IO[bytes]],
) -> List[_Output]:
outputs: List[_Output] = []
if json:
outputs.append(_Output(to_file=json, kind="json"))
if human_json:
outputs.append(_Output(to_file=human_json, kind="human-json"))
return outputs


def _get_input_files(files_and_dirs: Sequence[Path]) -> List[Path]:
Expand All @@ -64,13 +180,37 @@ def _get_input_files(files_and_dirs: Sequence[Path]) -> List[Path]:
return results


R = TypeVar("R")


def _call_for_output_of_kind(
kind: OutputKind, outputs: Sequence[_Output], fn: Callable[[IO[bytes]], R]
) -> Optional[R]:
for output in outputs:
if output.kind == kind:
return fn(output.to_file)
return None


def _get_return_code(analysis: RunResult) -> int:
if analysis.state_summary.errors:
return -1
return 0


@track_analysis
async def _do_analyze(protocol_source: ProtocolSource) -> RunResult:

runner = await create_simulating_runner(
robot_type=protocol_source.robot_type, protocol_config=protocol_source.config
)
return await runner.run(deck_configuration=[], protocol_source=protocol_source)


async def _analyze(
files_and_dirs: Sequence[Path],
json_output: Optional[AsyncPath],
) -> None:
files_and_dirs: Sequence[Path], outputs: Sequence[_Output], check: bool
) -> int:
input_files = _get_input_files(files_and_dirs)

try:
protocol_source = await ProtocolReader().read_saved(
files=input_files,
Expand All @@ -79,47 +219,52 @@ async def _analyze(
except ProtocolFilesInvalidError as error:
raise click.ClickException(str(error))

runner = await create_simulating_runner(
robot_type=protocol_source.robot_type, protocol_config=protocol_source.config
analysis = await _do_analyze(protocol_source)
return_code = _get_return_code(analysis)

if not outputs:
return return_code

results = AnalyzeResults.construct(
createdAt=datetime.now(tz=timezone.utc),
files=[
ProtocolFile.construct(name=f.path.name, role=f.role)
for f in protocol_source.files
],
config=(
JsonConfig.construct(schemaVersion=protocol_source.config.schema_version)
if isinstance(protocol_source.config, JsonProtocolConfig)
else PythonConfig.construct(apiVersion=protocol_source.config.api_version)
),
metadata=protocol_source.metadata,
robotType=protocol_source.robot_type,
runTimeParameters=analysis.parameters,
commands=analysis.commands,
errors=analysis.state_summary.errors,
labware=analysis.state_summary.labware,
pipettes=analysis.state_summary.pipettes,
modules=analysis.state_summary.modules,
liquids=analysis.state_summary.liquids,
)
analysis = await runner.run(deck_configuration=[], protocol_source=protocol_source)

if json_output:
results = AnalyzeResults.construct(
createdAt=datetime.now(tz=timezone.utc),
files=[
ProtocolFile.construct(name=f.path.name, role=f.role)
for f in protocol_source.files
],
config=(
JsonConfig.construct(
schemaVersion=protocol_source.config.schema_version
)
if isinstance(protocol_source.config, JsonProtocolConfig)
else PythonConfig.construct(
apiVersion=protocol_source.config.api_version
)
),
metadata=protocol_source.metadata,
robotType=protocol_source.robot_type,
runTimeParameters=analysis.parameters,
commands=analysis.commands,
errors=analysis.state_summary.errors,
labware=analysis.state_summary.labware,
pipettes=analysis.state_summary.pipettes,
modules=analysis.state_summary.modules,
liquids=analysis.state_summary.liquids,
)

await json_output.write_text(
results.json(exclude_none=True),
encoding="utf-8",
)

_call_for_output_of_kind(
"json",
outputs,
lambda to_file: to_file.write(
results.json(exclude_none=True).encode("utf-8"),
),
)
_call_for_output_of_kind(
"human-json",
outputs,
lambda to_file: to_file.write(
results.json(exclude_none=True, indent=2).encode("utf-8")
),
)
if check:
return return_code
else:
raise click.UsageError(
"Currently, this tool only supports JSON mode. Use `--json-output`."
)
return 0


class ProtocolFile(BaseModel):
Expand Down
Loading
Loading