Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AVITI manifest fixes #393

Merged
163 changes: 123 additions & 40 deletions scripts/generate_aviti_run_manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from genologics.config import BASEURI, PASSWORD, USERNAME
from genologics.entities import Process
from genologics.lims import Lims
from Levenshtein import hamming as distance
from Levenshtein import distance

from data.Chromium_10X_indexes import Chromium_10X_indexes
from scilifelab_epps.epp import upload_file
Expand Down Expand Up @@ -145,7 +145,9 @@ def dict_to_manifest_col(d: dict) -> str:
return s


def get_manifests(process: Process, manifest_root_name: str) -> list[tuple[str, str]]:
def get_manifests(
process: Process, manifest_root_name: str
) -> list[tuple[str, str | None]]:
"""Generate multiple manifests, grouping samples by index multiplicity and length,
adding PhiX controls of appropriate lengths as needed.
"""
Expand All @@ -158,30 +160,27 @@ def get_manifests(process: Process, manifest_root_name: str) -> list[tuple[str,

# Assert lanes
lanes = [art_out.location[1].split(":")[0] for art_out in arts_out]
lanes.sort()
assert set(lanes) == {"1"} or set(lanes) == {
"1",
"2",
}, "Expected a single-lane or dual-lane flowcell."

# Iterate over pool / lane
sample_rows = []
for pool, lane in zip(arts_out, lanes):
for pool, lane in sorted(zip(arts_out, lanes), key=lambda x: x[1]):
# Get sample-label linkage via database
sample2label: dict[str, str] = get_pool_sample_label_mapping(pool)
assert len(set(pool.reagent_labels)) == len(pool.reagent_labels), (
"Detected non-unique reagent labels."
f"Detected non-unique reagent labels in lane {lane}"
)

# Record PhiX UDFs for each output artifact
phix_loaded: bool = pool.udf["% phiX"] != 0
phix_set_name = pool.udf.get("Element PhiX Set", None)
if phix_loaded:
assert phix_set_name is not None, (
"PhiX controls loaded but no kit specified."
)
phix_loaded: float = pool.udf.get("% phiX", 0)
phix_set_name: str = pool.udf.get("Element PhiX Set", "")
if phix_loaded != 0:
assert phix_set_name != "", "PhiX controls loaded but no kit specified."
else:
assert phix_set_name is None, "PhiX controls specified but not loaded."
assert phix_set_name == "", "PhiX controls specified but not loaded."

# Collect rows for each sample
for sample in pool.samples:
Expand Down Expand Up @@ -274,7 +273,7 @@ def get_manifests(process: Process, manifest_root_name: str) -> list[tuple[str,
check_distances(rows_to_check)

# Start building manifests
manifests: list[tuple[str, str]] = []
manifests: list[tuple[str, str | None]] = []
for manifest_type in ["untrimmed", "trimmed", "phix", "empty"]:
manifest_name, manifest_contents = make_manifest(
df_samples_and_controls,
Expand All @@ -292,7 +291,13 @@ def make_manifest(
process: Process,
manifest_root_name: str,
manifest_type: str,
) -> tuple[str, str]:
) -> tuple[str, str | None]:
logging.info(f"Building {manifest_type} manifest...")

# Get the index cycles from the step fields
idx1_cycles = int(process.udf.get("Index Read 1"))
idx2_cycles = int(process.udf.get("Index Read 2"))

# Make copy of input df and subset columns to include in manifest
df = df_samples_and_controls[
[
Expand All @@ -319,26 +324,36 @@ def make_manifest(
]
)

settings_section = "\n".join(
[
"[SETTINGS]",
"SettingName, Value",
]
)

# Build the [SAMPLES] section of the manifest, depending on the manifest type.
if manifest_type == "untrimmed":
samples_section = f"[SAMPLES]\n{df.to_csv(index=None, header=True)}"

elif manifest_type == "trimmed":
min_idx1_len = df["Index1"].apply(len).min()
min_idx2_len = df["Index2"].apply(len).min()
df["Index1"] = df["Index1"].apply(lambda x: x[:min_idx1_len])
df["Index2"] = df["Index2"].apply(lambda x: x[:min_idx2_len])

samples_section = f"[SAMPLES]\n{df.to_csv(index=None, header=True)}"
elif manifest_type in ["trimmed", "phix"]:
if manifest_type == "phix":
# Subset to PhiX controls
df = df[df["Project"] == "Control"]

# Make index lengths conform to number of cycles
for idx, cycles in zip(["Index1", "Index2"], [idx1_cycles, idx2_cycles]):
# If there are any indexes shorter than the number of cycles
if not df[df[idx].apply(len) < cycles].empty:
for row in df[df[idx].apply(len) < cycles].to_dict(orient="records"):
logging.error(
f"'{row['SampleName']}' has {idx} '{row[idx]}' of length {len(row[idx])} shorter than {cycles} cycles."
)
logging.error(
f"Could not generate {manifest_type} manifest because indexes are shorter than the number of index cycles. Skipping."
)
return (file_name, None)
# If there are any indexes longer than the number of cycles
if not df[df[idx].apply(len) > cycles].empty:
# For each one, log how it's trimmed
for row in df[df[idx].apply(len) > cycles].to_dict(orient="records"):
logging.info(
f"Trimming '{row['SampleName']}' {idx} '{row[idx]}' of length {len(row[idx])} to {cycles} cycles."
)
df[idx] = df[idx].apply(lambda x: x[:cycles])

elif manifest_type == "phix":
df = df[df["Project"] == "Control"]
samples_section = f"[SAMPLES]\n{df.to_csv(index=None, header=True)}"

elif manifest_type == "empty":
Expand All @@ -347,13 +362,82 @@ def make_manifest(
else:
raise AssertionError("Invalid manifest type.")

settings_section = "\n".join(
[
"[SETTINGS]",
"SettingName, Value",
]
)

# Customize mismatch thresholds, if necessary
if manifest_type not in ["untrimmed", "empty"]:
try:
logging.info(
f"Getting custom mismatch thresholds for {manifest_type} manifest..."
)
i1_mismatch, i2_mismatch = get_custom_mistmatch_thresholds(df)
except AssertionError as e:
logging.error(e)
logging.error(
f"Could not generate {manifest_type} manifest without index collisions. Skipping."
)
return (file_name, None)

settings_section += "\n" + "\n".join(
[
f"I1MismatchThreshold, {i1_mismatch}",
f"I2MismatchThreshold, {i2_mismatch}",
]
)

# Write manifest
manifest_contents = "\n\n".join(
[runValues_section, settings_section, samples_section]
)

return (file_name, manifest_contents)


def get_custom_mistmatch_thresholds(df: pd.DataFrame) -> tuple[int, int]:
# Defaults, according to Element documentation
i1MismatchThreshold = 1
i2MismatchThreshold = 1

# Collect distances
idx1_dists = []
idx2_dists = []
total_dists = []
# Iterate across all sample pairings per lane
for lane in df["Lane"].unique():
df_lane = df[df["Lane"] == lane]
df_lane.reset_index(drop=True, inplace=True)
for i in range(0, len(df_lane)):
for j in range(i + 1, len(df_lane)):
# TODO skip NNN
idx1_dist = distance(df_lane["Index1"][i], df_lane["Index1"][j])
idx2_dist = distance(df_lane["Index2"][i], df_lane["Index2"][j])

# Collect distances between all sample pairings on index and index-pair level
idx1_dists.append(idx1_dist)
idx2_dists.append(idx2_dist)
total_dists.append(idx1_dist + idx2_dist)

if min(total_dists) == 0:
raise AssertionError("Total index distance of 0 detected.")
if min(idx1_dists) <= 2:
logging.warning(
"Minimum distance between Index1 sequences is at or below 2. Reducing allowed mismatches from 1 to 0."
)
i1MismatchThreshold = 0
if min(idx2_dists) <= 2:
logging.warning(
"Minimum distance between Index2 sequences is at or below 2. Reducing allowed mismatches from 1 to 0."
)
i2MismatchThreshold = 0

return (i1MismatchThreshold, i2MismatchThreshold)


def check_distances(rows: list[dict], threshold=2) -> None:
"""Iterator function to check index distances between all pairs of samples."""
for i in range(len(rows)):
Expand Down Expand Up @@ -459,19 +543,18 @@ def main(args: Namespace):
manifest_root_name = f"AVITI_run_manifest_{flowcell_id}_{process.id}_{TIMESTAMP}_{process.technician.name.replace(' ', '')}"

# Create manifest(s)
manifests: list[tuple[str, str]] = get_manifests(process, manifest_root_name)
manifests: list[tuple[str, str | None]] = get_manifests(process, manifest_root_name)

# Write manifest(s)
for file, content in manifests:
open(file, "w").write(content)

# Zip manifest(s)
# Write and zip manifest(s)
zip_file = f"{manifest_root_name}.zip"
files = [file for file, _ in manifests]
with ZipFile(zip_file, "w") as zip_stream:
for file in files:
zip_stream.write(file)
os.remove(file)
for file, content in manifests:
if content:
open(file, "w").write(content)
zip_stream.write(file)
os.remove(file)
else:
logging.warning(f"Not writing {file} due to missing contents.")

# Upload manifest(s)
logging.info("Uploading run manifest to LIMS...")
Expand Down