Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: add Replica.WaitForLeaseAppliedIndex() #117968

Merged
merged 1 commit into from
Jan 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions pkg/kv/kvserver/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -894,6 +894,32 @@ func waitForReplicasInit(
})
}

// WaitForLeaseAppliedIndex waits for the replica to reach the given lease
// applied index, or until the context is cancelled or the replica is destroyed.
// Note that the lease applied index may regress across restarts, since we don't
// sync state machine application to disk.
//
// TODO(erikgrinaker): it would be nice if we could be notified about LAI
// updates instead, but polling will do for now.
func (r *Replica) WaitForLeaseAppliedIndex(
ctx context.Context, lai kvpb.LeaseAppliedIndex,
) (kvpb.LeaseAppliedIndex, error) {
retryOpts := retry.Options{
InitialBackoff: 10 * time.Millisecond,
Multiplier: 2,
MaxBackoff: time.Second,
}
for retry := retry.StartWithCtx(ctx, retryOpts); retry.Next(); {
if currentLAI := r.GetLeaseAppliedIndex(); currentLAI >= lai {
return currentLAI, nil
}
if _, err := r.IsDestroyed(); err != nil {
return 0, err
}
}
return 0, ctx.Err()
}

// ChangeReplicas atomically changes the replicas that are members of a range.
// The change is performed in a distributed transaction and takes effect when
// that transaction is committed. This transaction confirms that the supplied
Expand Down
82 changes: 82 additions & 0 deletions pkg/kv/kvserver/replica_command_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ package kvserver
import (
"context"
"encoding/binary"
math "math"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/keys"
Expand All @@ -23,6 +25,9 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -608,3 +613,80 @@ func TestSynthesizeTargetsByChangeType(t *testing.T) {
})
}
}

func TestWaitForLeaseAppliedIndex(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

const maxLAI = math.MaxUint64

ctx := context.Background()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)

tc := testContext{}
tc.Start(ctx, t, stopper)
db := tc.store.DB()

// Submit a write and read it back to bump the initial LAI.
write := func(key, value string) {
require.NoError(t, db.Put(ctx, key, value))
_, err := db.Get(ctx, key)
require.NoError(t, err)
}
write("foo", "bar")

// Should return immediately when already at or past the LAI.
currentLAI := tc.repl.GetLeaseAppliedIndex()
require.NotZero(t, currentLAI)
resultLAI, err := tc.repl.WaitForLeaseAppliedIndex(ctx, currentLAI)
require.NoError(t, err)
require.GreaterOrEqual(t, resultLAI, currentLAI)

// Should wait for a future LAI to be reached.
const numWrites = 10
waitLAI := tc.repl.GetLeaseAppliedIndex() + numWrites
laiC := make(chan kvpb.LeaseAppliedIndex, 1)
go func() {
lai, err := tc.repl.WaitForLeaseAppliedIndex(ctx, waitLAI)
assert.NoError(t, err) // can't use require in goroutine
laiC <- lai
}()

select {
case lai := <-laiC:
t.Fatalf("unexpected early LAI %d", lai)
case <-time.After(time.Second):
}

for i := 0; i < numWrites; i++ {
write("foo", "bar")
}

select {
case lai := <-laiC:
require.GreaterOrEqual(t, lai, waitLAI)
require.GreaterOrEqual(t, tc.repl.GetLeaseAppliedIndex(), lai)
case <-time.After(5 * time.Second):
t.Fatalf("timed out waiting for LAI %d", waitLAI)
}

// Should error on context cancellation.
cancelCtx, cancel := context.WithCancel(ctx)
cancel()
_, err = tc.repl.WaitForLeaseAppliedIndex(cancelCtx, maxLAI)
require.Error(t, err)
require.Equal(t, cancelCtx.Err(), err)

// Should error on destroyed replicas.
stopper.Stop(ctx)

destroyErr := errors.New("destroyed")
tc.repl.mu.Lock()
tc.repl.mu.destroyStatus.Set(destroyErr, destroyReasonRemoved)
tc.repl.mu.Unlock()

_, err = tc.repl.WaitForLeaseAppliedIndex(ctx, maxLAI)
require.Error(t, err)
require.Equal(t, destroyErr, err)
}
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/stores_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ func (is Server) CollectChecksum(
//
// It is the caller's responsibility to cancel or set a timeout on the context.
// If the context is never canceled, WaitForApplication will retry forever.
//
// TODO(erikgrinaker): consider using Replica.WaitForLeaseAppliedIndex().
func (is Server) WaitForApplication(
ctx context.Context, req *WaitForApplicationRequest,
) (*WaitForApplicationResponse, error) {
Expand Down