Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] [KV] delete and purge at seq #656

Merged
merged 1 commit into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 19 additions & 7 deletions jetstream/kv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import {
KV,
KvCodec,
KvCodecs,
KvDeleteOptions,
KvEntry,
KvOptions,
kvPrefix,
Expand Down Expand Up @@ -545,12 +546,12 @@ export class Bucket implements KV, KvRemove {
}
}

purge(k: string): Promise<void> {
return this._deleteOrPurge(k, "PURGE");
purge(k: string, opts?: Partial<KvDeleteOptions>): Promise<void> {
return this._deleteOrPurge(k, "PURGE", opts);
}

delete(k: string): Promise<void> {
return this._deleteOrPurge(k, "DEL");
delete(k: string, opts?: Partial<KvDeleteOptions>): Promise<void> {
return this._deleteOrPurge(k, "DEL", opts);
}

async purgeDeletes(
Expand Down Expand Up @@ -590,9 +591,13 @@ export class Bucket implements KV, KvRemove {
});
}

async _deleteOrPurge(k: string, op: "DEL" | "PURGE"): Promise<void> {
async _deleteOrPurge(
k: string,
op: "DEL" | "PURGE",
opts?: Partial<KvDeleteOptions>,
): Promise<void> {
if (!this.hasWildcards(k)) {
return this._doDeleteOrPurge(k, op);
return this._doDeleteOrPurge(k, op, opts);
}
const iter = await this.keys(k);
const buf: Promise<void>[] = [];
Expand All @@ -608,14 +613,21 @@ export class Bucket implements KV, KvRemove {
}
}

async _doDeleteOrPurge(k: string, op: "DEL" | "PURGE"): Promise<void> {
async _doDeleteOrPurge(
k: string,
op: "DEL" | "PURGE",
opts?: Partial<KvDeleteOptions>,
): Promise<void> {
const ek = this.encodeKey(k);
this.validateKey(ek);
const h = headers();
h.set(kvOperationHdr, op);
if (op === "PURGE") {
h.set(JsHeaders.RollupHdr, JsHeaders.RollupValueSubject);
}
if (opts?.previousSeq) {
h.set(PubHeaders.ExpectedLastSubjectSequenceHdr, `${opts.previousSeq}`);
}
await this.js.publish(this.subjectForKey(ek, true), Empty, { headers: h });
}

Expand Down
48 changes: 48 additions & 0 deletions jetstream/tests/kv_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1966,3 +1966,51 @@ Deno.test("kv - watch start at", async () => {

await cleanup(ns, nc);
});

Deno.test("kv - delete key if revision", async () => {
const { ns, nc } = await setup(
jetstreamServerConf({}, true),
);
if (await notCompatible(ns, nc, "2.6.3")) {
return;
}
const js = nc.jetstream();
const b = await js.views.kv(nuid.next());
const seq = await b.create("a", Empty);
await assertRejects(
async () => {
await b.delete("a", { previousSeq: 100 });
},
Error,
"wrong last sequence: 1",
undefined,
);

await b.delete("a", { previousSeq: seq });

await cleanup(ns, nc);
});

Deno.test("kv - purge key if revision", async () => {
const { ns, nc } = await setup(
jetstreamServerConf({}, true),
);
if (await notCompatible(ns, nc, "2.6.3")) {
return;
}
const js = nc.jetstream();
const b = await js.views.kv(nuid.next());
const seq = await b.create("a", Empty);

await assertRejects(
async () => {
await b.purge("a", { previousSeq: 2 });
},
Error,
"wrong last sequence: 1",
undefined,
);

await b.purge("a", { previousSeq: seq });
await cleanup(ns, nc);
});
14 changes: 12 additions & 2 deletions jetstream/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1238,15 +1238,17 @@ export interface KV extends RoKV {
* a key or the soft delete marker to be removed without
* additional notification on a watch.
* @param k
* @param opts
*/
delete(k: string): Promise<void>;
delete(k: string, opts?: Partial<KvDeleteOptions>): Promise<void>;

/**
* Deletes and purges the specified key and any value
* history.
* @param k
* @param opts
*/
purge(k: string): Promise<void>;
purge(k: string, opts?: Partial<KvDeleteOptions>): Promise<void>;

/**
* Destroys the underlying stream used by the KV. This
Expand All @@ -1263,6 +1265,14 @@ export interface KvPutOptions {
previousSeq: number;
}

export interface KvDeleteOptions {
/**
* If set the KV must be at the current sequence or the
* put will fail.
*/
previousSeq: number;
}

export type ObjectStoreLink = {
/**
* name of object store storing the data
Expand Down
Loading