From 0454aaa66dc1b7dc18a9f6e2fbe5fbd643bfc44b Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Wed, 1 Feb 2023 17:01:57 -0700 Subject: [PATCH] [FIXED] JetStream: natsSubscription_Fetch() not honoring timeout In some conditions, it could be possible for natsSubscription_Fetch() to return with a NATS_TIMEOUT error after said timeout elpased, but the next call would return almost immediately with a NATS_TIMEOUT again, but not after waiting for the specified timeout. Resolves #631 Signed-off-by: Ivan Kozlovic --- src/js.c | 39 +++++++++++++++++++++++++++------- src/natsp.h | 1 + test/test.c | 60 +++++++++++++++++++++++++++++++++++++++++++++++++++-- 3 files changed, 91 insertions(+), 9 deletions(-) diff --git a/src/js.c b/src/js.c index 0e306bf96..74def9f08 100644 --- a/src/js.c +++ b/src/js.c @@ -1696,7 +1696,7 @@ jsSub_scheduleFlowControlResponse(jsSub *jsi, const char *reply) } static natsStatus -_checkMsg(natsMsg *msg, bool checkSts, bool *usrMsg, natsMsg *mhMsg) +_checkMsg(natsMsg *msg, bool checkSts, bool *usrMsg, natsMsg *mhMsg, const char* reqSubj) { natsStatus s = NATS_OK; const char *val = NULL; @@ -1733,6 +1733,13 @@ _checkMsg(natsMsg *msg, bool checkSts, bool *usrMsg, natsMsg *mhMsg) if (strncmp(val, CTRL_STATUS, HDR_STATUS_LEN) == 0) return NATS_OK; + // Before checking for "errors", if the incoming status message is + // for a previous request (message's subject is not reqSubj), then + // simply return NATS_OK. The caller will destroy the message and + // proceed as if nothing was received. + if (strcmp(natsMsg_GetSubject(msg), reqSubj) != 0) + return NATS_OK; + // 404 indicating that there are no messages. if (strncmp(val, NOT_FOUND_STATUS, HDR_STATUS_LEN) == 0) return NATS_NOT_FOUND; @@ -1798,6 +1805,7 @@ _fetch(natsMsgList *list, natsSubscription *sub, jsFetchRequest *req, bool simpl bool sendReq = true; jsSub *jsi = NULL; natsMsg *mhMsg = NULL; + char *reqSubj = NULL; bool noWait; if (list == NULL) @@ -1831,11 +1839,15 @@ _fetch(natsMsgList *list, natsSubscription *sub, jsFetchRequest *req, bool simpl } natsBuf_InitWithBackend(&buf, buffer, 0, sizeof(buffer)); nc = sub->conn; - rply = (const char*) sub->subject; subj = jsi->nxtMsgSubj; pmc = (sub->msgList.msgs > 0); jsi->inFetch = true; - if (req->Heartbeat) + jsi->fetchID++; + if (nats_asprintf(&reqSubj, "%.*s%" PRIu64, (int) strlen(sub->subject)-1, sub->subject, jsi->fetchID) < 0) + s = nats_setDefaultError(NATS_NO_MEMORY); + else + rply = (const char*) reqSubj; + if ((s == NATS_OK) && req->Heartbeat) { int64_t hbi = req->Heartbeat / 1000000; sub->refs++; @@ -1882,8 +1894,9 @@ _fetch(natsMsgList *list, natsSubscription *sub, jsFetchRequest *req, bool simpl } if (s == NATS_OK) { - // Here we care only about user messages. - s = _checkMsg(msg, false, &usrMsg, mhMsg); + // Here we care only about user messages. We don't need to pass + // the request subject since it is not even checked in this case. + s = _checkMsg(msg, false, &usrMsg, mhMsg, NULL); if ((s == NATS_OK) && usrMsg) { msgs[count++] = msg; @@ -1928,7 +1941,7 @@ _fetch(natsMsgList *list, natsSubscription *sub, jsFetchRequest *req, bool simpl IFOK(s, natsSub_nextMsg(&msg, sub, timeout, true)); if (s == NATS_OK) { - s = _checkMsg(msg, true, &usrMsg, mhMsg); + s = _checkMsg(msg, true, &usrMsg, mhMsg, rply); if ((s == NATS_OK) && usrMsg) { msgs[count++] = msg; @@ -1978,6 +1991,8 @@ _fetch(natsMsgList *list, natsSubscription *sub, jsFetchRequest *req, bool simpl natsTimer_Stop(jsi->hbTimer); natsSub_Unlock(sub); + NATS_FREE(reqSubj); + return NATS_UPDATE_ERR_STACK(s); } @@ -2558,10 +2573,19 @@ _subscribe(natsSubscription **new_sub, jsCtx *js, const char *subject, const cha } if (s == NATS_OK) { + char *pullWCInbox = NULL; + if (isPullMode) { + // Create a wildcard inbox. s = natsConn_newInbox(nc, &inbox); - deliver = (const char*) inbox; + if (s == NATS_OK) + { + if (nats_asprintf(&pullWCInbox, "%s.*", (const char*) inbox) < 0) + s = nats_setDefaultError(NATS_NO_MEMORY); + } + if (s == NATS_OK) + deliver = (const char*) pullWCInbox; } // Create the NATS subscription on given deliver subject. Note that // cb/cbClosure will be NULL for sync or pull subscriptions. @@ -2576,6 +2600,7 @@ _subscribe(natsSubscription **new_sub, jsCtx *js, const char *subject, const cha sub->refs--; natsSub_Unlock(sub); } + NATS_FREE(pullWCInbox); } if ((s == NATS_OK) && create) { diff --git a/src/natsp.h b/src/natsp.h index 226e18d08..c98bbb283 100644 --- a/src/natsp.h +++ b/src/natsp.h @@ -377,6 +377,7 @@ typedef struct __jsSub bool ordered; bool dc; // delete JS consumer in Unsub()/Drain() bool ackNone; + uint64_t fetchID; // This is ConsumerInfo's Pending+Consumer.Delivered that we get from the // add consumer response. Note that some versions of the server gather the diff --git a/test/test.c b/test/test.c index 7b1dc274e..56d9528c0 100644 --- a/test/test.c +++ b/test/test.c @@ -27637,15 +27637,28 @@ _sendToPullSub(void *closure) { struct threadArg *args = (struct threadArg*) closure; natsMsg *msg = NULL; + char *subj = NULL; + natsSubscription *sub = NULL; + uint64_t id = 0; natsStatus s; nats_Sleep(250); natsMutex_Lock(args->m); - s = natsMsg_create(&msg, args->sub->subject, (int) strlen(args->sub->subject), - NULL, 0, args->string, (int) strlen(args->string), (int) strlen(args->string)); + sub = args->sub; + natsSub_Lock(sub); + id = sub->jsi->fetchID; + if (args->sum != 0) + id = (uint64_t) args->sum; + if (nats_asprintf(&subj, "%.*s%" PRIu64, (int) strlen(sub->subject)-1, sub->subject, id) < 0) + s = NATS_NO_MEMORY; + else + s = natsMsg_create(&msg, subj, (int) strlen(subj), + NULL, 0, args->string, (int) strlen(args->string), (int) strlen(args->string)); + natsSub_Unlock(sub); IFOK(s, natsConnection_PublishMsg(args->nc, msg)); natsMutex_Unlock(args->m); natsMsg_Destroy(msg); + free(subj); } static void @@ -27710,6 +27723,18 @@ _dropIdleHBs(natsConnection *nc, natsMsg **msg, void* closure) *msg = NULL; } +static void +_dropTimeoutProto(natsConnection *nc, natsMsg **msg, void* closure) +{ + const char *val = NULL; + + if (natsMsgHeader_Get(*msg, STATUS_HDR, &val) != NATS_OK) + return; + + natsMsg_Destroy(*msg); + *msg = NULL; +} + static void test_JetStreamSubscribePull(void) { @@ -28075,6 +28100,37 @@ test_JetStreamSubscribePull(void) testCond((s == NATS_OK) && (list.Msgs != NULL) && (list.Count == 1) && (jerr == 0)); natsMsgList_Destroy(&list); + + test("Fetch returns before 408: "); + natsConn_setFilter(nc, _dropTimeoutProto); + start = nats_Now(); + s = natsSubscription_Fetch(&list, sub, 1, 1000, NULL); + dur = nats_Now() - start; + testCond((s == NATS_TIMEOUT) && (list.Count == 0) && (dur >= 600)); + nats_clearLastError(); + + test("Next Fetch waits for proper timeout: "); + nats_Sleep(100); + natsConn_setFilter(nc, NULL); + natsMutex_Lock(args.m); + args.nc = nc; + args.sub = sub; + args.string = "NATS/1.0 408 Request Timeout\r\n\r\n"; + // Will make the 408 sent to a subject ID with 1 while we are likely at 2 or above. + // So this will be considered a "late" 408 timeout and should be ignored. + args.sum = 1; + natsMutex_Unlock(args.m); + s = natsThread_Create(&t, _sendToPullSub, (void*) &args); + start = nats_Now(); + IFOK(s, natsSubscription_Fetch(&list, sub, 1, 1000, NULL)); + dur = nats_Now() - start; + testCond((s == NATS_TIMEOUT) && (list.Count == 0) && (dur >= 600)); + nats_clearLastError(); + + natsThread_Join(t); + natsThread_Destroy(t); + t = NULL; + natsSubscription_Destroy(sub); sub = NULL;