Skip to content

Commit

Permalink
Merge branch 'edge' into plate_reader_csv_engine_file_provider
Browse files Browse the repository at this point in the history
  • Loading branch information
CaseyBatten committed Oct 18, 2024
2 parents 27b4c88 + 7b023dc commit a564912
Show file tree
Hide file tree
Showing 637 changed files with 92,870 additions and 50,883 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/analyses-snapshot-lint.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ jobs:
- name: Setup Python
uses: 'actions/setup-python@v5'
with:
python-version: '3.13.0-rc.3'
python-version: '3.13.0'
cache: 'pipenv'
cache-dependency-path: analyses-snapshot-testing/Pipfile.lock
- name: Setup
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/analyses-snapshot-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ jobs:
- name: Set up Python 3.13
uses: actions/setup-python@v5
with:
python-version: '3.13.0-rc.3'
python-version: '3.13.0'
cache: 'pipenv'
cache-dependency-path: analyses-snapshot-testing/Pipfile.lock

Expand Down
10 changes: 9 additions & 1 deletion abr-testing/abr_testing/automation/google_sheets_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,15 @@ def update_cell(
self, sheet_title: str, row: int, column: int, single_data: Any
) -> Tuple[int, int, Any]:
"""Update ONE individual cell according to a row and column."""
self.spread_sheet.worksheet(sheet_title).update_cell(row, column, single_data)
try:
self.spread_sheet.worksheet(sheet_title).update_cell(
row, column, single_data
)
except gspread.exceptions.APIError:
t.sleep(30)
self.spread_sheet.worksheet(sheet_title).update_cell(
row, column, single_data
)
return row, column, single_data

