diff --git a/jetstream/jsmstream_api.ts b/jetstream/jsmstream_api.ts index a16b846f..a7f74f51 100644 --- a/jetstream/jsmstream_api.ts +++ b/jetstream/jsmstream_api.ts @@ -254,7 +254,7 @@ export class StreamAPIImpl extends BaseApiClient implements StreamAPI { context: string, src: Partial, ): void { - const count = src.subject_transforms?.length || 0; + const count = src?.subject_transforms?.length || 0; if (count > 0) { const { min, ok } = nci.features.get( Feature.JS_STREAM_SOURCE_SUBJECT_TRANSFORM, diff --git a/jetstream/kv.ts b/jetstream/kv.ts index 1199d488..c1af622a 100644 --- a/jetstream/kv.ts +++ b/jetstream/kv.ts @@ -267,14 +267,25 @@ export class Bucket implements KV, KvRemove { } else if (opts.sources) { const sources = opts.sources.map((s) => { const c = Object.assign({}, s) as StreamSource; + const srcBucketName = c.name.startsWith(kvPrefix) + ? c.name.substring(kvPrefix.length) + : c.name; if (!c.name.startsWith(kvPrefix)) { c.name = `${kvPrefix}${c.name}`; } + if (!s.external && srcBucketName !== this.bucket) { + c.subject_transforms = [ + { src: `$KV.${srcBucketName}.>`, dest: `$KV.${this.bucket}.>` }, + ]; + } + return c; }); sc.sources = sources as unknown[] as StreamSource[]; + sc.subjects = [this.subjectForBucket()]; } else { sc.subjects = [this.subjectForBucket()]; } + if (opts.metadata) { sc.metadata = opts.metadata; } diff --git a/jetstream/tests/kv_test.ts b/jetstream/tests/kv_test.ts index c3d6b94b..dbe055f5 100644 --- a/jetstream/tests/kv_test.ts +++ b/jetstream/tests/kv_test.ts @@ -2210,3 +2210,30 @@ Deno.test("kv - keys filter", async () => { await cleanup(ns, nc); }); + +Deno.test("kv - sourced", async () => { + const { ns, nc } = await setup( + jetstreamServerConf({}), + ); + if (await notCompatible(ns, nc, "2.6.3")) { + return; + } + + const js = nc.jetstream(); + const source = await js.views.kv("source"); + const target = await js.views.kv("target", { + sources: [{ name: "source" }], + }); + + await source.put("hello", "world"); + for (let i = 0; i < 10; i++) { + const v = await target.get("hello"); + if (v === null) { + await delay(250); + continue; + } + assertEquals(v.string(), "world"); + } + + await cleanup(ns, nc); +});