Skip to content

Commit

Permalink
Merge pull request #583 from nats-io/msg_hdr_data_len
Browse files Browse the repository at this point in the history
[CHANGED] Count message headers toward subscription limits
  • Loading branch information
kozlovic authored Sep 19, 2022
2 parents 75166b4 + 79a13a8 commit 5b8f3aa
Show file tree
Hide file tree
Showing 7 changed files with 16 additions and 15 deletions.
12 changes: 5 additions & 7 deletions src/conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -2592,7 +2592,6 @@ natsConn_processMsg(natsConnection *nc, char *buf, int bufLen)
natsMsgDlvWorker *ldw = NULL;
bool sc = false;
bool sm = false;
int dl = 0;
nats_MsgList *list = NULL;
natsMutex *mu = NULL;
natsCondition *cond = NULL;
Expand Down Expand Up @@ -2626,10 +2625,9 @@ natsConn_processMsg(natsConnection *nc, char *buf, int bufLen)
s = _createMsg(&msg, nc, buf, bufLen, nc->ps->ma.hdr);
if (s != NATS_OK)
return s;
// bufLen is the overall buffer len. In presence of headers, we need
// to capture the real message payload data length, which has been
// computed as the bufLen - header size.
dl = msg->dataLen;
// bufLen is the total length of headers + data. Since headers become
// more and more prevalent, it makes sense to count them both toward
// the subscription's pending limit. So use bufLen for accounting.

if (mf != NULL)
{
Expand Down Expand Up @@ -2692,7 +2690,7 @@ natsConn_processMsg(natsConnection *nc, char *buf, int bufLen)
if (!ctrlMsg)
{
sub->msgList.msgs++;
sub->msgList.bytes += dl;
sub->msgList.bytes += bufLen;

if (((sub->msgsLimit > 0) && (sub->msgList.msgs > sub->msgsLimit))
|| ((sub->bytesLimit > 0) && (sub->msgList.bytes > sub->bytesLimit)))
Expand All @@ -2706,7 +2704,7 @@ natsConn_processMsg(natsConnection *nc, char *buf, int bufLen)

// Undo stats from above.
sub->msgList.msgs--;
sub->msgList.bytes -= dl;
sub->msgList.bytes -= bufLen;
}
else
{
Expand Down
2 changes: 1 addition & 1 deletion src/js.c
Original file line number Diff line number Diff line change
Expand Up @@ -848,7 +848,7 @@ _timeoutPubAsync(natsTimer *t, void *closure)
}
js->rsub->msgList.tail = m;
js->rsub->msgList.msgs++;
js->rsub->msgList.bytes += m->dataLen;
js->rsub->msgList.bytes += natsMsg_dataAndHdrLen(m);
natsSub_Unlock(js->rsub);

js->pmHead = pm->next;
Expand Down
2 changes: 2 additions & 0 deletions src/msg.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
#define natsMsg_isTimeout(m) (((m)->flags & (1 << 3)) != 0)
#define natsMsg_clearTimeout(m) ((m)->flags &= ~(1 << 3))

#define natsMsg_dataAndHdrLen(m) ((m)->dataLen + (m)->hdrLen)

struct __natsMsg
{
natsGCItem gc;
Expand Down
2 changes: 1 addition & 1 deletion src/nats.c
Original file line number Diff line number Diff line change
Expand Up @@ -1750,7 +1750,7 @@ _deliverMsgs(void *arg)

// Update before checking closed state.
sub->msgList.msgs--;
sub->msgList.bytes -= msg->dataLen;
sub->msgList.bytes -= natsMsg_dataAndHdrLen(msg);

// Need to check for closed subscription again here.
// The subscription could have been unsubscribed from a callback
Expand Down
7 changes: 4 additions & 3 deletions src/pub.c
Original file line number Diff line number Diff line change
Expand Up @@ -120,14 +120,16 @@ natsConn_publish(natsConnection *nc, natsMsg *msg, const char *reply, bool direc
totalLen = hdrl;
}
}
// This will represent headers + data
totalLen += msg->dataLen;

if (!nc->initc && ((int64_t) msg->dataLen > nc->info.maxPayload))
if (!nc->initc && ((int64_t) totalLen > nc->info.maxPayload))
{
natsConn_Unlock(nc);

return nats_setError(NATS_MAX_PAYLOAD,
"Payload %d greater than maximum allowed: %" PRId64,
msg->dataLen, nc->info.maxPayload);
totalLen, nc->info.maxPayload);
}

// Check if we are reconnecting, and if so check if
Expand All @@ -142,7 +144,6 @@ natsConn_publish(natsConnection *nc, natsMsg *msg, const char *reply, bool direc
}
}

totalLen += msg->dataLen;
GETBYTES_SIZE(totalLen, dlb, dli)
dlSize = (BYTES_SIZE_MAX - dli);

Expand Down
4 changes: 2 additions & 2 deletions src/sub.c
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ natsSub_deliverMsgs(void *arg)
sub->msgList.tail = NULL;

sub->msgList.msgs--;
sub->msgList.bytes -= msg->dataLen;
sub->msgList.bytes -= natsMsg_dataAndHdrLen(msg);

msg->next = NULL;

Expand Down Expand Up @@ -731,7 +731,7 @@ natsSub_nextMsg(natsMsg **nextMsg, natsSubscription *sub, int64_t timeout, bool
sub->msgList.tail = NULL;

sub->msgList.msgs--;
sub->msgList.bytes -= msg->dataLen;
sub->msgList.bytes -= natsMsg_dataAndHdrLen(msg);

msg->next = NULL;

Expand Down
2 changes: 1 addition & 1 deletion test/test.c
Original file line number Diff line number Diff line change
Expand Up @@ -24142,7 +24142,7 @@ test_JetStreamPublishAsync(void)
rsub->msgList.head = msg;
rsub->msgList.tail = msg;
rsub->msgList.msgs = 1;
rsub->msgList.bytes = msg->dataLen;
rsub->msgList.bytes = natsMsg_dataAndHdrLen(msg);
natsCondition_Signal(rsub->cond);
natsSub_Unlock(rsub);

Expand Down

0 comments on commit 5b8f3aa

Please sign in to comment.