Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DOC] [JS] heatbeat example was not up-to-date, updated example showing behaviour of missed heartbeats on JS push subscriber. #446

Merged
merged 1 commit into from
Dec 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 35 additions & 43 deletions examples/jetstream/10_heartbeats.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,4 @@
import {
AckPolicy,
connect,
isHeartbeatMsg,
JsHeaders,
nanos,
nuid,
toJsMsg,
} from "../../src/mod.ts";
import { connect, consumerOpts, nuid } from "../../src/mod.ts";

const nc = await connect();
const jsm = await nc.jetstreamManager();
Expand All @@ -15,41 +7,41 @@ const stream = nuid.next();
const subj = nuid.next();
await jsm.streams.add({ name: stream, subjects: [`${subj}.>`] });

// create a regular subscription (this is an ephemeral consumer, so start the sub)
let missed = 0;
const sub = nc.subscribe("my.messages", {
callback: (_err, msg) => {
missed = 0;
// simply checking if has headers and code === 100, with no reply
// subject set. if it has a reply it would be a flow control message
// which will get acknowledged at the end.
if (isHeartbeatMsg(msg)) {
// the heartbeat has additional information:
const lastSeq = msg.headers?.get(JsHeaders.LastStreamSeqHdr);
const consSeq = msg.headers?.get(JsHeaders.LastConsumerSeqHdr);
console.log(
`alive - last stream seq: ${lastSeq} - last consumer seq: ${consSeq}`,
);
return;
}
// do something with the message
const m = toJsMsg(msg);
const js = nc.jetstream();
let opts = consumerOpts()
.deliverTo("push")
.manualAck()
.ackExplicit()
.idleHeartbeat(500)
.durable("iter-dur");
const iter = await js.subscribe(`${subj}.>`, opts);
// if 2 heartbeats are missed, the iterator will end with an error
// simply re-do the js.subscribe() and attempt again
const done = (async () => {
for await (const m of iter) {
m.ack();
},
});

setInterval(() => {
missed++;
if (missed > 3) {
console.error("JetStream stopped sending heartbeats!");
}
}, 30000);

// create a consumer that delivers to the subscription
await jsm.consumers.add(stream, {
ack_policy: AckPolicy.Explicit,
deliver_subject: "my.messages",
idle_heartbeat: nanos(10000),
})();
done.catch((err) => {
console.log(`iterator closed: ${err}`);
});

await sub.closed;
opts = consumerOpts()
.deliverTo("push")
.manualAck()
.ackExplicit()
.idleHeartbeat(500)
.durable("callback-dur")
.callback((err, m) => {
if (err) {
// the callback will also report a heartbeat error, however because the
// callback can receive errors, it continues active. If the server returns
// the client will automatically resume receiving messages
console.log(err);
} else {
m?.ack();
}
});

const sub = await js.subscribe(`${subj}.>`, opts);
await sub.closed.then(() => console.log("sub closed"));
74 changes: 38 additions & 36 deletions jetstream.md
Original file line number Diff line number Diff line change
Expand Up @@ -426,47 +426,49 @@ By creating a consumer that enables heartbeats, you can request JetStream to
send you heartbeat messages every so often. This way your client can reconcile
if the lack of messages means that you should be restarting your consumer.

Currently, the library doesn't provide a notification for missed heartbeats, but
this is not too difficult to do:

```typescript
let missed = 0;
// this is a plain nats subscription
const sub = nc.subscribe("my.messages", {
callback: (err, msg) => {
// if we got a message, we simply reset
missed = 0;
// simply checking if has headers and code === 100 and a description === "Idle Heartbeat"
if (isHeartbeatMsg(msg)) {
// the heartbeat has additional information:
const lastSeq = msg.headers.get(JsHeaders.LastStreamSeqHdr);
const consSeq = msg.headers.get(JsHeaders.LastConsumerSeqHdr);
console.log(
`alive - last stream seq: ${lastSeq} - last consumer seq: ${consSeq}`,
);
return;
}
// do something with the message
const m = toJsMsg(msg);
m.ack();
},
});
const stream = nuid.next();
const subj = nuid.next();
await jsm.streams.add({ name: stream, subjects: [`${subj}.>`] });

setInterval(() => {
missed++;
if (missed > 3) {
console.error("JetStream stopped sending heartbeats!");
const js = nc.jetstream();
let opts = consumerOpts()
.deliverTo("push")
.manualAck()
.ackExplicit()
.idleHeartbeat(500)
.durable("iter-dur");
const iter = await js.subscribe(`${subj}.>`, opts);
// if 2 heartbeats are missed, the iterator will end with an error
// simply re-do the js.subscribe() and attempt again
const done = (async () => {
for await (const m of iter) {
m.ack();
}
}, 30000);

// create a consumer that delivers to the subscription
await jsm.consumers.add(stream, {
ack_policy: AckPolicy.Explicit,
deliver_subject: "my.messages",
idle_heartbeat: nanos(10000),
})();
done.catch((err) => {
console.log(`iterator closed: ${err}`);
});

await sub.closed;
opts = consumerOpts()
.deliverTo("push")
.manualAck()
.ackExplicit()
.idleHeartbeat(500)
.durable("callback-dur")
.callback((err, m) => {
if (err) {
// the callback will also report a heartbeat error, however because the
// callback can receive errors, it continues active. If the server returns
// the client will automatically resume receiving messages
console.log(err);
} else {
m?.ack();
}
});

const sub = await js.subscribe(`${subj}.>`, opts);
await sub.closed.then(() => console.log("sub closed"));
```

#### JetStream Ordered Consumers
Expand Down