diff --git a/src/nats.h b/src/nats.h index 1ad558700..3b1433d73 100644 --- a/src/nats.h +++ b/src/nats.h @@ -2651,6 +2651,22 @@ natsOptions_SetReconnectBufSize(natsOptions *opts, int reconnectBufSize); NATS_EXTERN natsStatus natsOptions_SetMaxPendingMsgs(natsOptions *opts, int maxPending); +/** \brief Sets the maximum number of pending bytes per subscription. + * + * Specifies the maximum number of inbound bytes that can be buffered in the + * library, for each subscription, before inbound messages are dropped and + * #NATS_SLOW_CONSUMER status is reported to the #natsErrHandler callback (if + * one has been set). + * + * @see natsOptions_SetErrorHandler() + * + * @param opts the pointer to the #natsOptions object. + * @param maxPending the number of bytes allowed to be buffered by the + * library before triggering a slow consumer scenario. + */ +NATS_EXTERN natsStatus +natsOptions_SetMaxPendingBytes(natsOptions* opts, int64_t maxPending); + /** \brief Sets the error handler for asynchronous events. * * Specifies the callback to invoke when an asynchronous error diff --git a/src/natsp.h b/src/natsp.h index acd4648c6..c749b84bb 100644 --- a/src/natsp.h +++ b/src/natsp.h @@ -265,6 +265,7 @@ struct __natsOptions int64_t pingInterval; int maxPingsOut; int maxPendingMsgs; + int64_t maxPendingBytes; natsSSLCtx *sslCtx; diff --git a/src/opts.c b/src/opts.c index 8955d8d7d..1c7864635 100644 --- a/src/opts.c +++ b/src/opts.c @@ -883,6 +883,18 @@ natsOptions_SetMaxPendingMsgs(natsOptions *opts, int maxPending) return NATS_OK; } +natsStatus +natsOptions_SetMaxPendingBytes(natsOptions* opts, int64_t maxPending) +{ + LOCK_AND_CHECK_OPTIONS(opts, (maxPending <= 0)); + + opts->maxPendingBytes = maxPending; + + UNLOCK_OPTS(opts); + + return NATS_OK; +} + natsStatus natsOptions_SetErrorHandler(natsOptions *opts, natsErrHandler errHandler, void *closure) @@ -1517,6 +1529,7 @@ natsOptions_Create(natsOptions **newOpts) opts->maxPingsOut = NATS_OPTS_DEFAULT_MAX_PING_OUT; opts->ioBufSize = NATS_OPTS_DEFAULT_IO_BUF_SIZE; opts->maxPendingMsgs = NATS_OPTS_DEFAULT_MAX_PENDING_MSGS; + opts->maxPendingBytes = -1; opts->timeout = NATS_OPTS_DEFAULT_TIMEOUT; opts->libMsgDelivery = natsLib_isLibHandlingMsgDeliveryByDefault(); opts->writeDeadline = natsLib_defaultWriteDeadline(); diff --git a/src/opts.h b/src/opts.h index 3955634e1..81a39aa27 100644 --- a/src/opts.h +++ b/src/opts.h @@ -40,7 +40,8 @@ static inline void natsOptions_unlock(natsOptions *opts) #define NATS_OPTS_DEFAULT_PING_INTERVAL (2 * 60 * 1000) // 2 minutes #define NATS_OPTS_DEFAULT_MAX_PING_OUT (2) #define NATS_OPTS_DEFAULT_IO_BUF_SIZE (32 * 1024) // 32 KB -#define NATS_OPTS_DEFAULT_MAX_PENDING_MSGS (65536) +#define NATS_OPTS_DEFAULT_MAX_PENDING_MSGS (65536) // 65536 messages +#define NATS_OPTS_DEFAULT_MAX_PENDING_BYTES (64 * 1024 * 1024) // 64 MB #define NATS_OPTS_DEFAULT_RECONNECT_BUF_SIZE (8 * 1024 * 1024) // 8 MB #define NATS_OPTS_DEFAULT_RECONNECT_JITTER (100) // 100 ms #define NATS_OPTS_DEFAULT_RECONNECT_JITTER_TLS (1000) // 1 second diff --git a/src/sub.c b/src/sub.c index 3c68c0ed6..499909797 100644 --- a/src/sub.c +++ b/src/sub.c @@ -22,6 +22,7 @@ #include "msg.h" #include "util.h" #include "js.h" +#include "opts.h" #ifdef DEV_MODE @@ -430,10 +431,6 @@ natsSub_create(natsSubscription **newSub, natsConnection *nc, const char *subj, { natsStatus s = NATS_OK; natsSubscription *sub = NULL; - int bytesLimit = nc->opts->maxPendingMsgs * 1024; - - if (bytesLimit <= 0) - return nats_setError(NATS_INVALID_ARG, "Invalid bytes limit of %d", bytesLimit); sub = (natsSubscription*) NATS_CALLOC(1, sizeof(natsSubscription)); if (sub == NULL) @@ -454,7 +451,7 @@ natsSub_create(natsSubscription **newSub, natsConnection *nc, const char *subj, sub->msgCb = cb; sub->msgCbClosure = cbClosure; sub->msgsLimit = nc->opts->maxPendingMsgs; - sub->bytesLimit = bytesLimit; + sub->bytesLimit = nc->opts->maxPendingBytes == -1 ? nc->opts->maxPendingMsgs * 1024 : nc->opts->maxPendingBytes;; sub->jsi = jsi; sub->subject = NATS_STRDUP(subj); diff --git a/test/list.txt b/test/list.txt index af5eb0c9e..18f5b898d 100644 --- a/test/list.txt +++ b/test/list.txt @@ -146,7 +146,8 @@ AsyncSubscriptionPending AsyncSubscriptionPendingDrain SyncSubscriptionPending SyncSubscriptionPendingDrain -AsyncErrHandler +AsyncErrHandlerMaxPendingMsgs +AsyncErrHandlerMaxPendingBytes AsyncErrHandlerSubDestroyed AsyncSubscriberStarvation AsyncSubscriberOnClose @@ -266,7 +267,8 @@ MicroBasics MicroStartStop MicroServiceStopsOnClosedConn MicroServiceStopsWhenServerStops -MicroAsyncErrorHandler +MicroAsyncErrorHandlerMaxPendingMsgs +MicroAsyncErrorHandlerMaxPendingBytes StanPBufAllocator StanConnOptions StanSubOptions diff --git a/test/test.c b/test/test.c index 0fe1ea7c9..d0a85759d 100644 --- a/test/test.c +++ b/test/test.c @@ -2581,6 +2581,7 @@ test_natsOptions(void) && (opts->maxPingsOut == 2) && (opts->ioBufSize == 32 * 1024) && (opts->maxPendingMsgs == 65536) + && (opts->maxPendingBytes == -1) && (opts->user == NULL) && (opts->password == NULL) && (opts->token == NULL) @@ -2812,6 +2813,10 @@ test_natsOptions(void) s = natsOptions_SetMaxPendingMsgs(opts, 10000); testCond((s == NATS_OK) && (opts->maxPendingMsgs == 10000)); + test("Set Max Pending Bytes : "); + s = natsOptions_SetMaxPendingBytes(opts, 1000000); + testCond((s == NATS_OK) && (opts->maxPendingBytes == 1000000)) + test("Set Error Handler: "); s = natsOptions_SetErrorHandler(opts, _dummyErrHandler, NULL); testCond((s == NATS_OK) && (opts->asyncErrCb == _dummyErrHandler)); @@ -14340,13 +14345,13 @@ _asyncErrCb(natsConnection *nc, natsSubscription *sub, natsStatus err, void* clo arg->closed = true; arg->done = true; - natsCondition_Signal(arg->c); + natsCondition_Broadcast(arg->c); natsMutex_Unlock(arg->m); } static void -test_AsyncErrHandler(void) +test_AsyncErrHandler_MaxPendingMsgs(void) { natsStatus s; natsConnection *nc = NULL; @@ -14408,6 +14413,71 @@ test_AsyncErrHandler(void) _stopServer(serverPid); } +static void +test_AsyncErrHandler_MaxPendingBytes(void) +{ + natsStatus s; + natsConnection* nc = NULL; + natsOptions* opts = NULL; + natsSubscription* sub = NULL; + natsPid serverPid = NATS_INVALID_PID; + struct threadArg arg; + int data_len = 10; + const char msg[] = { 0,1,2,3,4,5,6,7,8,9 }; //10 bytes long message + int64_t pendingBytesLimit = 100; + int i = 0; + + s = _createDefaultThreadArgsForCbTests(&arg); + if (s != NATS_OK) + FAIL("Unable to setup test!"); + + arg.status = NATS_OK; + arg.control = 7; + + s = natsOptions_Create(&opts); + IFOK(s, natsOptions_SetURL(opts, NATS_DEFAULT_URL)); + IFOK(s, natsOptions_SetMaxPendingBytes(opts, pendingBytesLimit)); + IFOK(s, natsOptions_SetErrorHandler(opts, _asyncErrCb, (void*)&arg)); + + if (s != NATS_OK) + FAIL("Unable to create options for test AsyncErrHandler"); + + serverPid = _startServer("nats://127.0.0.1:4222", NULL, true); + CHECK_SERVER_STARTED(serverPid); + + s = natsConnection_Connect(&nc, opts); + IFOK(s, natsConnection_Subscribe(&sub, nc, "async_test", _recvTestString, (void*)&arg)); + + natsMutex_Lock(arg.m); + arg.sub = sub; + natsMutex_Unlock(arg.m); + for (i=0; + (s == NATS_OK) && (i < (pendingBytesLimit + 100)); i+=data_len) //increment by 10 (message size) each iteration + { + s = natsConnection_Publish(nc, "async_test", msg, data_len); + } + IFOK(s, natsConnection_Flush(nc)); + + // Wait for async err callback + natsMutex_Lock(arg.m); + while ((s != NATS_TIMEOUT) && !arg.done) + s = natsCondition_TimedWait(arg.c, arg.m, 2000); + natsMutex_Unlock(arg.m); + + test("Aync fired properly, and all checks are good: "); + testCond((s == NATS_OK) + && arg.done + && arg.closed + && (arg.status == NATS_OK)); + + natsOptions_Destroy(opts); + natsSubscription_Destroy(sub); + natsConnection_Destroy(nc); + + _destroyDefaultThreadArgs(&arg); + + _stopServer(serverPid); +} static void _asyncErrBlockingCb(natsConnection *nc, natsSubscription *sub, natsStatus err, void* closure) @@ -33537,7 +33607,7 @@ _microAsyncErrorRequestHandler(microRequest *req) } static void -test_MicroAsyncErrorHandler(void) +test_MicroAsyncErrorHandler_MaxPendingMsgs(void) { natsStatus s; struct threadArg arg; @@ -33615,6 +33685,89 @@ test_MicroAsyncErrorHandler(void) _stopServer(serverPid); } +static void +test_MicroAsyncErrorHandler_MaxPendingBytes(void) +{ + natsStatus s; + struct threadArg arg; + natsConnection* nc = NULL; + natsOptions* opts = NULL; + natsPid serverPid = NATS_INVALID_PID; + microService* m = NULL; + microEndpoint* ep = NULL; + microEndpointConfig ep_cfg = { + .Name = "do", + .Subject = "async_test", + .Handler = _microAsyncErrorRequestHandler, + }; + microServiceConfig cfg = { + .Name = "test", + .Version = "1.0.0", + .ErrHandler = _microAsyncErrorHandler, + .State = &arg, + .DoneHandler = _microServiceDoneHandler, + }; + int data_len = 10; + const char msg[] = { 0,1,2,3,4,5,6,7,8,9 }; //10 bytes long message + int64_t pendingBytesLimit = 100; + int i = 0; + + s = _createDefaultThreadArgsForCbTests(&arg); + if (s != NATS_OK) + FAIL("Unable to setup test!"); + + s = natsOptions_Create(&opts); + IFOK(s, natsOptions_SetURL(opts, NATS_DEFAULT_URL)); + IFOK(s, natsOptions_SetMaxPendingBytes(opts, pendingBytesLimit)); + if (s != NATS_OK) + FAIL("Unable to create options for test AsyncErrHandler"); + + serverPid = _startServer("nats://127.0.0.1:4222", NULL, true); + CHECK_SERVER_STARTED(serverPid); + + test("Connect to NATS: "); + testCond(NATS_OK == natsConnection_Connect(&nc, opts)); + + _startMicroservice(&m, nc, &cfg, NULL, 0, &arg); + + test("Test microservice is running: "); + testCond(!microService_IsStopped(m)) + + test("Add test endpoint: "); + testCond(NULL == micro_add_endpoint(&ep, m, NULL, &ep_cfg, true)); + + natsMutex_Lock(arg.m); + arg.status = NATS_OK; + natsMutex_Unlock(arg.m); + + test("Cause an error by sending too many messages: "); + for (i=0; + (s == NATS_OK) && (i < (pendingBytesLimit + 100)); i+=data_len) //increment by 10 (message size) each iteration + { + s = natsConnection_Publish(nc, "async_test", msg, data_len); + } + testCond((NATS_OK == s) + && (NATS_OK == natsConnection_Flush(nc))); + + test("Wait for async err callback: "); + natsMutex_Lock(arg.m); + while ((s != NATS_TIMEOUT) && !arg.closed) + s = natsCondition_TimedWait(arg.c, arg.m, 1000); + natsMutex_Unlock(arg.m); + testCond((s == NATS_OK) && arg.closed && (arg.status == NATS_SLOW_CONSUMER)); + + microService_Destroy(m); + _waitForMicroservicesAllDone(&arg); + + test("Destroy the test connection: "); + natsConnection_Destroy(nc); + testCond(NATS_OK == _waitForConnClosed(&arg)); + + natsOptions_Destroy(opts); + _destroyDefaultThreadArgs(&arg); + _stopServer(serverPid); +} + #if defined(NATS_HAS_STREAMING) static int @@ -35996,7 +36149,8 @@ static testInfo allTests[] = {"AsyncSubscriptionPendingDrain", test_AsyncSubscriptionPendingDrain}, {"SyncSubscriptionPending", test_SyncSubscriptionPending}, {"SyncSubscriptionPendingDrain", test_SyncSubscriptionPendingDrain}, - {"AsyncErrHandler", test_AsyncErrHandler}, + {"AsyncErrHandlerMaxPendingMsgs", test_AsyncErrHandler_MaxPendingMsgs}, + {"AsyncErrHandlerMaxPendingBytes", test_AsyncErrHandler_MaxPendingBytes }, {"AsyncErrHandlerSubDestroyed", test_AsyncErrHandlerSubDestroyed}, {"AsyncSubscriberStarvation", test_AsyncSubscriberStarvation}, {"AsyncSubscriberOnClose", test_AsyncSubscriberOnClose}, @@ -36122,7 +36276,8 @@ static testInfo allTests[] = {"MicroStartStop", test_MicroStartStop}, {"MicroServiceStopsOnClosedConn", test_MicroServiceStopsOnClosedConn}, {"MicroServiceStopsWhenServerStops", test_MicroServiceStopsWhenServerStops}, - {"MicroAsyncErrorHandler", test_MicroAsyncErrorHandler}, + {"MicroAsyncErrorHandlerMaxPendingMsgs", test_MicroAsyncErrorHandler_MaxPendingMsgs}, + {"MicroAsyncErrorHandlerMaxPendingBytes", test_MicroAsyncErrorHandler_MaxPendingBytes }, #if defined(NATS_HAS_STREAMING) {"StanPBufAllocator", test_StanPBufAllocator},