Skip to content

Commit

Permalink
When a consumer had not filtered subject and was attached to a intere…
Browse files Browse the repository at this point in the history
…st policy retention stream we could incorrectly drop messages.

Signed-off-by: Derek Collison <[email protected]>
  • Loading branch information
derekcollison committed Feb 1, 2022
1 parent a55efc4 commit 12f5ea3
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 2 deletions.
47 changes: 47 additions & 0 deletions server/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14642,6 +14642,53 @@ func TestJetStreamMaxMsgsPerSubjectWithDiscardNew(t *testing.T) {
}
}

// Issue #2836
func TestJetStreamInterestRetentionBug(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()

if config := s.JetStreamConfig(); config != nil {
defer removeDir(t, config.StoreDir)
}

nc, js := jsClientConnect(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo.>"},
Retention: nats.InterestPolicy,
})
require_NoError(t, err)

_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{Durable: "c1", AckPolicy: nats.AckExplicitPolicy})
require_NoError(t, err)

test := func(token string, fseq, msgs uint64) {
t.Helper()
subj := fmt.Sprintf("foo.%s", token)
_, err = js.Publish(subj, nil)
require_NoError(t, err)
si, err := js.StreamInfo("TEST")
require_NoError(t, err)
if si.State.FirstSeq != fseq {
t.Fatalf("Expected first to be %d, got %d", fseq, si.State.FirstSeq)
}
if si.State.Msgs != msgs {
t.Fatalf("Expected msgs to be %d, got %d", msgs, si.State.Msgs)
}
}

test("bar", 1, 1)

// Create second filtered consumer.
_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{Durable: "c2", FilterSubject: "foo.foo", AckPolicy: nats.AckExplicitPolicy})
require_NoError(t, err)

test("bar", 1, 2)

}

///////////////////////////////////////////////////////////////////////////
// Simple JetStream Benchmarks
///////////////////////////////////////////////////////////////////////////
Expand Down
4 changes: 2 additions & 2 deletions server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -2923,10 +2923,10 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
if numConsumers == 0 {
noInterest = true
} else if mset.numFilter > 0 {
// Assume none.
// Assume no interest and check to disqualify.
noInterest = true
for _, o := range mset.consumers {
if o.cfg.FilterSubject != _EMPTY_ && subjectIsSubsetMatch(subject, o.cfg.FilterSubject) {
if o.cfg.FilterSubject == _EMPTY_ || subjectIsSubsetMatch(subject, o.cfg.FilterSubject) {
noInterest = false
break
}
Expand Down

0 comments on commit 12f5ea3

Please sign in to comment.