Skip to content

Commit

Permalink
more consistent pending count for messages in an iterator that doesn'…
Browse files Browse the repository at this point in the history
…t include any pushed functions or the current message being processed by the iterator. (#194)

Signed-off-by: Alberto Ricart <[email protected]>
  • Loading branch information
aricart authored Jan 27, 2025
1 parent c276e2e commit c02a380
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 14 deletions.
7 changes: 5 additions & 2 deletions core/src/queued_iterator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ export class QueuedIteratorImpl<T> implements QueuedIterator<T>, Dispatcher<T> {
}
return;
}
if (typeof v === "function") {
this.pendingFiltered++;
}
this.yields.push(v);
this.signal.resolve();
}
Expand Down Expand Up @@ -104,6 +107,7 @@ export class QueuedIteratorImpl<T> implements QueuedIterator<T>, Dispatcher<T> {
this.yields = [];
for (let i = 0; i < yields.length; i++) {
if (typeof yields[i] === "function") {
this.pendingFiltered--;
const fn = yields[i] as CallbackFn;
try {
fn();
Expand All @@ -120,11 +124,10 @@ export class QueuedIteratorImpl<T> implements QueuedIterator<T>, Dispatcher<T> {
}

this.processed++;
this.inflight--;
const start = this.profile ? Date.now() : 0;
yield yields[i] as T;
this.time = this.profile ? Date.now() - start : 0;

this.inflight--;
}
// yielding could have paused and microtask
// could have added messages. Prevent allocations
Expand Down
33 changes: 21 additions & 12 deletions core/tests/basics_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -867,7 +867,8 @@ Deno.test("basics - subs pending count", async () => {
for await (const _m of sub) {
count++;
assertEquals(count, sub.getProcessed());
assertEquals(sub.getProcessed() + sub.getPending(), 12);
console.log({ processed: sub.getProcessed(), pending: sub.getPending() });
assertEquals(sub.getProcessed() + sub.getPending(), 10);
}
})();

Expand Down Expand Up @@ -1547,7 +1548,7 @@ Deno.test("basics - slow", async () => {
for await (const m of nc.status()) {
//@ts-ignore: test
if (m.type === "slowConsumer") {
console.log(m);
console.log(`sub: ${m.sub.getID()}`);
slow++;
}
}
Expand All @@ -1567,37 +1568,45 @@ Deno.test("basics - slow", async () => {

// send one more, no more notifications until we drop below 10
nc.publish("test", "");
await delay(100); // 12
await nc.flush(); // 12
await delay(100);
assertEquals(sub.getPending(), 12);
assertEquals(slow, 0);

await s.next(); // 12
await s.next(); // 11
await s.next(); // 10
await s.next(); // 9

nc.publish("test", ""); // 11
nc.publish("test", ""); // 10
await nc.flush();
await delay(100);
assertEquals(sub.getPending(), 11);
assertEquals(sub.getPending(), 10);
assertEquals(slow, 0);

// now this will notify
await s.next(); // 11
await s.next(); // 10
await s.next(); // 9
await s.next(); // 8
await nc.flush();
await delay(100);
assertEquals(sub.getPending(), 9);
assertEquals(sub.getPending(), 8);

await s.next(); // 9
nc.publish("test", "");
await s.next(); // 7
nc.publish("test", ""); // 8
await nc.flush();
await delay(100);
assertEquals(sub.getPending(), 9);
assertEquals(sub.getPending(), 8);
assertEquals(slow, 0);

nc.publish("test", ""); // 9
nc.publish("test", ""); // 10
await nc.flush();
await delay(100);

assertEquals(sub.getPending(), 10);
assertEquals(slow, 0);

nc.publish("test", ""); // 11
await nc.flush();
await delay(100);
assertEquals(sub.getPending(), 11);
assertEquals(slow, 1);
Expand Down
33 changes: 33 additions & 0 deletions jetstream/tests/consume_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -467,3 +467,36 @@ Deno.test("consume - connection close exits", async () => {

await cleanup(ns, nc);
});

Deno.test("consume - one pending is none", async () => {
const { ns, nc } = await setup(jetstreamServerConf());

const jsm = await jetstreamManager(nc);
await jsm.streams.add({ name: "A", subjects: ["a"] });

const js = jsm.jetstream();
const buf = [];
for (let i = 0; i < 100; i++) {
buf.push(js.publish("a", `${i}`));
}
await Promise.all(buf);

await jsm.consumers.add("A", {
durable_name: "a",
deliver_policy: DeliverPolicy.All,
ack_policy: AckPolicy.Explicit,
});

const c = await js.consumers.get("A", "a");
const iter = await c.consume({ bind: true, max_messages: 1 });
for await (const m of iter) {
assertEquals(iter.getPending(), 0);
assertEquals(iter.getReceived(), m.seq);
m.ack();
if (m.info.pending === 0) {
break;
}
}

await cleanup(ns, nc);
});

0 comments on commit c02a380

Please sign in to comment.