Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADDED] js_PullSubscribeAsync #785

Merged
merged 27 commits into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
6355f75
Squashed and merged
levb Aug 8, 2024
daf2c86
Added limits.h
levb Aug 8, 2024
fbc612c
RelWithDebInfo on Travis
levb Aug 8, 2024
84d79b4
Reduced numMsgs in test_AssignSubToDispatch to make Travis happier
levb Aug 8, 2024
954e923
fixed a Travis NOTLS gcc warning
levb Aug 8, 2024
6d8caed
PR feedback: mostly formatting/whitespace
levb Aug 13, 2024
8a3126e
PR feedback: NATS_CALLOC error handling
levb Aug 13, 2024
5c97305
PR feedback: do not cache sub->closed, draining in nats_dispatchThrea…
levb Aug 14, 2024
571af79
Merge branch 'main' of github.com:nats-io/nats.c into lev-sub-pull-as…
levb Aug 14, 2024
1f94770
nit: renamed dedicated->own
levb Aug 14, 2024
a8a9179
Make sure fetch onComplete is always invoked when sub closes
levb Aug 14, 2024
33d0e7f
Reconnect tests
levb Aug 15, 2024
f9c0749
Fixed dispatch pool reallocation
levb Aug 16, 2024
4f04996
(?) fixed flapping PullSubscribeAsync 'Fetch with a short expiration'…
levb Aug 16, 2024
74f7853
Fixed flappers, refactored 'lifetime'
levb Aug 21, 2024
5683c02
Merge branch 'main' of github.com:nats-io/nats.c into lev-sub-pull-as…
levb Aug 21, 2024
74dce5c
Fixes:
levb Aug 22, 2024
c4150e2
Fix _testBatchCompleted timeout at 1s
levb Aug 23, 2024
cb7205f
PR feedback: naming/nits
levb Aug 23, 2024
2212a2f
Increased 2 more (test wait) timeouts
levb Aug 23, 2024
51a5f53
Increased sleep in JetStreamSubscribePull_Reconnect, flapping on Travis
levb Aug 23, 2024
2c340a9
PR feedback: no typedef for jsOptionsPullSubscribeAsync
levb Aug 24, 2024
25e9d4a
PR feedback: removed jsOpts defaulting trickery
levb Aug 24, 2024
ec3b946
PR feedback: use actual defaults in the js-sub example
levb Aug 28, 2024
773d801
PR feedback: disallow use of KeepAhead with NoWait
levb Aug 28, 2024
046552f
PR feedback: extra comments for NoWait
levb Aug 28, 2024
1fc9179
Merge branch 'main' into lev-sub-pull-async-try2
levb Sep 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 6 additions & 12 deletions examples/js-sub.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,6 @@ static const char *usage = ""\
"-fc enable flow control\n" \
"-count number of expected messages\n";

#define SECOND_NANO (int64_t)1E9
#define MINUTE_NANO (SECOND_NANO * 60)
#define HOUR_NANO (MINUTE_NANO * 60)

static bool fetchCompleteCalled = false;
static bool subCompleteCalled = false;

