Skip to content

Commit

Permalink
Merge pull request #596 from nats-io/fix_ordered_consumers
Browse files Browse the repository at this point in the history
[FIXED] JetStream: Ordered consumers may not be reset in some cases
  • Loading branch information
kozlovic authored Oct 3, 2022
2 parents 0abde6f + bb347da commit 8a04792
Show file tree
Hide file tree
Showing 5 changed files with 163 additions and 28 deletions.
16 changes: 14 additions & 2 deletions src/conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -2602,6 +2602,7 @@ natsConn_processMsg(natsConnection *nc, char *buf, int bufLen)
int jct = 0;
natsMsgFilter mf = NULL;
void *mfc = NULL;
bool unlock = false;

natsMutex_Lock(nc->subsMu);

Expand Down Expand Up @@ -2647,6 +2648,11 @@ natsConn_processMsg(natsConnection *nc, char *buf, int bufLen)
mu = ldw->lock;
cond = ldw->cond;
list = &(ldw->msgList);
if (sub->jsi != NULL)
{
natsSub_Lock(sub);
unlock = true;
}
}
else
{
Expand All @@ -2659,6 +2665,8 @@ natsConn_processMsg(natsConnection *nc, char *buf, int bufLen)
if (sub->closed || sub->drainSkip)
{
natsMutex_Unlock(mu);
if (unlock)
natsSub_Unlock(sub);
natsMsg_Destroy(msg);
return NATS_OK;
}
Expand All @@ -2679,10 +2687,12 @@ natsConn_processMsg(natsConnection *nc, char *buf, int bufLen)
{
bool replaced = false;

s = jsSub_checkOrderedMsg(sub, mu, msg, &replaced);
s = jsSub_checkOrderedMsg(sub, msg, &replaced);
if ((s != NATS_OK) || replaced)
{
natsMutex_Unlock(mu);
if (unlock)
natsSub_Unlock(sub);
natsMsg_Destroy(msg);
return s;
}
Expand Down Expand Up @@ -2747,7 +2757,7 @@ natsConn_processMsg(natsConnection *nc, char *buf, int bufLen)
else if ((jct == jsCtrlHeartbeat) && (msg->reply == NULL))
{
// Handle control heartbeat messages.
s = jsSub_processSequenceMismatch(sub, mu, msg, &sm);
s = jsSub_processSequenceMismatch(sub, msg, &sm);
}
else if ((jct == jsCtrlFlowControl) && (msg->reply != NULL))
{
Expand All @@ -2765,6 +2775,8 @@ natsConn_processMsg(natsConnection *nc, char *buf, int bufLen)
}

natsMutex_Unlock(mu);
if (unlock)
natsSub_Unlock(sub);

if ((s == NATS_OK) && fcReply)
s = natsConnection_Publish(nc, fcReply, NULL, 0);
Expand Down
68 changes: 48 additions & 20 deletions src/js.c
Original file line number Diff line number Diff line change
Expand Up @@ -1533,7 +1533,7 @@ jsSub_trackSequences(jsSub *jsi, const char *reply)
}

natsStatus
jsSub_processSequenceMismatch(natsSubscription *sub, natsMutex *mu, natsMsg *msg, bool *sm)
jsSub_processSequenceMismatch(natsSubscription *sub, natsMsg *msg, bool *sm)
{
jsSub *jsi = sub->jsi;
const char *str = NULL;
Expand Down Expand Up @@ -1585,7 +1585,7 @@ jsSub_processSequenceMismatch(natsSubscription *sub, natsMutex *mu, natsMsg *msg
{
if (jsi->ordered)
{
s = jsSub_resetOrderedConsumer(sub, mu, jsi->sseq+1);
s = jsSub_resetOrderedConsumer(sub, jsi->sseq+1);
}
else if (!jsi->ssmn)
{
Expand Down Expand Up @@ -2019,9 +2019,12 @@ _hbTimerFired(natsTimer *timer, void* closure)
jsSub *jsi = sub->jsi;
bool alert= false;
natsConnection *nc = NULL;
bool oc = false;
natsStatus s = NATS_OK;

natsSub_Lock(sub);
alert = !jsi->active;
oc = jsi->ordered;
jsi->active = false;
if (alert && jsi->pull)
{
Expand All @@ -2046,12 +2049,29 @@ _hbTimerFired(natsTimer *timer, void* closure)
if (!alert)
return;

// For ordered consumers, we will need to reset
if (oc)
{
natsSub_Lock(sub);
if (!sub->closed)
{
// If we fail in that call, we will report to async err callback
// (if one is specified).
s = jsSub_resetOrderedConsumer(sub, sub->jsi->sseq+1);
}
natsSub_Unlock(sub);
}

natsConn_Lock(nc);
// We did create the timer only knowing that there was a async err
// handler, but check anyway in case we decide to have timer set
// regardless.
if (nc->opts->asyncErrCb != NULL)
natsAsyncCb_PostErrHandler(nc, sub, NATS_MISSED_HEARTBEAT, NULL);
{
// Even if we have called resetOrderedConsumer, we will post something
// to the async error callback, either "missed heartbeats", or the error
// that occurred trying to do the reset.
if (s == NATS_OK)
s = NATS_MISSED_HEARTBEAT;
natsAsyncCb_PostErrHandler(nc, sub, s, NULL);
}
natsConn_Unlock(nc);
}

Expand Down Expand Up @@ -2463,6 +2483,8 @@ _subscribe(natsSubscription **new_sub, jsCtx *js, const char *subject, const cha
cfg->AckWait = NATS_SECONDS_TO_NANOS(24*60*60); // Just set to something known, not utilized.
if (opts->Config.Heartbeat <= 0)
cfg->Heartbeat = jsOrderedHBInterval;
cfg->MemoryStorage = true;
cfg->Replicas = 1;
}
else
{
Expand Down Expand Up @@ -2534,13 +2556,15 @@ _subscribe(natsSubscription **new_sub, jsCtx *js, const char *subject, const cha
opts->Queue, 0, cb, cbClosure, false, jsi));
if ((s == NATS_OK) && (hbi > 0) && !isPullMode)
{
bool ct = false; // create timer or not.
// We will create a timer if we use create an ordered consumer, or
// if the async error callback is registered.
bool ct = opts->Ordered;

// Check to see if it is even worth creating a timer to check
// on missed heartbeats, since the way to notify the user will be
// through async callback.
natsConn_Lock(nc);
ct = (nc->opts->asyncErrCb != NULL ? true : false);
ct = ct || (nc->opts->asyncErrCb != NULL ? true : false);
natsConn_Unlock(nc);

if (ct)
Expand Down Expand Up @@ -2928,13 +2952,13 @@ natsMsg_isJSCtrl(natsMsg *msg, int *ctrlType)
// Update and replace sid.
// Lock should be held on entry but will be unlocked to prevent lock inversion.
int64_t
applyNewSID(natsSubscription *sub, natsMutex *mu)
applyNewSID(natsSubscription *sub)
{
int64_t osid = 0;
int64_t nsid = 0;
natsConnection *nc = sub->conn;

natsMutex_Unlock(mu);
natsSub_Unlock(sub);

natsMutex_Lock(nc->subsMu);
osid = sub->sid;
Expand All @@ -2945,7 +2969,7 @@ applyNewSID(natsSubscription *sub, natsMutex *mu)
natsHash_Set(nc->subs, nsid, sub, NULL);
natsMutex_Unlock(nc->subsMu);

natsMutex_Lock(mu);
natsSub_Lock(sub);
sub->sid = nsid;
return osid;
}
Expand All @@ -2962,6 +2986,9 @@ _recreateOrderedCons(void *closure)
jsConsumerConfig cc;
natsStatus s;

// Note: if anything fail here, the reset/recreate of the ordered consumer
// will happen again based on the missed HB timer.

// Unsubscribe and subscribe with new inbox and sid.
// Remap a new low level sub into this sub since its client accessible.
// This is done here in this thread to prevent lock inversion.
Expand All @@ -2980,7 +3007,7 @@ _recreateOrderedCons(void *closure)

if (!oci->done && (s == NATS_OK))
{
natsSubAndLdw_Lock(sub);
natsSub_Lock(sub);
t = oci->thread;
jsi = sub->jsi;
// Reset some items in jsi.
Expand All @@ -3001,7 +3028,7 @@ _recreateOrderedCons(void *closure)
cc.DeliverSubject = sub->subject;
cc.DeliverPolicy = js_DeliverByStartSequence;
cc.OptStartSeq = oci->sseq;
natsSubAndLdw_Unlock(sub);
natsSub_Unlock(sub);

s = js_AddConsumer(&ci, jsi->js, jsi->stream, &cc, NULL, NULL);
if (s == NATS_OK)
Expand All @@ -3021,13 +3048,11 @@ _recreateOrderedCons(void *closure)
if (nc->opts->asyncErrCb != NULL)
{
char tmp[256];
snprintf(tmp, sizeof(tmp), "failed recreating ordered consumer: %u (%s)",
snprintf(tmp, sizeof(tmp), "failed recreating ordered consumer: %u (%s), will try again",
s, natsStatus_GetText(s));
natsAsyncCb_PostErrHandler(nc, sub, s, NATS_STRDUP(tmp));
}
natsConn_Unlock(nc);

natsConn_unsubscribe(nc, sub, 0, true, 0);
}

NATS_FREE(oci);
Expand All @@ -3040,7 +3065,7 @@ _recreateOrderedCons(void *closure)
// We will create a new consumer and rewire the low level subscription.
// Lock should be held.
natsStatus
jsSub_resetOrderedConsumer(natsSubscription *sub, natsMutex *mu, uint64_t sseq)
jsSub_resetOrderedConsumer(natsSubscription *sub, uint64_t sseq)
{
natsStatus s = NATS_OK;
natsConnection *nc = sub->conn;
Expand All @@ -3053,6 +3078,9 @@ jsSub_resetOrderedConsumer(natsSubscription *sub, natsMutex *mu, uint64_t sseq)
if ((sub->jsi == NULL) || (nc == NULL) || sub->closed)
return NATS_OK;

// Note: if anything fail here, the reset/recreate of the ordered consumer
// will happen again based on the missed HB timer.

// If there was an AUTO_UNSUB, we need to adjust the new value and send
// an UNSUB for the new sid with new value.
if (sub->max > 0)
Expand All @@ -3071,7 +3099,7 @@ jsSub_resetOrderedConsumer(natsSubscription *sub, natsMutex *mu, uint64_t sseq)
return NATS_UPDATE_ERR_STACK(s);

// Quick unsubscribe. Since we know this is a simple push subscriber we do in place.
osid = applyNewSID(sub, mu);
osid = applyNewSID(sub);

NATS_FREE(sub->subject);
sub->subject = (char*) newDeliver;
Expand Down Expand Up @@ -3110,7 +3138,7 @@ jsSub_resetOrderedConsumer(natsSubscription *sub, natsMutex *mu, uint64_t sseq)
// The caller has verified that sub.jsi != nil and that this is not a control message.
// Lock should be held.
natsStatus
jsSub_checkOrderedMsg(natsSubscription *sub, natsMutex *mu, natsMsg *msg, bool *reset)
jsSub_checkOrderedMsg(natsSubscription *sub, natsMsg *msg, bool *reset)
{
natsStatus s = NATS_OK;
jsSub *jsi = NULL;
Expand All @@ -3131,7 +3159,7 @@ jsSub_checkOrderedMsg(natsSubscription *sub, natsMutex *mu, natsMsg *msg, bool *
if (dseq != jsi->dseq)
{
*reset = true;
s = jsSub_resetOrderedConsumer(sub, mu, jsi->sseq+1);
s = jsSub_resetOrderedConsumer(sub, jsi->sseq+1);
}
else
{
Expand Down
6 changes: 3 additions & 3 deletions src/natsp.h
Original file line number Diff line number Diff line change
Expand Up @@ -876,7 +876,7 @@ natsStatus
jsSub_trackSequences(jsSub *jsi, const char *reply);

natsStatus
jsSub_processSequenceMismatch(natsSubscription *sub, natsMutex *mu, natsMsg *msg, bool *sm);
jsSub_processSequenceMismatch(natsSubscription *sub, natsMsg *msg, bool *sm);

char*
jsSub_checkForFlowControlResponse(natsSubscription *sub);
Expand All @@ -885,10 +885,10 @@ natsStatus
jsSub_scheduleFlowControlResponse(jsSub *jsi, const char *reply);

natsStatus
jsSub_checkOrderedMsg(natsSubscription *sub, natsMutex *mu, natsMsg *msg, bool *reset);
jsSub_checkOrderedMsg(natsSubscription *sub, natsMsg *msg, bool *reset);

natsStatus
jsSub_resetOrderedConsumer(natsSubscription *sub, natsMutex *mu, uint64_t sseq);
jsSub_resetOrderedConsumer(natsSubscription *sub, uint64_t sseq);

bool
natsMsg_isJSCtrl(natsMsg *msg, int *ctrlType);
Expand Down
1 change: 1 addition & 0 deletions test/list.txt
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ JetStreamSubscribeHeadersOnly
JetStreamOrderedCons
JetStreamOrderedConsWithErrors
JetStreamOrderedConsAutoUnsub
JetStreamOrderedConsSrvRestart
JetStreamSubscribeWithFWC
JetStreamStreamsSealAndRollup
JetStreamGetMsgAndLastMsg
Expand Down
Loading

0 comments on commit 8a04792

Please sign in to comment.