diff --git a/abr-testing/Makefile b/abr-testing/Makefile index f711579ff57..b9f92229177 100644 --- a/abr-testing/Makefile +++ b/abr-testing/Makefile @@ -88,3 +88,14 @@ push-no-restart-ot3: sdist Pipfile.lock .PHONY: push-ot3 push-ot3: push-no-restart-ot3 + +.PHONY: abr-setup +abr-setup: + $(python) abr_testing/tools/abr_setup.py + +.PHONY: simulate +PROTOCOL_DIR := abr_testing/protocols +SIMULATION_TOOL := protocol_simulation/abr_sim_check.py +EXTENSION := .py +simulate: + $(python) $(SIMULATION_TOOL) \ No newline at end of file diff --git a/abr-testing/abr_testing/automation/google_sheets_tool.py b/abr-testing/abr_testing/automation/google_sheets_tool.py index 3ca3bd38f9b..c33d3198e6f 100644 --- a/abr-testing/abr_testing/automation/google_sheets_tool.py +++ b/abr-testing/abr_testing/automation/google_sheets_tool.py @@ -167,20 +167,13 @@ def column_letter_to_index(column_letter: str) -> int: self.spread_sheet.batch_update(body=body) except gspread.exceptions.APIError as e: print(f"ERROR MESSAGE: {e}") + raise def update_cell( self, sheet_title: str, row: int, column: int, single_data: Any ) -> Tuple[int, int, Any]: """Update ONE individual cell according to a row and column.""" - try: - self.spread_sheet.worksheet(sheet_title).update_cell( - row, column, single_data - ) - except gspread.exceptions.APIError: - t.sleep(30) - self.spread_sheet.worksheet(sheet_title).update_cell( - row, column, single_data - ) + self.spread_sheet.worksheet(sheet_title).update_cell(row, column, single_data) return row, column, single_data def get_all_data( diff --git a/abr-testing/abr_testing/data_collection/abr_calibration_logs.py b/abr-testing/abr_testing/data_collection/abr_calibration_logs.py index 82d9d9c45bc..f25c89d8435 100644 --- a/abr-testing/abr_testing/data_collection/abr_calibration_logs.py +++ b/abr-testing/abr_testing/data_collection/abr_calibration_logs.py @@ -1,129 +1,327 @@ """Get Calibration logs from robots.""" -from typing import Dict, Any, List, Union +from typing import Dict, Any, List, Set import argparse import os import json import sys -import time as t +import traceback from abr_testing.data_collection import read_robot_logs from abr_testing.automation import google_drive_tool, google_sheets_tool -def check_for_duplicates( - sheet_location: str, - google_sheet: Any, - col_1: int, - col_2: int, - row: List[str], - headers: List[str], -) -> Union[List[str], None]: - """Check google sheet for duplicates.""" - t.sleep(5) - serials = google_sheet.get_column(col_1) - modify_dates = google_sheet.get_column(col_2) - # Check for calibration time stamp. - if row[-1] is not None: - if len(row[-1]) > 0: - for serial, modify_date in zip(serials, modify_dates): - if row[col_1 - 1] == serial and row[col_2 - 1] == modify_date: - print( - f"Skipped row for instrument {serial}. Already on Google Sheet." - ) - return None - read_robot_logs.write_to_sheets(sheet_location, google_sheet, row, headers) - print(f"Writing calibration for: {row[7]}") - return row - - -def upload_calibration_offsets( - calibration: Dict[str, Any], storage_directory: str -) -> None: - """Upload calibration data to google_sheet.""" - # Common Headers - headers_beg = list(calibration.keys())[:4] - headers_end = list(["X", "Y", "Z", "lastModified"]) +def instrument_helper( + headers_beg: List[str], + headers_end: List[str], + calibration_log: Dict[Any, Any], + google_sheet_name: str, + inst_sheet_serials: Set[str], + inst_sheet_modify_dates: Set[str], + storage_directory: str, +) -> List[Any]: + """Helper for parsing instrument calibration data.""" + # Populate Instruments # INSTRUMENT SHEET + instruments_upload_rows: List[Any] = [] instrument_headers = ( - headers_beg + list(calibration["Instruments"][0].keys())[:7] + headers_end + headers_beg + list(calibration_log["Instruments"][0].keys())[:7] + headers_end ) local_instrument_file = google_sheet_name + "-Instruments" - instrument_sheet_location = read_robot_logs.create_abr_data_sheet( + read_robot_logs.create_abr_data_sheet( storage_directory, local_instrument_file, instrument_headers ) # INSTRUMENTS DATA - instruments = calibration["Instruments"] + instruments = calibration_log["Instruments"] for instrument in range(len(instruments)): one_instrument = instruments[instrument] + inst_serial = one_instrument["serialNumber"] + modified = one_instrument["data"]["calibratedOffset"].get("last_modified", "") + if inst_serial in inst_sheet_serials and modified in inst_sheet_modify_dates: + continue x = one_instrument["data"]["calibratedOffset"]["offset"].get("x", "") y = one_instrument["data"]["calibratedOffset"]["offset"].get("y", "") z = one_instrument["data"]["calibratedOffset"]["offset"].get("z", "") - modified = one_instrument["data"]["calibratedOffset"].get("last_modified", "") instrument_row = ( - list(calibration.values())[:4] + list(calibration_log.values())[:4] + list(one_instrument.values())[:7] + list([x, y, z, modified]) ) - check_for_duplicates( - instrument_sheet_location, - google_sheet_instruments, - 8, - 15, - instrument_row, - instrument_headers, - ) + instruments_upload_rows.append(instrument_row) + return instruments_upload_rows + +def module_helper( + headers_beg: List[str], + headers_end: List[str], + calibration_log: Dict[Any, Any], + google_sheet_name: str, + module_sheet_serials: Set[str], + module_modify_dates: Set[str], + storage_directory: str, +) -> List[Any]: + """Helper for parsing module calibration data.""" + # Populate Modules # MODULE SHEET - if len(calibration.get("Modules", "")) > 0: + modules_upload_rows: List[Any] = [] + if len(calibration_log.get("Modules", "")) > 0: module_headers = ( - headers_beg + list(calibration["Modules"][0].keys())[:7] + headers_end + headers_beg + list(calibration_log["Modules"][0].keys())[:7] + headers_end ) local_modules_file = google_sheet_name + "-Modules" - modules_sheet_location = read_robot_logs.create_abr_data_sheet( + read_robot_logs.create_abr_data_sheet( storage_directory, local_modules_file, module_headers ) # MODULES DATA - modules = calibration["Modules"] + modules = calibration_log["Modules"] for module in range(len(modules)): one_module = modules[module] - x = one_module["moduleOffset"]["offset"].get("x", "") - y = one_module["moduleOffset"]["offset"].get("y", "") - z = one_module["moduleOffset"]["offset"].get("z", "") - modified = one_module["moduleOffset"].get("last_modified", "") + mod_serial = one_module["serialNumber"] + modified = "No data" + x = "" + y = "" + z = "" + try: + modified = one_module["moduleOffset"].get("last_modified", "") + x = one_module["moduleOffset"]["offset"].get("x", "") + y = one_module["moduleOffset"]["offset"].get("y", "") + z = one_module["moduleOffset"]["offset"].get("z", "") + except KeyError: + pass + if mod_serial in module_sheet_serials and modified in module_modify_dates: + continue module_row = ( - list(calibration.values())[:4] + list(calibration_log.values())[:4] + list(one_module.values())[:7] + list([x, y, z, modified]) ) - check_for_duplicates( - modules_sheet_location, - google_sheet_modules, - 8, - 15, - module_row, - module_headers, - ) + modules_upload_rows.append(module_row) + return modules_upload_rows + + +def deck_helper( + headers_beg: List[str], + headers_end: List[str], + calibration_log: Dict[Any, Any], + google_sheet_name: str, + deck_sheet_serials: Set[str], + deck_sheet_modify_dates: Set[str], + storage_directory: str, +) -> List[Any]: + """Helper for parsing deck calibration data.""" + deck_upload_rows: List[Any] = [] + # Populate Deck # DECK SHEET local_deck_file = google_sheet_name + "-Deck" deck_headers = headers_beg + list(["pipetteCalibratedWith", "Slot"]) + headers_end - deck_sheet_location = read_robot_logs.create_abr_data_sheet( + read_robot_logs.create_abr_data_sheet( storage_directory, local_deck_file, deck_headers ) # DECK DATA - deck = calibration["Deck"] - slots = ["D3", "D1", "A1"] + deck = calibration_log["Deck"] deck_modified = deck["data"].get("lastModified", "") + slots = ["D3", "D1", "A1"] pipette_calibrated_with = deck["data"].get("pipetteCalibratedWith", "") for i in range(len(deck["data"]["matrix"])): + if slots[i] in deck_sheet_serials and deck_modified in deck_sheet_modify_dates: + continue coords = deck["data"]["matrix"][i] x = coords[0] y = coords[1] z = coords[2] - deck_row = list(calibration.values())[:4] + list( + deck_row = list(calibration_log.values())[:4] + list( [pipette_calibrated_with, slots[i], x, y, z, deck_modified] ) - check_for_duplicates( - deck_sheet_location, google_sheet_deck, 6, 10, deck_row, deck_headers + deck_upload_rows.append(deck_row) + return deck_upload_rows + + +def send_batch_update( + instruments_upload_rows: List[str], + google_sheet_instruments: google_sheets_tool.google_sheet, + modules_upload_rows: List[str], + google_sheet_modules: google_sheets_tool.google_sheet, + deck_upload_rows: List[str], + google_sheet_deck: google_sheets_tool.google_sheet, +) -> None: + """Executes batch updates.""" + # Prepare for batch updates + try: + transposed_instruments_upload_rows = list( + map(list, zip(*instruments_upload_rows)) + ) + google_sheet_instruments.batch_update_cells( + transposed_instruments_upload_rows, + "A", + google_sheet_instruments.get_index_row() + 1, + "0", + ) + except Exception: + print("No new instrument data") + try: + transposed_module_upload_rows = list(map(list, zip(*modules_upload_rows))) + google_sheet_modules.batch_update_cells( + transposed_module_upload_rows, + "A", + google_sheet_modules.get_index_row() + 1, + "1020695883", + ) + except Exception: + print("No new module data") + try: + transposed_deck_upload_rows = list(map(list, zip(*deck_upload_rows))) + google_sheet_deck.batch_update_cells( + transposed_deck_upload_rows, + "A", + google_sheet_deck.get_index_row() + 1, + "1332568460", + ) + except Exception: + print("No new deck data") + + +def upload_calibration_offsets( + calibration_data: List[Dict[str, Any]], + storage_directory: str, + google_sheet_instruments: google_sheets_tool.google_sheet, + google_sheet_modules: google_sheets_tool.google_sheet, + google_sheet_deck: google_sheets_tool.google_sheet, + google_sheet_name: str, +) -> None: + """Upload calibration data to google_sheet.""" + # Common Headers + headers_beg = list(calibration_data[0].keys())[:4] + headers_end = list(["X", "Y", "Z", "lastModified"]) + sheets = [google_sheet_instruments, google_sheet_modules, google_sheet_deck] + instruments_upload_rows: List[Any] = [] + modules_upload_rows: List[Any] = [] + deck_upload_rows: List[Any] = [] + inst_sheet_serials: Set[str] = set() + inst_sheet_modify_dates: Set[str] = set() + module_sheet_serials: Set[str] = set() + deck_sheet_serials: Set[str] = set() + deck_sheet_modify_dates: Set[str] = set() + + # Get current serials, and modified info from google sheet + for i, sheet in enumerate(sheets): + if i == 0: + inst_sheet_serials = sheet.get_column(8) + inst_sheet_modify_dates = sheet.get_column(15) + if i == 1: + module_sheet_serials = sheet.get_column(8) + module_modify_dates = sheet.get_column(15) + elif i == 2: + deck_sheet_serials = sheet.get_column(6) + deck_sheet_modify_dates = sheet.get_column(10) + + # Go through caliration logs and deterine what should be added to the sheet + for calibration_log in calibration_data: + for sheet_ind, sheet in enumerate(sheets): + if sheet_ind == 0: + instruments_upload_rows += instrument_helper( + headers_beg, + headers_end, + calibration_log, + google_sheet_name, + inst_sheet_serials, + inst_sheet_modify_dates, + storage_directory, + ) + elif sheet_ind == 1: + modules_upload_rows += module_helper( + headers_beg, + headers_end, + calibration_log, + google_sheet_name, + module_sheet_serials, + module_modify_dates, + storage_directory, + ) + elif sheet_ind == 2: + deck_upload_rows += deck_helper( + headers_beg, + headers_end, + calibration_log, + google_sheet_name, + deck_sheet_serials, + deck_sheet_modify_dates, + storage_directory, + ) + send_batch_update( + instruments_upload_rows, + google_sheet_instruments, + modules_upload_rows, + google_sheet_modules, + deck_upload_rows, + google_sheet_deck, + ) + + +def run( + storage_directory: str, folder_name: str, google_sheet_name_param: str, email: str +) -> None: + """Main control function.""" + # Connect to google drive. + google_sheet_name = google_sheet_name_param + try: + credentials_path = os.path.join(storage_directory, "credentials.json") + except FileNotFoundError: + print(f"Add credentials.json file to: {storage_directory}.") + sys.exit() + google_drive = google_drive_tool.google_drive(credentials_path, folder_name, email) + # Connect to google sheet + google_sheet_instruments = google_sheets_tool.google_sheet( + credentials_path, google_sheet_name, 0 + ) + google_sheet_modules = google_sheets_tool.google_sheet( + credentials_path, google_sheet_name, 1 + ) + google_sheet_deck = google_sheets_tool.google_sheet( + credentials_path, google_sheet_name, 2 + ) + ip_json_file = os.path.join(storage_directory, "IPs.json") + try: + ip_file = json.load(open(ip_json_file)) + except FileNotFoundError: + print(f"Add .json file with robot IPs to: {storage_directory}.") + sys.exit() + ip_or_all = "" + while not ip_or_all: + ip_or_all = input("IP Address or ALL: ") + calibration_data = [] + if ip_or_all.upper() == "ALL": + ip_address_list = ip_file["ip_address_list"] + for ip in ip_address_list: + saved_file_path, calibration = read_robot_logs.get_calibration_offsets( + ip, storage_directory + ) + calibration_data.append(calibration) + # upload_calibration_offsets(calibration, storage_directory) + else: + try: + ( + saved_file_path, + calibration, + ) = read_robot_logs.get_calibration_offsets( + ip_or_all, storage_directory + ) + calibration_data.append(calibration) + except Exception: + print("Invalid IP try again") + ip_or_all = "" + try: + upload_calibration_offsets( + calibration_data, + storage_directory, + google_sheet_instruments, + google_sheet_modules, + google_sheet_deck, + google_sheet_name, ) + print("Successfully uploaded callibration data!") + except Exception: + print("No calibration data to upload: ") + traceback.print_exc() + sys.exit(1) + google_drive.upload_missing_files(storage_directory) if __name__ == "__main__": @@ -160,42 +358,3 @@ def upload_calibration_offsets( folder_name = args.folder_name[0] google_sheet_name = args.google_sheet_name[0] email = args.email[0] - # Connect to google drive. - try: - credentials_path = os.path.join(storage_directory, "credentials.json") - except FileNotFoundError: - print(f"Add credentials.json file to: {storage_directory}.") - sys.exit() - google_drive = google_drive_tool.google_drive(credentials_path, folder_name, email) - # Connect to google sheet - google_sheet_instruments = google_sheets_tool.google_sheet( - credentials_path, google_sheet_name, 0 - ) - google_sheet_modules = google_sheets_tool.google_sheet( - credentials_path, google_sheet_name, 1 - ) - google_sheet_deck = google_sheets_tool.google_sheet( - credentials_path, google_sheet_name, 2 - ) - ip_json_file = os.path.join(storage_directory, "IPs.json") - try: - ip_file = json.load(open(ip_json_file)) - except FileNotFoundError: - print(f"Add .json file with robot IPs to: {storage_directory}.") - sys.exit() - ip_or_all = input("IP Address or ALL: ") - - if ip_or_all == "ALL": - ip_address_list = ip_file["ip_address_list"] - for ip in ip_address_list: - saved_file_path, calibration = read_robot_logs.get_calibration_offsets( - ip, storage_directory - ) - upload_calibration_offsets(calibration, storage_directory) - else: - saved_file_path, calibration = read_robot_logs.get_calibration_offsets( - ip_or_all, storage_directory - ) - upload_calibration_offsets(calibration, storage_directory) - - google_drive.upload_missing_files(storage_directory) diff --git a/abr-testing/abr_testing/data_collection/abr_google_drive.py b/abr-testing/abr_testing/data_collection/abr_google_drive.py index e1924e3c53e..88ed55cab82 100644 --- a/abr-testing/abr_testing/data_collection/abr_google_drive.py +++ b/abr-testing/abr_testing/data_collection/abr_google_drive.py @@ -158,38 +158,10 @@ def create_data_dictionary( return transposed_runs_and_robots, headers, transposed_runs_and_lpc, headers_lpc -if __name__ == "__main__": - parser = argparse.ArgumentParser(description="Read run logs on google drive.") - parser.add_argument( - "storage_directory", - metavar="STORAGE_DIRECTORY", - type=str, - nargs=1, - help="Path to long term storage directory for run logs.", - ) - parser.add_argument( - "folder_name", - metavar="FOLDER_NAME", - type=str, - nargs=1, - help="Google Drive folder name. Open desired folder and copy string after drive/folders/.", - ) - parser.add_argument( - "google_sheet_name", - metavar="GOOGLE_SHEET_NAME", - type=str, - nargs=1, - help="Google sheet name.", - ) - parser.add_argument( - "email", metavar="EMAIL", type=str, nargs=1, help="opentrons gmail." - ) - args = parser.parse_args() - folder_name = args.folder_name[0] - storage_directory = args.storage_directory[0] - google_sheet_name = args.google_sheet_name[0] - email = args.email[0] - +def run( + storage_directory: str, folder_name: str, google_sheet_name: str, email: str +) -> None: + """Main control function.""" try: credentials_path = os.path.join(storage_directory, "credentials.json") except FileNotFoundError: @@ -203,7 +175,6 @@ def create_data_dictionary( # Get run ids on google sheet run_ids_on_gs = set(google_sheet.get_column(2)) # Get robots on google sheet - robots = list(set(google_sheet.get_column(1))) # Uploads files that are not in google drive directory google_drive.upload_missing_files(storage_directory) @@ -229,7 +200,6 @@ def create_data_dictionary( hellma_plate_standards=file_values, ) start_row = google_sheet.get_index_row() + 1 - print(start_row) google_sheet.batch_update_cells(transposed_runs_and_robots, "A", start_row, "0") # Add LPC to google sheet @@ -238,6 +208,40 @@ def create_data_dictionary( google_sheet_lpc.batch_update_cells( transposed_runs_and_lpc, "A", start_row_lpc, "0" ) - robots = list(set(google_sheet.get_column(1))) # Calculate Robot Lifetimes sync_abr_sheet.determine_lifetime(google_sheet) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Read run logs on google drive.") + parser.add_argument( + "storage_directory", + metavar="STORAGE_DIRECTORY", + type=str, + nargs=1, + help="Path to long term storage directory for run logs.", + ) + parser.add_argument( + "folder_name", + metavar="FOLDER_NAME", + type=str, + nargs=1, + help="Google Drive folder name. Open desired folder and copy string after drive/folders/.", + ) + parser.add_argument( + "google_sheet_name", + metavar="GOOGLE_SHEET_NAME", + type=str, + nargs=1, + help="Google sheet name.", + ) + parser.add_argument( + "email", metavar="EMAIL", type=str, nargs=1, help="opentrons gmail." + ) + args = parser.parse_args() + folder_name = args.folder_name[0] + storage_directory = args.storage_directory[0] + google_sheet_name = args.google_sheet_name[0] + email = args.email[0] + + run(storage_directory, folder_name, google_sheet_name, email) diff --git a/abr-testing/abr_testing/data_collection/get_run_logs.py b/abr-testing/abr_testing/data_collection/get_run_logs.py index 3d8eb851197..24d5aaf4f3b 100644 --- a/abr-testing/abr_testing/data_collection/get_run_logs.py +++ b/abr-testing/abr_testing/data_collection/get_run_logs.py @@ -92,7 +92,9 @@ def save_runs(runs_to_save: Set[str], ip: str, storage_directory: str) -> Set[st return saved_file_paths -def get_all_run_logs(storage_directory: str) -> None: +def get_all_run_logs( + storage_directory: str, google_drive: google_drive_tool.google_drive +) -> None: """GET ALL RUN LOGS. Connect to each ABR robot to read run log data. @@ -114,6 +116,17 @@ def get_all_run_logs(storage_directory: str) -> None: google_drive.upload_missing_files(storage_directory) +def run(storage_directory: str, folder_name: str, email: str) -> None: + """Main control function.""" + try: + credentials_path = os.path.join(storage_directory, "credentials.json") + except FileNotFoundError: + print(f"Add credentials.json file to: {storage_directory}.") + sys.exit() + google_drive = google_drive_tool.google_drive(credentials_path, folder_name, email) + get_all_run_logs(storage_directory, google_drive) + + if __name__ == "__main__": """Get run logs.""" parser = argparse.ArgumentParser(description="Pulls run logs from ABR robots.") @@ -138,10 +151,4 @@ def get_all_run_logs(storage_directory: str) -> None: storage_directory = args.storage_directory[0] folder_name = args.folder_name[0] email = args.email[0] - try: - credentials_path = os.path.join(storage_directory, "credentials.json") - except FileNotFoundError: - print(f"Add credentials.json file to: {storage_directory}.") - sys.exit() - google_drive = google_drive_tool.google_drive(credentials_path, folder_name, email) - get_all_run_logs(storage_directory) + run(storage_directory, folder_name, email) diff --git a/abr-testing/abr_testing/data_collection/read_robot_logs.py b/abr-testing/abr_testing/data_collection/read_robot_logs.py index be74294fbe5..ff650335d84 100644 --- a/abr-testing/abr_testing/data_collection/read_robot_logs.py +++ b/abr-testing/abr_testing/data_collection/read_robot_logs.py @@ -13,7 +13,6 @@ import time as t import json import requests -import sys from abr_testing.tools import plate_reader @@ -695,7 +694,7 @@ def get_calibration_offsets( print(f"Connected to {ip}") except Exception: print(f"ERROR: Failed to read IP address: {ip}") - sys.exit() + raise health_data = response.json() robot_name = health_data.get("name", "") api_version = health_data.get("api_version", "") diff --git a/abr-testing/abr_testing/tools/abr_setup.py b/abr-testing/abr_testing/tools/abr_setup.py new file mode 100644 index 00000000000..853f1c53ced --- /dev/null +++ b/abr-testing/abr_testing/tools/abr_setup.py @@ -0,0 +1,139 @@ +"""Automate ABR data collection.""" +import os +import time +import configparser +import traceback +import sys +from hardware_testing.scripts import ABRAsairScript # type: ignore +from abr_testing.data_collection import ( + get_run_logs, + abr_google_drive, + abr_calibration_logs, +) + + +def run_temp_sensor(ip_file: str) -> None: + """Run temperature sensors on all robots.""" + processes = ABRAsairScript.run(ip_file) + for process in processes: + process.start() + time.sleep(20) + for process in processes: + process.join() + + +def get_abr_logs(storage_directory: str, folder_name: str, email: str) -> None: + """Retrieve run logs on all robots and record missing run logs in google drive.""" + try: + get_run_logs.run(storage_directory, folder_name, email) + except Exception as e: + print("Cannot Get Run Logs", e) + traceback.print_exc + + +def record_abr_logs( + storage_directory: str, folder_name: str, google_sheet_name: str, email: str +) -> None: + """Write run logs to ABR run logs in sheets.""" + try: + abr_google_drive.run(storage_directory, folder_name, google_sheet_name, email) + except Exception as e: + print(e) + + +def get_calibration_data( + storage_directory: str, folder_name: str, google_sheet_name: str, email: str +) -> None: + """Download calibration logs and write to ABR-calibration-data in sheets.""" + try: + abr_calibration_logs.run( + storage_directory, folder_name, google_sheet_name, email + ) + except Exception as e: + print("Cannot get callibration data", e) + traceback.print_exc() + + +def main(configurations: configparser.ConfigParser) -> None: + """Main function.""" + ip_file = None + storage_directory = None + email = None + drive_folder = None + sheet_name = None + + has_defaults = False + # If default is not specified get all values + default = configurations["DEFAULT"] + if len(default) > 0: + has_defaults = True + try: + if has_defaults: + storage_directory = default["Storage"] + email = default["Email"] + drive_folder = default["Drive_Folder"] + sheet_name = default["Sheet_Name"] + except KeyError as e: + print("Cannot read config file\n" + str(e)) + + # Run Temperature Sensors + if not has_defaults: + ip_file = configurations["TEMP-SENSOR"]["Robo_List"] + print("Starting temp sensors...") + if ip_file: + run_temp_sensor(ip_file) + print("Temp Sensors Started") + else: + print("Missing ip_file location, please fix configs") + sys.exit(1) + # Get Run Logs and Record + if not has_defaults: + storage_directory = configurations["RUN-LOG"]["Storage"] + email = configurations["RUN-LOG"]["Email"] + drive_folder = configurations["RUN-LOG"]["Drive_Folder"] + sheet_name = configurations["RUN-LOG"]["Sheet_Name"] + print(sheet_name) + if storage_directory and drive_folder and sheet_name and email: + print("Retrieving robot run logs...") + get_abr_logs(storage_directory, drive_folder, email) + print("Recording robot run logs...") + record_abr_logs(storage_directory, drive_folder, sheet_name, email) + print("Run logs updated") + else: + print("Storage, Email, or Drive Folder is missing, please fix configs") + sys.exit(1) + + # Collect calibration data + if not has_defaults: + storage_directory = configurations["CALIBRATION"]["Storage"] + email = configurations["CALIBRATION"]["Email"] + drive_folder = configurations["CALIBRATION"]["Drive_Folder"] + sheet_name = configurations["CALIBRATION"]["Sheet_Name"] + if storage_directory and drive_folder and sheet_name and email: + print("Retrieving and recording robot calibration data...") + get_calibration_data(storage_directory, drive_folder, sheet_name, email) + print("Calibration logs updated") + else: + print( + "Storage, Email, Drive Folder, or Sheet name is missing, please fix configs" + ) + sys.exit(1) + + +if __name__ == "__main__": + configurations = None + configs_file = None + while not configs_file: + configs_file = input("Please enter path to config.ini: ") + if os.path.exists(configs_file): + break + else: + configs_file = None + print("Please enter a valid path") + try: + configurations = configparser.ConfigParser() + configurations.read(configs_file) + except configparser.ParsingError as e: + print("Cannot read configuration file\n" + str(e)) + if configurations: + main(configurations) diff --git a/abr-testing/abr_testing/tools/sync_abr_sheet.py b/abr-testing/abr_testing/tools/sync_abr_sheet.py index aca116292a8..569f0f9b834 100644 --- a/abr-testing/abr_testing/tools/sync_abr_sheet.py +++ b/abr-testing/abr_testing/tools/sync_abr_sheet.py @@ -7,6 +7,8 @@ import csv import sys import os +import time +import traceback from typing import Dict, Tuple, Any, List from statistics import mean, StatisticsError @@ -27,76 +29,94 @@ def determine_lifetime(abr_google_sheet: Any) -> None: ) # Goes through dataframe per robot for index, run in df_sheet_data.iterrows(): - end_time = run["End_Time"] - robot = run["Robot"] - robot_lifetime = ( - float(run["Robot Lifetime (%)"]) if run["Robot Lifetime (%)"] != "" else 0 + max_retries = 5 + retries = 0 + while retries < max_retries: + try: + update_df(abr_google_sheet, lifetime_index, df_sheet_data, dict(run)) + break + except Exception as e: + if "Quota exceeded for quota metric" in str(e): + retries += 1 + print( + f"Read/write limit reached on attempt: {retries}, pausing then retrying..." + ) + time.sleep(65) + else: + print("unrecoverable error:", e) + traceback.print_exc() + sys.exit(1) + + +def update_df( + abr_google_sheet: Any, lifetime_index: int, df_sheet_data: Any, run: Dict[Any, Any] +) -> None: + """Update google sheets with new run log data.""" + end_time = run["End_Time"] + robot = run["Robot"] + robot_lifetime = ( + float(run["Robot Lifetime (%)"]) if run["Robot Lifetime (%)"] != "" else 0 + ) + if robot_lifetime < 1 and len(run["Run_ID"]) > 1: + # Get Robot % Lifetime + robot_runs_before = df_sheet_data[ + (df_sheet_data["End_Time"] <= end_time) & (df_sheet_data["Robot"] == robot) + ] + robot_percent_lifetime = ( + (robot_runs_before["Run_Time (min)"].sum() / 60) / 3750 * 100 ) - if robot_lifetime < 1 and len(run["Run_ID"]) > 1: - # Get Robot % Lifetime - robot_runs_before = df_sheet_data[ + # Get Left Pipette % Lifetime + left_pipette = run["Left Mount"] + if len(left_pipette) > 1: + left_pipette_runs_before = df_sheet_data[ (df_sheet_data["End_Time"] <= end_time) - & (df_sheet_data["Robot"] == robot) + & ( + (df_sheet_data["Left Mount"] == left_pipette) + | (df_sheet_data["Right Mount"] == left_pipette) + ) ] - robot_percent_lifetime = ( - (robot_runs_before["Run_Time (min)"].sum() / 60) / 3750 * 100 + left_pipette_percent_lifetime = ( + (left_pipette_runs_before["Run_Time (min)"].sum() / 60) / 1248 * 100 ) - # Get Left Pipette % Lifetime - left_pipette = run["Left Mount"] - if len(left_pipette) > 1: - left_pipette_runs_before = df_sheet_data[ - (df_sheet_data["End_Time"] <= end_time) - & ( - (df_sheet_data["Left Mount"] == left_pipette) - | (df_sheet_data["Right Mount"] == left_pipette) - ) - ] - left_pipette_percent_lifetime = ( - (left_pipette_runs_before["Run_Time (min)"].sum() / 60) / 1248 * 100 - ) - else: - left_pipette_percent_lifetime = "" - # Get Right Pipette % Lifetime - right_pipette = run["Right Mount"] - if len(right_pipette) > 1: - right_pipette_runs_before = df_sheet_data[ - (df_sheet_data["End_Time"] <= end_time) - & ( - (df_sheet_data["Left Mount"] == right_pipette) - | (df_sheet_data["Right Mount"] == right_pipette) - ) - ] - right_pipette_percent_lifetime = ( - (right_pipette_runs_before["Run_Time (min)"].sum() / 60) - / 1248 - * 100 - ) - else: - right_pipette_percent_lifetime = "" - # Get Gripper % Lifetime - gripper = run["Extension"] - if len(gripper) > 1: - gripper_runs_before = df_sheet_data[ - (df_sheet_data["End_Time"] <= end_time) - & (df_sheet_data["Extension"] == gripper) - ] - gripper_percent_lifetime = ( - (gripper_runs_before["Run_Time (min)"].sum() / 60) / 3750 * 100 + else: + left_pipette_percent_lifetime = "" + # Get Right Pipette % Lifetime + right_pipette = run["Right Mount"] + if len(right_pipette) > 1: + right_pipette_runs_before = df_sheet_data[ + (df_sheet_data["End_Time"] <= end_time) + & ( + (df_sheet_data["Left Mount"] == right_pipette) + | (df_sheet_data["Right Mount"] == right_pipette) ) - else: - gripper_percent_lifetime = "" - run_id = run["Run_ID"] - row_num = abr_google_sheet.get_row_index_with_value(run_id, 2) - update_list = [ - [robot_percent_lifetime], - [left_pipette_percent_lifetime], - [right_pipette_percent_lifetime], - [gripper_percent_lifetime], ] - abr_google_sheet.batch_update_cells( - update_list, lifetime_index, row_num, "0" + right_pipette_percent_lifetime = ( + (right_pipette_runs_before["Run_Time (min)"].sum() / 60) / 1248 * 100 ) - print(f"Updated row {row_num} for run: {run_id}") + else: + right_pipette_percent_lifetime = "" + # Get Gripper % Lifetime + gripper = run["Extension"] + if len(gripper) > 1: + gripper_runs_before = df_sheet_data[ + (df_sheet_data["End_Time"] <= end_time) + & (df_sheet_data["Extension"] == gripper) + ] + gripper_percent_lifetime = ( + (gripper_runs_before["Run_Time (min)"].sum() / 60) / 3750 * 100 + ) + else: + gripper_percent_lifetime = "" + run_id = run["Run_ID"] + row_num = abr_google_sheet.get_row_index_with_value(run_id, 2) + update_list = [ + [robot_percent_lifetime], + [left_pipette_percent_lifetime], + [right_pipette_percent_lifetime], + [gripper_percent_lifetime], + ] + abr_google_sheet.batch_update_cells(update_list, lifetime_index, row_num, "0") + print(f"Updated row {row_num} for run: {run_id}") def compare_run_to_temp_data( diff --git a/abr-testing/protocol_simulation/simulation_metrics.py b/abr-testing/protocol_simulation/simulation_metrics.py index 544bc3fb4bc..c110011d57e 100644 --- a/abr-testing/protocol_simulation/simulation_metrics.py +++ b/abr-testing/protocol_simulation/simulation_metrics.py @@ -12,7 +12,11 @@ from typing import Set, Dict, Any, Tuple, List, Union from abr_testing.tools import plate_reader + + + def look_for_air_gaps(protocol_file_path: str) -> int: + """Search Protocol for Air Gaps""" instances = 0 try: with open(protocol_file_path, "r") as open_file: @@ -26,34 +30,18 @@ def look_for_air_gaps(protocol_file_path: str) -> int: except Exception as error: print("Error reading protocol:", error.with_traceback()) return instances - -def set_api_level(protocol_file_path) -> None: - with open(protocol_file_path, "r") as file: - file_contents = file.readlines() - # Look for current'apiLevel:' - for i, line in enumerate(file_contents): - print(line) - if 'apiLevel' in line: - print(f"The current API level of this protocol is: {line}") - change = input("Would you like to simulate with a different API level? (Y/N) ").strip().upper() - if change == "Y": - api_level = input("Protocol API Level to Simulate with: ") - # Update new API level - file_contents[i] = f'apiLevel: {api_level}\n' - print(f"Updated line: {file_contents[i]}") - break - with open(protocol_file_path, "w") as file: - file.writelines(file_contents) - print("File updated successfully.") -original_exit = sys.exit +# Mock sys.exit to avoid program termination +original_exit = sys.exit # Save the original sys.exit function -def mock_exit(code=None) -> None: +def mock_exit(code: Any = None) -> None: + """Prevents program from exiting after analyze""" print(f"sys.exit() called with code: {code}") - raise SystemExit(code) + raise SystemExit(code) # Raise the exception but catch it to prevent termination def get_labware_name(id: str, object_dict: dict, json_data: dict) -> str: + """Recursively find the labware_name""" slot = "" for obj in object_dict: if obj['id'] == id: @@ -62,6 +50,7 @@ def get_labware_name(id: str, object_dict: dict, json_data: dict) -> str: slot = obj['location']['slotName'] return " SLOT: " + slot except KeyError: + # Handle KeyError when location or slotName is missing location = obj.get('location', {}) # Check if location contains 'moduleId' @@ -74,15 +63,18 @@ def get_labware_name(id: str, object_dict: dict, json_data: dict) -> str: return " Labware not found" + def parse_results_volume(json_data_file: str) -> Tuple[ List[str], List[str], List[str], List[str], List[str], List[str], List[str], List[str], List[str], List[str], List[str] ]: + """Pars run log and extract neccessay information""" json_data = [] with open(json_data_file, "r") as json_file: json_data = json.load(json_file) commands = json_data.get("commands", []) + start_time = datetime.fromisoformat(commands[0]["createdAt"]) end_time = datetime.fromisoformat(commands[len(commands)-1]["completedAt"]) header = ["", "Protocol Name", "Date", "Time"] @@ -127,6 +119,7 @@ def parse_results_volume(json_data_file: str) -> Tuple[ "Average Liquid Probe Time (sec)", ] values_row = ["Value"] + labware_well_dict = {} hs_dict, temp_module_dict, thermo_cycler_dict, plate_reader_dict, instrument_dict = {}, {}, {}, {}, {} try: @@ -140,53 +133,52 @@ def parse_results_volume(json_data_file: str) -> Tuple[ metrics = [hs_dict, temp_module_dict, thermo_cycler_dict, plate_reader_dict, instrument_dict] - # Iterate through all the commands executed in the protocol run log for x, command in enumerate(commands): if x != 0: prev_command = commands[x-1] if command["commandType"] == "aspirate": - if not (prev_command["commandType"] == "comment" and (prev_command['params']['message'] == "AIR GAP" or prev_command['params']['message'] == "MIXING")): - labware_id = command["params"]["labwareId"] - labware_name = "" - for labware in json_data.get("labware"): - if labware["id"] == labware_id: - labware_name = (labware["loadName"]) + get_labware_name(labware["id"], json_data["labware"], json_data) - well_name = command["params"]["wellName"] + labware_id = command["params"]["labwareId"] + labware_name = "" + for labware in json_data.get("labware"): + if labware["id"] == labware_id: + labware_name = (labware["loadName"]) + get_labware_name(labware["id"], json_data["labware"], json_data) + well_name = command["params"]["wellName"] - if labware_id not in labware_well_dict: - labware_well_dict[labware_id] = {} + if labware_id not in labware_well_dict: + labware_well_dict[labware_id] = {} - if well_name not in labware_well_dict[labware_id]: - labware_well_dict[labware_id][well_name] = (labware_name, 0, 0, "") + if well_name not in labware_well_dict[labware_id]: + labware_well_dict[labware_id][well_name] = (labware_name, 0, 0, "") - vol = int(command["params"]["volume"]) + vol = int(command["params"]["volume"]) - labware_name, added_volumes, subtracted_volumes, log = labware_well_dict[labware_id][well_name] + labware_name, added_volumes, subtracted_volumes, log = labware_well_dict[labware_id][well_name] + + subtracted_volumes += vol + log+=(f"aspirated {vol} ") + labware_well_dict[labware_id][well_name] = (labware_name, added_volumes, subtracted_volumes, log) - subtracted_volumes += vol - log+=(f"aspirated {vol} ") - labware_well_dict[labware_id][well_name] = (labware_name, added_volumes, subtracted_volumes, log) elif command["commandType"] == "dispense": - if not (prev_command["commandType"] == "comment" and (prev_command['params']['message'] == "MIXING")): - labware_id = command["params"]["labwareId"] - labware_name = "" - for labware in json_data.get("labware"): - if labware["id"] == labware_id: - labware_name = (labware["loadName"]) + get_labware_name(labware["id"], json_data["labware"], json_data) - well_name = command["params"]["wellName"] + labware_id = command["params"]["labwareId"] + labware_name = "" + for labware in json_data.get("labware"): + if labware["id"] == labware_id: + labware_name = (labware["loadName"]) + get_labware_name(labware["id"], json_data["labware"], json_data) + well_name = command["params"]["wellName"] + + if labware_id not in labware_well_dict: + labware_well_dict[labware_id] = {} - if labware_id not in labware_well_dict: - labware_well_dict[labware_id] = {} + if well_name not in labware_well_dict[labware_id]: + labware_well_dict[labware_id][well_name] = (labware_name, 0, 0, "") - if well_name not in labware_well_dict[labware_id]: - labware_well_dict[labware_id][well_name] = (labware_name, 0, 0, "") + vol = int(command["params"]["volume"]) - vol = int(command["params"]["volume"]) - labware_name, added_volumes, subtracted_volumes, log = labware_well_dict[labware_id][well_name] - added_volumes += vol - log+=(f"dispensed {vol} ") - labware_well_dict[labware_id][well_name] = (labware_name, added_volumes, subtracted_volumes, log) - # file_date_formatted = file_date.strftime("%Y-%m-%d_%H-%M-%S") + labware_name, added_volumes, subtracted_volumes, log = labware_well_dict[labware_id][well_name] + + added_volumes += vol + log+=(f"dispensed {vol} ") + labware_well_dict[labware_id][well_name] = (labware_name, added_volumes, subtracted_volumes, log) with open(f"{os.path.dirname(json_data_file)}\\{protocol_name}_well_volumes_{file_date_formatted}.json", "w") as output_file: json.dump(labware_well_dict, output_file) output_file.close() @@ -224,9 +216,10 @@ def parse_results_volume(json_data_file: str) -> Tuple[ metrics_row, values_row) -def main(storage_directory, google_sheet_name, protocol_file_path): - sys.exit = mock_exit +def main(protocol_file_path: Path, save: bool, storage_directory: str = os.curdir, google_sheet_name: str = "") -> None: + """Main module control""" + sys.exit = mock_exit # Replace sys.exit with the mock function # Read file path from arguments protocol_file_path = Path(protocol_file_path) global protocol_name @@ -236,27 +229,41 @@ def main(storage_directory, google_sheet_name, protocol_file_path): file_date = datetime.now() global file_date_formatted file_date_formatted = file_date.strftime("%Y-%m-%d_%H-%M-%S") - # Prepare output file - json_file_path = f"{storage_directory}\\{protocol_name}_{file_date_formatted}.json" - json_file_output = open(json_file_path, "wb+") - error_output = f"{storage_directory}\\error_log" + error_output = f"{storage_directory}\\test_debug" # Run protocol simulation try: with Context(analyze) as ctx: - ctx.invoke( - analyze, - files=[protocol_file_path], - json_output=json_file_output, - human_json_output=None, - log_output=error_output, - log_level="ERROR", - check=False - ) + if save: + # Prepare output file + json_file_path = f"{storage_directory}\\{protocol_name}_{file_date_formatted}.json" + json_file_output = open(json_file_path, "wb+") + # log_output_file = f"{protocol_name}_log" + ctx.invoke( + analyze, + files=[protocol_file_path], + json_output=json_file_output, + human_json_output=None, + log_output=error_output, + log_level="ERROR", + check=False + ) + json_file_output.close() + else: + ctx.invoke( + analyze, + files=[protocol_file_path], + json_output=None, + human_json_output=None, + log_output=error_output, + log_level="ERROR", + check=True + ) + except SystemExit as e: print(f"SystemExit caught with code: {e}") finally: + # Reset sys.exit to the original behavior sys.exit = original_exit - json_file_output.close() with open(error_output, "r") as open_file: try: errors = open_file.readlines() @@ -267,32 +274,30 @@ def main(storage_directory, google_sheet_name, protocol_file_path): except: print("error simulating ...") sys.exit() + if save: + try: + credentials_path = os.path.join(storage_directory, "credentials.json") + print(credentials_path) - try: - credentials_path = os.path.join(storage_directory, "credentials.json") - print(credentials_path) - except FileNotFoundError: - print(f"Add credentials.json file to: {storage_directory}.") - sys.exit() - - global hellma_plate_standards - - try: - hellma_plate_standards = plate_reader.read_hellma_plate_files(storage_directory, 101934) - except: - print(f"Add helma plate standard files to {storage_directory}.") - sys.exit() - - google_sheet = google_sheets_tool.google_sheet( - credentials_path, google_sheet_name, 0 - ) - - google_sheet.write_to_row([]) - - for row in parse_results_volume(json_file_path): - print("Writing results to", google_sheet_name) - print(str(row)) - google_sheet.write_to_row(row) + except FileNotFoundError: + print(f"Add credentials.json file to: {storage_directory}.") + sys.exit() + + global hellma_plate_standards + try: + hellma_plate_standards = plate_reader.read_hellma_plate_files(storage_directory, 101934) + + except: + print(f"Add helma plate standard files to {storage_directory}.") + sys.exit() + google_sheet = google_sheets_tool.google_sheet( + credentials_path, google_sheet_name, 0 + ) + google_sheet.write_to_row([]) + for row in parse_results_volume(json_file_path): + print("Writing results to", google_sheet_name) + print(str(row)) + google_sheet.write_to_row(row) if __name__ == "__main__": CLEAN_PROTOCOL = True @@ -343,11 +348,13 @@ def main(storage_directory, google_sheet_name, protocol_file_path): choice = "" print("Please enter a valid response.") SETUP = False - + + # set_api_level() if CLEAN_PROTOCOL: + # set_api_level(Path(protocol_file_path)) main( - storage_directory, - sheet_name, protocol_file_path, - ) + True, + storage_directory, + sheet_name,) else: sys.exit(0) \ No newline at end of file diff --git a/hardware-testing/hardware_testing/scripts/ABRAsairScript.py b/hardware-testing/hardware_testing/scripts/ABRAsairScript.py index 2324e330dc7..8ab86522952 100644 --- a/hardware-testing/hardware_testing/scripts/ABRAsairScript.py +++ b/hardware-testing/hardware_testing/scripts/ABRAsairScript.py @@ -15,19 +15,8 @@ def execute(client: pmk.SSHClient, command: str, args: list) -> Optional[int]: stdin, stdout, stderr = client.exec_command(command, get_pty=True) stdout_lines: List[str] = [] stderr_lines: List[str] = [] - time.sleep(15) + time.sleep(25) - # check stdout, stderr - - # Check the exit status of the command. - # while not stdout.channel.exit_status_ready(): - # if stdout.channel.recv_ready(): - # stdout_lines = stdout.readlines() - # print(f"{args[0]} output:", "".join(stdout_lines)) - # if stderr.channel.recv_ready(): - # stderr_lines = stderr.readlines() - # print(f"{args[0]} ERROR:", "".join(stdout_lines)) - # return 1 if stderr.channel.recv_ready: stderr_lines = stderr.readlines() if stderr_lines != []: @@ -58,24 +47,7 @@ def connect_ssh(ip: str) -> pmk.SSHClient: return client -# Load Robot IPs -file_name = sys.argv[1] -robot_ips = [] -robot_names = [] - -with open(file_name) as file: - for line in file.readlines(): - info = line.split(",") - if "Y" in info[2]: - robot_ips.append(info[0]) - robot_names.append(info[1]) - -cmd = "nohup python3 -m hardware_testing.scripts.abr_asair_sensor {name} {duration} {frequency}" -cd = "cd /opt/opentrons-robot-server && " -print("Executing Script on All Robots:") - - -def run_command_on_ip(index: int) -> None: +def run_command_on_ip(index: int, robot_ips, robot_names, cd, cmd) -> None: """Execute ssh command and start abr_asair script on the specified robot.""" curr_ip = robot_ips[index] try: @@ -87,15 +59,34 @@ def run_command_on_ip(index: int) -> None: print(f"Error running command on {curr_ip}: {e}") -# Launch the processes for each robot. -processes = [] -for index in range(len(robot_ips)): - process = multiprocessing.Process(target=run_command_on_ip, args=(index,)) - processes.append(process) +def run(file_name): + # Load Robot IPs + cmd = "nohup python3 -m hardware_testing.scripts.abr_asair_sensor {name} {duration} {frequency}" + cd = "cd /opt/opentrons-robot-server && " + robot_ips = [] + robot_names = [] + with open(file_name) as file: + for line in file.readlines(): + info = line.split(",") + if "Y" in info[2]: + robot_ips.append(info[0]) + robot_names.append(info[1]) + print("Executing Script on All Robots:") + # Launch the processes for each robot. + processes = [] + for index in range(len(robot_ips)): + process = multiprocessing.Process( + target=run_command_on_ip, args=(index, robot_ips, robot_names, cd, cmd) + ) + processes.append(process) + return processes if __name__ == "__main__": # Wait for all processes to finish. + file_name = sys.argv[1] + processes = run(file_name) + for process in processes: process.start() time.sleep(20) diff --git a/hardware-testing/hardware_testing/scripts/abr_asair_sensor.py b/hardware-testing/hardware_testing/scripts/abr_asair_sensor.py index 1e8fca0358c..ba41f9399f1 100644 --- a/hardware-testing/hardware_testing/scripts/abr_asair_sensor.py +++ b/hardware-testing/hardware_testing/scripts/abr_asair_sensor.py @@ -80,7 +80,7 @@ def __init__(self, robot: str, duration: int, frequency: int) -> None: break # write to google sheet try: - if google_sheet.creditals.access_token_expired: + if google_sheet.credentials.access_token_expired: google_sheet.gc.login() google_sheet.write_header(header) google_sheet.update_row_index()