From 12f5ea365577ca370835e2bf782c823eb6a4ed91 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Tue, 1 Feb 2022 14:03:53 -0800 Subject: [PATCH] When a consumer had not filtered subject and was attached to a interest policy retention stream we could incorrectly drop messages. Signed-off-by: Derek Collison --- server/jetstream_test.go | 47 ++++++++++++++++++++++++++++++++++++++++ server/stream.go | 4 ++-- 2 files changed, 49 insertions(+), 2 deletions(-) diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 2eee7c5576f..00e75bc05b2 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -14642,6 +14642,53 @@ func TestJetStreamMaxMsgsPerSubjectWithDiscardNew(t *testing.T) { } } +// Issue #2836 +func TestJetStreamInterestRetentionBug(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + if config := s.JetStreamConfig(); config != nil { + defer removeDir(t, config.StoreDir) + } + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo.>"}, + Retention: nats.InterestPolicy, + }) + require_NoError(t, err) + + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{Durable: "c1", AckPolicy: nats.AckExplicitPolicy}) + require_NoError(t, err) + + test := func(token string, fseq, msgs uint64) { + t.Helper() + subj := fmt.Sprintf("foo.%s", token) + _, err = js.Publish(subj, nil) + require_NoError(t, err) + si, err := js.StreamInfo("TEST") + require_NoError(t, err) + if si.State.FirstSeq != fseq { + t.Fatalf("Expected first to be %d, got %d", fseq, si.State.FirstSeq) + } + if si.State.Msgs != msgs { + t.Fatalf("Expected msgs to be %d, got %d", msgs, si.State.Msgs) + } + } + + test("bar", 1, 1) + + // Create second filtered consumer. + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{Durable: "c2", FilterSubject: "foo.foo", AckPolicy: nats.AckExplicitPolicy}) + require_NoError(t, err) + + test("bar", 1, 2) + +} + /////////////////////////////////////////////////////////////////////////// // Simple JetStream Benchmarks /////////////////////////////////////////////////////////////////////////// diff --git a/server/stream.go b/server/stream.go index 6e579dc0815..4e5c9c4193a 100644 --- a/server/stream.go +++ b/server/stream.go @@ -2923,10 +2923,10 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, if numConsumers == 0 { noInterest = true } else if mset.numFilter > 0 { - // Assume none. + // Assume no interest and check to disqualify. noInterest = true for _, o := range mset.consumers { - if o.cfg.FilterSubject != _EMPTY_ && subjectIsSubsetMatch(subject, o.cfg.FilterSubject) { + if o.cfg.FilterSubject == _EMPTY_ || subjectIsSubsetMatch(subject, o.cfg.FilterSubject) { noInterest = false break }