Skip to content

Commit

Permalink
Server might crash if a pull consumer with inactivity threshold acks …
Browse files Browse the repository at this point in the history
…a msg then immediately deletes the consumer.

Signed-off-by: Derek Collison <[email protected]>
  • Loading branch information
derekcollison committed Nov 21, 2022
1 parent dde34ce commit 5e8c199
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 1 deletion.
6 changes: 5 additions & 1 deletion server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3130,10 +3130,14 @@ func (o *consumer) suppressDeletion() {
o.mu.Lock()
defer o.mu.Unlock()

if o.closed {
return
}

if o.isPushMode() && o.dtmr != nil {
// if dtmr is not nil we have started the countdown, simply reset to threshold.
o.dtmr.Reset(o.dthresh)
} else if o.isPullMode() {
} else if o.isPullMode() && o.waiting != nil {
// Pull mode always has timer running, just update last on waiting queue.
o.waiting.last = time.Now()
}
Expand Down
34 changes: 34 additions & 0 deletions server/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19827,3 +19827,37 @@ func TestJetStreamMsgIDHeaderCollision(t *testing.T) {

require_True(t, si.State.Msgs == 5)
}

// https://github.com/nats-io/nats-server/issues/3657
func TestJetStreamServerCrashOnPullConsumerDeleteWithInactiveThresholdAfterAck(t *testing.T) {
s := RunBasicJetStreamServer()
if config := s.JetStreamConfig(); config != nil {
defer removeDir(t, config.StoreDir)
}
defer s.Shutdown()

// Client for API requests.
nc, js := jsClientConnect(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
})
require_NoError(t, err)

sendStreamMsg(t, nc, "foo", "msg")

sub, err := js.PullSubscribe("foo", "dlc", nats.InactiveThreshold(10*time.Second))
require_NoError(t, err)

msgs := fetchMsgs(t, sub, 1, time.Second)
require_True(t, len(msgs) == 1)
msgs[0].Ack()
err = js.DeleteConsumer("TEST", "dlc")
require_NoError(t, err)

// If server crashes this will fail.
_, err = js.StreamInfo("TEST")
require_NoError(t, err)
}

0 comments on commit 5e8c199

Please sign in to comment.