def get_all_data(
Expand Down
6 changes: 4 additions & 2 deletions abr-testing/abr_testing/data_collection/abr_google_drive.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def create_data_dictionary(
print(f"Run {run_id} is incomplete. Skipping run.")
continue
if run_id in runs_to_save:
print("started reading run.")
print(f"started reading run {run_id}.")
robot = file_results.get("robot_name")
protocol_name = file_results["protocol"]["metadata"].get("protocolName", "")
software_version = file_results.get("API_Version", "")
Expand Down Expand Up @@ -114,7 +114,9 @@ def create_data_dictionary(
tc_dict = read_robot_logs.thermocycler_commands(file_results)
hs_dict = read_robot_logs.hs_commands(file_results)
tm_dict = read_robot_logs.temperature_module_commands(file_results)
pipette_dict = read_robot_logs.instrument_commands(file_results)
pipette_dict = read_robot_logs.instrument_commands(
file_results, labware_name="opentrons_tough_pcr_auto_sealing_lid"
)
plate_reader_dict = read_robot_logs.plate_reader_commands(
file_results, hellma_plate_standards
)
Expand Down
5 changes: 4 additions & 1 deletion abr-testing/abr_testing/data_collection/abr_robot_error.py
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,10 @@ def get_run_error_info_from_robot(
# TODO: make argument or see if I can get rid of with using board_id.
project_key = "RABR"
print(robot)
parent_key = project_key + "-" + robot.split("ABR")[1]
try:
parent_key = project_key + "-" + robot.split("ABR")[1]
except IndexError:
parent_key = ""

# Grab all previous issues
all_issues = ticket.issues_on_board(project_key)
Expand Down
150 changes: 105 additions & 45 deletions abr-testing/abr_testing/data_collection/read_robot_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from datetime import datetime
import os
from abr_testing.data_collection.error_levels import ERROR_LEVELS_PATH
from typing import List, Dict, Any, Tuple, Set
from typing import List, Dict, Any, Tuple, Set, Optional
import time as t
import json
import requests
Expand Down Expand Up @@ -107,7 +107,44 @@ def count_command_in_run_data(
return total_command, avg_time


def instrument_commands(file_results: Dict[str, Any]) -> Dict[str, float]:
def identify_labware_ids(
file_results: Dict[str, Any], labware_name: Optional[str]
) -> List[str]:
"""Determine what type of labware is being picked up."""
if labware_name:
labwares = file_results.get("labware", "")
list_of_labware_ids = []
if len(labwares) > 1:
for labware in labwares:
load_name = labware["loadName"]
if load_name == labware_name:
labware_id = labware["id"]
list_of_labware_ids.append(labware_id)
return list_of_labware_ids


def match_pipette_to_action(
command_dict: Dict[str, Any],
commandTypes: List[str],
right_pipette: Optional[str],
left_pipette: Optional[str],
) -> Tuple[int, int]:
"""Match pipette id to id in command."""
right_pipette_add = 0
left_pipette_add = 0
for command in commandTypes:
command_type = command_dict["commandType"]
command_pipette = command_dict.get("pipetteId", "")
if command_type == command and command_pipette == right_pipette:
right_pipette_add = 1
elif command_type == command and command_pipette == left_pipette:
left_pipette_add = 1
return left_pipette_add, right_pipette_add


def instrument_commands(
file_results: Dict[str, Any], labware_name: Optional[str]
) -> Dict[str, float]:
"""Count number of pipette and gripper commands per run."""
pipettes = file_results.get("pipettes", "")
commandData = file_results.get("commands", "")
Expand All @@ -120,38 +157,44 @@ def instrument_commands(file_results: Dict[str, Any]) -> Dict[str, float]:
right_pipette_id = ""
left_pipette_id = ""
gripper_pickups = 0.0
gripper_labware_of_interest = 0.0
avg_liquid_probe_time_sec = 0.0
list_of_labware_ids = identify_labware_ids(file_results, labware_name)
# Match pipette mount to id
for pipette in pipettes:
if pipette["mount"] == "right":
right_pipette_id = pipette["id"]
elif pipette["mount"] == "left":
left_pipette_id = pipette["id"]
for command in commandData:
commandType = command["commandType"]
# Count tip pick ups
if commandType == "pickUpTip":
if command["params"].get("pipetteId", "") == right_pipette_id:
right_tip_pick_up += 1
elif command["params"].get("pipetteId", "") == left_pipette_id:
left_tip_pick_up += 1
# Count pick ups
single_left_pickup, single_right_pickup = match_pipette_to_action(
command, ["pickUpTip"], right_pipette_id, left_pipette_id
)
right_tip_pick_up += single_right_pickup
left_tip_pick_up += single_left_pickup
# Count aspirates
elif commandType == "aspirate":
if command["params"].get("pipetteId", "") == right_pipette_id:
right_aspirate += 1
elif command["params"].get("pipetteId", "") == left_pipette_id:
left_aspirate += 1
single_left_aspirate, single_right_aspirate = match_pipette_to_action(
command, ["aspirate"], right_pipette_id, left_pipette_id
)
right_aspirate += single_right_aspirate
left_aspirate += single_left_aspirate
# count dispenses/blowouts
elif commandType == "dispense" or commandType == "blowout":
if command["params"].get("pipetteId", "") == right_pipette_id:
right_dispense += 1
elif command["params"].get("pipetteId", "") == left_pipette_id:
left_dispense += 1
elif (
single_left_dispense, single_right_dispense = match_pipette_to_action(
command, ["blowOut", "dispense"], right_pipette_id, left_pipette_id
)
right_dispense += single_right_dispense
left_dispense += single_left_dispense
# count gripper actions
commandType = command["commandType"]
if (
commandType == "moveLabware"
and command["params"]["strategy"] == "usingGripper"
):
gripper_pickups += 1
labware_moving = command["params"]["labwareId"]
if labware_moving in list_of_labware_ids:
gripper_labware_of_interest += 1
liquid_probes, avg_liquid_probe_time_sec = count_command_in_run_data(
commandData, "liquidProbe", True
)
Expand All @@ -163,6 +206,7 @@ def instrument_commands(file_results: Dict[str, Any]) -> Dict[str, float]:
"Right Pipette Total Aspirates": right_aspirate,
"Right Pipette Total Dispenses": right_dispense,
"Gripper Pick Ups": gripper_pickups,
f"Gripper Pick Ups of {labware_name}": gripper_labware_of_interest,
"Total Liquid Probes": liquid_probes,
"Average Liquid Probe Time (sec)": avg_liquid_probe_time_sec,
}
Expand All @@ -178,11 +222,12 @@ def plate_reader_commands(
initialize_count: int = 0
read = "no"
final_result = {}
# Count Number of Reads
read_num = 0
# Count Number of Reads per measure mode
read_count, avg_read_time = count_command_in_run_data(
commandData, "absorbanceReader/read", True
)
# Count Number of Initializations
# Count Number of Initializations per measure mode
initialize_count, avg_initialize_time = count_command_in_run_data(
commandData, "absorbanceReader/initialize", True
)
Expand All @@ -198,28 +243,37 @@ def plate_reader_commands(
read = "yes"
elif read == "yes" and commandType == "comment":
result = command["params"].get("message", "")
wavelength = result.split("result: {")[1].split(":")[0]
wavelength_str = wavelength + ": "
rest_of_string = result.split(wavelength_str)[1][:-1]
result_dict = eval(rest_of_string)
result_ndarray = plate_reader.convert_read_dictionary_to_array(result_dict)
for item in hellma_plate_standards:
wavelength_of_interest = item["wavelength"]
if str(wavelength) == str(wavelength_of_interest):
error_cells = plate_reader.check_byonoy_data_accuracy(
result_ndarray, item, False
)
if len(error_cells[0]) > 0:
percent = (96 - len(error_cells)) / 96 * 100
for cell in error_cells:
print("FAIL: Cell " + str(cell) + " out of accuracy spec.")
else:
percent = 100
print(
f"PASS: {wavelength_of_interest} meet accuracy specification"
formatted_result = result.split("result: ")[1]
result_dict = eval(formatted_result)
result_dict_keys = list(result_dict.keys())
if len(result_dict_keys) > 1:
read_type = "multi"
else:
read_type = "single"
for wavelength in result_dict_keys:
one_wavelength_dict = result_dict.get(wavelength)
result_ndarray = plate_reader.convert_read_dictionary_to_array(
one_wavelength_dict
)
for item in hellma_plate_standards:
wavelength_of_interest = item["wavelength"]
if str(wavelength) == str(wavelength_of_interest):
error_cells = plate_reader.check_byonoy_data_accuracy(
result_ndarray, item, False
)
final_result[wavelength] = percent
input("###########################")
if len(error_cells[0]) > 0:
percent = (96 - len(error_cells)) / 96 * 100
for cell in error_cells:
print(
"FAIL: Cell " + str(cell) + " out of accuracy spec."
)
else:
percent = 100
print(
f"PASS: {wavelength_of_interest} meet accuracy specification"
)
final_result[read_type, wavelength, read_num] = percent
read_num += 1
read = "no"
plate_dict = {
"Plate Reader # of Reads": read_count,
Expand Down Expand Up @@ -372,7 +426,10 @@ def thermocycler_commands(file_results: Dict[str, Any]) -> Dict[str, float]:
or commandType == "thermocycler/closeLid"
):
lid_engagements += 1
if commandType == "thermocycler/setTargetBlockTemperature":
if (
commandType == "thermocycler/setTargetBlockTemperature"
and command["status"] != "queued"
):
block_temp = command["params"]["celsius"]
block_temp_changes += 1
block_on_time = datetime.strptime(
Expand Down Expand Up @@ -502,7 +559,10 @@ def get_error_info(file_results: Dict[str, Any]) -> Dict[str, Any]:
error_code = error_details.get("errorCode", "")
error_instrument = error_details.get("detail", "")
# Determine error level
error_level = error_levels.get(error_code, "4")
if end_run_errors > 0:
error_level = error_levels.get(error_code, "4")
else:
error_level = ""
# Create dictionary with all error descriptions
error_dict = {
"Total Recoverable Error(s)": total_recoverable_errors,
Expand Down
1 change: 1 addition & 0 deletions abr-testing/protocol_simulation/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""The package holding code for simulating protocols."""
Loading

0 comments on commit a564912

Please sign in to comment.