From 07b123355ca25b1bc4a3b24ce1dea5897eb60b07 Mon Sep 17 00:00:00 2001 From: aricart Date: Wed, 1 Mar 2023 14:31:42 -0400 Subject: [PATCH 1/2] [FIX] Lister was filtering stream responses for objectstore and kv, but offset calculations are made as a whole. Added check of total count that lister results are not duplicated. --- nats-base-client/jslister.ts | 30 ++++++++++++++++++++++++++-- nats-base-client/jsmstream_api.ts | 4 ++-- tests/jsm_test.ts | 33 ++++++++++++------------------- 3 files changed, 43 insertions(+), 24 deletions(-) diff --git a/nats-base-client/jslister.ts b/nats-base-client/jslister.ts index fb260d6e..c72ca9a0 100644 --- a/nats-base-client/jslister.ts +++ b/nats-base-client/jslister.ts @@ -12,7 +12,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -import { ApiPaged, ApiPagedRequest, Lister } from "./types.ts"; +import { + ApiPaged, + ApiPagedRequest, + ApiResponse, + ConsumerListResponse, + Lister, + StreamListResponse, +} from "./types.ts"; import { BaseApiClient } from "./jsbaseclient_api.ts"; export type ListerFieldFilter = (v: unknown) => T[]; @@ -62,8 +69,10 @@ export class ListerImpl implements Lister, AsyncIterable { { timeout: this.jsm.timeout }, ); this.pageInfo = r as ApiPaged; + // offsets are reported in total, so need to count + // all the entries returned + this.offset += this.countResponse(r as ApiResponse); const a = this.filter(r); - this.offset += a.length; return a; } catch (err) { this.err = err; @@ -71,6 +80,23 @@ export class ListerImpl implements Lister, AsyncIterable { } } + countResponse(r?: ApiResponse): number { + switch (r?.type) { + case "io.nats.jetstream.api.v1.stream_names_response": + case "io.nats.jetstream.api.v1.stream_list_response": + return (r as StreamListResponse).streams.length; + case "io.nats.jetstream.api.v1.consumer_list_response": + return (r as ConsumerListResponse).consumers.length; + default: + console.error( + `jslister.ts: unknown API response for paged output: ${r?.type}`, + ); + // has to be a stream... + return (r as StreamListResponse).streams?.length || 0; + } + return 0; + } + async *[Symbol.asyncIterator]() { let page = await this.next(); while (page.length > 0) { diff --git a/nats-base-client/jsmstream_api.ts b/nats-base-client/jsmstream_api.ts index dd8d6df5..cda0e652 100644 --- a/nats-base-client/jsmstream_api.ts +++ b/nats-base-client/jsmstream_api.ts @@ -295,8 +295,8 @@ export class StreamAPIImpl extends BaseApiClient implements StreamAPI { const listerFilter: ListerFieldFilter = ( v: unknown, ): string[] => { - const slr = v as StreamNames; - return slr.streams; + const sr = v as StreamNames; + return sr.streams; }; const subj = `${this.prefix}.STREAM.NAMES`; return new ListerImpl(subj, listerFilter, this, payload); diff --git a/tests/jsm_test.ts b/tests/jsm_test.ts index 411f036e..a7b2c21a 100644 --- a/tests/jsm_test.ts +++ b/tests/jsm_test.ts @@ -1523,7 +1523,7 @@ Deno.test("jsm - consumer name is validated", async () => { await cleanup(ns, nc); }); -Deno.test("jsm - list kvs", async () => { +Deno.test("jsm - list all", async () => { const { ns, nc } = await setup( jetstreamServerConf({}, true), ); @@ -1532,31 +1532,24 @@ Deno.test("jsm - list kvs", async () => { await jsm.streams.add({ name: "A", subjects: ["a"] }); let kvs = await jsm.streams.listKvs().next(); assertEquals(kvs.length, 0); + let obs = await jsm.streams.listObjectStores().next(); + assertEquals(obs.length, 0); const js = nc.jetstream(); - await js.views.kv("A"); + await js.views.os("os"); + await js.views.kv("kv"); + kvs = await jsm.streams.listKvs().next(); assertEquals(kvs.length, 1); - assertEquals(kvs[0].bucket, `A`); + assertEquals(kvs[0].bucket, `kv`); - await cleanup(ns, nc); -}); + obs = await jsm.streams.listObjectStores().next(); + assertEquals(obs.length, 1); + assertEquals(obs[0].bucket, `os`); -Deno.test("jsm - list objectstores", async () => { - const { ns, nc } = await setup( - jetstreamServerConf({}, true), - ); - - const jsm = await nc.jetstreamManager(); - await jsm.streams.add({ name: "A", subjects: ["a"] }); - let objs = await jsm.streams.listObjectStores().next(); - assertEquals(objs.length, 0); - - const js = nc.jetstream(); - await js.views.os("A"); - objs = await jsm.streams.listObjectStores().next(); - assertEquals(objs.length, 1); - assertEquals(objs[0].bucket, "A"); + const names = await (await jsm.streams.names()).next(); + assertEquals(names.length, 3); + assertArrayIncludes(names, ["A", "KV_kv", "OBJ_os"]); await cleanup(ns, nc); }); From 60dabbb7c99cf72b662771d21113f4d0c0b50e40 Mon Sep 17 00:00:00 2001 From: aricart Date: Thu, 2 Mar 2023 20:31:18 -0400 Subject: [PATCH 2/2] trigger ci --- tests/jsm_test.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/jsm_test.ts b/tests/jsm_test.ts index a7b2c21a..2920f268 100644 --- a/tests/jsm_test.ts +++ b/tests/jsm_test.ts @@ -1547,6 +1547,7 @@ Deno.test("jsm - list all", async () => { assertEquals(obs.length, 1); assertEquals(obs[0].bucket, `os`); + // test names as well const names = await (await jsm.streams.names()).next(); assertEquals(names.length, 3); assertArrayIncludes(names, ["A", "KV_kv", "OBJ_os"]);