From 16f1eb855e99f96e48e943c3dec65bd42d93e02c Mon Sep 17 00:00:00 2001 From: Alberto Ricart Date: Wed, 11 Dec 2024 10:11:32 -0600 Subject: [PATCH] Add version checks and debugging improvements in tests Introduce server version checks for direct API functionality to ensure compatibility with server requirements. Enhance `direct_consumer_test` with additional debug and status tracking logic. These changes improve test robustness and developer insight during testing. Fixed an issue where underling error was being thrown instead of rejecting. Signed-off-by: Alberto Ricart --- jetstream/src/jsm_direct.ts | 2 +- jetstream/tests/direct_consumer_test.ts | 20 ++++++++++++++- jetstream/tests/jsm_direct_test.ts | 34 +++++++++++++++++++++++++ 3 files changed, 54 insertions(+), 2 deletions(-) diff --git a/jetstream/src/jsm_direct.ts b/jetstream/src/jsm_direct.ts index a43c87b6..f781f44b 100644 --- a/jetstream/src/jsm_direct.ts +++ b/jetstream/src/jsm_direct.ts @@ -130,7 +130,7 @@ export class DirectStreamAPIImpl extends BaseApiClientImpl ): Promise> { const { min, ok } = this.nc.features.get(Feature.JS_BATCH_DIRECT_GET); if (!ok) { - throw new Error(`batch direct require server ${min}`); + return Promise.reject(new Error(`batch direct require server ${min}`)); } validateStreamName(stream); const callback = typeof opts.callback === "function" ? opts.callback : null; diff --git a/jetstream/tests/direct_consumer_test.ts b/jetstream/tests/direct_consumer_test.ts index eb0b5da4..9ed8791d 100644 --- a/jetstream/tests/direct_consumer_test.ts +++ b/jetstream/tests/direct_consumer_test.ts @@ -12,7 +12,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -import { assertEquals } from "jsr:@std/assert"; +import { assert, assertEquals } from "jsr:@std/assert"; import { jetstreamManager, type StoredMsg } from "../src/mod.ts"; import { @@ -125,6 +125,23 @@ Deno.test("direct consumer - consume", async () => { new DirectStreamAPIImpl(nc), { seq: 0 }, ); + + dc.debug(); + + let nexts = 0; + + (async () => { + for await (const s of dc.status()) { + switch (s.type) { + case "next": + nexts += s.options.batch; + break; + default: + // nothing + } + } + })().catch(); + const iter = await dc.consume({ batch: 7 }); for await (const m of iter) { if (m.pending === 0) { @@ -133,6 +150,7 @@ Deno.test("direct consumer - consume", async () => { } assertEquals(iter.getProcessed(), 100); + assert(nexts > 100); await cleanup(ns, nc); }); diff --git a/jetstream/tests/jsm_direct_test.ts b/jetstream/tests/jsm_direct_test.ts index bdcb60d9..5091338c 100644 --- a/jetstream/tests/jsm_direct_test.ts +++ b/jetstream/tests/jsm_direct_test.ts @@ -49,6 +49,40 @@ import { TimeoutError, } from "@nats-io/nats-core/internal"; +Deno.test("direct - version checks", async () => { + const { ns, nc } = await setup(jetstreamServerConf({})); + assertExists(nc.info); + nc.info.version = "2.0.0"; + + const jsm = await jetstreamManager(nc) as JetStreamManagerImpl; + + await assertRejects( + () => { + return jsm.direct.getMessage("A", { start_time: new Date() }); + }, + Error, + "start_time direct option require server 2.11.0", + ); + + await assertRejects( + () => { + return jsm.direct.getBatch("A", { seq: 1, batch: 100 }); + }, + Error, + "batch direct require server 2.11.0", + ); + + await assertRejects( + () => { + return jsm.direct.getBatch("A", { seq: 1, batch: 100 }); + }, + Error, + "batch direct require server 2.11.0", + ); + + await cleanup(ns, nc); +}); + Deno.test("direct - decoder", async (t) => { const { ns, nc } = await setup(jetstreamServerConf({})); if (await notCompatible(ns, nc, "2.9.0")) {