From 3216eb5ee5cac6d3b90a49bac4c8043246c170f0 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 9 Mar 2022 06:43:20 -0500 Subject: [PATCH] When a consumer has no state we are now compacting the log, but were not snapshotting. This caused issues on leader change and losing quorum. Signed-off-by: Derek Collison --- server/consumer.go | 2 +- server/filestore.go | 3 +-- server/filestore_test.go | 2 +- server/jetstream_cluster.go | 5 ----- server/jetstream_cluster_test.go | 16 +++++++++++++++- 5 files changed, 18 insertions(+), 10 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index b3b99eb092f..73bd2e428e1 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -1760,7 +1760,7 @@ func (o *consumer) readStoredState() error { return nil } state, err := o.store.State() - if err == nil && state != nil { + if err == nil && state != nil && state.Delivered.Consumer != 0 { o.applyState(state) if len(o.rdc) > 0 { o.checkRedelivered() diff --git a/server/filestore.go b/server/filestore.go index 2c37c43f837..28ac847f69d 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -5509,11 +5509,10 @@ func (o *consumerFileStore) State() (*ConsumerState, error) { o.mu.Lock() defer o.mu.Unlock() - var state *ConsumerState + state := &ConsumerState{} // See if we have a running state or if we need to read in from disk. if o.state.Delivered.Consumer != 0 { - state = &ConsumerState{} state.Delivered = o.state.Delivered state.AckFloor = o.state.AckFloor if len(o.state.Pending) > 0 { diff --git a/server/filestore_test.go b/server/filestore_test.go index a6316024305..a4a16da947d 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -1892,7 +1892,7 @@ func TestFileStoreConsumer(t *testing.T) { if err != nil { t.Fatalf("Unexepected error: %v", err) } - if state, err := o.State(); state != nil || err != nil { + if state, err := o.State(); err != nil || state.Delivered.Consumer != 0 { t.Fatalf("Unexpected state or error: %v", err) } diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index a6dc64170eb..0e85c1e8638 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -3106,11 +3106,6 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) { lastSnap = snap } } - } else { - // If we are here we may have no state but may be processing updates as a filtered consumer. - // Make sure the store does not grow too much, so similar to memory logic above, just compact. - _, _, applied := n.Progress() - n.Compact(applied) } } diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index 192da6261a0..cd11afc0d79 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -4309,7 +4309,7 @@ func TestJetStreamClusterStreamLeaderStepDown(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } if cdResp.Error != nil { - t.Fatalf("Unexpected error: %+v", sdResp.Error) + t.Fatalf("Unexpected error: %+v", cdResp.Error) } checkFor(t, 2*time.Second, 50*time.Millisecond, func() error { @@ -10918,6 +10918,20 @@ func TestJetStreamClusterFilteredAndIdleConsumerNRGGrowth(t *testing.T) { if entries, _ := o.raftNode().Size(); entries > compactNumMin { t.Fatalf("Expected <= %d entries, got %d", compactNumMin, entries) } + + // Now make the consumer leader stepdown and make sure we have the proper snapshot. + resp, err := nc.Request(fmt.Sprintf(JSApiConsumerLeaderStepDownT, "TEST", "dlc"), nil, time.Second) + require_NoError(t, err) + + var cdResp JSApiConsumerLeaderStepDownResponse + if err := json.Unmarshal(resp.Data, &cdResp); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if cdResp.Error != nil { + t.Fatalf("Unexpected error: %+v", cdResp.Error) + } + + c.waitOnConsumerLeader("$G", "TEST", "dlc") } func TestJetStreamClusterMirrorOrSourceNotActiveReporting(t *testing.T) {