diff --git a/abr-testing/protocol_simulation/__init__.py b/abr-testing/protocol_simulation/__init__.py new file mode 100644 index 00000000000..157c21fd93e --- /dev/null +++ b/abr-testing/protocol_simulation/__init__.py @@ -0,0 +1 @@ +"""The package holding code for simulating protocols.""" \ No newline at end of file diff --git a/abr-testing/protocol_simulation/simulation_metrics.py b/abr-testing/protocol_simulation/simulation_metrics.py new file mode 100644 index 00000000000..544bc3fb4bc --- /dev/null +++ b/abr-testing/protocol_simulation/simulation_metrics.py @@ -0,0 +1,353 @@ +import re +import sys +import os +from pathlib import Path +from click import Context +from opentrons.cli import analyze +import json +import argparse +from datetime import datetime +from abr_testing.automation import google_sheets_tool +from abr_testing.data_collection import read_robot_logs +from typing import Set, Dict, Any, Tuple, List, Union +from abr_testing.tools import plate_reader + +def look_for_air_gaps(protocol_file_path: str) -> int: + instances = 0 + try: + with open(protocol_file_path, "r") as open_file: + protocol_lines = open_file.readlines() + for line in protocol_lines: + if "air_gap" in line: + print(line) + instances += 1 + print(f'Found {instances} instance(s) of the air gap function') + open_file.close() + except Exception as error: + print("Error reading protocol:", error.with_traceback()) + return instances + +def set_api_level(protocol_file_path) -> None: + with open(protocol_file_path, "r") as file: + file_contents = file.readlines() + # Look for current'apiLevel:' + for i, line in enumerate(file_contents): + print(line) + if 'apiLevel' in line: + print(f"The current API level of this protocol is: {line}") + change = input("Would you like to simulate with a different API level? (Y/N) ").strip().upper() + + if change == "Y": + api_level = input("Protocol API Level to Simulate with: ") + # Update new API level + file_contents[i] = f'apiLevel: {api_level}\n' + print(f"Updated line: {file_contents[i]}") + break + with open(protocol_file_path, "w") as file: + file.writelines(file_contents) + print("File updated successfully.") + +original_exit = sys.exit + +def mock_exit(code=None) -> None: + print(f"sys.exit() called with code: {code}") + raise SystemExit(code) + +def get_labware_name(id: str, object_dict: dict, json_data: dict) -> str: + slot = "" + for obj in object_dict: + if obj['id'] == id: + try: + # Try to get the slotName from the location + slot = obj['location']['slotName'] + return " SLOT: " + slot + except KeyError: + location = obj.get('location', {}) + + # Check if location contains 'moduleId' + if 'moduleId' in location: + return get_labware_name(location['moduleId'], json_data['modules'], json_data) + + # Check if location contains 'labwareId' + elif 'labwareId' in location: + return get_labware_name(location['labwareId'], json_data['labware'], json_data) + + return " Labware not found" + +def parse_results_volume(json_data_file: str) -> Tuple[ + List[str], List[str], List[str], List[str], + List[str], List[str], List[str], List[str], + List[str], List[str], List[str] + ]: + json_data = [] + with open(json_data_file, "r") as json_file: + json_data = json.load(json_file) + commands = json_data.get("commands", []) + start_time = datetime.fromisoformat(commands[0]["createdAt"]) + end_time = datetime.fromisoformat(commands[len(commands)-1]["completedAt"]) + header = ["", "Protocol Name", "Date", "Time"] + header_fill_row = ["", protocol_name, str(file_date.date()), str(file_date.time())] + labware_names_row =["Labware Name"] + volume_dispensed_row =["Total Volume Dispensed uL"] + volume_aspirated_row =["Total Volume Aspirated uL"] + change_in_volume_row = ["Total Change in Volume uL"] + start_time_row = ["Start Time"] + end_time_row = ["End Time"] + total_time_row = ["Total Time of Execution"] + metrics_row = [ + "Metric", + "Heatershaker # of Latch Open/Close", + "Heatershaker # of Homes", + "Heatershaker # of Rotations", + "Heatershaker Temp On Time (sec)", + "Temp Module # of Temp Changes", + "Temp Module Temp On Time (sec)", + "Temp Mod Time to 4C (sec)", + "Thermocycler # of Lid Open/Close", + "Thermocycler Block # of Temp Changes", + "Thermocycler Block Temp On Time (sec)", + "Thermocycler Block Time to 4C (sec)", + "Thermocycler Lid # of Temp Changes", + "Thermocycler Lid Temp On Time (sec)", + "Thermocycler Lid Time to 105C (sec)", + "Plate Reader # of Reads", + "Plate Reader Avg Read Time (sec)", + "Plate Reader # of Initializations", + "Plate Reader Avg Initialize Time (sec)", + "Plate Reader # of Lid Movements", + "Plate Reader Result", + "Left Pipette Total Tip Pick Up(s)", + "Left Pipette Total Aspirates", + "Left Pipette Total Dispenses", + "Right Pipette Total Tip Pick Up(s)", + "Right Pipette Total Aspirates", + "Right Pipette Total Dispenses", + "Gripper Pick Ups", + "Total Liquid Probes", + "Average Liquid Probe Time (sec)", + ] + values_row = ["Value"] + labware_well_dict = {} + hs_dict, temp_module_dict, thermo_cycler_dict, plate_reader_dict, instrument_dict = {}, {}, {}, {}, {} + try: + hs_dict = read_robot_logs.hs_commands(json_data) + temp_module_dict = read_robot_logs.temperature_module_commands(json_data) + thermo_cycler_dict = read_robot_logs.thermocycler_commands(json_data) + plate_reader_dict = read_robot_logs.plate_reader_commands(json_data, hellma_plate_standards) + instrument_dict = read_robot_logs.instrument_commands(json_data) + except: + pass + + metrics = [hs_dict, temp_module_dict, thermo_cycler_dict, plate_reader_dict, instrument_dict] + + # Iterate through all the commands executed in the protocol run log + for x, command in enumerate(commands): + if x != 0: + prev_command = commands[x-1] + if command["commandType"] == "aspirate": + if not (prev_command["commandType"] == "comment" and (prev_command['params']['message'] == "AIR GAP" or prev_command['params']['message'] == "MIXING")): + labware_id = command["params"]["labwareId"] + labware_name = "" + for labware in json_data.get("labware"): + if labware["id"] == labware_id: + labware_name = (labware["loadName"]) + get_labware_name(labware["id"], json_data["labware"], json_data) + well_name = command["params"]["wellName"] + + if labware_id not in labware_well_dict: + labware_well_dict[labware_id] = {} + + if well_name not in labware_well_dict[labware_id]: + labware_well_dict[labware_id][well_name] = (labware_name, 0, 0, "") + + vol = int(command["params"]["volume"]) + + labware_name, added_volumes, subtracted_volumes, log = labware_well_dict[labware_id][well_name] + + subtracted_volumes += vol + log+=(f"aspirated {vol} ") + labware_well_dict[labware_id][well_name] = (labware_name, added_volumes, subtracted_volumes, log) + elif command["commandType"] == "dispense": + if not (prev_command["commandType"] == "comment" and (prev_command['params']['message'] == "MIXING")): + labware_id = command["params"]["labwareId"] + labware_name = "" + for labware in json_data.get("labware"): + if labware["id"] == labware_id: + labware_name = (labware["loadName"]) + get_labware_name(labware["id"], json_data["labware"], json_data) + well_name = command["params"]["wellName"] + + if labware_id not in labware_well_dict: + labware_well_dict[labware_id] = {} + + if well_name not in labware_well_dict[labware_id]: + labware_well_dict[labware_id][well_name] = (labware_name, 0, 0, "") + + vol = int(command["params"]["volume"]) + labware_name, added_volumes, subtracted_volumes, log = labware_well_dict[labware_id][well_name] + added_volumes += vol + log+=(f"dispensed {vol} ") + labware_well_dict[labware_id][well_name] = (labware_name, added_volumes, subtracted_volumes, log) + # file_date_formatted = file_date.strftime("%Y-%m-%d_%H-%M-%S") + with open(f"{os.path.dirname(json_data_file)}\\{protocol_name}_well_volumes_{file_date_formatted}.json", "w") as output_file: + json.dump(labware_well_dict, output_file) + output_file.close() + + # populate row lists + for labware_id in labware_well_dict.keys(): + volume_added = 0 + volume_subtracted = 0 + labware_name ="" + for well in labware_well_dict[labware_id].keys(): + labware_name, added_volumes, subtracted_volumes, log = labware_well_dict[labware_id][well] + volume_added += added_volumes + volume_subtracted += subtracted_volumes + labware_names_row.append(labware_name) + volume_dispensed_row.append(str(volume_added)) + volume_aspirated_row.append(str(volume_subtracted)) + change_in_volume_row.append(str(volume_added - volume_subtracted)) + start_time_row.append(str(start_time.time())) + end_time_row.append(str(end_time.time())) + total_time_row.append(str(end_time - start_time)) + + for metric in metrics: + for cmd in metric.keys(): + values_row.append(str(metric[cmd])) + return( + header, + header_fill_row, + labware_names_row, + volume_dispensed_row, + volume_aspirated_row, + change_in_volume_row, + start_time_row, + end_time_row, + total_time_row, + metrics_row, + values_row) + +def main(storage_directory, google_sheet_name, protocol_file_path): + sys.exit = mock_exit + + # Read file path from arguments + protocol_file_path = Path(protocol_file_path) + global protocol_name + protocol_name = protocol_file_path.stem + print("Simulating", protocol_name) + global file_date + file_date = datetime.now() + global file_date_formatted + file_date_formatted = file_date.strftime("%Y-%m-%d_%H-%M-%S") + # Prepare output file + json_file_path = f"{storage_directory}\\{protocol_name}_{file_date_formatted}.json" + json_file_output = open(json_file_path, "wb+") + error_output = f"{storage_directory}\\error_log" + # Run protocol simulation + try: + with Context(analyze) as ctx: + ctx.invoke( + analyze, + files=[protocol_file_path], + json_output=json_file_output, + human_json_output=None, + log_output=error_output, + log_level="ERROR", + check=False + ) + except SystemExit as e: + print(f"SystemExit caught with code: {e}") + finally: + sys.exit = original_exit + json_file_output.close() + with open(error_output, "r") as open_file: + try: + errors = open_file.readlines() + if not errors: pass + else: + print(errors) + sys.exit(1) + except: + print("error simulating ...") + sys.exit() + + try: + credentials_path = os.path.join(storage_directory, "credentials.json") + print(credentials_path) + except FileNotFoundError: + print(f"Add credentials.json file to: {storage_directory}.") + sys.exit() + + global hellma_plate_standards + + try: + hellma_plate_standards = plate_reader.read_hellma_plate_files(storage_directory, 101934) + except: + print(f"Add helma plate standard files to {storage_directory}.") + sys.exit() + + google_sheet = google_sheets_tool.google_sheet( + credentials_path, google_sheet_name, 0 + ) + + google_sheet.write_to_row([]) + + for row in parse_results_volume(json_file_path): + print("Writing results to", google_sheet_name) + print(str(row)) + google_sheet.write_to_row(row) + +if __name__ == "__main__": + CLEAN_PROTOCOL = True + parser = argparse.ArgumentParser(description="Read run logs on google drive.") + parser.add_argument( + "storage_directory", + metavar="STORAGE_DIRECTORY", + type=str, + nargs=1, + help="Path to long term storage directory for run logs.", + ) + parser.add_argument( + "sheet_name", + metavar="SHEETNAME", + type=str, + nargs=1, + help="Name of sheet to upload results to", + ) + parser.add_argument( + "protocol_file_path", + metavar="PROTOCOL_FILE_PATH", + type=str, + nargs=1, + help="Path to protocol file" + + ) + args = parser.parse_args() + storage_directory = args.storage_directory[0] + sheet_name = args.sheet_name[0] + protocol_file_path = args.protocol_file_path[0] + + SETUP = True + while(SETUP): + print("This current version cannot properly handle air gap calls.\nThese may cause simulation results to be inaccurate") + air_gaps = look_for_air_gaps(protocol_file_path) + if air_gaps > 0: + choice = "" + while not choice: + choice = input("This protocol contains air gaps, results may be innacurate, would you like to continue? (Y/N): ") + if choice.upper() == "Y": + SETUP = False + CLEAN_PROTOCOL = True + elif choice.upper() == "N": + CLEAN_PROTOCOL = False + SETUP = False + print("Please remove air gaps then re-run") + else: + choice = "" + print("Please enter a valid response.") + SETUP = False + + if CLEAN_PROTOCOL: + main( + storage_directory, + sheet_name, + protocol_file_path, + ) + else: sys.exit(0) \ No newline at end of file diff --git a/hardware-testing/hardware_testing/drivers/__init__.py b/hardware-testing/hardware_testing/drivers/__init__.py index fde7e228d9b..f1b4c991e2c 100644 --- a/hardware-testing/hardware_testing/drivers/__init__.py +++ b/hardware-testing/hardware_testing/drivers/__init__.py @@ -15,28 +15,23 @@ def list_ports_and_select(device_name: str = "", port_substr: str = None) -> str idx_str = "" for i, p in enumerate(ports): print(f"\t{i + 1}) {p.device}") - if port_substr: - for i, p in enumerate(ports): - if port_substr in p.device: - idx = i + 1 - break - else: - idx_str = input( - f"\nenter number next to {device_name} port (or ENTER to re-scan): " - ) - if not idx_str: - return list_ports_and_select(device_name) - if not device_name: - device_name = "desired" - - try: + if port_substr: + for i, p in enumerate(ports): + if port_substr in p.device: + return p.device + + while True: + idx_str = input( + f"\nEnter number next to {device_name} port (or ENTER to re-scan): " + ) + if not idx_str: + return list_ports_and_select(device_name, port_substr) + try: idx = int(idx_str.strip()) - except TypeError: - pass - return ports[idx - 1].device - except (ValueError, IndexError): - return list_ports_and_select() + return ports[idx - 1].device + except (ValueError, IndexError): + print("Invalid selection. Please try again.") def find_port(vid: int, pid: int) -> str: diff --git a/hardware-testing/hardware_testing/scripts/abr_asair_sensor.py b/hardware-testing/hardware_testing/scripts/abr_asair_sensor.py index f2c00e015d3..1e8fca0358c 100644 --- a/hardware-testing/hardware_testing/scripts/abr_asair_sensor.py +++ b/hardware-testing/hardware_testing/scripts/abr_asair_sensor.py @@ -26,7 +26,7 @@ def __init__(self, robot: str, duration: int, frequency: int) -> None: test_name = "ABR-Environment-Monitoring" run_id = data.create_run_id() file_name = data.create_file_name(test_name, run_id, robot) - sensor = asair_sensor.BuildAsairSensor(False, False, "USB") + sensor = asair_sensor.BuildAsairSensor(False, False, "USB0") print(sensor) env_data = sensor.get_reading() header = [