Skip to content

Commit

Permalink
[FIXED] Invalid heartbeat timer for Consumer.Messages (#1786)
Browse files Browse the repository at this point in the history
This fixes an issue where for `Consumer.Messages` the heartbeat error would be returned after 4*idle heartbeat (instead of 2*)

Signed-off-by: Piotr Piotrowski <[email protected]>
  • Loading branch information
piotrpio authored Jan 28, 2025
1 parent 5945afd commit f7dfee9
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 17 deletions.
2 changes: 1 addition & 1 deletion jetstream/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ func (s *pullSubscription) Next() (Msg, error) {
if closed && !drainMode {
return nil, ErrMsgIteratorClosed
}
hbMonitor := s.scheduleHeartbeatCheck(2 * s.consumeOpts.Heartbeat)
hbMonitor := s.scheduleHeartbeatCheck(s.consumeOpts.Heartbeat)
defer func() {
if hbMonitor != nil {
hbMonitor.Stop()
Expand Down
86 changes: 70 additions & 16 deletions jetstream/test/pull_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1790,6 +1790,52 @@ func TestPullConsumerMessages(t *testing.T) {
}
})

t.Run("with idle heartbeat", func(t *testing.T) {
srv := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, srv)
nc, err := nats.Connect(srv.ClientURL())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

js, err := jetstream.New(nc)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer nc.Close()

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
s, err := js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// remove consumer to force missing heartbeats
if err := s.DeleteConsumer(ctx, c.CachedInfo().Name); err != nil {
t.Fatalf("Error deleting consumer: %s", err)
}

it, err := c.Messages(jetstream.PullHeartbeat(500 * time.Millisecond))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer it.Stop()
now := time.Now()
_, err = it.Next()
elapsed := time.Since(now)
if !errors.Is(err, jetstream.ErrNoHeartbeat) {
t.Fatalf("Expected error: %v; got: %v", jetstream.ErrNoHeartbeat, err)
}
// we should get missing heartbeat error after approximately 2*heartbeat interval
if elapsed < time.Second || elapsed > 1500*time.Millisecond {
t.Fatalf("Unexpected elapsed time; want 1-1.5s; got %v", elapsed)
}
})

t.Run("no messages received after stop", func(t *testing.T) {
srv := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, srv)
Expand Down Expand Up @@ -2488,7 +2534,7 @@ func TestPullConsumerConsume(t *testing.T) {
}
})

t.Run("with idle heartbeat", func(t *testing.T) {
t.Run("with missing heartbeat", func(t *testing.T) {
srv := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, srv)
nc, err := nats.Connect(srv.ClientURL())
Expand All @@ -2513,27 +2559,35 @@ func TestPullConsumerConsume(t *testing.T) {
t.Fatalf("Unexpected error: %v", err)
}

msgs := make([]jetstream.Msg, 0)
wg := &sync.WaitGroup{}
wg.Add(len(testMsgs))
l, err := c.Consume(func(msg jetstream.Msg) {
msgs = append(msgs, msg)
wg.Done()
}, jetstream.PullMaxBytes(1*time.Second))
// delete consumer to force missing heartbeat error
if err := s.DeleteConsumer(ctx, c.CachedInfo().Name); err != nil {
t.Fatalf("Error deleting consumer: %s", err)
}

errs := make(chan error, 1)
now := time.Now()
var elapsed time.Duration
l, err := c.Consume(func(msg jetstream.Msg) {},
jetstream.PullHeartbeat(500*time.Millisecond),
jetstream.ConsumeErrHandler(func(consumeCtx jetstream.ConsumeContext, err error) {
errs <- err
}))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer l.Stop()

publishTestMsgs(t, js)
wg.Wait()
if len(msgs) != len(testMsgs) {
t.Fatalf("Unexpected received message count; want %d; got %d", len(testMsgs), len(msgs))
}
for i, msg := range msgs {
if string(msg.Data()) != testMsgs[i] {
t.Fatalf("Invalid msg on index %d; expected: %s; got: %s", i, testMsgs[i], string(msg.Data()))
select {
case err := <-errs:
if !errors.Is(err, jetstream.ErrNoHeartbeat) {
t.Fatalf("Expected error: %v; got: %v", jetstream.ErrNoHeartbeat, err)
}
elapsed = time.Since(now)
if elapsed < time.Second || elapsed > 1500*time.Millisecond {
t.Fatalf("Unexpected elapsed time; want between 1s and 1.5s; got %v", elapsed)
}
case <-time.After(5 * time.Second):
t.Fatalf("Timeout waiting for %v", jetstream.ErrNoHeartbeat)
}
})

Expand Down

0 comments on commit f7dfee9

Please sign in to comment.