Skip to content

Commit

Permalink
Merge pull request #2914 from nats-io/fix_2913
Browse files Browse the repository at this point in the history
[FIXED] Consumer with no activity can lose quorum
  • Loading branch information
kozlovic authored Mar 9, 2022
2 parents 9a2da9e + 3216eb5 commit 0cb0f6d
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 10 deletions.
2 changes: 1 addition & 1 deletion server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1760,7 +1760,7 @@ func (o *consumer) readStoredState() error {
return nil
}
state, err := o.store.State()
if err == nil && state != nil {
if err == nil && state != nil && state.Delivered.Consumer != 0 {
o.applyState(state)
if len(o.rdc) > 0 {
o.checkRedelivered()
Expand Down
3 changes: 1 addition & 2 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -5509,11 +5509,10 @@ func (o *consumerFileStore) State() (*ConsumerState, error) {
o.mu.Lock()
defer o.mu.Unlock()

var state *ConsumerState
state := &ConsumerState{}

// See if we have a running state or if we need to read in from disk.
if o.state.Delivered.Consumer != 0 {
state = &ConsumerState{}
state.Delivered = o.state.Delivered
state.AckFloor = o.state.AckFloor
if len(o.state.Pending) > 0 {
Expand Down
2 changes: 1 addition & 1 deletion server/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1892,7 +1892,7 @@ func TestFileStoreConsumer(t *testing.T) {
if err != nil {
t.Fatalf("Unexepected error: %v", err)
}
if state, err := o.State(); state != nil || err != nil {
if state, err := o.State(); err != nil || state.Delivered.Consumer != 0 {
t.Fatalf("Unexpected state or error: %v", err)
}

Expand Down
5 changes: 0 additions & 5 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -3106,11 +3106,6 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) {
lastSnap = snap
}
}
} else {
// If we are here we may have no state but may be processing updates as a filtered consumer.
// Make sure the store does not grow too much, so similar to memory logic above, just compact.
_, _, applied := n.Progress()
n.Compact(applied)
}
}

Expand Down
16 changes: 15 additions & 1 deletion server/jetstream_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4309,7 +4309,7 @@ func TestJetStreamClusterStreamLeaderStepDown(t *testing.T) {
t.Fatalf("Unexpected error: %v", err)
}
if cdResp.Error != nil {
t.Fatalf("Unexpected error: %+v", sdResp.Error)
t.Fatalf("Unexpected error: %+v", cdResp.Error)
}

checkFor(t, 2*time.Second, 50*time.Millisecond, func() error {
Expand Down Expand Up @@ -10918,6 +10918,20 @@ func TestJetStreamClusterFilteredAndIdleConsumerNRGGrowth(t *testing.T) {
if entries, _ := o.raftNode().Size(); entries > compactNumMin {
t.Fatalf("Expected <= %d entries, got %d", compactNumMin, entries)
}

// Now make the consumer leader stepdown and make sure we have the proper snapshot.
resp, err := nc.Request(fmt.Sprintf(JSApiConsumerLeaderStepDownT, "TEST", "dlc"), nil, time.Second)
require_NoError(t, err)

var cdResp JSApiConsumerLeaderStepDownResponse
if err := json.Unmarshal(resp.Data, &cdResp); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if cdResp.Error != nil {
t.Fatalf("Unexpected error: %+v", cdResp.Error)
}

c.waitOnConsumerLeader("$G", "TEST", "dlc")
}

func TestJetStreamClusterMirrorOrSourceNotActiveReporting(t *testing.T) {
Expand Down

0 comments on commit 0cb0f6d

Please sign in to comment.