Skip to content

Commit

Permalink
Added UpdatesOnly to kwWatchOptions (#818)
Browse files Browse the repository at this point in the history
* Added UpdatesOnly to kwWatchOptions

* PR feedback:  the freed pointer
  • Loading branch information
levb authored Nov 13, 2024
1 parent 54d247a commit adb9074
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 2 deletions.
13 changes: 11 additions & 2 deletions src/kv.c
Original file line number Diff line number Diff line change
Expand Up @@ -1135,6 +1135,8 @@ kvStore_WatchMulti(kvWatcher **new_watcher, kvStore *kv, const char **keys, int
{
if (opts->MetaOnly)
so.Config.HeadersOnly = true;
if (opts->UpdatesOnly)
so.Config.DeliverPolicy = js_DeliverNew;
if (opts->IgnoreDeletes)
w->ignoreDel = true;
}
Expand All @@ -1148,10 +1150,17 @@ kvStore_WatchMulti(kvWatcher **new_watcher, kvStore *kv, const char **keys, int
natsSubscription *sub = w->sub;

natsSub_Lock(sub);
if ((sub->jsi != NULL) && (sub->jsi->pending == 0))
if ((opts == NULL) || !opts->UpdatesOnly)
{
if ((sub->jsi != NULL) && (sub->jsi->pending == 0))
{
w->initDone = true;
w->retMarker = true;
}
}
else
{
w->initDone = true;
w->retMarker = true;
}
natsSub_Unlock(sub);
}
Expand Down
1 change: 1 addition & 0 deletions src/nats.h
Original file line number Diff line number Diff line change
Expand Up @@ -1426,6 +1426,7 @@ typedef struct kvWatchOptions
bool IncludeHistory;
bool MetaOnly;
int64_t Timeout; ///< How long to wait (in milliseconds) for some operations to complete.
bool UpdatesOnly; ///< Only receive updates, no initial snapshot.

} kvWatchOptions;

Expand Down
11 changes: 11 additions & 0 deletions test/test.c
Original file line number Diff line number Diff line change
Expand Up @@ -32121,6 +32121,17 @@ void test_KeyValueWatch(void)
natsThread_Join(t);
natsThread_Destroy(t);
kvWatcher_Destroy(w);
w = NULL;

// Now try with UpdatesOnly and make sure we don't get the initial values.
test("Create watcher with UpdatesOnly: ");
kvWatchOptions o = {.UpdatesOnly = true};
s = kvStore_Watch(&w, kv, "t.*", &o);
testCond(s == NATS_OK);

IFOK(s, kvStore_PutString(NULL, kv, "t.age", "57"));
testCond(_expectUpdate(w, "t.age", "57", 11));
kvWatcher_Destroy(w);
kvStore_Destroy(kv);

JS_TEARDOWN;
Expand Down

0 comments on commit adb9074

Please sign in to comment.