Skip to content

Commit

Permalink
feat(core,node,deno): added a check on the connect function that the …
Browse files Browse the repository at this point in the history
…client wasn't given a ws/s:// url as these are difficult issues to diagnose #129

Signed-off-by: Alberto Ricart <[email protected]>
  • Loading branch information
aricart committed Nov 9, 2024
1 parent c7d578b commit 97e7e31
Show file tree
Hide file tree
Showing 40 changed files with 235 additions and 58 deletions.
2 changes: 1 addition & 1 deletion core/deno.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@nats-io/nats-core",
"version": "3.0.0-36",
"version": "3.0.0-38",
"exports": {
".": "./src/mod.ts",
"./internal": "./src/internal_mod.ts"
Expand Down
2 changes: 1 addition & 1 deletion core/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@nats-io/nats-core",
"version": "3.0.0-36",
"version": "3.0.0-38",
"files": [
"lib/",
"LICENSE",
Expand Down
1 change: 1 addition & 0 deletions core/src/internal_mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ export {
checkUnsupportedOption,
DEFAULT_MAX_RECONNECT_ATTEMPTS,
defaultOptions,
hasWsProtocol,
parseOptions,
} from "./options.ts";
export { RequestOne } from "./request.ts";
Expand Down
1 change: 1 addition & 0 deletions core/src/mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ export {
Empty,
errors,
Events,
hasWsProtocol,
headers,
InvalidArgumentError,
InvalidOperationError,
Expand Down
18 changes: 18 additions & 0 deletions core/src/options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,24 @@ export function defaultOptions(): ConnectionOptions {
} as ConnectionOptions;
}

export function hasWsProtocol(opts?: ConnectionOptions): boolean {
if (opts) {
let { servers } = opts;
if (typeof servers === "string") {
servers = [servers];
}
if (servers) {
for (let i = 0; i < servers.length; i++) {
const s = servers[i].toLowerCase();
if (s.startsWith("ws://") || s.startsWith("wss://")) {
return true;
}
}
}
}
return false;
}

export function buildAuthenticator(
opts: ConnectionOptions,
): Authenticator {
Expand Down
2 changes: 1 addition & 1 deletion core/src/version.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
// This file is generated - do not edit
export const version = "3.0.0-36";
export const version = "3.0.0-38";
15 changes: 15 additions & 0 deletions core/tests/properties_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import {
assert,
assertEquals,
assertExists,
assertFalse,
assertMatch,
} from "jsr:@std/assert";

Expand All @@ -35,6 +36,7 @@ import {
} from "../src/internal_mod.ts";

import { NatsServer } from "../../test_helpers/launcher.ts";
import { hasWsProtocol } from "../src/options.ts";

Deno.test("properties - VERSION is semver", async () => {
const ns = await NatsServer.start();
Expand Down Expand Up @@ -192,3 +194,16 @@ Deno.test("properties - multi", () => {
assert(cc.sig.length > 0);
assertExists(cc.nkey);
});

Deno.test("properties - hasWsProtocol", () => {
assertFalse(hasWsProtocol({ servers: "127.0.0.1:4222" }));
assert(hasWsProtocol({ servers: "WS://127.0.0.1:4222" }));
assert(hasWsProtocol({ servers: "ws://127.0.0.1:4222" }));
assert(hasWsProtocol({ servers: "WSS://127.0.0.1:4222" }));
assert(hasWsProtocol({ servers: "ws://127.0.0.1:4222" }));
assert(
hasWsProtocol({
servers: ["nats://127.0.0.1:4222", "ws://127.0.0.1:4222"],
}),
);
});
2 changes: 1 addition & 1 deletion jetstream/deno.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,6 @@
"test": "deno test -A --parallel --reload --trace-leaks --quiet tests/ --import-map=import_map.json"
},
"imports": {
"@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-36"
"@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-38"
}
}
4 changes: 2 additions & 2 deletions jetstream/examples/01_consumers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
* limitations under the License.
*/

import { connect } from "jsr:@nats-io/transport-deno@3.0.0-7";
import { jetstream } from "jsr:@nats-io/jetstream@3.0.0-18";
import { connect } from "@nats-io/transport-deno";
import { jetstream } from "@nats-io/jetstream";
import { setupStreamAndConsumer } from "./util.ts";

// create a connection
Expand Down
4 changes: 2 additions & 2 deletions jetstream/examples/02_next.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
* limitations under the License.
*/

import { connect } from "jsr:@nats-io/transport-deno@3.0.0-7";
import { jetstream } from "jsr:@nats-io/jetstream@3.0.0-18";
import { connect } from "@nats-io/transport-deno";
import { jetstream } from "@nats-io/jetstream";
import { setupStreamAndConsumer } from "./util.ts";

// create a connection
Expand Down
4 changes: 2 additions & 2 deletions jetstream/examples/03_batch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
* limitations under the License.
*/

import { connect } from "jsr:@nats-io/transport-deno@3.0.0-7";
import { jetstream } from "jsr:@nats-io/jetstream@3.0.0-18";
import { connect } from "@nats-io/transport-deno";
import { jetstream } from "@nats-io/jetstream";
import { setupStreamAndConsumer } from "./util.ts";

// create a connection
Expand Down
4 changes: 2 additions & 2 deletions jetstream/examples/04_consume.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
* limitations under the License.
*/

import { connect } from "jsr:@nats-io/transport-deno@3.0.0-7";
import { jetstream } from "jsr:@nats-io/jetstream@3.0.0-18";
import { connect } from "@nats-io/transport-deno";
import { jetstream } from "@nats-io/jetstream";
import { setupStreamAndConsumer } from "./util.ts";

// create a connection
Expand Down
4 changes: 2 additions & 2 deletions jetstream/examples/05_consume.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
* limitations under the License.
*/

import { connect } from "jsr:@nats-io/transport-deno@3.0.0-7";
import { jetstream } from "jsr:@nats-io/jetstream@3.0.0-18";
import { connect } from "@nats-io/transport-deno";
import { jetstream } from "@nats-io/jetstream";
import { setupStreamAndConsumer } from "./util.ts";

// create a connection
Expand Down
4 changes: 2 additions & 2 deletions jetstream/examples/06_heartbeats.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
* limitations under the License.
*/

import { connect } from "jsr:@nats-io/transport-deno@3.0.0-7";
import { ConsumerEvents, jetstream } from "jsr:@nats-io/jetstream@3.0.0-18";
import { connect } from "@nats-io/transport-deno";
import { ConsumerEvents, jetstream } from "@nats-io/jetstream";
import { setupStreamAndConsumer } from "./util.ts";

// create a connection
Expand Down
8 changes: 4 additions & 4 deletions jetstream/examples/07_consume_jobs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@
* limitations under the License.
*/

import { connect, delay } from "jsr:@nats-io/transport-deno@3.0.0-7";
import { SimpleMutex } from "jsr:@nats-io/nats-core@3.0.0-17/internal";
import { jetstream } from "jsr:@nats-io/jetstream@3.0.0-18";
import type { JsMsg } from "jsr:@nats-io/jetstream@3.0.0-18";
import { connect, delay } from "@nats-io/transport-deno";
import { SimpleMutex } from "@nats-io/nats-core/internal";
import { jetstream } from "@nats-io/jetstream";
import type { JsMsg } from "@nats-io/jetstream";
import { setupStreamAndConsumer } from "./util.ts";

// create a connection
Expand Down
4 changes: 2 additions & 2 deletions jetstream/examples/08_consume_process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
* limitations under the License.
*/

import { connect } from "jsr:@nats-io/transport-deno@3.0.0-7";
import { connect } from "@nats-io/transport-deno";
import { setupStreamAndConsumer } from "./util.ts";
import { jetstream } from "jsr:@nats-io/jetstream@3.0.0-18";
import { jetstream } from "@nats-io/jetstream";

// create a connection
const nc = await connect();
Expand Down
4 changes: 2 additions & 2 deletions jetstream/examples/js_readme_publish_examples.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
* limitations under the License.
*/

import { connect, Empty } from "jsr:@nats-io/transport-deno@3.0.0-7";
import { jetstream, jetstreamManager } from "jsr:@nats-io/jetstream@3.0.0-18";
import { connect, Empty } from "@nats-io/transport-deno";
import { jetstream, jetstreamManager } from "@nats-io/jetstream";
import type { PubAck } from "../src/mod.ts";

const nc = await connect();
Expand Down
6 changes: 3 additions & 3 deletions jetstream/examples/jsm_readme_jsm_example.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
* limitations under the License.
*/

import { connect, Empty } from "jsr:@nats-io/nats-transport-deno@3.0.0-5";
import { AckPolicy, jetstreamManager } from "jsr:@nats-io/jetstream@3.0.0-18";
import { connect, Empty } from "@nats-io/transport-deno";
import { AckPolicy, jetstreamManager } from "@nats-io/jetstream";

const nc = await connect();
const jsm = await jetstreamManager(nc);
Expand Down Expand Up @@ -48,7 +48,7 @@ await jsm.streams.update(name, si.config);
// get a particular stored message in the stream by sequence
// this is not associated with a consumer
const sm = await jsm.streams.getMessage("mystream", { seq: 1 });
console.log(sm.seq);
console.log(sm?.seq);

// delete the 5th message in the stream, securely erasing it
await jsm.streams.deleteMessage("mystream", 5);
Expand Down
4 changes: 2 additions & 2 deletions jetstream/examples/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
*/

import { createConsumer, fill, initStream } from "../tests/jstest_util.ts";
import type { NatsConnection } from "jsr:@nats-io/nats-core@3.0.0-31";
import { nuid } from "jsr:@nats-io/nats-core@3.0.0-31";
import type { NatsConnection } from "@nats-io/nats-core";
import { nuid } from "@nats-io/nats-core";

export async function setupStreamAndConsumer(
nc: NatsConnection,
Expand Down
4 changes: 2 additions & 2 deletions jetstream/import_map.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
"imports": {
"@nats-io/nkeys": "jsr:@nats-io/[email protected]",
"@nats-io/nuid": "jsr:@nats-io/[email protected]",
"@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-36",
"@nats-io/nats-core/internal": "jsr:@nats-io/nats-core@~3.0.0-36/internal",
"@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-38",
"@nats-io/nats-core/internal": "jsr:@nats-io/nats-core@~3.0.0-38/internal",
"test_helpers": "../test_helpers/mod.ts",
"@std/io": "jsr:@std/[email protected]"
}
Expand Down
2 changes: 1 addition & 1 deletion jetstream/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
},
"description": "jetstream library - this library implements all the base functionality for NATS JetStream for javascript clients",
"dependencies": {
"@nats-io/nats-core": "3.0.0-36"
"@nats-io/nats-core": "3.0.0-38"
},
"devDependencies": {
"@types/node": "^22.7.6",
Expand Down
4 changes: 2 additions & 2 deletions jetstream/src/jsapi_types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1010,8 +1010,8 @@ export type ConsumerUpdateConfig = PriorityGroups & {
*/
"inactive_threshold"?: Nanos;
/**
* List of durations in nanoseconds format that represents a retry timescale for
* NaK'd messages or those being normally retried
* List of durations in nanoseconds that represents a retry timescale for
* the redelivery of messages
*/
"backoff"?: Nanos[];
/**
Expand Down
59 changes: 57 additions & 2 deletions jetstream/tests/jetstream_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import {
} from "@nats-io/nats-core";
import {
assert,
assertAlmostEquals,
assertEquals,
assertExists,
assertInstanceOf,
Expand Down Expand Up @@ -707,7 +708,8 @@ Deno.test("jetstream - backoff", async () => {
const { stream, subj } = await initStream(nc);
const jsm = await jetstreamManager(nc);

const backoff = [nanos(250), nanos(1000), nanos(3000)];
const ms = [250, 1000, 3000];
const backoff = ms.map((n) => nanos(n));
const ci = await jsm.consumers.add(stream, {
durable_name: "me",
ack_policy: AckPolicy.Explicit,
Expand All @@ -723,10 +725,11 @@ Deno.test("jetstream - backoff", async () => {
const js = jetstream(nc);
await js.publish(subj);

const when: number[] = [];
const c = await js.consumers.get(stream, "me");
const iter = await c.consume({
callback: (m) => {
console.log(m.info.redeliveryCount);
when.push(Date.now());
if (m.info.redeliveryCount === 4) {
iter.stop();
}
Expand All @@ -735,6 +738,58 @@ Deno.test("jetstream - backoff", async () => {

await iter.closed();

const offset = when.map((n, idx) => {
const p = idx > 0 ? idx - 1 : 0;
return n - when[p];
});

offset.slice(1).forEach((n, idx) => {
assertAlmostEquals(n, ms[idx], 20);
});

await cleanup(ns, nc);
});

Deno.test("jetstream - redelivery", async () => {
const { ns, nc } = await setup(jetstreamServerConf({}));
if (await notCompatible(ns, nc, "2.7.2")) {
return;
}

const { stream, subj } = await initStream(nc);
const jsm = await jetstreamManager(nc);

const ci = await jsm.consumers.add(stream, {
durable_name: "me",
ack_policy: AckPolicy.Explicit,
max_deliver: 4,
ack_wait: nanos(1000),
});

assertEquals(ci.config.max_deliver, 4);

const js = jetstream(nc);
await js.publish(subj);

const c = await js.consumers.get(stream, "me");

let redeliveries = 0;
const iter = await c.consume({
callback: (m) => {
if (m.redelivered) {
redeliveries++;
}
if (m.info.redeliveryCount === 4) {
setTimeout(() => {
iter.stop();
}, 2000);
}
},
});

await iter.closed();
assertEquals(redeliveries, 3);

await cleanup(ns, nc);
});

Expand Down
2 changes: 1 addition & 1 deletion kv/deno.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
"test": "deno test -A --parallel --reload --quiet tests/ --import-map=import_map.json"
},
"imports": {
"@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-36",
"@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-38",
"@nats-io/jetstream": "jsr:@nats-io/jetstream@~3.0.0-25"
}
}
4 changes: 2 additions & 2 deletions kv/import_map.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"imports": {
"@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-36",
"@nats-io/nats-core/internal": "jsr:@nats-io/nats-core@~3.0.0-36/internal",
"@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-38",
"@nats-io/nats-core/internal": "jsr:@nats-io/nats-core@~3.0.0-38/internal",
"@nats-io/jetstream": "jsr:@nats-io/jetstream@~3.0.0-25",
"@nats-io/jetstream/internal": "jsr:@nats-io/jetstream@~3.0.0-25/internal",
"test_helpers": "../test_helpers/mod.ts",
Expand Down
2 changes: 1 addition & 1 deletion kv/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
"description": "kv library - this library implements all the base functionality for NATS KV javascript clients",
"dependencies": {
"@nats-io/jetstream": "3.0.0-25",
"@nats-io/nats-core": "3.0.0-36"
"@nats-io/nats-core": "3.0.0-38"
},
"devDependencies": {
"@types/node": "^22.7.6",
Expand Down
2 changes: 1 addition & 1 deletion obj/deno.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
"test": "deno test -A --parallel --reload --quiet tests/ --import-map=import_map.json"
},
"imports": {
"@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-36",
"@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-38",
"@nats-io/jetstream": "jsr:@nats-io/jetstream@~3.0.0-25"
}
}
Loading

0 comments on commit 97e7e31

Please sign in to comment.