Skip to content

Commit

Permalink
Added support for enabling Nats JetStream (#877)
Browse files Browse the repository at this point in the history
  • Loading branch information
jonjomckay authored Nov 29, 2024
1 parent 686a8c6 commit 3cf1da3
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 26 deletions.
4 changes: 4 additions & 0 deletions docs/modules/nats.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,7 @@ npm install @testcontainers/nats --save-dev
<!--codeinclude-->
[Set credentials:](../../packages/modules/nats/src/nats-container.test.ts) inside_block:credentials
<!--/codeinclude-->

<!--codeinclude-->
[Enable JetStream:](../../packages/modules/nats/src/nats-container.test.ts) inside_block:jetstream
<!--/codeinclude-->
39 changes: 38 additions & 1 deletion packages/modules/nats/src/nats-container.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,43 @@ describe("NatsContainer", () => {
});
// }

// jetstream {
it("should start with JetStream ", async () => {
// enable JetStream
const container = await new NatsContainer().withJetStream().start();

const nc = await connect(container.getConnectionOptions());

// ensure JetStream is enabled, otherwise this will throw an error
await nc.jetstream().jetstreamManager();

// close the connection
await nc.close();
// check if the close was OK
const err = await nc.closed();
expect(err).toBe(undefined);

await container.stop();
});

it("should fail without JetStream ", async () => {
const container = await new NatsContainer().start();

const nc = await connect(container.getConnectionOptions());

// ensure JetStream is not enabled, as this will throw an error
await expect(nc.jetstream().jetstreamManager()).rejects.toThrow("503");

// close the connection
await nc.close();
// check if the close was OK
const err = await nc.closed();
expect(err).toBe(undefined);

await container.stop();
});
// }

it("should immediately end when started with version argument ", async () => {
// for the complete list of available arguments see:
// See Command Line Options section inside [NATS docker image documentation](https://hub.docker.com/_/nats)
Expand All @@ -75,6 +112,6 @@ describe("NatsContainer", () => {
await connect(container.getConnectionOptions());
}

await expect(outputVersionAndExit()).rejects.toThrowError();
await expect(outputVersionAndExit()).rejects.toThrow();
});
});
75 changes: 50 additions & 25 deletions packages/modules/nats/src/nats-container.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,44 +7,51 @@ const HTTP_MANAGEMENT_PORT = 8222;
const USER_ARGUMENT_KEY = "--user";
const PASS_ARGUMENT_KEY = "--pass";

function buildCmdsFromArgs(args: { [p: string]: string }): string[] {
const result: string[] = [];
result.push("nats-server");

for (const argsKey in args) {
result.push(argsKey);
result.push(args[argsKey]);
}
return result;
}

export class NatsContainer extends GenericContainer {
private args: { [name: string]: string } = {};
private args = new Set<string>();
private values = new Map<string, string | undefined>();

constructor(image = "nats:2.8.4-alpine") {
super(image);

this.args[USER_ARGUMENT_KEY] = "test";
this.args[PASS_ARGUMENT_KEY] = "test";
this.withUsername("test");
this.withPass("test");

this.withExposedPorts(CLIENT_PORT, ROUTING_PORT_FOR_CLUSTERING, HTTP_MANAGEMENT_PORT)
.withWaitStrategy(Wait.forLogMessage(/.*Server is ready.*/))
.withStartupTimeout(120_000);
}

/**
* Enable JetStream
*
* @returns {this}
*/
public withJetStream(): this {
this.withArg("--jetstream");
return this;
}

public withUsername(user: string): this {
this.args[USER_ARGUMENT_KEY] = user;
this.withArg(USER_ARGUMENT_KEY, user);
return this;
}

public withPass(pass: string): this {
this.args[PASS_ARGUMENT_KEY] = pass;
this.withArg(PASS_ARGUMENT_KEY, pass);
return this;
}

public withArg(name: string, value: string) {
name = NatsContainer.ensureDashInFrontOfArgumentName(name);
this.args[name] = value;
public withArg(name: string, value: string): this;
public withArg(name: string): this;
public withArg(...args: [string, string] | [string]): this {
const [name, value] = args;

const correctName = NatsContainer.ensureDashInFrontOfArgumentName(name);
this.args.add(correctName);
if (args.length === 2) {
this.values.set(correctName, value);
}
return this;
}

Expand All @@ -61,23 +68,41 @@ export class NatsContainer extends GenericContainer {
}

public override async start(): Promise<StartedNatsContainer> {
this.withCommand(buildCmdsFromArgs(this.args));
this.withCommand(this.getNormalizedCommand());
return new StartedNatsContainer(await super.start(), this.getUser(), this.getPass());
}

private getUser(): string {
return this.args[USER_ARGUMENT_KEY];
private getUser(): string | undefined {
return this.values.get(USER_ARGUMENT_KEY);
}

private getPass(): string {
return this.args[PASS_ARGUMENT_KEY];
private getPass(): string | undefined {
return this.values.get(PASS_ARGUMENT_KEY);
}

private getNormalizedCommand(): string[] {
const result: string[] = ["nats-server"];
for (const arg of this.args) {
result.push(arg);
if (this.values.has(arg)) {
const value = this.values.get(arg);
if (value) {
result.push(value);
}
}
}
return result;
}
}

export class StartedNatsContainer extends AbstractStartedContainer {
private readonly connectionOptions: NatsConnectionOptions;

constructor(startedTestContainer: StartedTestContainer, readonly username: string, readonly password: string) {
constructor(
startedTestContainer: StartedTestContainer,
readonly username: string | undefined,
readonly password: string | undefined
) {
super(startedTestContainer);
const port = startedTestContainer.getMappedPort(CLIENT_PORT);
this.connectionOptions = {
Expand Down

0 comments on commit 3cf1da3

Please sign in to comment.