Skip to content

Commit

Permalink
Better accounting for max-bytes for pull consumers
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Collison <[email protected]>
  • Loading branch information
derekcollison committed Sep 8, 2022
1 parent a774402 commit b32814d
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 51 deletions.
98 changes: 57 additions & 41 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3070,10 +3070,12 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {
// Deliver all the msgs we have now, once done or on a condition, we wait for new ones.
for {
var (
pmsg *jsPubMsg
dc uint64
dsubj string
delay time.Duration
pmsg *jsPubMsg
dc uint64
dsubj string
ackReply string
delay time.Duration
sz int
)
o.mu.Lock()
// consumer is closed when mset is set to nil.
Expand Down Expand Up @@ -3112,9 +3114,25 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {
}
}

// Update our cached num pending here first.
if dc == 1 && o.npcm > 0 {
o.npc--
}
// Pre-calculate ackReply
ackReply = o.ackReply(pmsg.seq, o.dseq, dc, pmsg.ts, o.numPending())

// If headers only do not send msg payload.
// Add in msg size itself as header.
if o.cfg.HeadersOnly {
convertToHeadersOnly(pmsg)
}
// Calculate payload size. This can be calculated on client side.
// We do not include transport subject here since not generally known on client.
sz = len(pmsg.subj) + +len(ackReply) + len(pmsg.hdr) + len(pmsg.msg)

if o.isPushMode() {
dsubj = o.dsubj
} else if wr := o.nextWaiting(len(pmsg.hdr) + len(pmsg.msg)); wr != nil {
} else if wr := o.nextWaiting(sz); wr != nil {
dsubj = wr.reply
if done := wr.recycleIfDone(); done && o.node != nil {
o.removeClusterPendingRequest(dsubj)
Expand All @@ -3124,6 +3142,9 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {
} else {
// We will redo this one.
o.sseq--
if dc == 1 && o.npcm > 0 {
o.npc++
}
pmsg.returnToPool()
goto waitForMsgs
}
Expand All @@ -3147,8 +3168,8 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {

// If we have a rate limit set make sure we check that here.
if o.rlimit != nil {
now, sm := time.Now(), &pmsg.StoreMsg
r := o.rlimit.ReserveN(now, len(sm.msg)+len(sm.hdr)+len(sm.subj)+len(dsubj)+len(o.ackReplyT))
now := time.Now()
r := o.rlimit.ReserveN(now, sz)
delay := r.DelayFrom(now)
if delay > 0 {
o.mu.Unlock()
Expand All @@ -3163,7 +3184,7 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {
}

// Do actual delivery.
o.deliverMsg(dsubj, pmsg, dc, rp)
o.deliverMsg(dsubj, ackReply, pmsg, dc, rp)

// Reset our idle heartbeat timer if set.
if hb != nil {
Expand Down Expand Up @@ -3283,49 +3304,44 @@ func (o *consumer) streamNumPending() uint64 {
return o.npc
}

func convertToHeadersOnly(pmsg *jsPubMsg) {
// If headers only do not send msg payload.
// Add in msg size itself as header.
hdr, msg := pmsg.hdr, pmsg.msg
var bb bytes.Buffer
if len(hdr) == 0 {
bb.WriteString(hdrLine)
} else {
bb.Write(hdr)
bb.Truncate(len(hdr) - LEN_CR_LF)
}
bb.WriteString(JSMsgSize)
bb.WriteString(": ")
bb.WriteString(strconv.FormatInt(int64(len(msg)), 10))
bb.WriteString(CR_LF)
bb.WriteString(CR_LF)
// Replace underlying buf which we can use directly when we send.
// TODO(dlc) - Probably just use directly when forming bytes.Buffer?
pmsg.buf = pmsg.buf[:0]
pmsg.buf = append(pmsg.buf, bb.Bytes()...)
// Replace with new header.
pmsg.hdr = pmsg.buf
// Cancel msg payload
pmsg.msg = nil
}

// Deliver a msg to the consumer.
// Lock should be held and o.mset validated to be non-nil.
func (o *consumer) deliverMsg(dsubj string, pmsg *jsPubMsg, dc uint64, rp RetentionPolicy) {
func (o *consumer) deliverMsg(dsubj, ackReply string, pmsg *jsPubMsg, dc uint64, rp RetentionPolicy) {
if o.mset == nil {
pmsg.returnToPool()
return
}

// Update our cached num pending.
if dc == 1 && o.npcm > 0 {
o.npc--
}

dseq := o.dseq
o.dseq++

// If headers only do not send msg payload.
// Add in msg size itself as header.
if o.cfg.HeadersOnly {
hdr, msg := pmsg.hdr, pmsg.msg
var bb bytes.Buffer
if len(hdr) == 0 {
bb.WriteString(hdrLine)
} else {
bb.Write(hdr)
bb.Truncate(len(hdr) - LEN_CR_LF)
}
bb.WriteString(JSMsgSize)
bb.WriteString(": ")
bb.WriteString(strconv.FormatInt(int64(len(msg)), 10))
bb.WriteString(CR_LF)
bb.WriteString(CR_LF)
// Replace underlying buf which we can use directly when we send.
// TODO(dlc) - Probably just use directly when forming bytes.Buffer?
pmsg.buf = pmsg.buf[:0]
pmsg.buf = append(pmsg.buf, bb.Bytes()...)
// Replace with new header.
pmsg.hdr = pmsg.buf
// Cancel msg payload
pmsg.msg = nil
}

pmsg.dsubj, pmsg.reply, pmsg.o = dsubj, o.ackReply(pmsg.seq, dseq, dc, pmsg.ts, o.numPending()), o
pmsg.dsubj, pmsg.reply, pmsg.o = dsubj, ackReply, o
psz := pmsg.size()

if o.maxpb > 0 {
Expand Down
20 changes: 10 additions & 10 deletions server/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17327,8 +17327,8 @@ func TestJetStreamPullMaxBytes(t *testing.T) {
require_NoError(t, err)

// Put in ~2MB, each ~100k
msz := 99_980
total, msg := 20, []byte(strings.Repeat("Z", msz))
msz, dsz := 100_000, 99_950
total, msg := 20, []byte(strings.Repeat("Z", dsz))

for i := 0; i < total; i++ {
if _, err := js.Publish("TEST", msg); err != nil {
Expand Down Expand Up @@ -17377,7 +17377,7 @@ func TestJetStreamPullMaxBytes(t *testing.T) {

m, err = sub.NextMsg(time.Second)
require_NoError(t, err)
require_True(t, len(m.Data) == msz)
require_True(t, len(m.Data) == dsz)
require_True(t, len(m.Header) == 0)
checkSubsPending(t, sub, 0)

Expand All @@ -17389,33 +17389,33 @@ func TestJetStreamPullMaxBytes(t *testing.T) {
for i := 0; i < 5; i++ {
m, err = sub.NextMsg(time.Second)
require_NoError(t, err)
require_True(t, len(m.Data) == msz)
require_True(t, len(m.Data) == dsz)
require_True(t, len(m.Header) == 0)
}
checkSubsPending(t, sub, 0)

// Now ask for large batch but make sure we are limited by batch size.
req = &JSApiConsumerGetNextRequest{Batch: 1_000, MaxBytes: msz * 5, NoWait: true}
req = &JSApiConsumerGetNextRequest{Batch: 1_000, MaxBytes: msz * 4, NoWait: true}
jreq, _ = json.Marshal(req)
nc.PublishRequest(subj, reply, jreq)
checkSubsPending(t, sub, 5)
for i := 0; i < 5; i++ {
m, err = sub.NextMsg(time.Second)
require_NoError(t, err)
require_True(t, len(m.Data) == msz)
require_True(t, len(m.Data) == dsz)
require_True(t, len(m.Header) == 0)
}
checkSubsPending(t, sub, 0)

req = &JSApiConsumerGetNextRequest{Batch: 1_000, MaxBytes: msz + 20, NoWait: true}
req = &JSApiConsumerGetNextRequest{Batch: 1_000, MaxBytes: msz, NoWait: true}
jreq, _ = json.Marshal(req)
nc.PublishRequest(subj, reply, jreq)
checkSubsPending(t, sub, 1)
checkSubsPending(t, sub, 2)
m, err = sub.NextMsg(time.Second)
require_NoError(t, err)
require_True(t, len(m.Data) == msz)
require_True(t, len(m.Data) == dsz)
require_True(t, len(m.Header) == 0)
checkSubsPending(t, sub, 0)
checkSubsPending(t, sub, 1)
}

func TestJetStreamStreamRepublishCycle(t *testing.T) {
Expand Down

0 comments on commit b32814d

Please sign in to comment.