diff --git a/src/kv.c b/src/kv.c index f1396362..81f8e453 100644 --- a/src/kv.c +++ b/src/kv.c @@ -1135,6 +1135,8 @@ kvStore_WatchMulti(kvWatcher **new_watcher, kvStore *kv, const char **keys, int { if (opts->MetaOnly) so.Config.HeadersOnly = true; + if (opts->UpdatesOnly) + so.Config.DeliverPolicy = js_DeliverNew; if (opts->IgnoreDeletes) w->ignoreDel = true; } @@ -1148,10 +1150,17 @@ kvStore_WatchMulti(kvWatcher **new_watcher, kvStore *kv, const char **keys, int natsSubscription *sub = w->sub; natsSub_Lock(sub); - if ((sub->jsi != NULL) && (sub->jsi->pending == 0)) + if ((opts == NULL) || !opts->UpdatesOnly) + { + if ((sub->jsi != NULL) && (sub->jsi->pending == 0)) + { + w->initDone = true; + w->retMarker = true; + } + } + else { w->initDone = true; - w->retMarker = true; } natsSub_Unlock(sub); } diff --git a/src/nats.h b/src/nats.h index c3bfbb23..e368d89a 100644 --- a/src/nats.h +++ b/src/nats.h @@ -1426,6 +1426,7 @@ typedef struct kvWatchOptions bool IncludeHistory; bool MetaOnly; int64_t Timeout; ///< How long to wait (in milliseconds) for some operations to complete. + bool UpdatesOnly; ///< Only receive updates, no initial snapshot. } kvWatchOptions; diff --git a/test/test.c b/test/test.c index 57ed6adc..c6c95142 100644 --- a/test/test.c +++ b/test/test.c @@ -32121,6 +32121,17 @@ void test_KeyValueWatch(void) natsThread_Join(t); natsThread_Destroy(t); kvWatcher_Destroy(w); + w = NULL; + + // Now try with UpdatesOnly and make sure we don't get the initial values. + test("Create watcher with UpdatesOnly: "); + kvWatchOptions o = {.UpdatesOnly = true}; + s = kvStore_Watch(&w, kv, "t.*", &o); + testCond(s == NATS_OK); + + IFOK(s, kvStore_PutString(NULL, kv, "t.age", "57")); + testCond(_expectUpdate(w, "t.age", "57", 11)); + kvWatcher_Destroy(w); kvStore_Destroy(kv); JS_TEARDOWN;