Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

watcher support for all history, last values or updates as well as get notifications on deletes #577

Merged
merged 1 commit into from
Aug 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions jetstream/internal_mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ export type {
KvOptions,
KvPutOptions,
KvStatus,
KvWatchInclude,
KvWatchOptions,
ObjectInfo,
ObjectResult,
Expand Down
44 changes: 31 additions & 13 deletions jetstream/kv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@
*/

import {
ErrorCode,
MsgHdrs,
NatsConnection,
NatsError,
Payload,
QueuedIterator,
} from "../nats-base-client/core.ts";
import { millis, nanos } from "./jsutil.ts";
import { QueuedIteratorImpl } from "../nats-base-client/queued_iterator.ts";
Expand All @@ -39,12 +41,13 @@ import {
KvPutOptions,
KvRemove,
KvStatus,
KvWatchInclude,
KvWatchOptions,
StoredMsg,
} from "./types.ts";
import { compare, Feature, parseSemVer } from "../nats-base-client/semver.ts";
import { deferred } from "../nats-base-client/util.ts";
import { Empty } from "../nats-base-client/encoders.ts";
import { ErrorCode, QueuedIterator } from "../nats-base-client/core.ts";
import {
AckPolicy,
ConsumerConfig,
Expand Down Expand Up @@ -612,16 +615,22 @@ export class Bucket implements KV, KvRemove {

_buildCC(
k: string,
history = false,
content: KvWatchInclude,
opts: Partial<ConsumerConfig> = {},
): Partial<ConsumerConfig> {
const ek = this.encodeKey(k);
this.validateSearchKey(k);

let deliver_policy = DeliverPolicy.LastPerSubject;
if (content === KvWatchInclude.AllHistory) {
deliver_policy = DeliverPolicy.All;
}
if (content === KvWatchInclude.UpdatesOnly) {
deliver_policy = DeliverPolicy.New;
}

return Object.assign({
"deliver_policy": history
? DeliverPolicy.All
: DeliverPolicy.LastPerSubject,
deliver_policy,
"ack_policy": AckPolicy.None,
"filter_subject": this.fullKeyName(ek),
"flow_control": true,
Expand All @@ -647,7 +656,7 @@ export class Bucket implements KV, KvRemove {
};
let count = 0;

const cc = this._buildCC(k, true, co);
const cc = this._buildCC(k, KvWatchInclude.AllHistory, co);
const subj = cc.filter_subject!;
const copts = consumerOpts(cc);
copts.bindStream(this.stream);
Expand Down Expand Up @@ -711,21 +720,25 @@ export class Bucket implements KV, KvRemove {
}

async watch(
opts: {
key?: string;
headers_only?: boolean;
initializedFn?: () => void;
} = {},
opts: KvWatchOptions = {},
): Promise<QueuedIterator<KvEntry>> {
const k = opts.key ?? ">";
const qi = new QueuedIteratorImpl<KvEntry>();
const co = {} as Partial<ConsumerConfig>;
co.headers_only = opts.headers_only || false;

let content = KvWatchInclude.LastValue;
if (opts.include === KvWatchInclude.AllHistory) {
content = KvWatchInclude.AllHistory;
} else if (opts.include === KvWatchInclude.UpdatesOnly) {
content = KvWatchInclude.UpdatesOnly;
}
const ignoreDeletes = opts.ignoreDeletes === true;

let fn = opts.initializedFn;
let count = 0;

const cc = this._buildCC(k, false, co);
const cc = this._buildCC(k, content, co);
const subj = cc.filter_subject!;
const copts = consumerOpts(cc);
copts.bindStream(this.stream);
Expand All @@ -738,6 +751,9 @@ export class Bucket implements KV, KvRemove {
}
if (jm) {
const e = this.jmToEntry(jm);
if (ignoreDeletes && e.operation === "DEL") {
return;
}
qi.push(e);
qi.received++;

Expand Down Expand Up @@ -793,7 +809,9 @@ export class Bucket implements KV, KvRemove {

async keys(k = ">"): Promise<QueuedIterator<string>> {
const keys = new QueuedIteratorImpl<string>();
const cc = this._buildCC(k, false, { headers_only: true });
const cc = this._buildCC(k, KvWatchInclude.LastValue, {
headers_only: true,
});
const subj = cc.filter_subject!;
const copts = consumerOpts(cc);
copts.bindStream(this.stream);
Expand Down
1 change: 1 addition & 0 deletions jetstream/mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ export type {
KvOptions,
KvPutOptions,
KvStatus,
KvWatchInclude,
KvWatchOptions,
LastForMsgRequest,
Lister,
Expand Down
116 changes: 115 additions & 1 deletion jetstream/tests/kv_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,11 @@ import { QueuedIteratorImpl } from "../../nats-base-client/queued_iterator.ts";
import { connect } from "../../src/mod.ts";
import { JSONCodec } from "https://deno.land/x/[email protected]/nats-base-client/codec.ts";
import { JetStreamOptions } from "../../nats-base-client/core.ts";
import { JetStreamSubscriptionInfoable, kvPrefix } from "../types.ts";
import {
JetStreamSubscriptionInfoable,
kvPrefix,
KvWatchInclude,
} from "../types.ts";

Deno.test("kv - key validation", () => {
const bad = [
Expand Down Expand Up @@ -1780,3 +1784,113 @@ Deno.test("kv - metadata", async () => {
assertEquals(status.metadata?.hello, "world");
await cleanup(ns, nc);
});

Deno.test("kv - watch updates only", async () => {
const { ns, nc } = await setup(jetstreamServerConf({}, true));

const js = nc.jetstream();
const kv = await js.views.kv("K");

await kv.put("a", "a");
await kv.put("b", "b");

const d = deferred();
const iter = await kv.watch({
include: KvWatchInclude.UpdatesOnly,
initializedFn: () => {
d.resolve();
},
});

const notifications: string[] = [];
(async () => {
for await (const e of iter) {
notifications.push(e.key);
}
})().then();
await d;
await kv.put("c", "c");
await delay(1000);

assertEquals(notifications.length, 1);
assertEquals(notifications[0], "c");

await cleanup(ns, nc);
});

Deno.test("kv - watch history", async () => {
const { ns, nc } = await setup(jetstreamServerConf({}, true));

const js = nc.jetstream();
const kv = await js.views.kv("K", { history: 10 });

await kv.put("a", "a");
await kv.put("a", "aa");
await kv.put("a", "aaa");
await kv.delete("a");

const iter = await kv.watch({
include: KvWatchInclude.AllHistory,
});

const notifications: string[] = [];
(async () => {
for await (const e of iter) {
if (e.operation === "DEL") {
notifications.push(`${e.key}=del`);
} else {
notifications.push(`${e.key}=${e.string()}`);
}
}
})().then();
await kv.put("c", "c");
await delay(1000);

assertEquals(notifications.length, 5);
assertEquals(notifications[0], "a=a");
assertEquals(notifications[1], "a=aa");
assertEquals(notifications[2], "a=aaa");
assertEquals(notifications[3], "a=del");
assertEquals(notifications[4], "c=c");

await cleanup(ns, nc);
});

Deno.test("kv - watch history no deletes", async () => {
const { ns, nc } = await setup(jetstreamServerConf({}, true));

const js = nc.jetstream();
const kv = await js.views.kv("K", { history: 10 });

await kv.put("a", "a");
await kv.put("a", "aa");
await kv.put("a", "aaa");
await kv.delete("a");

const iter = await kv.watch({
include: KvWatchInclude.AllHistory,
ignoreDeletes: true,
});

const notifications: string[] = [];
(async () => {
for await (const e of iter) {
if (e.operation === "DEL") {
notifications.push(`${e.key}=del`);
} else {
notifications.push(`${e.key}=${e.string()}`);
}
}
})().then();
await kv.put("c", "c");
await kv.delete("c");
await delay(1000);

assertEquals(notifications.length, 4);
assertEquals(notifications[0], "a=a");
assertEquals(notifications[1], "a=aa");
assertEquals(notifications[2], "a=aaa");
assertEquals(notifications[3], "c=c");

await cleanup(ns, nc);
});
26 changes: 25 additions & 1 deletion jetstream/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1087,6 +1087,22 @@ export interface KvRemove {
remove(k: string): Promise<void>;
}

export enum KvWatchInclude {
/**
* Include the last value for all the keys
*/
LastValue = "",
/**
* Include all available history for all keys
*/
AllHistory = "history",
/**
* Don't include history or last values, only notify
* of updates
*/
UpdatesOnly = "updates",
}

export type KvWatchOptions = {
/**
* A key or wildcarded key following keys as if they were NATS subject names.
Expand All @@ -1099,9 +1115,17 @@ export type KvWatchOptions = {
/**
* A callback that notifies when the watch has yielded all the initial values.
* Subsequent notifications are updates since the initial watch was established.
* @deprecated
*/
initializedFn?: () => void;
/**
* Skips notifying deletes.
* @default: false
*/
ignoreDeletes?: boolean;
/**
* Specify what to include in the watcher, by default all last values.
*/
include?: KvWatchInclude;
};

export interface RoKV {
Expand Down