From e021f43bf5e68418d64f74265b540a5a932195e5 Mon Sep 17 00:00:00 2001 From: aricart Date: Fri, 23 Dec 2022 17:05:12 -0400 Subject: [PATCH] [DOC] [JS] heatbeat example was not up-to-date, updated example showing behaviour of missed heartbeats on JS push subscriber. --- examples/jetstream/10_heartbeats.ts | 78 +++++++++++++---------------- jetstream.md | 74 ++++++++++++++------------- 2 files changed, 73 insertions(+), 79 deletions(-) diff --git a/examples/jetstream/10_heartbeats.ts b/examples/jetstream/10_heartbeats.ts index c0aaa20a..9cc37a59 100644 --- a/examples/jetstream/10_heartbeats.ts +++ b/examples/jetstream/10_heartbeats.ts @@ -1,12 +1,4 @@ -import { - AckPolicy, - connect, - isHeartbeatMsg, - JsHeaders, - nanos, - nuid, - toJsMsg, -} from "../../src/mod.ts"; +import { connect, consumerOpts, nuid } from "../../src/mod.ts"; const nc = await connect(); const jsm = await nc.jetstreamManager(); @@ -15,41 +7,41 @@ const stream = nuid.next(); const subj = nuid.next(); await jsm.streams.add({ name: stream, subjects: [`${subj}.>`] }); -// create a regular subscription (this is an ephemeral consumer, so start the sub) -let missed = 0; -const sub = nc.subscribe("my.messages", { - callback: (_err, msg) => { - missed = 0; - // simply checking if has headers and code === 100, with no reply - // subject set. if it has a reply it would be a flow control message - // which will get acknowledged at the end. - if (isHeartbeatMsg(msg)) { - // the heartbeat has additional information: - const lastSeq = msg.headers?.get(JsHeaders.LastStreamSeqHdr); - const consSeq = msg.headers?.get(JsHeaders.LastConsumerSeqHdr); - console.log( - `alive - last stream seq: ${lastSeq} - last consumer seq: ${consSeq}`, - ); - return; - } - // do something with the message - const m = toJsMsg(msg); +const js = nc.jetstream(); +let opts = consumerOpts() + .deliverTo("push") + .manualAck() + .ackExplicit() + .idleHeartbeat(500) + .durable("iter-dur"); +const iter = await js.subscribe(`${subj}.>`, opts); +// if 2 heartbeats are missed, the iterator will end with an error +// simply re-do the js.subscribe() and attempt again +const done = (async () => { + for await (const m of iter) { m.ack(); - }, -}); - -setInterval(() => { - missed++; - if (missed > 3) { - console.error("JetStream stopped sending heartbeats!"); } -}, 30000); - -// create a consumer that delivers to the subscription -await jsm.consumers.add(stream, { - ack_policy: AckPolicy.Explicit, - deliver_subject: "my.messages", - idle_heartbeat: nanos(10000), +})(); +done.catch((err) => { + console.log(`iterator closed: ${err}`); }); -await sub.closed; +opts = consumerOpts() + .deliverTo("push") + .manualAck() + .ackExplicit() + .idleHeartbeat(500) + .durable("callback-dur") + .callback((err, m) => { + if (err) { + // the callback will also report a heartbeat error, however because the + // callback can receive errors, it continues active. If the server returns + // the client will automatically resume receiving messages + console.log(err); + } else { + m?.ack(); + } + }); + +const sub = await js.subscribe(`${subj}.>`, opts); +await sub.closed.then(() => console.log("sub closed")); diff --git a/jetstream.md b/jetstream.md index e499b942..bae72478 100644 --- a/jetstream.md +++ b/jetstream.md @@ -426,47 +426,49 @@ By creating a consumer that enables heartbeats, you can request JetStream to send you heartbeat messages every so often. This way your client can reconcile if the lack of messages means that you should be restarting your consumer. -Currently, the library doesn't provide a notification for missed heartbeats, but -this is not too difficult to do: - ```typescript -let missed = 0; -// this is a plain nats subscription -const sub = nc.subscribe("my.messages", { - callback: (err, msg) => { - // if we got a message, we simply reset - missed = 0; - // simply checking if has headers and code === 100 and a description === "Idle Heartbeat" - if (isHeartbeatMsg(msg)) { - // the heartbeat has additional information: - const lastSeq = msg.headers.get(JsHeaders.LastStreamSeqHdr); - const consSeq = msg.headers.get(JsHeaders.LastConsumerSeqHdr); - console.log( - `alive - last stream seq: ${lastSeq} - last consumer seq: ${consSeq}`, - ); - return; - } - // do something with the message - const m = toJsMsg(msg); - m.ack(); - }, -}); +const stream = nuid.next(); +const subj = nuid.next(); +await jsm.streams.add({ name: stream, subjects: [`${subj}.>`] }); -setInterval(() => { - missed++; - if (missed > 3) { - console.error("JetStream stopped sending heartbeats!"); +const js = nc.jetstream(); +let opts = consumerOpts() + .deliverTo("push") + .manualAck() + .ackExplicit() + .idleHeartbeat(500) + .durable("iter-dur"); +const iter = await js.subscribe(`${subj}.>`, opts); +// if 2 heartbeats are missed, the iterator will end with an error +// simply re-do the js.subscribe() and attempt again +const done = (async () => { + for await (const m of iter) { + m.ack(); } -}, 30000); - -// create a consumer that delivers to the subscription -await jsm.consumers.add(stream, { - ack_policy: AckPolicy.Explicit, - deliver_subject: "my.messages", - idle_heartbeat: nanos(10000), +})(); +done.catch((err) => { + console.log(`iterator closed: ${err}`); }); -await sub.closed; +opts = consumerOpts() + .deliverTo("push") + .manualAck() + .ackExplicit() + .idleHeartbeat(500) + .durable("callback-dur") + .callback((err, m) => { + if (err) { + // the callback will also report a heartbeat error, however because the + // callback can receive errors, it continues active. If the server returns + // the client will automatically resume receiving messages + console.log(err); + } else { + m?.ack(); + } + }); + +const sub = await js.subscribe(`${subj}.>`, opts); +await sub.closed.then(() => console.log("sub closed")); ``` #### JetStream Ordered Consumers