Skip to content

Commit

Permalink
[FIX] Lister was filtering stream responses for objectstore and kv, b…
Browse files Browse the repository at this point in the history
…ut offset calculations are made as a whole. Added check of total count that lister results are not duplicated.
  • Loading branch information
aricart committed Mar 3, 2023
1 parent baf5edf commit 07b1233
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 24 deletions.
30 changes: 28 additions & 2 deletions nats-base-client/jslister.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { ApiPaged, ApiPagedRequest, Lister } from "./types.ts";
import {
ApiPaged,
ApiPagedRequest,
ApiResponse,
ConsumerListResponse,
Lister,
StreamListResponse,
} from "./types.ts";
import { BaseApiClient } from "./jsbaseclient_api.ts";

export type ListerFieldFilter<T> = (v: unknown) => T[];
Expand Down Expand Up @@ -62,15 +69,34 @@ export class ListerImpl<T> implements Lister<T>, AsyncIterable<T> {
{ timeout: this.jsm.timeout },
);
this.pageInfo = r as ApiPaged;
// offsets are reported in total, so need to count
// all the entries returned
this.offset += this.countResponse(r as ApiResponse);
const a = this.filter(r);
this.offset += a.length;
return a;
} catch (err) {
this.err = err;
throw err;
}
}

countResponse(r?: ApiResponse): number {
switch (r?.type) {
case "io.nats.jetstream.api.v1.stream_names_response":
case "io.nats.jetstream.api.v1.stream_list_response":
return (r as StreamListResponse).streams.length;
case "io.nats.jetstream.api.v1.consumer_list_response":
return (r as ConsumerListResponse).consumers.length;
default:
console.error(
`jslister.ts: unknown API response for paged output: ${r?.type}`,
);
// has to be a stream...
return (r as StreamListResponse).streams?.length || 0;
}
return 0;
}

async *[Symbol.asyncIterator]() {
let page = await this.next();
while (page.length > 0) {
Expand Down
4 changes: 2 additions & 2 deletions nats-base-client/jsmstream_api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -295,8 +295,8 @@ export class StreamAPIImpl extends BaseApiClient implements StreamAPI {
const listerFilter: ListerFieldFilter<string> = (
v: unknown,
): string[] => {
const slr = v as StreamNames;
return slr.streams;
const sr = v as StreamNames;
return sr.streams;
};
const subj = `${this.prefix}.STREAM.NAMES`;
return new ListerImpl<string>(subj, listerFilter, this, payload);
Expand Down
33 changes: 13 additions & 20 deletions tests/jsm_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1523,7 +1523,7 @@ Deno.test("jsm - consumer name is validated", async () => {
await cleanup(ns, nc);
});

Deno.test("jsm - list kvs", async () => {
Deno.test("jsm - list all", async () => {
const { ns, nc } = await setup(
jetstreamServerConf({}, true),
);
Expand All @@ -1532,31 +1532,24 @@ Deno.test("jsm - list kvs", async () => {
await jsm.streams.add({ name: "A", subjects: ["a"] });
let kvs = await jsm.streams.listKvs().next();
assertEquals(kvs.length, 0);
let obs = await jsm.streams.listObjectStores().next();
assertEquals(obs.length, 0);

const js = nc.jetstream();
await js.views.kv("A");
await js.views.os("os");
await js.views.kv("kv");

kvs = await jsm.streams.listKvs().next();
assertEquals(kvs.length, 1);
assertEquals(kvs[0].bucket, `A`);
assertEquals(kvs[0].bucket, `kv`);

await cleanup(ns, nc);
});
obs = await jsm.streams.listObjectStores().next();
assertEquals(obs.length, 1);
assertEquals(obs[0].bucket, `os`);

Deno.test("jsm - list objectstores", async () => {
const { ns, nc } = await setup(
jetstreamServerConf({}, true),
);

const jsm = await nc.jetstreamManager();
await jsm.streams.add({ name: "A", subjects: ["a"] });
let objs = await jsm.streams.listObjectStores().next();
assertEquals(objs.length, 0);

const js = nc.jetstream();
await js.views.os("A");
objs = await jsm.streams.listObjectStores().next();
assertEquals(objs.length, 1);
assertEquals(objs[0].bucket, "A");
const names = await (await jsm.streams.names()).next();
assertEquals(names.length, 3);
assertArrayIncludes(names, ["A", "KV_kv", "OBJ_os"]);

await cleanup(ns, nc);
});
Expand Down

0 comments on commit 07b1233

Please sign in to comment.