Skip to content

Commit

Permalink
Emit primer pairs in penalty order. (#87)
Browse files Browse the repository at this point in the history
This PR does two things:

1. Fix a bug where amplicons that were too _small_ were still emitted
2. Ensure that primer pairs are emitted in penalty order

(2) requires materializing a small tuple for all _valid_ pairs, and then
sorting by score. The tuple contains two ints (indices into the primer
sequences) and two floats (the penalty and the tm, the last for
convenience so we don't have to recompute it). It the sorts the tuples
by penalty, and starts generating PrimerPairs in penalty order.

If you have e.g. 500 left and 500 right primers, this could construct
~250k tuples and calculate 250k Tms, but in reality the number is
probably substantially smaller constrained by amplicon sizes.
  • Loading branch information
tfenne authored Dec 13, 2024
1 parent fbd1aa1 commit b781cd6
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 43 deletions.
124 changes: 83 additions & 41 deletions prymer/api/picking.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from pathlib import Path
from typing import Iterator
from typing import Optional
from typing import Tuple

from pysam import FastaFile

Expand Down Expand Up @@ -80,18 +81,19 @@ def score(
# The penalty for the amplicon melting temperature.
# The difference in melting temperature between the calculated and optimal is weighted by the
# product melting temperature.
tm = amplicon_tm
tm_penalty: float
if tm > amplicon_tms.opt:
tm_penalty = (tm - amplicon_tms.opt) * weights.product_tm_gt
if amplicon_tms.opt == 0.0:
tm_penalty = 0.0
elif amplicon_tm > amplicon_tms.opt:
tm_penalty = (amplicon_tm - amplicon_tms.opt) * weights.product_tm_gt
else:
tm_penalty = (amplicon_tms.opt - tm) * weights.product_tm_lt
tm_penalty = (amplicon_tms.opt - amplicon_tm) * weights.product_tm_lt

# Put it all together
return left_primer.penalty + right_primer.penalty + size_penalty + tm_penalty


def build_primer_pairs(
def build_primer_pairs( # noqa: C901
left_primers: Sequence[Oligo],
right_primers: Sequence[Oligo],
target: Span,
Expand Down Expand Up @@ -119,7 +121,8 @@ def build_primer_pairs(
fasta_path: the path to the FASTA file from which the amplicon sequence will be retrieved.
Returns:
an iterator over all the valid primer pairs, unsorted
An iterator over all the valid primer pairs, sorted by primer pair penalty.
Primer pairs with smaller penalties are returned first.
"""
# Short circuit if we have no left primers or no right primers
if not any(left_primers) or not any(right_primers):
Expand All @@ -131,50 +134,89 @@ def build_primer_pairs(
if any(p.span.refname != target.refname for p in right_primers):
raise ValueError("Right primers exist on different reference than target.")

# Sort the left and right primers
left_primers = sorted(left_primers, key=lambda p: p.span.start)
right_primers = sorted(right_primers, key=lambda p: p.span.end)

# Grab the sequence we'll use to fill in the amplicon sequence
with FastaFile(f"{fasta_path}") as fasta:
region_start = min(p.span.start for p in left_primers)
region_end = max(p.span.end for p in right_primers)
bases = fasta.fetch(target.refname, region_start - 1, region_end)

# Each tuple is left_idx, right_idx, penalty, tm
pairings: list[Tuple[int, int, float, float]] = []

# generate all the primer pairs that don't violate hard size and Tm constraints
first_right_primer_idx = 0

# Nested loops over indices are used here so that we can skip potentially large chunks of
# the cartesian product, based on the fact that we're sorting the left and right primers.
# Two things are relied upon:
# 1. If we encounter a left/right combo that either has the right primer leftward of the
# left primer _or_ generates a too-short amplicon, the neither that right primer nor
# any previous right primer can make a valid combination with any subsequent left primer.
# 2. If we encounter a left/right combo that generates a too-large amplicon, then no
# subsequent right-primer can make a valid combination with that left primer
for i in range(0, len(left_primers)):
for j in range(first_right_primer_idx, len(right_primers)):
lp = left_primers[i]
rp = right_primers[j]

# If the right primer isn't "to the right" of the left primer, move on
if rp.span.start < lp.span.start or lp.span.end > rp.span.end:
first_right_primer_idx = max(first_right_primer_idx, j+1)
continue

amp_span = PrimerPair.calculate_amplicon_span(lp, rp)

if amp_span.length < amplicon_sizes.min:
first_right_primer_idx = max(first_right_primer_idx, j+1)
continue

if amp_span.length > amplicon_sizes.max:
break # break in this case because all subsequent rps will yield longer amplicons

# Since the amplicon span and the region_start are both 1-based, the minuend
# becomes a zero-based offset
amp_bases = bases[amp_span.start - region_start : amp_span.end - region_start + 1]
amp_tm = calculate_long_seq_tm(amp_bases)

if amp_tm < amplicon_tms.min or amp_tm > amplicon_tms.max:
continue

penalty = score(
left_primer=lp,
right_primer=rp,
amplicon=amp_span,
amplicon_tm=amp_tm,
amplicon_sizes=amplicon_sizes,
amplicon_tms=amplicon_tms,
weights=weights,
)

pairings.append((i, j, penalty, amp_tm))

# Sort by the penalty, ascending
pairings.sort(key=lambda tup: tup[2])

with NtThermoAlign() as ntthal:
# generate all the primer pairs that don't violate hard size and Tm constraints
for lp in left_primers:
for rp in right_primers:
amp_span = PrimerPair.calculate_amplicon_span(lp, rp)
for i, j, penalty, tm in pairings:
lp = left_primers[i]
rp = right_primers[j]

# Ignore pairings with amplicon sizes out of the range specified
if not amplicon_sizes.min <= amp_span.length <= amplicon_sizes.max:
if max_heterodimer_tm is not None:
if ntthal.duplex_tm(lp.bases, rp.bases) > max_heterodimer_tm:
continue

# Since the amplicon span and the region_start are both 1-based, the minuend
# becomes a zero-based offset
amp_bases = bases[amp_span.start - region_start : amp_span.end - region_start + 1]
amp_tm = calculate_long_seq_tm(amp_bases)
amp_bases = bases[lp.span.start - region_start : rp.span.end - region_start + 1]

if amp_tm < amplicon_tms.min or amp_tm > amplicon_tms.max:
continue
pp = PrimerPair(
left_primer=lp,
right_primer=rp,
amplicon_sequence=amp_bases,
amplicon_tm=tm,
penalty=penalty,
)

if max_heterodimer_tm is not None:
if ntthal.duplex_tm(lp.bases, rp.bases) > max_heterodimer_tm:
continue

penalty = score(
left_primer=lp,
right_primer=rp,
amplicon=amp_span,
amplicon_tm=amp_tm,
amplicon_sizes=amplicon_sizes,
amplicon_tms=amplicon_tms,
weights=weights,
)

pp = PrimerPair(
left_primer=lp,
right_primer=rp,
amplicon_sequence=amp_bases,
amplicon_tm=amp_tm,
penalty=penalty,
)

yield pp
yield pp
4 changes: 2 additions & 2 deletions tests/api/test_picking.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,8 +273,8 @@ def test_build_primers_amplicon_size_filtering(
) -> None:
pairs = list(
picking.build_primer_pairs(
left_primers=[p("A" * 20, tm=60, pos=s, pen=1.0) for s in range(1, 151, 10)],
right_primers=[p("A" * 20, tm=60, pos=s, pen=1.0) for s in range(151, 301, 10)],
left_primers=[p("A" * 20, tm=60, pos=s, pen=1.0) for s in range(51, 301, 10)],
right_primers=[p("A" * 20, tm=60, pos=s, pen=1.0) for s in range(1, 401, 10)],
target=Span("chr1", 150, 160),
amplicon_sizes=MinOptMax(100, 150, 200),
amplicon_tms=MinOptMax(0.0, 0.0, 100.0),
Expand Down

0 comments on commit b781cd6

Please sign in to comment.