From 47b2faa6ffe099f47c2520e17e219493a0921d26 Mon Sep 17 00:00:00 2001 From: Alberto Ricart Date: Tue, 29 Nov 2022 15:40:08 -0400 Subject: [PATCH 1/2] 2.9.1 [BUMP] version 2.9.1 (#536) [BUMP] nbc to 1.9.2 [TEST] added test verifying client behavior under node --- package.json | 4 +-- src/node_transport.ts | 4 +-- test/auth.js | 2 +- test/basics.js | 67 ++++++++++++++++++++++++++++++++++++++++++- 4 files changed, 71 insertions(+), 6 deletions(-) diff --git a/package.json b/package.json index 266039c9..642ab4c3 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "nats", - "version": "2.9.0", + "version": "2.9.1", "description": "Node.js client for NATS, a lightweight, high-performance cloud native messaging system", "keywords": [ "nats", @@ -40,7 +40,7 @@ "build": "tsc", "cjs": "deno run --allow-all bin/cjs-fix-imports.ts -o nats-base-client/ ./.deps/nats.deno/nats-base-client/", "clean": "shx rm -Rf ./lib/* ./nats-base-client ./.deps", - "clone-nbc": "shx mkdir -p ./.deps && cd ./.deps && git clone --branch v1.9.0 https://github.com/nats-io/nats.deno.git", + "clone-nbc": "shx mkdir -p ./.deps && cd ./.deps && git clone --branch v1.9.2 https://github.com/nats-io/nats.deno.git", "fmt": "deno fmt ./src/ ./examples/ ./test/", "prepack": "npm run clone-nbc && npm run cjs && npm run check-package && npm run build", "ava": "nyc ava --verbose -T 60000", diff --git a/src/node_transport.ts b/src/node_transport.ts index 2e2e98d3..948064eb 100644 --- a/src/node_transport.ts +++ b/src/node_transport.ts @@ -1,5 +1,5 @@ /* - * Copyright 2020-2021 The NATS Authors + * Copyright 2020-2022 The NATS Authors * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -34,7 +34,7 @@ const { resolve } = require("path"); const { readFile, existsSync } = require("fs"); const dns = require("dns"); -const VERSION = "2.9.0"; +const VERSION = "2.9.1"; const LANG = "nats.js"; export class NodeTransport implements Transport { diff --git a/test/auth.js b/test/auth.js index 9a5828d9..93a0cc7c 100644 --- a/test/auth.js +++ b/test/auth.js @@ -301,7 +301,7 @@ test("auth - custom error", async (t) => { ).then(() => { t.fail("shouldn't have connected"); }).catch((err) => { - t.is(err.code, ErrorCode.BadAuthentication); + t.is(err.message, "user code exploded"); }); await ns.stop(); }); diff --git a/test/basics.js b/test/basics.js index c332b5fe..8b599ea8 100644 --- a/test/basics.js +++ b/test/basics.js @@ -20,12 +20,15 @@ const { StringCodec, Empty, jwtAuthenticator, + AckPolicy, } = require( "../lib/src/mod", ); const net = require("net"); -const { deferred, delay } = require("../lib/nats-base-client/internal_mod"); +const { deferred, delay, nuid } = require( + "../lib/nats-base-client/internal_mod", +); const { Lock } = require("./helpers/lock"); const { NatsServer } = require("./helpers/launcher"); const { jetstreamServerConf } = require("./helpers/jsutil.js"); @@ -788,3 +791,65 @@ test("basics - resolve", async (t) => { t.true(srv.resolves && srv.resolves.length > 1); await nc.close(); }); + +test("basics - js fetch on stopped server doesn't close", async (t) => { + let ns = await NatsServer.start(jetstreamServerConf()); + const nc = await connect({ + port: ns.port, + maxReconnectAttempts: -1, + }); + const status = nc.status(); + (async () => { + let reconnects = 0; + for await (const s of status) { + switch (s.type) { + case "reconnecting": + reconnects++; + if (reconnects === 2) { + ns.restart().then((s) => { + ns = s; + }); + } + break; + case "reconnect": + setTimeout(() => { + loop = false; + }); + break; + default: + // nothing + } + } + })().then(); + + const jsm = await nc.jetstreamManager(); + const si = await jsm.streams.add({ name: nuid.next(), subjects: ["test"] }); + const { name: stream } = si.config; + await jsm.consumers.add(stream, { + durable_name: "dur", + ack_policy: AckPolicy.Explicit, + }); + + const js = nc.jetstream(); + setTimeout(() => { + ns.stop(); + }, 2000); + + let loop = true; + while (true) { + try { + const iter = js.fetch(stream, "dur", { batch: 1, expires: 500 }); + for await (const m of iter) { + m.ack(); + } + if (!loop) { + break; + } + } catch (err) { + t.fail(`shouldn't have errored: ${err.message}`); + } + } + t.pass(); + await nc.close(); + await ns.stop(); +}); From e672fe6c7ff8465de58fd05d436689563dd2a70d Mon Sep 17 00:00:00 2001 From: Alberto Ricart Date: Thu, 1 Dec 2022 16:49:33 -0400 Subject: [PATCH 2/2] [BUMP] version 2.9.2 (#537) [BUMP] nbc to 1.9.3 [TEST] fixed a flapping test --- package.json | 4 +-- src/node_transport.ts | 2 +- test/basics.js | 61 +++++-------------------------------------- 3 files changed, 10 insertions(+), 57 deletions(-) diff --git a/package.json b/package.json index 642ab4c3..67f685cb 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "nats", - "version": "2.9.1", + "version": "2.9.2", "description": "Node.js client for NATS, a lightweight, high-performance cloud native messaging system", "keywords": [ "nats", @@ -40,7 +40,7 @@ "build": "tsc", "cjs": "deno run --allow-all bin/cjs-fix-imports.ts -o nats-base-client/ ./.deps/nats.deno/nats-base-client/", "clean": "shx rm -Rf ./lib/* ./nats-base-client ./.deps", - "clone-nbc": "shx mkdir -p ./.deps && cd ./.deps && git clone --branch v1.9.2 https://github.com/nats-io/nats.deno.git", + "clone-nbc": "shx mkdir -p ./.deps && cd ./.deps && git clone --branch v1.9.3 https://github.com/nats-io/nats.deno.git", "fmt": "deno fmt ./src/ ./examples/ ./test/", "prepack": "npm run clone-nbc && npm run cjs && npm run check-package && npm run build", "ava": "nyc ava --verbose -T 60000", diff --git a/src/node_transport.ts b/src/node_transport.ts index 948064eb..c91a5090 100644 --- a/src/node_transport.ts +++ b/src/node_transport.ts @@ -34,7 +34,7 @@ const { resolve } = require("path"); const { readFile, existsSync } = require("fs"); const dns = require("dns"); -const VERSION = "2.9.1"; +const VERSION = "2.9.2"; const LANG = "nats.js"; export class NodeTransport implements Transport { diff --git a/test/basics.js b/test/basics.js index 8b599ea8..5c9d9f32 100644 --- a/test/basics.js +++ b/test/basics.js @@ -487,62 +487,15 @@ test("basics - server gone", async (t) => { }); test("basics - server error", async (t) => { - const PING = { re: /^PING\r\n/i, out: "PONG\r\n" }; - const CONNECT = { re: /^CONNECT\s+([^\r\n]+)\r\n/i, out: "" }; - - const CMDS = [PING, CONNECT]; - - let inbound; - - const pp = deferred(); - const server = net.createServer(); - server.on("connection", (conn) => { - const buf = Buffer.from( - `INFO {"server_id":"FAKE","server_name":"FAKE","version":"2.9.4","proto":1,"go":"go1.19.2","host":"127.0.0.1","port":${port},"headers":true,"max_payload":1048576,"jetstream":true,"client_id":4,"client_ip":"127.0.0.1"}\r\n`, - ); - conn.write(buf); - - conn.on("data", (data) => { - if (inbound) { - inbound = Buffer.concat([inbound, data]); - } else { - inbound = data; - } - while (data.length > 0) { - let m = null; - for (let i = 0; i < CMDS.length; i++) { - m = CMDS[i].re.exec(inbound); - if (m) { - const len = m[0].length; - if (len <= inbound.length) { - inbound = inbound.slice(len); - conn.write(Buffer.from(CMDS[i].out)); - if (i === 0) { - // fail as if the server sent an error - conn.write(Buffer.from("-ERR 'here'\r\n")); - } - break; - } - } - } - if (m === null) { - break; - } - } - }); - }); - - server.listen(0, (v) => { - const p = server.address().port; - pp.resolve(p); + const ns = await NatsServer.start(); + t.plan(1); + const nc = await connect({ port: ns.port, reconnect: false }); + setTimeout(() => { + nc.protocol.sendCommand("X\r\n"); }); - - const port = await pp; - const nc = await connect({ port, reconnect: false }); const err = await nc.closed(); - t.is(err.message, "'here'"); - - server.close(); + t.is(err?.code, ErrorCode.ProtocolError); + await ns.stop(); }); test("basics - subscription with timeout", async (t) => {