Skip to content

Commit

Permalink
Expose MaxPendingBytes on the natsOptions (#700)
Browse files Browse the repository at this point in the history
* Expose MaxPendingBytes on the natsOptions

* Fix tests and declare new method in header

* Fix comments

* Preserve old behaviour exactly

* Fix Comments

---------

Co-authored-by: Laurens Vergote <[email protected]>
Co-authored-by: Lev <[email protected]>
  • Loading branch information
3 people authored Apr 5, 2024
1 parent 8a00e4b commit ff8528f
Show file tree
Hide file tree
Showing 7 changed files with 198 additions and 13 deletions.
16 changes: 16 additions & 0 deletions src/nats.h
Original file line number Diff line number Diff line change
Expand Up @@ -2651,6 +2651,22 @@ natsOptions_SetReconnectBufSize(natsOptions *opts, int reconnectBufSize);
NATS_EXTERN natsStatus
natsOptions_SetMaxPendingMsgs(natsOptions *opts, int maxPending);

/** \brief Sets the maximum number of pending bytes per subscription.
*
* Specifies the maximum number of inbound bytes that can be buffered in the
* library, for each subscription, before inbound messages are dropped and
* #NATS_SLOW_CONSUMER status is reported to the #natsErrHandler callback (if
* one has been set).
*
* @see natsOptions_SetErrorHandler()
*
* @param opts the pointer to the #natsOptions object.
* @param maxPending the number of bytes allowed to be buffered by the
* library before triggering a slow consumer scenario.
*/
NATS_EXTERN natsStatus
natsOptions_SetMaxPendingBytes(natsOptions* opts, int64_t maxPending);

/** \brief Sets the error handler for asynchronous events.
*
* Specifies the callback to invoke when an asynchronous error
Expand Down
1 change: 1 addition & 0 deletions src/natsp.h
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ struct __natsOptions
int64_t pingInterval;
int maxPingsOut;
int maxPendingMsgs;
int64_t maxPendingBytes;

natsSSLCtx *sslCtx;

Expand Down
13 changes: 13 additions & 0 deletions src/opts.c
Original file line number Diff line number Diff line change
Expand Up @@ -883,6 +883,18 @@ natsOptions_SetMaxPendingMsgs(natsOptions *opts, int maxPending)
return NATS_OK;
}

natsStatus
natsOptions_SetMaxPendingBytes(natsOptions* opts, int64_t maxPending)
{
LOCK_AND_CHECK_OPTIONS(opts, (maxPending <= 0));

opts->maxPendingBytes = maxPending;

UNLOCK_OPTS(opts);

return NATS_OK;
}

natsStatus
natsOptions_SetErrorHandler(natsOptions *opts, natsErrHandler errHandler,
void *closure)
Expand Down Expand Up @@ -1517,6 +1529,7 @@ natsOptions_Create(natsOptions **newOpts)
opts->maxPingsOut = NATS_OPTS_DEFAULT_MAX_PING_OUT;
opts->ioBufSize = NATS_OPTS_DEFAULT_IO_BUF_SIZE;
opts->maxPendingMsgs = NATS_OPTS_DEFAULT_MAX_PENDING_MSGS;
opts->maxPendingBytes = -1;
opts->timeout = NATS_OPTS_DEFAULT_TIMEOUT;
opts->libMsgDelivery = natsLib_isLibHandlingMsgDeliveryByDefault();
opts->writeDeadline = natsLib_defaultWriteDeadline();
Expand Down
3 changes: 2 additions & 1 deletion src/opts.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ static inline void natsOptions_unlock(natsOptions *opts)
#define NATS_OPTS_DEFAULT_PING_INTERVAL (2 * 60 * 1000) // 2 minutes
#define NATS_OPTS_DEFAULT_MAX_PING_OUT (2)
#define NATS_OPTS_DEFAULT_IO_BUF_SIZE (32 * 1024) // 32 KB
#define NATS_OPTS_DEFAULT_MAX_PENDING_MSGS (65536)
#define NATS_OPTS_DEFAULT_MAX_PENDING_MSGS (65536) // 65536 messages
#define NATS_OPTS_DEFAULT_MAX_PENDING_BYTES (64 * 1024 * 1024) // 64 MB
#define NATS_OPTS_DEFAULT_RECONNECT_BUF_SIZE (8 * 1024 * 1024) // 8 MB
#define NATS_OPTS_DEFAULT_RECONNECT_JITTER (100) // 100 ms
#define NATS_OPTS_DEFAULT_RECONNECT_JITTER_TLS (1000) // 1 second
Expand Down
7 changes: 2 additions & 5 deletions src/sub.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "msg.h"
#include "util.h"
#include "js.h"
#include "opts.h"

#ifdef DEV_MODE

Expand Down Expand Up @@ -430,10 +431,6 @@ natsSub_create(natsSubscription **newSub, natsConnection *nc, const char *subj,
{
natsStatus s = NATS_OK;
natsSubscription *sub = NULL;
int bytesLimit = nc->opts->maxPendingMsgs * 1024;

if (bytesLimit <= 0)
return nats_setError(NATS_INVALID_ARG, "Invalid bytes limit of %d", bytesLimit);

sub = (natsSubscription*) NATS_CALLOC(1, sizeof(natsSubscription));
if (sub == NULL)
Expand All @@ -454,7 +451,7 @@ natsSub_create(natsSubscription **newSub, natsConnection *nc, const char *subj,
sub->msgCb = cb;
sub->msgCbClosure = cbClosure;
sub->msgsLimit = nc->opts->maxPendingMsgs;
sub->bytesLimit = bytesLimit;
sub->bytesLimit = nc->opts->maxPendingBytes == -1 ? nc->opts->maxPendingMsgs * 1024 : nc->opts->maxPendingBytes;;
sub->jsi = jsi;

sub->subject = NATS_STRDUP(subj);
Expand Down
6 changes: 4 additions & 2 deletions test/list.txt
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,8 @@ AsyncSubscriptionPending
AsyncSubscriptionPendingDrain
SyncSubscriptionPending
SyncSubscriptionPendingDrain
AsyncErrHandler
AsyncErrHandlerMaxPendingMsgs
AsyncErrHandlerMaxPendingBytes
AsyncErrHandlerSubDestroyed
AsyncSubscriberStarvation
AsyncSubscriberOnClose
Expand Down Expand Up @@ -266,7 +267,8 @@ MicroBasics
MicroStartStop
MicroServiceStopsOnClosedConn
MicroServiceStopsWhenServerStops
MicroAsyncErrorHandler
MicroAsyncErrorHandlerMaxPendingMsgs
MicroAsyncErrorHandlerMaxPendingBytes
StanPBufAllocator
StanConnOptions
StanSubOptions
Expand Down
165 changes: 160 additions & 5 deletions test/test.c
Original file line number Diff line number Diff line change
Expand Up @@ -2581,6 +2581,7 @@ test_natsOptions(void)
&& (opts->maxPingsOut == 2)
&& (opts->ioBufSize == 32 * 1024)
&& (opts->maxPendingMsgs == 65536)
&& (opts->maxPendingBytes == -1)
&& (opts->user == NULL)
&& (opts->password == NULL)
&& (opts->token == NULL)
Expand Down Expand Up @@ -2812,6 +2813,10 @@ test_natsOptions(void)
s = natsOptions_SetMaxPendingMsgs(opts, 10000);
testCond((s == NATS_OK) && (opts->maxPendingMsgs == 10000));

test("Set Max Pending Bytes : ");
s = natsOptions_SetMaxPendingBytes(opts, 1000000);
testCond((s == NATS_OK) && (opts->maxPendingBytes == 1000000))

test("Set Error Handler: ");
s = natsOptions_SetErrorHandler(opts, _dummyErrHandler, NULL);
testCond((s == NATS_OK) && (opts->asyncErrCb == _dummyErrHandler));
Expand Down Expand Up @@ -14340,13 +14345,13 @@ _asyncErrCb(natsConnection *nc, natsSubscription *sub, natsStatus err, void* clo

arg->closed = true;
arg->done = true;
natsCondition_Signal(arg->c);
natsCondition_Broadcast(arg->c);

natsMutex_Unlock(arg->m);
}

static void
test_AsyncErrHandler(void)
test_AsyncErrHandler_MaxPendingMsgs(void)
{
natsStatus s;
natsConnection *nc = NULL;
Expand Down Expand Up @@ -14408,6 +14413,71 @@ test_AsyncErrHandler(void)
_stopServer(serverPid);
}

static void
test_AsyncErrHandler_MaxPendingBytes(void)
{
natsStatus s;
natsConnection* nc = NULL;
natsOptions* opts = NULL;
natsSubscription* sub = NULL;
natsPid serverPid = NATS_INVALID_PID;
struct threadArg arg;
int data_len = 10;
const char msg[] = { 0,1,2,3,4,5,6,7,8,9 }; //10 bytes long message
int64_t pendingBytesLimit = 100;
int i = 0;

s = _createDefaultThreadArgsForCbTests(&arg);
if (s != NATS_OK)
FAIL("Unable to setup test!");

arg.status = NATS_OK;
arg.control = 7;

s = natsOptions_Create(&opts);
IFOK(s, natsOptions_SetURL(opts, NATS_DEFAULT_URL));
IFOK(s, natsOptions_SetMaxPendingBytes(opts, pendingBytesLimit));
IFOK(s, natsOptions_SetErrorHandler(opts, _asyncErrCb, (void*)&arg));

if (s != NATS_OK)
FAIL("Unable to create options for test AsyncErrHandler");

serverPid = _startServer("nats://127.0.0.1:4222", NULL, true);
CHECK_SERVER_STARTED(serverPid);

s = natsConnection_Connect(&nc, opts);
IFOK(s, natsConnection_Subscribe(&sub, nc, "async_test", _recvTestString, (void*)&arg));

natsMutex_Lock(arg.m);
arg.sub = sub;
natsMutex_Unlock(arg.m);
for (i=0;
(s == NATS_OK) && (i < (pendingBytesLimit + 100)); i+=data_len) //increment by 10 (message size) each iteration
{
s = natsConnection_Publish(nc, "async_test", msg, data_len);
}
IFOK(s, natsConnection_Flush(nc));

// Wait for async err callback
natsMutex_Lock(arg.m);
while ((s != NATS_TIMEOUT) && !arg.done)
s = natsCondition_TimedWait(arg.c, arg.m, 2000);
natsMutex_Unlock(arg.m);

test("Aync fired properly, and all checks are good: ");
testCond((s == NATS_OK)
&& arg.done
&& arg.closed
&& (arg.status == NATS_OK));

natsOptions_Destroy(opts);
natsSubscription_Destroy(sub);
natsConnection_Destroy(nc);

_destroyDefaultThreadArgs(&arg);

_stopServer(serverPid);
}

static void
_asyncErrBlockingCb(natsConnection *nc, natsSubscription *sub, natsStatus err, void* closure)
Expand Down Expand Up @@ -33537,7 +33607,7 @@ _microAsyncErrorRequestHandler(microRequest *req)
}

static void
test_MicroAsyncErrorHandler(void)
test_MicroAsyncErrorHandler_MaxPendingMsgs(void)
{
natsStatus s;
struct threadArg arg;
Expand Down Expand Up @@ -33615,6 +33685,89 @@ test_MicroAsyncErrorHandler(void)
_stopServer(serverPid);
}

static void
test_MicroAsyncErrorHandler_MaxPendingBytes(void)
{
natsStatus s;
struct threadArg arg;
natsConnection* nc = NULL;
natsOptions* opts = NULL;
natsPid serverPid = NATS_INVALID_PID;
microService* m = NULL;
microEndpoint* ep = NULL;
microEndpointConfig ep_cfg = {
.Name = "do",
.Subject = "async_test",
.Handler = _microAsyncErrorRequestHandler,
};
microServiceConfig cfg = {
.Name = "test",
.Version = "1.0.0",
.ErrHandler = _microAsyncErrorHandler,
.State = &arg,
.DoneHandler = _microServiceDoneHandler,
};
int data_len = 10;
const char msg[] = { 0,1,2,3,4,5,6,7,8,9 }; //10 bytes long message
int64_t pendingBytesLimit = 100;
int i = 0;

s = _createDefaultThreadArgsForCbTests(&arg);
if (s != NATS_OK)
FAIL("Unable to setup test!");

s = natsOptions_Create(&opts);
IFOK(s, natsOptions_SetURL(opts, NATS_DEFAULT_URL));
IFOK(s, natsOptions_SetMaxPendingBytes(opts, pendingBytesLimit));
if (s != NATS_OK)
FAIL("Unable to create options for test AsyncErrHandler");

serverPid = _startServer("nats://127.0.0.1:4222", NULL, true);
CHECK_SERVER_STARTED(serverPid);

test("Connect to NATS: ");
testCond(NATS_OK == natsConnection_Connect(&nc, opts));

_startMicroservice(&m, nc, &cfg, NULL, 0, &arg);

test("Test microservice is running: ");
testCond(!microService_IsStopped(m))

test("Add test endpoint: ");
testCond(NULL == micro_add_endpoint(&ep, m, NULL, &ep_cfg, true));

natsMutex_Lock(arg.m);
arg.status = NATS_OK;
natsMutex_Unlock(arg.m);

test("Cause an error by sending too many messages: ");
for (i=0;
(s == NATS_OK) && (i < (pendingBytesLimit + 100)); i+=data_len) //increment by 10 (message size) each iteration
{
s = natsConnection_Publish(nc, "async_test", msg, data_len);
}
testCond((NATS_OK == s)
&& (NATS_OK == natsConnection_Flush(nc)));

test("Wait for async err callback: ");
natsMutex_Lock(arg.m);
while ((s != NATS_TIMEOUT) && !arg.closed)
s = natsCondition_TimedWait(arg.c, arg.m, 1000);
natsMutex_Unlock(arg.m);
testCond((s == NATS_OK) && arg.closed && (arg.status == NATS_SLOW_CONSUMER));

microService_Destroy(m);
_waitForMicroservicesAllDone(&arg);

test("Destroy the test connection: ");
natsConnection_Destroy(nc);
testCond(NATS_OK == _waitForConnClosed(&arg));

natsOptions_Destroy(opts);
_destroyDefaultThreadArgs(&arg);
_stopServer(serverPid);
}

#if defined(NATS_HAS_STREAMING)

static int
Expand Down Expand Up @@ -35996,7 +36149,8 @@ static testInfo allTests[] =
{"AsyncSubscriptionPendingDrain", test_AsyncSubscriptionPendingDrain},
{"SyncSubscriptionPending", test_SyncSubscriptionPending},
{"SyncSubscriptionPendingDrain", test_SyncSubscriptionPendingDrain},
{"AsyncErrHandler", test_AsyncErrHandler},
{"AsyncErrHandlerMaxPendingMsgs", test_AsyncErrHandler_MaxPendingMsgs},
{"AsyncErrHandlerMaxPendingBytes", test_AsyncErrHandler_MaxPendingBytes },
{"AsyncErrHandlerSubDestroyed", test_AsyncErrHandlerSubDestroyed},
{"AsyncSubscriberStarvation", test_AsyncSubscriberStarvation},
{"AsyncSubscriberOnClose", test_AsyncSubscriberOnClose},
Expand Down Expand Up @@ -36122,7 +36276,8 @@ static testInfo allTests[] =
{"MicroStartStop", test_MicroStartStop},
{"MicroServiceStopsOnClosedConn", test_MicroServiceStopsOnClosedConn},
{"MicroServiceStopsWhenServerStops", test_MicroServiceStopsWhenServerStops},
{"MicroAsyncErrorHandler", test_MicroAsyncErrorHandler},
{"MicroAsyncErrorHandlerMaxPendingMsgs", test_MicroAsyncErrorHandler_MaxPendingMsgs},
{"MicroAsyncErrorHandlerMaxPendingBytes", test_MicroAsyncErrorHandler_MaxPendingBytes },

#if defined(NATS_HAS_STREAMING)
{"StanPBufAllocator", test_StanPBufAllocator},
Expand Down

0 comments on commit ff8528f

Please sign in to comment.