Skip to content

Commit

Permalink
When a consumer has no state we are now compacting the log, but were …
Browse files Browse the repository at this point in the history
…not snapshotting.

This caused issues on leader change and losing quorum.

Signed-off-by: Derek Collison <[email protected]>
  • Loading branch information
derekcollison committed Mar 9, 2022
1 parent 7a98563 commit 9fc2321
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 8 deletions.
3 changes: 1 addition & 2 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -5509,11 +5509,10 @@ func (o *consumerFileStore) State() (*ConsumerState, error) {
o.mu.Lock()
defer o.mu.Unlock()

var state *ConsumerState
state := &ConsumerState{}

// See if we have a running state or if we need to read in from disk.
if o.state.Delivered.Consumer != 0 {
state = &ConsumerState{}
state.Delivered = o.state.Delivered
state.AckFloor = o.state.AckFloor
if len(o.state.Pending) > 0 {
Expand Down
5 changes: 0 additions & 5 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -3106,11 +3106,6 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) {
lastSnap = snap
}
}
} else {
// If we are here we may have no state but may be processing updates as a filtered consumer.
// Make sure the store does not grow too much, so similar to memory logic above, just compact.
_, _, applied := n.Progress()
n.Compact(applied)
}
}

Expand Down
16 changes: 15 additions & 1 deletion server/jetstream_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4309,7 +4309,7 @@ func TestJetStreamClusterStreamLeaderStepDown(t *testing.T) {
t.Fatalf("Unexpected error: %v", err)
}
if cdResp.Error != nil {
t.Fatalf("Unexpected error: %+v", sdResp.Error)
t.Fatalf("Unexpected error: %+v", cdResp.Error)
}

checkFor(t, 2*time.Second, 50*time.Millisecond, func() error {
Expand Down Expand Up @@ -10918,6 +10918,20 @@ func TestJetStreamClusterFilteredAndIdleConsumerNRGGrowth(t *testing.T) {
if entries, _ := o.raftNode().Size(); entries > compactNumMin {
t.Fatalf("Expected <= %d entries, got %d", compactNumMin, entries)
}

// Now make the consumer leader stepdown and make sure we have the proper snapshot.
resp, err := nc.Request(fmt.Sprintf(JSApiConsumerLeaderStepDownT, "TEST", "dlc"), nil, time.Second)
require_NoError(t, err)

var cdResp JSApiConsumerLeaderStepDownResponse
if err := json.Unmarshal(resp.Data, &cdResp); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if cdResp.Error != nil {
t.Fatalf("Unexpected error: %+v", cdResp.Error)
}

c.waitOnConsumerLeader("$G", "TEST", "dlc")
}

func TestJetStreamClusterMirrorOrSourceNotActiveReporting(t *testing.T) {
Expand Down

0 comments on commit 9fc2321

Please sign in to comment.