Skip to content

Commit

Permalink
Extend the term advisory with a reason (#4697)
Browse files Browse the repository at this point in the history
 * Support getting a reason from the client doing a term
 * Supply some reasons when terminating messages during limits processing

Signed-off-by: R.I.Pienaar <[email protected]>
  • Loading branch information
ripienaar authored Oct 26, 2023
2 parents 4bad329 + c7a1445 commit 2fb1b1b
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 8 deletions.
22 changes: 15 additions & 7 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,10 @@ var (
AckNext = []byte("+NXT")
// Terminate delivery of the message.
AckTerm = []byte("+TERM")

// reasons to supply when terminating messages using limits
ackTermLimitsReason = "Message deleted by stream limits"
ackTermUnackedLimitsReason = "Unacknowledged message was deleted"
)

// Calculate accurate replicas for the consumer config with the parent stream config.
Expand Down Expand Up @@ -1991,8 +1995,12 @@ func (o *consumer) processAck(subject, reply string, hdr int, rmsg []byte) {
o.processNak(sseq, dseq, dc, msg)
case bytes.Equal(msg, AckProgress):
o.progressUpdate(sseq)
case bytes.Equal(msg, AckTerm):
o.processTerm(sseq, dseq, dc)
case bytes.HasPrefix(msg, AckTerm):
var reason string
if buf := msg[len(AckTerm):]; len(buf) > 0 {
reason = string(bytes.TrimSpace(buf))
}
o.processTerm(sseq, dseq, dc, reason)
}

// Ack the ack if requested.
Expand Down Expand Up @@ -2319,7 +2327,7 @@ func (o *consumer) processNak(sseq, dseq, dc uint64, nak []byte) {
}

// Process a TERM
func (o *consumer) processTerm(sseq, dseq, dc uint64) {
func (o *consumer) processTerm(sseq, dseq, dc uint64, reason string) {
// Treat like an ack to suppress redelivery.
o.processAckMsg(sseq, dseq, dc, false)

Expand All @@ -2338,6 +2346,7 @@ func (o *consumer) processTerm(sseq, dseq, dc uint64) {
ConsumerSeq: dseq,
StreamSeq: sseq,
Deliveries: dc,
Reason: reason,
Domain: o.srv.getOpts().JetStreamDomain,
}

Expand Down Expand Up @@ -3650,7 +3659,7 @@ func (o *consumer) checkAckFloor() {
o.mu.RUnlock()
// If it was pending for us, get rid of it.
if isPending {
o.processTerm(seq, p.Sequence, rdc)
o.processTerm(seq, p.Sequence, rdc, ackTermLimitsReason)
}
}
} else if numPending > 0 {
Expand All @@ -3675,7 +3684,7 @@ func (o *consumer) checkAckFloor() {

for i := 0; i < len(toTerm); i += 3 {
seq, dseq, rdc := toTerm[i], toTerm[i+1], toTerm[i+2]
o.processTerm(seq, dseq, rdc)
o.processTerm(seq, dseq, rdc, ackTermLimitsReason)
}
}

Expand Down Expand Up @@ -5151,11 +5160,10 @@ func (o *consumer) decStreamPending(sseq uint64, subj string) {
o.mu.Unlock()

// If it was pending process it like an ack.
// TODO(dlc) - we could do a term here instead with a reason to generate the advisory.
if wasPending {
// We could have lock for stream so do this in a go routine.
// TODO(dlc) - We should do this with ipq vs naked go routines.
go o.processTerm(sseq, p.Sequence, rdc)
go o.processTerm(sseq, p.Sequence, rdc, ackTermUnackedLimitsReason)
}
}

Expand Down
1 change: 1 addition & 0 deletions server/jetstream_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ type JSConsumerDeliveryTerminatedAdvisory struct {
ConsumerSeq uint64 `json:"consumer_seq"`
StreamSeq uint64 `json:"stream_seq"`
Deliveries uint64 `json:"deliveries"`
Reason string `json:"reason,omitempty"`
Domain string `json:"domain,omitempty"`
}

Expand Down
5 changes: 4 additions & 1 deletion server/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2935,7 +2935,7 @@ func TestJetStreamWorkQueueTerminateDelivery(t *testing.T) {
// We should get 1 back.
m := getMsg(1, 2)
// Now terminate
m.Respond(AckTerm)
m.Respond([]byte(fmt.Sprintf("%s with reason", string(AckTerm))))
time.Sleep(ackWait * 2)

// We should get 2 here, not 1 since we have indicated we wanted to terminate.
Expand Down Expand Up @@ -2963,6 +2963,9 @@ func TestJetStreamWorkQueueTerminateDelivery(t *testing.T) {
if adv.Deliveries != 2 {
t.Fatalf("Expected delivery count of %d, got %d", 2, adv.Deliveries)
}
if adv.Reason != "with reason" {
t.Fatalf("Advisory did not have a reason")
}
})
}
}
Expand Down

0 comments on commit 2fb1b1b

Please sign in to comment.