Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] sync subscription utility #602

Merged
merged 2 commits into from
Oct 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions jetstream/tests/consumers_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import {
PullConsumerMessagesImpl,
} from "../consumer.ts";
import { deadline } from "../../nats-base-client/util.ts";
import { syncIterator } from "../../nats-base-client/core.ts";

Deno.test("consumers - min supported server", async () => {
const { ns, nc } = await setup(jetstreamServerConf({}));
Expand Down Expand Up @@ -1051,3 +1052,56 @@ Deno.test("consumers - inboxPrefix is respected", async () => {
await done;
await cleanup(ns, nc);
});

Deno.test("consumers - consume sync", async () => {
const { ns, nc } = await setup(jetstreamServerConf());
const jsm = await nc.jetstreamManager();
await jsm.streams.add({ name: "messages", subjects: ["hello"] });

const js = nc.jetstream();
await js.publish("hello");
await js.publish("hello");

await jsm.consumers.add("messages", {
durable_name: "c",
deliver_policy: DeliverPolicy.All,
ack_policy: AckPolicy.Explicit,
ack_wait: nanos(3000),
max_waiting: 500,
});

const consumer = await js.consumers.get("messages", "c");
const iter = await consumer.consume() as PullConsumerMessagesImpl;
const sync = syncIterator(iter);
assertExists(await sync.next());
assertExists(await sync.next());
iter.stop();
assertEquals(await sync.next(), null);
await cleanup(ns, nc);
});

Deno.test("consumers - fetch sync", async () => {
const { ns, nc } = await setup(jetstreamServerConf());
const jsm = await nc.jetstreamManager();
await jsm.streams.add({ name: "messages", subjects: ["hello"] });

const js = nc.jetstream();
await js.publish("hello");
await js.publish("hello");

await jsm.consumers.add("messages", {
durable_name: "c",
deliver_policy: DeliverPolicy.All,
ack_policy: AckPolicy.Explicit,
ack_wait: nanos(3000),
max_waiting: 500,
});

const consumer = await js.consumers.get("messages", "c");
const iter = await consumer.fetch({ max_messages: 2 });
const sync = syncIterator(iter);
assertExists(await sync.next());
assertExists(await sync.next());
assertEquals(await sync.next(), null);
await cleanup(ns, nc);
});
90 changes: 90 additions & 0 deletions jetstream/tests/jetstream_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ import {
ConsumerOptsBuilderImpl,
JetStreamSubscriptionInfoable,
} from "../types.ts";
import { syncIterator } from "../../nats-base-client/core.ts";

function callbackConsume(debug = false): JsMsgCallback {
return (err: NatsError | null, jm: JsMsg | null) => {
Expand Down Expand Up @@ -4609,3 +4610,92 @@ Deno.test("jetstream - pullSub iter consumer deleted", async () => {

await cleanup(ns, nc);
});

Deno.test("jetstream - fetch sync", async () => {
const { ns, nc } = await setup(jetstreamServerConf());
const name = nuid.next();
const jsm = await nc.jetstreamManager();
await jsm.streams.add({
name,
subjects: [name],
storage: StorageType.Memory,
});
await jsm.consumers.add(name, {
durable_name: name,
ack_policy: AckPolicy.Explicit,
});

const js = nc.jetstream();

await js.publish(name);
await js.publish(name);

const iter = js.fetch(name, name, { batch: 2, no_wait: true });
const sync = syncIterator(iter);
assertExists(await sync.next());
assertExists(await sync.next());
assertEquals(await sync.next(), null);

await cleanup(ns, nc);
});

Deno.test("jetstream - push sync", async () => {
const { ns, nc } = await setup(jetstreamServerConf());
const name = nuid.next();
const jsm = await nc.jetstreamManager();
await jsm.streams.add({
name,
subjects: [name],
storage: StorageType.Memory,
});
await jsm.consumers.add(name, {
durable_name: name,
ack_policy: AckPolicy.Explicit,
deliver_subject: "here",
});

const js = nc.jetstream();

await js.publish(name);
await js.publish(name);

const sub = await js.subscribe(name, consumerOpts().bind(name, name));
const sync = syncIterator(sub);
assertExists(await sync.next());
assertExists(await sync.next());

await cleanup(ns, nc);
});

Deno.test("jetstream - pull sync", async () => {
const { ns, nc } = await setup(jetstreamServerConf());
const name = nuid.next();
const jsm = await nc.jetstreamManager();
await jsm.streams.add({
name,
subjects: [name],
storage: StorageType.Memory,
});
await jsm.consumers.add(name, {
durable_name: name,
ack_policy: AckPolicy.Explicit,
});

const js = nc.jetstream();

await js.publish(name);
await js.publish(name);

const sub = await js.pullSubscribe(name, consumerOpts().bind(name, name));
sub.pull({ batch: 2, no_wait: true });
const sync = syncIterator(sub);

assertExists(await sync.next());
assertExists(await sync.next());
// if don't unsubscribe, the call will hang because
// we are waiting for the sub.pull() to happen
sub.unsubscribe();
assertEquals(await sync.next(), null);

await cleanup(ns, nc);
});
23 changes: 23 additions & 0 deletions nats-base-client/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -705,6 +705,29 @@ export interface Msg {
string(): string;
}

export type SyncIterator<T> = {
next(): Promise<T | null>;
};

/**
* syncIterator is a utility function that allows an AsyncIterator to be triggered
* by calling next() - the utility will yield null if the underlying iterator is closed.
* Note it is possibly an error to call use this function on an AsyncIterable that has
* already been started (Symbol.asyncIterator() has been called) from a looping construct.
*/
export function syncIterator<T>(src: AsyncIterable<T>): SyncIterator<T> {
const iter = src[Symbol.asyncIterator]();
return {
async next(): Promise<T | null> {
const m = await iter.next();
if (m.done) {
return Promise.resolve(null);
}
return Promise.resolve(m.value);
},
};
}

/**
* Basic interface to a Subscription type
*/
Expand Down
63 changes: 29 additions & 34 deletions nats-base-client/internal_mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ export {
export type { Codec } from "./codec.ts";
export { JSONCodec, StringCodec } from "./codec.ts";
export * from "./nkeys.ts";
export type { DispatchedFn } from "./queued_iterator.ts";
export { QueuedIteratorImpl } from "./queued_iterator.ts";
export type {
DispatchedFn,
IngestionFilterFn,
IngestionFilterFnResult,
ProtocolFilterFn,
} from "./queued_iterator.ts";
export { QueuedIteratorImpl } from "./queued_iterator.ts";
export type { ParserEvent } from "./parser.ts";
export { Kind, Parser, State } from "./parser.ts";
export { DenoBuffer, MAX_SIZE, readAll, writeAll } from "./denobuffer.ts";
Expand All @@ -60,35 +60,6 @@ export { compare, parseSemVer } from "./semver.ts";
export { Empty } from "./types.ts";
export { extractProtocolMessage } from "./transport.ts";

export type {
Msg,
Nanos,
NatsConnection,
Payload,
PublishOptions,
RequestManyOptions,
RequestOptions,
ReviverFn,
Server,
ServerInfo,
ServersChanged,
Stats,
Status,
Sub,
SubOpts,
Subscription,
SubscriptionOptions,
} from "./core.ts";

export {
DebugEvents,
Events,
RequestStrategy,
ServiceResponseType,
} from "./core.ts";

export { ServiceErrorCodeHeader, ServiceErrorHeader } from "./core.ts";

export type {
ApiError,
Auth,
Expand All @@ -100,12 +71,23 @@ export type {
EndpointOptions,
EndpointStats,
JwtAuth,
Msg,
MsgHdrs,
NamedEndpointStats,
Nanos,
NatsConnection,
NKeyAuth,
NoAuth,
Payload,
PublishOptions,
QueuedIterator,
Request,
RequestManyOptions,
RequestOptions,
ReviverFn,
Server,
ServerInfo,
ServersChanged,
Service,
ServiceConfig,
ServiceGroup,
Expand All @@ -117,18 +99,31 @@ export type {
ServiceResponse,
ServicesAPI,
ServiceStats,
Stats,
Status,
Sub,
SubOpts,
Subscription,
SubscriptionOptions,
SyncIterator,
TlsOptions,
TokenAuth,
UserPass,
} from "./core.ts";
export {
createInbox,
DebugEvents,
ErrorCode,
Events,
isNatsError,
Match,
NatsError,
RequestStrategy,
ServiceError,
ServiceErrorCodeHeader,
ServiceErrorHeader,
ServiceResponseType,
ServiceVerb,
syncIterator,
} from "./core.ts";
export { SubscriptionImpl } from "./protocol.ts";
export { Subscriptions } from "./protocol.ts";
export { ServiceVerb } from "./core.ts";
export { SubscriptionImpl, Subscriptions } from "./protocol.ts";
2 changes: 2 additions & 0 deletions nats-base-client/mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ export {
ServiceResponseType,
ServiceVerb,
StringCodec,
syncIterator,
tokenAuthenticator,
usernamePasswordAuthenticator,
} from "./internal_mod.ts";
Expand Down Expand Up @@ -83,6 +84,7 @@ export type {
SubOpts,
Subscription,
SubscriptionOptions,
SyncIterator,
TlsOptions,
TokenAuth,
TypedCallback,
Expand Down
6 changes: 6 additions & 0 deletions nats-base-client/queued_iterator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ export class QueuedIteratorImpl<T> implements QueuedIterator<T> {
_data?: unknown; //data is for use by extenders in any way they like
err?: Error;
time: number;
yielding: boolean;

constructor() {
this.inflight = 0;
Expand All @@ -79,6 +80,7 @@ export class QueuedIteratorImpl<T> implements QueuedIterator<T> {
this.yields = [];
this.iterClosed = deferred<void>();
this.time = 0;
this.yielding = false;
}

[Symbol.asyncIterator]() {
Expand Down Expand Up @@ -111,6 +113,10 @@ export class QueuedIteratorImpl<T> implements QueuedIterator<T> {
if (this.noIterator) {
throw new NatsError("unsupported iterator", ErrorCode.ApiError);
}
if (this.yielding) {
throw new NatsError("already yielding", ErrorCode.ApiError);
}
this.yielding = true;
try {
while (true) {
if (this.yields.length === 0) {
Expand Down
19 changes: 19 additions & 0 deletions tests/basics_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import {
SubscriptionImpl,
} from "../nats-base-client/internal_mod.ts";
import { Feature } from "../nats-base-client/semver.ts";
import { syncIterator } from "../nats-base-client/core.ts";

Deno.test("basics - connect port", async () => {
const ns = await NatsServer.start();
Expand Down Expand Up @@ -1361,3 +1362,21 @@ Deno.test("basics - json reviver", async () => {

await cleanup(ns, nc);
});

Deno.test("basics - sync subscription", async () => {
const { ns, nc } = await setup();
const subj = nuid.next();

const sub = nc.subscribe(subj);
const sync = syncIterator(sub);
nc.publish(subj);

let m = await sync.next();
assertExists(m);

sub.unsubscribe();
m = await sync.next();
assertEquals(m, null);

await cleanup(ns, nc);
});
Loading