From f50b52c63bad619a2147b693ea0eda5ad9117c3c Mon Sep 17 00:00:00 2001 From: Alberto Ricart Date: Fri, 29 Sep 2023 09:33:59 -0500 Subject: [PATCH 1/2] [FEAT] added a utility function syncIterator() to wrap an AsyncIterables so that elements can be extracted in a `sync` faction - this is syntactic sugar, as the iterator can be had via `src[Symbol.asyncIterator]()`. This can be useful, to process subscription messages one at a time. --- jetstream/tests/consumers_test.ts | 54 +++++++++++++++++ jetstream/tests/jetstream_test.ts | 90 +++++++++++++++++++++++++++++ nats-base-client/core.ts | 23 ++++++++ nats-base-client/internal_mod.ts | 63 ++++++++++---------- nats-base-client/mod.ts | 2 + nats-base-client/queued_iterator.ts | 6 ++ tests/basics_test.ts | 19 ++++++ 7 files changed, 223 insertions(+), 34 deletions(-) diff --git a/jetstream/tests/consumers_test.ts b/jetstream/tests/consumers_test.ts index 5af76a5d..7ea6983f 100644 --- a/jetstream/tests/consumers_test.ts +++ b/jetstream/tests/consumers_test.ts @@ -51,6 +51,7 @@ import { PullConsumerMessagesImpl, } from "../consumer.ts"; import { deadline } from "../../nats-base-client/util.ts"; +import { syncIterator } from "../../nats-base-client/core.ts"; Deno.test("consumers - min supported server", async () => { const { ns, nc } = await setup(jetstreamServerConf({})); @@ -1051,3 +1052,56 @@ Deno.test("consumers - inboxPrefix is respected", async () => { await done; await cleanup(ns, nc); }); + +Deno.test("consumers - consume sync", async () => { + const { ns, nc } = await setup(jetstreamServerConf()); + const jsm = await nc.jetstreamManager(); + await jsm.streams.add({ name: "messages", subjects: ["hello"] }); + + const js = nc.jetstream(); + await js.publish("hello"); + await js.publish("hello"); + + await jsm.consumers.add("messages", { + durable_name: "c", + deliver_policy: DeliverPolicy.All, + ack_policy: AckPolicy.Explicit, + ack_wait: nanos(3000), + max_waiting: 500, + }); + + const consumer = await js.consumers.get("messages", "c"); + const iter = await consumer.consume() as PullConsumerMessagesImpl; + const sync = syncIterator(iter); + assertExists(await sync.next()); + assertExists(await sync.next()); + iter.stop(); + assertEquals(await sync.next(), null); + await cleanup(ns, nc); +}); + +Deno.test("consumers - fetch sync", async () => { + const { ns, nc } = await setup(jetstreamServerConf()); + const jsm = await nc.jetstreamManager(); + await jsm.streams.add({ name: "messages", subjects: ["hello"] }); + + const js = nc.jetstream(); + await js.publish("hello"); + await js.publish("hello"); + + await jsm.consumers.add("messages", { + durable_name: "c", + deliver_policy: DeliverPolicy.All, + ack_policy: AckPolicy.Explicit, + ack_wait: nanos(3000), + max_waiting: 500, + }); + + const consumer = await js.consumers.get("messages", "c"); + const iter = await consumer.fetch({ max_messages: 2 }); + const sync = syncIterator(iter); + assertExists(await sync.next()); + assertExists(await sync.next()); + assertEquals(await sync.next(), null); + await cleanup(ns, nc); +}); diff --git a/jetstream/tests/jetstream_test.ts b/jetstream/tests/jetstream_test.ts index 5db42cbf..7fa1718e 100644 --- a/jetstream/tests/jetstream_test.ts +++ b/jetstream/tests/jetstream_test.ts @@ -83,6 +83,7 @@ import { ConsumerOptsBuilderImpl, JetStreamSubscriptionInfoable, } from "../types.ts"; +import { syncIterator } from "../../nats-base-client/core.ts"; function callbackConsume(debug = false): JsMsgCallback { return (err: NatsError | null, jm: JsMsg | null) => { @@ -4609,3 +4610,92 @@ Deno.test("jetstream - pullSub iter consumer deleted", async () => { await cleanup(ns, nc); }); + +Deno.test("jetstream - fetch sync", async () => { + const { ns, nc } = await setup(jetstreamServerConf()); + const name = nuid.next(); + const jsm = await nc.jetstreamManager(); + await jsm.streams.add({ + name, + subjects: [name], + storage: StorageType.Memory, + }); + await jsm.consumers.add(name, { + durable_name: name, + ack_policy: AckPolicy.Explicit, + }); + + const js = nc.jetstream(); + + await js.publish(name); + await js.publish(name); + + const iter = js.fetch(name, name, { batch: 2, no_wait: true }); + const sync = syncIterator(iter); + assertExists(await sync.next()); + assertExists(await sync.next()); + assertEquals(await sync.next(), null); + + await cleanup(ns, nc); +}); + +Deno.test("jetstream - push sync", async () => { + const { ns, nc } = await setup(jetstreamServerConf()); + const name = nuid.next(); + const jsm = await nc.jetstreamManager(); + await jsm.streams.add({ + name, + subjects: [name], + storage: StorageType.Memory, + }); + await jsm.consumers.add(name, { + durable_name: name, + ack_policy: AckPolicy.Explicit, + deliver_subject: "here", + }); + + const js = nc.jetstream(); + + await js.publish(name); + await js.publish(name); + + const sub = await js.subscribe(name, consumerOpts().bind(name, name)); + const sync = syncIterator(sub); + assertExists(await sync.next()); + assertExists(await sync.next()); + + await cleanup(ns, nc); +}); + +Deno.test("jetstream - pull sync", async () => { + const { ns, nc } = await setup(jetstreamServerConf()); + const name = nuid.next(); + const jsm = await nc.jetstreamManager(); + await jsm.streams.add({ + name, + subjects: [name], + storage: StorageType.Memory, + }); + await jsm.consumers.add(name, { + durable_name: name, + ack_policy: AckPolicy.Explicit, + }); + + const js = nc.jetstream(); + + await js.publish(name); + await js.publish(name); + + const sub = await js.pullSubscribe(name, consumerOpts().bind(name, name)); + sub.pull({ batch: 2, no_wait: true }); + const sync = syncIterator(sub); + + assertExists(await sync.next()); + assertExists(await sync.next()); + // if don't unsubscribe, the call will hang because + // we are waiting for the sub.pull() to happen + sub.unsubscribe(); + assertEquals(await sync.next(), null); + + await cleanup(ns, nc); +}); diff --git a/nats-base-client/core.ts b/nats-base-client/core.ts index be221861..a94e06de 100644 --- a/nats-base-client/core.ts +++ b/nats-base-client/core.ts @@ -705,6 +705,29 @@ export interface Msg { string(): string; } +export type SyncIterator = { + next(): Promise; +}; + +/** + * syncIterator is a utility function that allows an AsyncIterator to be triggered + * by calling next() - the utility will yield null if the underlying iterator is closed. + * Note it is possibly an error to call use this function on an AsyncIterable that has + * already been started (Symbol.asyncIterator() has been called) from a looping construct. + */ +export function syncIterator(src: AsyncIterable): SyncIterator { + const iter = src[Symbol.asyncIterator](); + return { + async next(): Promise { + const m = await iter.next(); + if (m.done) { + return Promise.resolve(null); + } + return Promise.resolve(m.value); + }, + }; +} + /** * Basic interface to a Subscription type */ diff --git a/nats-base-client/internal_mod.ts b/nats-base-client/internal_mod.ts index c962c07e..686d0c25 100644 --- a/nats-base-client/internal_mod.ts +++ b/nats-base-client/internal_mod.ts @@ -30,13 +30,13 @@ export { export type { Codec } from "./codec.ts"; export { JSONCodec, StringCodec } from "./codec.ts"; export * from "./nkeys.ts"; -export type { DispatchedFn } from "./queued_iterator.ts"; -export { QueuedIteratorImpl } from "./queued_iterator.ts"; export type { + DispatchedFn, IngestionFilterFn, IngestionFilterFnResult, ProtocolFilterFn, } from "./queued_iterator.ts"; +export { QueuedIteratorImpl } from "./queued_iterator.ts"; export type { ParserEvent } from "./parser.ts"; export { Kind, Parser, State } from "./parser.ts"; export { DenoBuffer, MAX_SIZE, readAll, writeAll } from "./denobuffer.ts"; @@ -60,35 +60,6 @@ export { compare, parseSemVer } from "./semver.ts"; export { Empty } from "./types.ts"; export { extractProtocolMessage } from "./transport.ts"; -export type { - Msg, - Nanos, - NatsConnection, - Payload, - PublishOptions, - RequestManyOptions, - RequestOptions, - ReviverFn, - Server, - ServerInfo, - ServersChanged, - Stats, - Status, - Sub, - SubOpts, - Subscription, - SubscriptionOptions, -} from "./core.ts"; - -export { - DebugEvents, - Events, - RequestStrategy, - ServiceResponseType, -} from "./core.ts"; - -export { ServiceErrorCodeHeader, ServiceErrorHeader } from "./core.ts"; - export type { ApiError, Auth, @@ -100,12 +71,23 @@ export type { EndpointOptions, EndpointStats, JwtAuth, + Msg, MsgHdrs, NamedEndpointStats, + Nanos, + NatsConnection, NKeyAuth, NoAuth, + Payload, + PublishOptions, QueuedIterator, Request, + RequestManyOptions, + RequestOptions, + ReviverFn, + Server, + ServerInfo, + ServersChanged, Service, ServiceConfig, ServiceGroup, @@ -117,18 +99,31 @@ export type { ServiceResponse, ServicesAPI, ServiceStats, + Stats, + Status, + Sub, + SubOpts, + Subscription, + SubscriptionOptions, + SyncIterator, TlsOptions, TokenAuth, UserPass, } from "./core.ts"; export { createInbox, + DebugEvents, ErrorCode, + Events, isNatsError, Match, NatsError, + RequestStrategy, ServiceError, + ServiceErrorCodeHeader, + ServiceErrorHeader, + ServiceResponseType, + ServiceVerb, + syncIterator, } from "./core.ts"; -export { SubscriptionImpl } from "./protocol.ts"; -export { Subscriptions } from "./protocol.ts"; -export { ServiceVerb } from "./core.ts"; +export { SubscriptionImpl, Subscriptions } from "./protocol.ts"; diff --git a/nats-base-client/mod.ts b/nats-base-client/mod.ts index afdc3b20..73f7246c 100644 --- a/nats-base-client/mod.ts +++ b/nats-base-client/mod.ts @@ -27,6 +27,7 @@ export { ServiceResponseType, ServiceVerb, StringCodec, + syncIterator, tokenAuthenticator, usernamePasswordAuthenticator, } from "./internal_mod.ts"; @@ -83,6 +84,7 @@ export type { SubOpts, Subscription, SubscriptionOptions, + SyncIterator, TlsOptions, TokenAuth, TypedCallback, diff --git a/nats-base-client/queued_iterator.ts b/nats-base-client/queued_iterator.ts index d0e47b81..788a90db 100644 --- a/nats-base-client/queued_iterator.ts +++ b/nats-base-client/queued_iterator.ts @@ -66,6 +66,7 @@ export class QueuedIteratorImpl implements QueuedIterator { _data?: unknown; //data is for use by extenders in any way they like err?: Error; time: number; + yielding: boolean; constructor() { this.inflight = 0; @@ -79,6 +80,7 @@ export class QueuedIteratorImpl implements QueuedIterator { this.yields = []; this.iterClosed = deferred(); this.time = 0; + this.yielding = false; } [Symbol.asyncIterator]() { @@ -111,6 +113,10 @@ export class QueuedIteratorImpl implements QueuedIterator { if (this.noIterator) { throw new NatsError("unsupported iterator", ErrorCode.ApiError); } + if (this.yielding) { + throw new NatsError("already yielding", ErrorCode.ApiError); + } + this.yielding = true; try { while (true) { if (this.yields.length === 0) { diff --git a/tests/basics_test.ts b/tests/basics_test.ts index c8b8d7f1..3080fc7e 100644 --- a/tests/basics_test.ts +++ b/tests/basics_test.ts @@ -55,6 +55,7 @@ import { SubscriptionImpl, } from "../nats-base-client/internal_mod.ts"; import { Feature } from "../nats-base-client/semver.ts"; +import { syncIterator } from "../nats-base-client/core.ts"; Deno.test("basics - connect port", async () => { const ns = await NatsServer.start(); @@ -1361,3 +1362,21 @@ Deno.test("basics - json reviver", async () => { await cleanup(ns, nc); }); + +Deno.test("basics - sync subscription", async () => { + const { ns, nc } = await setup(); + const subj = nuid.next(); + + const sub = nc.subscribe(subj); + const sync = syncIterator(sub); + nc.publish(subj); + + let m = await sync.next(); + assertExists(m); + + sub.unsubscribe(); + m = await sync.next(); + assertEquals(m, null); + + await cleanup(ns, nc); +}); From 8524c4f04f765425afb0ec98435b5f5631cc4021 Mon Sep 17 00:00:00 2001 From: Alberto Ricart Date: Fri, 29 Sep 2023 09:53:03 -0500 Subject: [PATCH 2/2] added more tests --- tests/iterators_test.ts | 56 +++++++++++++++++++++++++++++++++++++++-- 1 file changed, 54 insertions(+), 2 deletions(-) diff --git a/tests/iterators_test.ts b/tests/iterators_test.ts index c9b81d9f..66c6ea1e 100644 --- a/tests/iterators_test.ts +++ b/tests/iterators_test.ts @@ -12,13 +12,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -import { connect, createInbox, ErrorCode } from "../src/mod.ts"; -import { assertEquals } from "https://deno.land/std@0.200.0/assert/mod.ts"; +import { connect, createInbox, ErrorCode, syncIterator } from "../src/mod.ts"; +import { + assertEquals, + assertRejects, +} from "https://deno.land/std@0.200.0/assert/mod.ts"; import { assertErrorCode, Lock, NatsServer } from "./helpers/mod.ts"; import { assert } from "../nats-base-client/denobuffer.ts"; import { QueuedIteratorImpl } from "../nats-base-client/queued_iterator.ts"; import { NatsConnectionImpl } from "../nats-base-client/nats.ts"; import { cleanup, setup } from "./helpers/mod.ts"; +import { nuid } from "../nats-base-client/nuid.ts"; +import { delay } from "../nats-base-client/util.ts"; Deno.test("iterators - unsubscribe breaks and closes", async () => { const { ns, nc } = await setup(); @@ -211,3 +216,50 @@ Deno.test("iterators - break cleans up", async () => { await cleanup(ns, nc); }); + +Deno.test("iterators - sync iterator", async () => { + const { ns, nc } = await setup(); + const subj = nuid.next(); + const sub = nc.subscribe(subj); + const sync = syncIterator(sub); + nc.publish(subj, "a"); + let m = await sync.next(); + assertEquals(m?.string(), "a"); + nc.publish(subj, "b"); + m = await sync.next(); + assertEquals(m?.string(), "b"); + const p = sync.next(); + // blocks until next message + const v = await Promise.race([ + delay(250).then(() => { + return "timer"; + }), + p, + ]); + assertEquals(v, "timer"); + await assertRejects( + async () => { + for await (const _m of sub) { + // should fail + } + }, + Error, + "already yielding", + ); + + const sub2 = nc.subscribe("foo", { + callback: () => { + }, + }); + await assertRejects( + async () => { + for await (const _m of sub2) { + // should fail + } + }, + Error, + "unsupported iterator", + ); + + await cleanup(ns, nc); +});