Skip to content

Commit

Permalink
depthcharge upgrade - all unit tests pass
Browse files Browse the repository at this point in the history
  • Loading branch information
Lilferrit committed Oct 7, 2024
1 parent e58b0c0 commit 46e9606
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 68 deletions.
3 changes: 2 additions & 1 deletion casanovo/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ residues:
"P": 97.052764
"V": 99.068414
"T": 101.047670
"C[Carbamidomethyl]": 160.030649 # 103.009185 + 57.021464 "L": 113.084064
"C[Carbamidomethyl]": 160.030649 # 103.009185 + 57.021464
"L": 113.084064
"I": 113.084064
"N": 114.042927
"D": 115.026943
Expand Down
4 changes: 2 additions & 2 deletions casanovo/denovo/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ def _finish_beams(
minimum peptide length).
"""
# Check for tokens with a negative mass (i.e. neutral loss).
aa_neg_mass_idx = []
aa_neg_mass_idx = [None]
for aa, mass in self.tokenizer.residues.items():
if mass < 0:
# aa_neg_mass.append(aa)
Expand All @@ -369,7 +369,7 @@ def _finish_beams(
[
self.tokenizer.index[aa]
for aa in self.tokenizer.index
if aa.startswith(("+", "-", "[+", "[-"))
if aa.startswith("[") and aa.endswith("]-")
]
).to(self.decoder.device)

Expand Down
18 changes: 10 additions & 8 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ def tiny_config(tmp_path):
"P": 97.052764,
"V": 99.068414,
"T": 101.047670,
"C+57.021": 160.030649,
"C[Carbamidomethyl]": 160.030649, # 103.009185 + 57.021464
"L": 113.084064,
"I": 113.084064,
"N": 114.042927,
Expand All @@ -274,13 +274,15 @@ def tiny_config(tmp_path):
"R": 156.101111,
"Y": 163.063329,
"W": 186.079313,
"M+15.995": 147.035400,
"N+0.984": 115.026943,
"Q+0.984": 129.042594,
"+42.011": 42.010565,
"+43.006": 43.005814,
"-17.027": -17.026549,
"+43.006-17.027": 25.980265,
# Amino acid modifications.
"M[Oxidation]": 147.035400, # Met oxidation: 131.040485 + 15.994915
"N[Deamidated]": 115.026943, # Asn deamidation: 114.042927 + 0.984016
"Q[Deamidated]": 129.042594, # Gln deamidation: 128.058578 + 0.984016
# N-terminal modifications.
"[Acetyl]-": 42.010565, # Acetylation
"[Carbamyl]-": 43.005814, # Carbamylation "+43.006"
"[Ammonia-loss]-": -17.026549, # NH3 loss
"[+25.980265]-": 25.980265, # Carbamylation and NH3 loss
},
}

Expand Down
149 changes: 92 additions & 57 deletions tests/unit_tests/test_unit.py
Original file line number Diff line number Diff line change
Expand Up @@ -510,14 +510,15 @@ def test_beam_search_decode(tiny_config):
beam_fits_precursor,
pred_cache,
)

# Verify that the correct peptides have been cached.
correct_cached = 0
for _, _, _, pep in pred_cache[0]:
if torch.equal(pep, torch.tensor([4, 14, 4, 13])):
if torch.equal(pep, model.tokenizer.tokenize("PEPK")[0]):
correct_cached += 1
elif torch.equal(pep, torch.tensor([4, 14, 4, 18])):
elif torch.equal(pep, model.tokenizer.tokenize("PEPR")[0]):
correct_cached += 1
elif torch.equal(pep, torch.tensor([4, 14, 4])):
elif torch.equal(pep, model.tokenizer.tokenize("PEP")[0]):
correct_cached += 1
else:
pytest.fail(
Expand All @@ -529,57 +530,63 @@ def test_beam_search_decode(tiny_config):
# Return the candidate peptide with the highest score
test_cache = collections.OrderedDict((i, []) for i in range(batch))
heapq.heappush(
test_cache[0], (0.93, 0.1, 4 * [0.93], torch.tensor([4, 14, 4, 19]))
test_cache[0],
(0.93, 0.1, 4 * [0.93], model.tokenizer.tokenize("PEPY")[0]),
)
heapq.heappush(
test_cache[0], (0.95, 0.2, 4 * [0.95], torch.tensor([4, 14, 4, 13]))
test_cache[0],
(0.95, 0.2, 4 * [0.95], model.tokenizer.tokenize("PEPK")[0]),
)
heapq.heappush(
test_cache[0], (0.94, 0.3, 4 * [0.94], torch.tensor([4, 14, 4, 4]))
test_cache[0],
(0.94, 0.3, 4 * [0.94], model.tokenizer.tokenize("PEPP")[0]),
)

assert list(model._get_top_peptide(test_cache))[0][0][-1] == "PEPK"
assert torch.equal(
next(model._get_top_peptide(test_cache))[0][-1],
model.tokenizer.tokenize(["PEPK"])[0],
)
# Test that an empty predictions is returned when no beams have been
# finished.
empty_cache = collections.OrderedDict((i, []) for i in range(batch))
assert len(list(model._get_top_peptide(empty_cache))[0]) == 0
# Test multiple PSM per spectrum and if it's highest scoring peptides
model.top_match = 2
assert set(
[pep[-1] for pep in list(model._get_top_peptide(test_cache))[0]]
[
model.tokenizer.detokenize(pep[-1].unsqueeze(0))[0]
for pep in list(model._get_top_peptide(test_cache))[0]
]
) == {"PEPK", "PEPP"}

# Test _get_topk_beams().
# Set scores to proceed generating the unfinished beam.
step = 4
scores[2, step, :] = 0
scores[2, step, range(1, 5)] = torch.tensor([1.0, 2.0, 3.0, 4.0])
next_tokens = model.tokenizer.tokenize(["P", "S", "A", "G"]).flatten()
scores[2, step, next_tokens] = torch.tensor([4.0, 3.0, 2.0, 1.0])
# Modify finished beams array to allow decoding from only one beam
test_finished_beams = torch.tensor([True, True, False, True])
new_tokens, new_scores = model._get_topk_beams(
tokens, scores, test_finished_beams, batch, step
)
expected_tokens = torch.tensor(
[
[4, 14, 4, 1, 4],
[4, 14, 4, 1, 3],
[4, 14, 4, 1, 2],
[4, 14, 4, 1, 1],
]
expected_tokens = model.tokenizer.tokenize(
["PEPGP", "PEPGS", "PEPGA", "PEPGG"]
)

# Only the expected scores of the final step.
expected_scores = torch.zeros(beam, vocab)
expected_scores[:, range(1, 5)] = torch.tensor([1.0, 2.0, 3.0, 4.0])
expected_scores[:, next_tokens] = torch.tensor([4.0, 3.0, 2.0, 1.0])

assert torch.equal(new_tokens[:, : step + 1], expected_tokens)
assert torch.equal(new_scores[:, step, :], expected_scores)

# Test output if decoding loop isn't stopped with termination of all beams.
model.max_length = 0
# 1 spectrum with 5 peaks (2 values: m/z and intensity).
spectra = torch.zeros(1, 5, 2)
mzs = ints = torch.zeros(1, 5)
precursors = torch.tensor([[469.25364, 2.0, 235.63410]])
assert len(list(model.beam_search_decode(spectra, precursors))[0]) == 0
assert len(list(model.beam_search_decode(mzs, ints, precursors))[0]) == 0
model.max_length = 100

# Re-initialize scores and tokens to further test caching functionality.
Expand All @@ -590,8 +597,9 @@ def test_beam_search_decode(tiny_config):
tokens = torch.zeros(batch * beam, length, dtype=torch.int64)

scores[:, : step + 1, :] = 0
for i, peptide in enumerate(["PKKP$", "EPPK$", "PEPK$", "PMKP$"]):
tokens[i, : step + 1] = torch.tensor([aa2idx[aa] for aa in peptide])
tokens[:, : step + 1] = model.tokenizer.tokenize(
["PKKP", "EPPK", "PEPK", "PMKP"], add_stop=True
)
i, j, s = np.arange(step), np.arange(4), torch.Tensor([4, 0.5, 3, 0.4])
scores[:, i, :] = 1
scores[j, i, tokens[j, i]] = s
Expand All @@ -612,10 +620,16 @@ def test_beam_search_decode(tiny_config):
assert negative_score == 2

# Test using a single beam only.
model = Spec2Pep(n_beams=1, residues="massivekb", min_peptide_len=2)
model = Spec2Pep(
n_beams=1,
min_peptide_len=2,
tokenizer=depthcharge.tokenizers.peptides.MskbPeptideTokenizer(
residues=config.residues
),
)
vocab = len(model.tokenizer) + 1
beam = model.n_beams # S
model.decoder.reverse = False # For simplicity.
aa2idx = model.decoder._aa2idx
step = 4

# Initialize scores and tokens.
Expand All @@ -628,12 +642,14 @@ def test_beam_search_decode(tiny_config):
pred_cache = collections.OrderedDict((i, []) for i in range(batch))

# Ground truth peptide is "PEPK".
true_peptide = "PEPK$"
true_peptide = "PEPK"
precursors = torch.tensor([469.25364, 2.0, 235.63410]).repeat(
beam * batch, 1
)
scores[:, range(step), :] = 1
tokens[0, : step + 1] = torch.tensor([aa2idx[aa] for aa in true_peptide])
tokens[0, : step + 1] = model.tokenizer.tokenize(
true_peptide, add_stop=True
)[0]

# Test _finish_beams().
finished_beams, beam_fits_precursor, discarded_beams = model._finish_beams(
Expand All @@ -649,7 +665,9 @@ def test_beam_search_decode(tiny_config):
tokens, scores, step, finished_beams, beam_fits_precursor, pred_cache
)

assert torch.equal(pred_cache[0][0][-1], torch.tensor([4, 14, 4, 13]))
assert torch.equal(
pred_cache[0][0][-1], model.tokenizer.tokenize(true_peptide)[0]
)

# Test _get_topk_beams().
step = 1
Expand Down Expand Up @@ -680,18 +698,21 @@ def test_beam_search_decode(tiny_config):
assert torch.equal(new_tokens[:, : step + 1], expected_tokens)

# Test _finish_beams() for tokens with a negative mass.
model = Spec2Pep(n_beams=2, residues="massivekb")
model = Spec2Pep(
n_beams=2,
tokenizer=depthcharge.tokenizers.peptides.MskbPeptideTokenizer(
residues=config.residues
),
)
beam = model.n_beams # S
aa2idx = model.decoder._aa2idx
step = 1

# Ground truth peptide is "-17.027GK".
precursors = torch.tensor([186.10044, 2.0, 94.05750]).repeat(
beam * batch, 1
)
tokens = torch.zeros(batch * beam, length, dtype=torch.int64)
for i, peptide in enumerate(["GK", "AK"]):
tokens[i, : step + 1] = torch.tensor([aa2idx[aa] for aa in peptide])
tokens[:, : step + 1] = model.tokenizer.tokenize(["GK", "AK"])

# Test _finish_beams().
finished_beams, beam_fits_precursor, discarded_beams = model._finish_beams(
Expand All @@ -702,26 +723,34 @@ def test_beam_search_decode(tiny_config):
assert torch.equal(discarded_beams, torch.tensor([False, False]))

# Test _finish_beams() for multiple/internal N-mods and dummy predictions.
model = Spec2Pep(n_beams=3, residues="massivekb", min_peptide_len=3)
model = Spec2Pep(
n_beams=3,
min_peptide_len=3,
tokenizer=depthcharge.tokenizers.peptides.PeptideTokenizer(
residues=config.residues
),
)
beam = model.n_beams # S
model.decoder.reverse = True
aa2idx = model.decoder._aa2idx
step = 4

# Ground truth peptide is irrelevant for this test.
precursors = torch.tensor([1861.0044, 2.0, 940.5750]).repeat(
beam * batch, 1
)

# sequences with invalid mass modifications will raise an exception if
# tokenized using tokenizer.tokenize
tokens = torch.zeros(batch * beam, length, dtype=torch.int64)
# Reverse decoding
for i, peptide in enumerate(
[
["K", "A", "A", "A", "+43.006-17.027"],
["K", "A", "A", "+42.011", "A"],
["K", "A", "A", "+43.006", "+42.011"],
]
):
tokens[i, : step + 1] = torch.tensor([aa2idx[aa] for aa in peptide])
sequences = [
["K", "A", "A", "A", "[+25.980265]-"],
["K", "A", "A", "[Acetyl]-", "A"],
["K", "A", "A", "[Carbamyl]-", "[Ammonia-loss]-"],
]

for i, seq in enumerate(sequences):
tokens[i, : step + 1] = torch.tensor(
[model.tokenizer.index[aa] for aa in seq]
)

# Test _finish_beams(). All should be discarded
finished_beams, beam_fits_precursor, discarded_beams = model._finish_beams(
Expand All @@ -734,14 +763,19 @@ def test_beam_search_decode(tiny_config):
assert torch.equal(discarded_beams, torch.tensor([False, True, True]))

# Test _get_topk_beams() with finished beams in the batch.
model = Spec2Pep(n_beams=1, residues="massivekb", min_peptide_len=3)
model = Spec2Pep(
n_beams=1,
min_peptide_len=3,
tokenizer=depthcharge.tokenizers.peptides.PeptideTokenizer(
residues=config.residues
),
)

# Sizes and other variables.
batch = 2 # B
beam = model.n_beams # S
model.decoder.reverse = True
length = model.max_length + 1 # L
vocab = model.decoder.vocab_size + 1 # V
vocab = len(model.tokenizer) + 1 # V
step = 4

# Initialize dummy scores and tokens.
Expand All @@ -756,8 +790,8 @@ def test_beam_search_decode(tiny_config):
scores[:, step, range(1, 4)] = torch.tensor([1.0, 2.0, 3.0])

# Simulate one finished and one unfinished beam in the same batch.
tokens[0, :step] = torch.tensor([4, 14, 4, 28])
tokens[1, :step] = torch.tensor([4, 14, 4, 1])
tokens[0, :step] = model.tokenizer.tokenize("PEP", add_stop=True)[0]
tokens[1, :step] = model.tokenizer.tokenize("PEPG")[0]

# Set finished beams array to allow decoding from only one beam.
test_finished_beams = torch.tensor([True, False])
Expand All @@ -767,22 +801,23 @@ def test_beam_search_decode(tiny_config):
)

# Only the second peptide should have a new token predicted.
expected_tokens = torch.tensor(
[
[4, 14, 4, 28, 0],
[4, 14, 4, 1, 3],
]
)
expected_tokens = tokens.clone()
expected_tokens[1, len("PEPG")] = 3

assert torch.equal(new_tokens[:, : step + 1], expected_tokens)
assert torch.equal(new_tokens, expected_tokens)

# Test that duplicate peptide scores don't lead to a conflict in the cache.
model = Spec2Pep(n_beams=5, residues="massivekb", min_peptide_len=3)
model = Spec2Pep(
n_beams=1,
min_peptide_len=3,
tokenizer=depthcharge.tokenizers.peptides.PeptideTokenizer(
residues=config.residues
),
)
batch = 2 # B
beam = model.n_beams # S
model.decoder.reverse = True
length = model.max_length + 1 # L
vocab = model.decoder.vocab_size + 1 # V
vocab = len(model.tokenizer) + 1 # V
step = 4

# Simulate beams with identical amino acid scores but different tokens.
Expand Down

0 comments on commit 46e9606

Please sign in to comment.