Expand Down Expand Up @@ -177,26 +173,24 @@ int main(int argc, char **argv)
{
if (pull && async)
{
jsOpts.PullSubscribeAsync.MaxMessages = (int) total;
levb marked this conversation as resolved.
Show resolved Hide resolved
jsOpts.PullSubscribeAsync.NoWait = true;
jsOpts.PullSubscribeAsync.TimeoutMillis = 3600 * 1000; // 1 hour
jsOpts.PullSubscribeAsync.FetchSize = 17;
levb marked this conversation as resolved.
Show resolved Hide resolved
jsOpts.PullSubscribeAsync.KeepAhead = 7;
levb marked this conversation as resolved.
Show resolved Hide resolved
jsOpts.PullSubscribeAsync.CompleteHandler = _completeFetchCb;

jsFetchRequest lifetime;
jsFetchRequest_Init(&lifetime);
lifetime.NoWait = true;
lifetime.Expires = 1 * HOUR_NANO;
lifetime.Batch = (int) total;
jsOpts.PullSubscribeAsync.CompleteHandlerClosure = NULL;

// Uncomment to use a 1 second heartbeat.
levb marked this conversation as resolved.
Show resolved Hide resolved
// lifetime.Heartbeat = 1 * SECOND_NANO;
// jsOpts.PullSubscribeAsync.HeartbeatMillis = 1000; // 1 second

// Uncomment to provide custom control over next fetch size.
// jsOpts.PullSubscribeAsync.NextHandler = nextFetchCb;

// Uncomment to turn off AutoACK on delivered messages.
// so.ManualAck = true;

s = js_PullSubscribeAsync(&sub, js, subj, durable, onMsg, NULL, &lifetime, &jsOpts, &so, &jerr);
s = js_PullSubscribeAsync(&sub, js, subj, durable, onMsg, NULL, &jsOpts, &so, &jerr);
}
else if (pull)
s = js_PullSubscribe(&sub, js, subj, durable, &jsOpts, &so, &jerr);
Expand Down
8 changes: 4 additions & 4 deletions src/dispatch.c
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,10 @@ _preProcessUserMessage(

if (fetch)
{
bool overMaxBytes = ((fetch->lifetime.MaxBytes > 0) && ((fetch->deliveredBytes) > fetch->lifetime.MaxBytes));
bool overMaxFetch = ((fetch->deliveredMsgs >= fetch->lifetime.Batch) || overMaxBytes);
bool overMaxBytes = ((fetch->maxBytes > 0) && ((fetch->deliveredBytes) > fetch->maxBytes));
bool overMaxFetch = ((fetch->deliveredMsgs >= fetch->maxMessages) || overMaxBytes);

*lastMessageInFetch = (fetch->deliveredMsgs == (fetch->lifetime.Batch - 1) || overMaxBytes);
*lastMessageInFetch = (fetch->deliveredMsgs == (fetch->maxMessages - 1) || overMaxBytes);

// See if we want to override fetch status based on our own data.
if (fetchStatus == NATS_OK)
Expand All @@ -142,7 +142,7 @@ _preProcessUserMessage(
{
fetchStatus = NATS_MAX_DELIVERED_MSGS;
}
if (overMaxBytes)
else if (overMaxBytes)
{
fetchStatus = NATS_LIMIT_REACHED;
}
Expand Down
79 changes: 30 additions & 49 deletions src/js.c
Original file line number Diff line number Diff line change
Expand Up @@ -2917,9 +2917,10 @@ js_maybeFetchMore(natsSubscription *sub, jsFetch *fetch)

// These are not changeable by the callback, only Batch and MaxBytes can be updated.
int64_t now = nats_Now();
req.Heartbeat = fetch->lifetime.Heartbeat;
req.Expires = fetch->lifetime.Expires - (now - fetch->startTimeMilli) * 10*1000*1000;
req.NoWait = fetch->lifetime.NoWait;
if (fetch->timeoutMillis != 0)
req.Expires = (fetch->timeoutMillis - (now - fetch->startTimeMillis)) * 1000 * 1000; // ns, go time.Duration
req.NoWait = fetch->noWait;
req.Heartbeat = fetch->heartbeatMillis * 1000 * 1000; // ns, go time.Duration

char buffer[128];
natsBuffer buf;
Expand All @@ -2945,6 +2946,7 @@ js_maybeFetchMore(natsSubscription *sub, jsFetch *fetch)
return NATS_UPDATE_ERR_STACK(s);
}

// Sets Batch and MaxBytes for the next fetch request.
static bool
_autoNextFetchRequest(jsFetchRequest *req, natsSubscription *sub, void *closure)
{
Expand All @@ -2963,15 +2965,15 @@ _autoNextFetchRequest(jsFetchRequest *req, natsSubscription *sub, void *closure)

if (maybeMore)
{
// fetch->lifetime.Batch is always > 0
remainingUnrequested = fetch->lifetime.Batch - fetch->requestedMsgs;
// fetch->maxMessages is always > 0
remainingUnrequested = fetch->maxMessages - fetch->requestedMsgs;
if (remainingUnrequested <= 0)
maybeMore = false;
}

if (maybeMore && (fetch->lifetime.MaxBytes > 0))
if (maybeMore && (fetch->maxBytes > 0))
{
remainingBytes = fetch->lifetime.MaxBytes - fetch->receivedBytes;
remainingBytes = fetch->maxBytes - fetch->receivedBytes;
if (remainingBytes <= 0)
maybeMore = false;
}
Expand All @@ -2990,7 +2992,6 @@ _autoNextFetchRequest(jsFetchRequest *req, natsSubscription *sub, void *closure)
if (!maybeMore)
return false;

*req = fetch->lifetime; // copy bytes, reading immutable data
req->Batch = want;
// FIXME discuss in PR - this seems wrong, we don't know how many bytes we will have
levb marked this conversation as resolved.
Show resolved Hide resolved
// received from what is already requested. Still, can serve as a safe
Expand All @@ -3002,31 +3003,19 @@ _autoNextFetchRequest(jsFetchRequest *req, natsSubscription *sub, void *closure)
natsStatus
js_PullSubscribeAsync(natsSubscription **newsub, jsCtx *js, const char *subject, const char *durable,
natsMsgHandler msgCB, void *msgCBClosure,
jsFetchRequest *lifetime,
jsOptions *jsOpts, jsSubOptions *opts, jsErrCode *errCode)
{
natsStatus s = NATS_OK;
natsSubscription *sub = NULL;
jsSub *jsi = NULL;
jsFetch *fetch = NULL;

jsFetchRequest defaultLifetime = {
.Batch = INT_MAX, // no limit
.Expires = INT64_MAX, // never
.Heartbeat = 0, // none
.MaxBytes = 0, // no limit
.NoWait = false, // wait forever
};

if ((newsub == NULL) || (msgCB == NULL))
return nats_setDefaultError(NATS_INVALID_ARG);

if (errCode != NULL)
*errCode = 0;

if (lifetime == NULL)
lifetime = &defaultLifetime;

// Do a basic pull subscribe first, but with a callback so it is treated as
// "async" and assigned to a dispatcher. Since we don't fetch anything, it
// will not be active yet.
Expand All @@ -3041,30 +3030,22 @@ js_PullSubscribeAsync(natsSubscription **newsub, jsCtx *js, const char *subject,
// Initialize fetch parameters.
if (s == NATS_OK)
{
fetch->startTimeMilli = nats_Now();
fetch->lifetime = *lifetime;

if (fetch->lifetime.Batch == 0)
fetch->lifetime.Batch = INT_MAX;

fetch->nextf = _autoNextFetchRequest;
fetch->nextClosure = (void *)fetch;
if (jsOpts != NULL)
{
fetch->fetchSize = jsOpts->PullSubscribeAsync.FetchSize;
fetch->keepAhead = jsOpts->PullSubscribeAsync.KeepAhead;
fetch->completeCB = jsOpts->PullSubscribeAsync.CompleteHandler;
fetch->completeCBClosure = jsOpts->PullSubscribeAsync.CompleteHandlerClosure;

if (jsOpts->PullSubscribeAsync.NextHandler != NULL)
{
fetch->nextf = jsOpts->PullSubscribeAsync.NextHandler;
fetch->nextClosure = jsOpts->PullSubscribeAsync.NextHandlerClosure;
}
}
fetch->status = NATS_OK;
fetch->startTimeMillis = nats_Now();

if (fetch->fetchSize == 0)
fetch->fetchSize = NATS_DEFAULT_ASYNC_FETCH_SIZE;
#define _set(_f, _v, _nil, _def) fetch->_f = ((jsOpts != NULL) && (jsOpts->PullSubscribeAsync._v != _nil)) ? jsOpts->PullSubscribeAsync._v : _def
levb marked this conversation as resolved.
Show resolved Hide resolved
_set(completeCB, CompleteHandler, NULL, NULL);
_set(completeCBClosure, CompleteHandlerClosure, NULL, NULL);
_set(fetchSize, FetchSize, 0, NATS_DEFAULT_ASYNC_FETCH_SIZE);
_set(heartbeatMillis, HeartbeatMillis, 0, 0);
_set(keepAhead, KeepAhead, 0, 0);
_set(maxBytes, MaxBytes, 0, 0);
_set(maxMessages, MaxMessages, 0, INT_MAX);
_set(nextClosure, NextHandlerClosure, NULL, fetch);
_set(nextf, NextHandler, NULL, _autoNextFetchRequest);
_set(noWait, NoWait, false, false);
_set(timeoutMillis, TimeoutMillis, 0, INT64_MAX);
#undef _set
}

// Set up the sub to process fetch results.
Expand All @@ -3079,27 +3060,27 @@ js_PullSubscribeAsync(natsSubscription **newsub, jsCtx *js, const char *subject,

// Start the timers. They will live for the entire length of the
// subscription (the missed heartbeat timer may be reset as needed).
if (lifetime->Expires > 0)
if (fetch->timeoutMillis > 0)
{
sub->refs++;
s = natsTimer_Create(&fetch->expiresTimer, _fetchExpiredFired, _releaseSubWhenStopped,
lifetime->Expires, (void *)sub);
fetch->timeoutMillis, (void *)sub);
if (s != NATS_OK)
sub->refs--;
}

if ((s == NATS_OK) && (lifetime->Heartbeat > 0))
if ((s == NATS_OK) && (fetch->heartbeatMillis > 0))
{
int64_t milli = (lifetime->Heartbeat / 1000000) * 2;
int64_t dur = fetch->heartbeatMillis * 2;
sub->refs++;
if (jsi->hbTimer == NULL)
{
s = natsTimer_Create(&jsi->hbTimer, _hbTimerFired, _releaseSubWhenStopped, milli, (void *)sub);
s = natsTimer_Create(&jsi->hbTimer, _hbTimerFired, _releaseSubWhenStopped, dur, (void *)sub);
if (s != NATS_OK)
sub->refs--;
}
else
natsTimer_Reset(jsi->hbTimer, milli);
natsTimer_Reset(jsi->hbTimer, dur);
}

natsSub_Unlock(sub);
Expand Down
47 changes: 28 additions & 19 deletions src/nats.h
Original file line number Diff line number Diff line change
Expand Up @@ -1281,26 +1281,39 @@ typedef struct jsOptions

struct jsOptionsSubscribePullAsync
{
// Options to control automatic Fetch flow control.
//
// The number of messages to ask for in a single request, and if
// we should try to fetch ahead, KeepAhead more than we need to
// finish the current request. Fetch this many messages ahead of
// time.
int FetchSize;
int KeepAhead;
// Lifetime of the subscription (completes when any one of the
// targets is reached).
int64_t TimeoutMillis;
levb marked this conversation as resolved.
Show resolved Hide resolved
int MaxMessages;
int64_t MaxBytes;

// Manual fetch flow control. If provided gets called before
// each message is deliverered to msgCB, and overrides the
// default algorithm for sending Next requests.
natsFetchNextHandler NextHandler;
void *NextHandlerClosure;
// If NoWait is set, the subscription will receive the messages
// already stored on the server subject to the limits, but will
// not wait for more messages.
bool NoWait;
levb marked this conversation as resolved.
Show resolved Hide resolved

// Fetch complete handler that receives the exit status code,
// the subscription's Complete handler is also invoked, but does
// not have the status code.
natsFetchCompleteHandler CompleteHandler;
void *CompleteHandlerClosure;
void *CompleteHandlerClosure;

// Have server sends heartbeats to help detect communication failures.
int64_t HeartbeatMillis;
levb marked this conversation as resolved.
Show resolved Hide resolved

// Options to control automatic Fetch flow control. The number
// of messages to ask for in a single request, and if we should
// try to fetch ahead, KeepAhead more than we need to finish the
// current request. Fetch this many messages ahead of time.
int FetchSize;
int KeepAhead;

// Manual fetch flow control. If provided gets called before
// each message is deliverered to msgCB, and overrides the
// default algorithm for sending Next requests.
natsFetchNextHandler NextHandler;
void *NextHandlerClosure;

} PullSubscribeAsync;

/**
Expand Down Expand Up @@ -6597,18 +6610,14 @@ jsFetchRequest_Init(jsFetchRequest *request);
* @param durable the optional durable name.
* @param msgCB the #natsMsgHandler callback.
* @param msgCBClosure a pointer to an user defined object (can be `NULL`).
* @param lifetime the pointer to the #jsFetchRequest configuration used to set
* the sub's lifetime limits on messages, bytes, and elapsed time. It also
* allows to specify the heartbeat frequency. The default behavior would be to
* terminate the subscription if it fails. #jsOpts provides finer control.
* @param jsOpts the pointer to the #jsOptions object, possibly `NULL`.
* @param opts the subscribe options, possibly `NULL`.
* @param errCode the location where to store the JetStream specific error code,
* or `NULL` if not needed.
*/
NATS_EXTERN natsStatus
js_PullSubscribeAsync(natsSubscription **newsub, jsCtx *js, const char *subject, const char *durable,
natsMsgHandler msgCB, void *msgCBClosure, jsFetchRequest *lifetime,
natsMsgHandler msgCB, void *msgCBClosure,
jsOptions *jsOpts, jsSubOptions *opts, jsErrCode *errCode);

/** \brief Fetches messages for a pull subscription with a complete request configuration
Expand Down
39 changes: 23 additions & 16 deletions src/natsp.h
Original file line number Diff line number Diff line change
Expand Up @@ -374,31 +374,38 @@ struct __jsCtx

typedef struct __jsFetch
{
natsFetchCompleteHandler completeCB;
void *completeCBClosure;
natsStatus status;

// Lifetime control
jsFetchRequest lifetime;
int64_t startTimeMilli;
int receivedMsgs;
int64_t receivedBytes;
int deliveredMsgs;
int64_t deliveredBytes;
int requestedMsgs;
int64_t timeoutMillis;
int maxMessages;
int64_t maxBytes;
bool noWait;

int keepAhead;
int fetchSize;
// On complete
natsFetchCompleteHandler completeCB;
void *completeCBClosure;
natsStatus status;

// Flow control
natsFetchNextHandler nextf;
void *nextClosure;
void *nextClosure;
int keepAhead;
int fetchSize;
int64_t heartbeatMillis;

// Stats
int64_t startTimeMillis;
int receivedMsgs;
int64_t receivedBytes;
int deliveredMsgs;
int64_t deliveredBytes;
int requestedMsgs;

// Timer for the fetch expiration. We leverage the existing jsi->hbTimer for
// checking missed heartbeats.
natsTimer *expiresTimer;
natsTimer *expiresTimer;

// Matches jsi->fetchID
char replySubject[NATS_DEFAULT_INBOX_PRE_LEN + NUID_BUFFER_LEN + 32]; // big enough for {INBOX}.number
char replySubject[NATS_DEFAULT_INBOX_PRE_LEN + NUID_BUFFER_LEN + 32]; // big enough for {INBOX}.number
} jsFetch;

typedef struct __jsSub
Expand Down
Loading