Skip to content

Commit

Permalink
Merge pull request #494 from nats-io/subj-transforms
Browse files Browse the repository at this point in the history
[FEAT] subject transforms
  • Loading branch information
aricart authored Mar 15, 2023
2 parents fb4ba36 + c0265ba commit 338ce74
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 0 deletions.
21 changes: 21 additions & 0 deletions nats-base-client/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1792,6 +1792,17 @@ export interface StreamInfo extends ApiPaged {
alternates?: StreamAlternate[];
}

export interface SubjectTransformConfig {
/**
* The source pattern
*/
src?: string;
/**
* The destination pattern
*/
dest: string;
}

export interface StreamConfig extends StreamUpdateConfig {
/**
* A unique name for the Stream
Expand Down Expand Up @@ -1918,6 +1929,11 @@ export interface StreamUpdateConfig {
* 2.10.x and better.
*/
metadata?: Record<string, string>;
/**
* Apply a subject transform to incoming messages before doing anything else.
* This feature only supported on 2.10.x and better.
*/
subject_transform?: SubjectTransformConfig;
}

export interface Republish {
Expand Down Expand Up @@ -1976,6 +1992,11 @@ export interface StreamSource {
* if external is set.
*/
domain?: string;
/**
* Apply a subject transform to sourced messages before doing anything else.
* This feature only supported on 2.10.x and better.
*/
subject_transform_dest?: string;
}

export interface Placement {
Expand Down
81 changes: 81 additions & 0 deletions tests/jetstream_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4389,3 +4389,84 @@ Deno.test("jetstream - jsmsg decode", async () => {

await cleanup(ns, nc);
});

Deno.test("jetstream - input transform", async () => {
const { ns, nc } = await setup(jetstreamServerConf());
if (await notCompatible(ns, nc, "2.10.0")) {
return;
}
const name = nuid.next();
const jsm = await nc.jetstreamManager();

const si = await jsm.streams.add({
name,
subjects: ["foo"],
subject_transform: {
src: ">",
dest: "transformed.>",
},
storage: StorageType.Memory,
});

assertEquals(si.config.subject_transform, {
src: ">",
dest: "transformed.>",
});

const js = nc.jetstream();
const pa = await js.publish("foo", Empty);
assertEquals(pa.seq, 1);

const m = await jsm.streams.getMessage(si.config.name, { seq: 1 });
assertEquals(m.subject, "transformed.foo");

await cleanup(ns, nc);
});

Deno.test("jetstream - source transform", async () => {
const { ns, nc } = await setup(jetstreamServerConf());
if (await notCompatible(ns, nc, "2.10.0")) {
return;
}
const name = nuid.next();
const jsm = await nc.jetstreamManager();

const proms = ["foo", "bar", "baz"].map((subj) => {
return jsm.streams.add({
name: subj,
subjects: [subj],
storage: StorageType.Memory,
});
});
await Promise.all(proms);

const js = nc.jetstream();
await Promise.all([
js.publish("foo", Empty),
js.publish("bar", Empty),
js.publish("baz", Empty),
]);

await jsm.streams.add({
name: "sourced",
storage: StorageType.Memory,
sources: [
{ name: "foo", subject_transform_dest: "foo2.>" },
{ name: "bar" },
{ name: "baz" },
],
});

while (true) {
const si = await jsm.streams.info("sourced");
if (si.state.messages === 3) {
break;
}
await delay(100);
}

const m = await jsm.streams.getMessage("sourced", { seq: 1 });
assertEquals(m.subject, "foo2.foo");

await cleanup(ns, nc);
});

0 comments on commit 338ce74

Please sign in to comment.