diff --git a/jetstream/internal_mod.ts b/jetstream/internal_mod.ts index e88bf2d5..fc37bde7 100644 --- a/jetstream/internal_mod.ts +++ b/jetstream/internal_mod.ts @@ -53,6 +53,7 @@ export type { KvOptions, KvPutOptions, KvStatus, + KvWatchInclude, KvWatchOptions, ObjectInfo, ObjectResult, diff --git a/jetstream/kv.ts b/jetstream/kv.ts index 6ec1b6b6..e751d5c7 100644 --- a/jetstream/kv.ts +++ b/jetstream/kv.ts @@ -14,10 +14,12 @@ */ import { + ErrorCode, MsgHdrs, NatsConnection, NatsError, Payload, + QueuedIterator, } from "../nats-base-client/core.ts"; import { millis, nanos } from "./jsutil.ts"; import { QueuedIteratorImpl } from "../nats-base-client/queued_iterator.ts"; @@ -39,12 +41,13 @@ import { KvPutOptions, KvRemove, KvStatus, + KvWatchInclude, + KvWatchOptions, StoredMsg, } from "./types.ts"; import { compare, Feature, parseSemVer } from "../nats-base-client/semver.ts"; import { deferred } from "../nats-base-client/util.ts"; import { Empty } from "../nats-base-client/encoders.ts"; -import { ErrorCode, QueuedIterator } from "../nats-base-client/core.ts"; import { AckPolicy, ConsumerConfig, @@ -612,16 +615,22 @@ export class Bucket implements KV, KvRemove { _buildCC( k: string, - history = false, + content: KvWatchInclude, opts: Partial = {}, ): Partial { const ek = this.encodeKey(k); this.validateSearchKey(k); + let deliver_policy = DeliverPolicy.LastPerSubject; + if (content === KvWatchInclude.AllHistory) { + deliver_policy = DeliverPolicy.All; + } + if (content === KvWatchInclude.UpdatesOnly) { + deliver_policy = DeliverPolicy.New; + } + return Object.assign({ - "deliver_policy": history - ? DeliverPolicy.All - : DeliverPolicy.LastPerSubject, + deliver_policy, "ack_policy": AckPolicy.None, "filter_subject": this.fullKeyName(ek), "flow_control": true, @@ -647,7 +656,7 @@ export class Bucket implements KV, KvRemove { }; let count = 0; - const cc = this._buildCC(k, true, co); + const cc = this._buildCC(k, KvWatchInclude.AllHistory, co); const subj = cc.filter_subject!; const copts = consumerOpts(cc); copts.bindStream(this.stream); @@ -711,21 +720,25 @@ export class Bucket implements KV, KvRemove { } async watch( - opts: { - key?: string; - headers_only?: boolean; - initializedFn?: () => void; - } = {}, + opts: KvWatchOptions = {}, ): Promise> { const k = opts.key ?? ">"; const qi = new QueuedIteratorImpl(); const co = {} as Partial; co.headers_only = opts.headers_only || false; + let content = KvWatchInclude.LastValue; + if (opts.include === KvWatchInclude.AllHistory) { + content = KvWatchInclude.AllHistory; + } else if (opts.include === KvWatchInclude.UpdatesOnly) { + content = KvWatchInclude.UpdatesOnly; + } + const ignoreDeletes = opts.ignoreDeletes === true; + let fn = opts.initializedFn; let count = 0; - const cc = this._buildCC(k, false, co); + const cc = this._buildCC(k, content, co); const subj = cc.filter_subject!; const copts = consumerOpts(cc); copts.bindStream(this.stream); @@ -738,6 +751,9 @@ export class Bucket implements KV, KvRemove { } if (jm) { const e = this.jmToEntry(jm); + if (ignoreDeletes && e.operation === "DEL") { + return; + } qi.push(e); qi.received++; @@ -793,7 +809,9 @@ export class Bucket implements KV, KvRemove { async keys(k = ">"): Promise> { const keys = new QueuedIteratorImpl(); - const cc = this._buildCC(k, false, { headers_only: true }); + const cc = this._buildCC(k, KvWatchInclude.LastValue, { + headers_only: true, + }); const subj = cc.filter_subject!; const copts = consumerOpts(cc); copts.bindStream(this.stream); diff --git a/jetstream/mod.ts b/jetstream/mod.ts index 802cead3..29d27d2e 100644 --- a/jetstream/mod.ts +++ b/jetstream/mod.ts @@ -85,6 +85,7 @@ export type { KvOptions, KvPutOptions, KvStatus, + KvWatchInclude, KvWatchOptions, LastForMsgRequest, Lister, diff --git a/jetstream/tests/kv_test.ts b/jetstream/tests/kv_test.ts index ec46f5e3..14326cd8 100644 --- a/jetstream/tests/kv_test.ts +++ b/jetstream/tests/kv_test.ts @@ -66,7 +66,11 @@ import { QueuedIteratorImpl } from "../../nats-base-client/queued_iterator.ts"; import { connect } from "../../src/mod.ts"; import { JSONCodec } from "https://deno.land/x/nats@v1.10.2/nats-base-client/codec.ts"; import { JetStreamOptions } from "../../nats-base-client/core.ts"; -import { JetStreamSubscriptionInfoable, kvPrefix } from "../types.ts"; +import { + JetStreamSubscriptionInfoable, + kvPrefix, + KvWatchInclude, +} from "../types.ts"; Deno.test("kv - key validation", () => { const bad = [ @@ -1780,3 +1784,113 @@ Deno.test("kv - metadata", async () => { assertEquals(status.metadata?.hello, "world"); await cleanup(ns, nc); }); + +Deno.test("kv - watch updates only", async () => { + const { ns, nc } = await setup(jetstreamServerConf({}, true)); + + const js = nc.jetstream(); + const kv = await js.views.kv("K"); + + await kv.put("a", "a"); + await kv.put("b", "b"); + + const d = deferred(); + const iter = await kv.watch({ + include: KvWatchInclude.UpdatesOnly, + initializedFn: () => { + d.resolve(); + }, + }); + + const notifications: string[] = []; + (async () => { + for await (const e of iter) { + notifications.push(e.key); + } + })().then(); + await d; + await kv.put("c", "c"); + await delay(1000); + + assertEquals(notifications.length, 1); + assertEquals(notifications[0], "c"); + + await cleanup(ns, nc); +}); + +Deno.test("kv - watch history", async () => { + const { ns, nc } = await setup(jetstreamServerConf({}, true)); + + const js = nc.jetstream(); + const kv = await js.views.kv("K", { history: 10 }); + + await kv.put("a", "a"); + await kv.put("a", "aa"); + await kv.put("a", "aaa"); + await kv.delete("a"); + + const iter = await kv.watch({ + include: KvWatchInclude.AllHistory, + }); + + const notifications: string[] = []; + (async () => { + for await (const e of iter) { + if (e.operation === "DEL") { + notifications.push(`${e.key}=del`); + } else { + notifications.push(`${e.key}=${e.string()}`); + } + } + })().then(); + await kv.put("c", "c"); + await delay(1000); + + assertEquals(notifications.length, 5); + assertEquals(notifications[0], "a=a"); + assertEquals(notifications[1], "a=aa"); + assertEquals(notifications[2], "a=aaa"); + assertEquals(notifications[3], "a=del"); + assertEquals(notifications[4], "c=c"); + + await cleanup(ns, nc); +}); + +Deno.test("kv - watch history no deletes", async () => { + const { ns, nc } = await setup(jetstreamServerConf({}, true)); + + const js = nc.jetstream(); + const kv = await js.views.kv("K", { history: 10 }); + + await kv.put("a", "a"); + await kv.put("a", "aa"); + await kv.put("a", "aaa"); + await kv.delete("a"); + + const iter = await kv.watch({ + include: KvWatchInclude.AllHistory, + ignoreDeletes: true, + }); + + const notifications: string[] = []; + (async () => { + for await (const e of iter) { + if (e.operation === "DEL") { + notifications.push(`${e.key}=del`); + } else { + notifications.push(`${e.key}=${e.string()}`); + } + } + })().then(); + await kv.put("c", "c"); + await kv.delete("c"); + await delay(1000); + + assertEquals(notifications.length, 4); + assertEquals(notifications[0], "a=a"); + assertEquals(notifications[1], "a=aa"); + assertEquals(notifications[2], "a=aaa"); + assertEquals(notifications[3], "c=c"); + + await cleanup(ns, nc); +}); diff --git a/jetstream/types.ts b/jetstream/types.ts index e233158e..311dff94 100644 --- a/jetstream/types.ts +++ b/jetstream/types.ts @@ -1087,6 +1087,22 @@ export interface KvRemove { remove(k: string): Promise; } +export enum KvWatchInclude { + /** + * Include the last value for all the keys + */ + LastValue = "", + /** + * Include all available history for all keys + */ + AllHistory = "history", + /** + * Don't include history or last values, only notify + * of updates + */ + UpdatesOnly = "updates", +} + export type KvWatchOptions = { /** * A key or wildcarded key following keys as if they were NATS subject names. @@ -1099,9 +1115,17 @@ export type KvWatchOptions = { /** * A callback that notifies when the watch has yielded all the initial values. * Subsequent notifications are updates since the initial watch was established. - * @deprecated */ initializedFn?: () => void; + /** + * Skips notifying deletes. + * @default: false + */ + ignoreDeletes?: boolean; + /** + * Specify what to include in the watcher, by default all last values. + */ + include?: KvWatchInclude; }; export interface RoKV {