Skip to content

Commit

Permalink
Merge branch 'main' into dev
Browse files Browse the repository at this point in the history
# Conflicts:
#	package.json
#	src/node_transport.ts
  • Loading branch information
aricart committed Dec 1, 2022
2 parents 0c37d28 + e672fe6 commit 8055ea7
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 57 deletions.
2 changes: 1 addition & 1 deletion src/node_transport.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2021 The NATS Authors
* Copyright 2020-2022 The NATS Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
Expand Down
2 changes: 1 addition & 1 deletion test/auth.js
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ test("auth - custom error", async (t) => {
).then(() => {
t.fail("shouldn't have connected");
}).catch((err) => {
t.is(err.code, ErrorCode.BadAuthentication);
t.is(err.message, "user code exploded");
});
await ns.stop();
});
Expand Down
128 changes: 73 additions & 55 deletions test/basics.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@ const {
StringCodec,
Empty,
jwtAuthenticator,
AckPolicy,
} = require(
"../lib/src/mod",
);
const net = require("net");

const { deferred, delay } = require("../lib/nats-base-client/internal_mod");
const { deferred, delay, nuid } = require(
"../lib/nats-base-client/internal_mod",
);
const { Lock } = require("./helpers/lock");
const { NatsServer } = require("./helpers/launcher");
const { jetstreamServerConf } = require("./helpers/jsutil.js");
Expand Down Expand Up @@ -484,62 +487,15 @@ test("basics - server gone", async (t) => {
});

test("basics - server error", async (t) => {
const PING = { re: /^PING\r\n/i, out: "PONG\r\n" };
const CONNECT = { re: /^CONNECT\s+([^\r\n]+)\r\n/i, out: "" };

const CMDS = [PING, CONNECT];

let inbound;

const pp = deferred();
const server = net.createServer();
server.on("connection", (conn) => {
const buf = Buffer.from(
`INFO {"server_id":"FAKE","server_name":"FAKE","version":"2.9.4","proto":1,"go":"go1.19.2","host":"127.0.0.1","port":${port},"headers":true,"max_payload":1048576,"jetstream":true,"client_id":4,"client_ip":"127.0.0.1"}\r\n`,
);
conn.write(buf);

conn.on("data", (data) => {
if (inbound) {
inbound = Buffer.concat([inbound, data]);
} else {
inbound = data;
}
while (data.length > 0) {
let m = null;
for (let i = 0; i < CMDS.length; i++) {
m = CMDS[i].re.exec(inbound);
if (m) {
const len = m[0].length;
if (len <= inbound.length) {
inbound = inbound.slice(len);
conn.write(Buffer.from(CMDS[i].out));
if (i === 0) {
// fail as if the server sent an error
conn.write(Buffer.from("-ERR 'here'\r\n"));
}
break;
}
}
}
if (m === null) {
break;
}
}
});
});

server.listen(0, (v) => {
const p = server.address().port;
pp.resolve(p);
const ns = await NatsServer.start();
t.plan(1);
const nc = await connect({ port: ns.port, reconnect: false });
setTimeout(() => {
nc.protocol.sendCommand("X\r\n");
});

const port = await pp;
const nc = await connect({ port, reconnect: false });
const err = await nc.closed();
t.is(err.message, "'here'");

server.close();
t.is(err?.code, ErrorCode.ProtocolError);
await ns.stop();
});

test("basics - subscription with timeout", async (t) => {
Expand Down Expand Up @@ -788,3 +744,65 @@ test("basics - resolve", async (t) => {
t.true(srv.resolves && srv.resolves.length > 1);
await nc.close();
});

test("basics - js fetch on stopped server doesn't close", async (t) => {
let ns = await NatsServer.start(jetstreamServerConf());
const nc = await connect({
port: ns.port,
maxReconnectAttempts: -1,
});
const status = nc.status();
(async () => {
let reconnects = 0;
for await (const s of status) {
switch (s.type) {
case "reconnecting":
reconnects++;
if (reconnects === 2) {
ns.restart().then((s) => {
ns = s;
});
}
break;
case "reconnect":
setTimeout(() => {
loop = false;
});
break;
default:
// nothing
}
}
})().then();

const jsm = await nc.jetstreamManager();
const si = await jsm.streams.add({ name: nuid.next(), subjects: ["test"] });
const { name: stream } = si.config;
await jsm.consumers.add(stream, {
durable_name: "dur",
ack_policy: AckPolicy.Explicit,
});

const js = nc.jetstream();
setTimeout(() => {
ns.stop();
}, 2000);

let loop = true;
while (true) {
try {
const iter = js.fetch(stream, "dur", { batch: 1, expires: 500 });
for await (const m of iter) {
m.ack();
}
if (!loop) {
break;
}
} catch (err) {
t.fail(`shouldn't have errored: ${err.message}`);
}
}
t.pass();
await nc.close();
await ns.stop();
});

0 comments on commit 8055ea7

Please sign in to comment.