From 47cd435ec86c555c2fdf03081ca4727da31109c0 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Mon, 15 Jan 2024 16:26:13 +0000 Subject: [PATCH 1/4] kvpb: mark `BarrierRequest` as `isAlone` Otherwise, `BatchRequest.RequiresConsensus()` may return `false` and not submit the barrier through Raft. Similarly, `shouldWaitOnLatchesWithoutAcquiring` will return `false` so it will contend with later writes. Barriers are not used in recent releases, so this does not have any mixed-version concerns. Epic: none Release note: None --- pkg/kv/kvpb/api.go | 2 +- pkg/kv/kvpb/batch.go | 8 +++++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/pkg/kv/kvpb/api.go b/pkg/kv/kvpb/api.go index 85f91a2be0f5..6fa96e5a23d9 100644 --- a/pkg/kv/kvpb/api.go +++ b/pkg/kv/kvpb/api.go @@ -1767,7 +1767,7 @@ func (*RangeStatsRequest) flags() flag { return isRead } func (*QueryResolvedTimestampRequest) flags() flag { return isRead | isRange | requiresClosedTSOlderThanStorageSnapshot } -func (*BarrierRequest) flags() flag { return isWrite | isRange } +func (*BarrierRequest) flags() flag { return isWrite | isRange | isAlone } func (*IsSpanEmptyRequest) flags() flag { return isRead | isRange } // IsParallelCommit returns whether the EndTxn request is attempting to perform diff --git a/pkg/kv/kvpb/batch.go b/pkg/kv/kvpb/batch.go index c54da8265ab2..7d810fc65234 100644 --- a/pkg/kv/kvpb/batch.go +++ b/pkg/kv/kvpb/batch.go @@ -215,6 +215,12 @@ func (ba *BatchRequest) IsSingleRequest() bool { return len(ba.Requests) == 1 } +// IsSingleBarrierRequest returns true iff the batch contains a single request, +// and that request is a Barrier. +func (ba *BatchRequest) IsSingleBarrierRequest() bool { + return ba.isSingleRequestWithMethod(Barrier) +} + // IsSingleSkipsLeaseCheckRequest returns true iff the batch contains a single // request, and that request has the skipsLeaseCheck flag set. func (ba *BatchRequest) IsSingleSkipsLeaseCheckRequest() bool { @@ -349,7 +355,7 @@ func (ba *BatchRequest) IsSingleExportRequest() bool { // a no-op. The Barrier request requires consensus even though its evaluation // is a no-op. func (ba *BatchRequest) RequiresConsensus() bool { - return ba.isSingleRequestWithMethod(Barrier) || ba.isSingleRequestWithMethod(Probe) + return ba.IsSingleBarrierRequest() || ba.IsSingleProbeRequest() } // IsCompleteTransaction determines whether a batch contains every write in a From 9373cfa2030f6f33d71efd064b1bfe8b6002512d Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Mon, 15 Jan 2024 16:29:43 +0000 Subject: [PATCH 2/4] batcheval: add `BarrierRequest.WithLeaseAppliedIndex` This can be used to detect whether a replica has applied the barrier command yet. Epic: none Release note: None --- pkg/kv/batch.go | 3 +- pkg/kv/db.go | 44 ++- pkg/kv/kvpb/api.go | 14 +- pkg/kv/kvpb/api.proto | 31 +- pkg/kv/kvserver/batcheval/BUILD.bazel | 1 + pkg/kv/kvserver/batcheval/cmd_barrier.go | 12 +- pkg/kv/kvserver/batcheval/cmd_barrier_test.go | 316 ++++++++++++++++++ pkg/kv/kvserver/batcheval/result/result.go | 27 +- pkg/kv/kvserver/replica_application_result.go | 11 + 9 files changed, 439 insertions(+), 20 deletions(-) create mode 100644 pkg/kv/kvserver/batcheval/cmd_barrier_test.go diff --git a/pkg/kv/batch.go b/pkg/kv/batch.go index 92080093f422..48d891f54f0a 100644 --- a/pkg/kv/batch.go +++ b/pkg/kv/batch.go @@ -1116,7 +1116,7 @@ func (b *Batch) queryResolvedTimestamp(s, e interface{}) { b.initResult(1, 0, notRaw, nil) } -func (b *Batch) barrier(s, e interface{}) { +func (b *Batch) barrier(s, e interface{}, withLAI bool) { begin, err := marshalKey(s) if err != nil { b.initResult(0, 0, notRaw, err) @@ -1132,6 +1132,7 @@ func (b *Batch) barrier(s, e interface{}) { Key: begin, EndKey: end, }, + WithLeaseAppliedIndex: withLAI, } b.appendReqs(req) b.initResult(1, 0, notRaw, nil) diff --git a/pkg/kv/db.go b/pkg/kv/db.go index d9e2c74f2679..798c5444d2fb 100644 --- a/pkg/kv/db.go +++ b/pkg/kv/db.go @@ -889,23 +889,47 @@ func (db *DB) QueryResolvedTimestamp( // writes on the specified key range to finish. func (db *DB) Barrier(ctx context.Context, begin, end interface{}) (hlc.Timestamp, error) { b := &Batch{} - b.barrier(begin, end) - err := getOneErr(db.Run(ctx, b), b) - if err != nil { + b.barrier(begin, end, false /* withLAI */) + if err := getOneErr(db.Run(ctx, b), b); err != nil { return hlc.Timestamp{}, err } - responses := b.response.Responses - if len(responses) == 0 { - return hlc.Timestamp{}, errors.Errorf("unexpected empty response for Barrier") + if l := len(b.response.Responses); l != 1 { + return hlc.Timestamp{}, errors.Errorf("got %d responses for Barrier", l) } - resp, ok := responses[0].GetInner().(*kvpb.BarrierResponse) - if !ok { - return hlc.Timestamp{}, errors.Errorf("unexpected response of type %T for Barrier", - responses[0].GetInner()) + resp := b.response.Responses[0].GetBarrier() + if resp == nil { + return hlc.Timestamp{}, errors.Errorf("unexpected response %T for Barrier", + b.response.Responses[0].GetInner()) } return resp.Timestamp, nil } +// BarrierWithLAI is like Barrier, but also returns the lease applied index and +// range descriptor at which the barrier was applied. In this case, the barrier +// can't span multiple ranges, otherwise a RangeKeyMismatchError is returned. +// +// NB: the protocol support for this was added in a patch release, and is not +// guaranteed to be present with nodes prior to 24.1. In this case, the request +// will return an empty result. +func (db *DB) BarrierWithLAI( + ctx context.Context, begin, end interface{}, +) (kvpb.LeaseAppliedIndex, roachpb.RangeDescriptor, error) { + b := &Batch{} + b.barrier(begin, end, true /* withLAI */) + if err := getOneErr(db.Run(ctx, b), b); err != nil { + return 0, roachpb.RangeDescriptor{}, err + } + if l := len(b.response.Responses); l != 1 { + return 0, roachpb.RangeDescriptor{}, errors.Errorf("got %d responses for Barrier", l) + } + resp := b.response.Responses[0].GetBarrier() + if resp == nil { + return 0, roachpb.RangeDescriptor{}, errors.Errorf("unexpected response %T for Barrier", + b.response.Responses[0].GetInner()) + } + return resp.LeaseAppliedIndex, resp.RangeDesc, nil +} + // sendAndFill is a helper which sends the given batch and fills its results, // returning the appropriate error which is either from the first failing call, // or an "internal" error. diff --git a/pkg/kv/kvpb/api.go b/pkg/kv/kvpb/api.go index 6fa96e5a23d9..972923a59eca 100644 --- a/pkg/kv/kvpb/api.go +++ b/pkg/kv/kvpb/api.go @@ -663,6 +663,12 @@ func (r *BarrierResponse) combine(_ context.Context, c combinable, _ *BatchReque return err } r.Timestamp.Forward(otherR.Timestamp) + if r.LeaseAppliedIndex != 0 || otherR.LeaseAppliedIndex != 0 { + return errors.AssertionFailedf("can't combine BarrierResponses with LeaseAppliedIndex") + } + if r.RangeDesc.NextReplicaID != 0 || otherR.RangeDesc.NextReplicaID != 0 { + return errors.AssertionFailedf("can't combine BarrierResponses with RangeDesc") + } } return nil } @@ -1767,7 +1773,13 @@ func (*RangeStatsRequest) flags() flag { return isRead } func (*QueryResolvedTimestampRequest) flags() flag { return isRead | isRange | requiresClosedTSOlderThanStorageSnapshot } -func (*BarrierRequest) flags() flag { return isWrite | isRange | isAlone } +func (r *BarrierRequest) flags() flag { + flags := isWrite | isRange | isAlone + if r.WithLeaseAppliedIndex { + flags |= isUnsplittable // the LAI is only valid for a single range + } + return flags +} func (*IsSpanEmptyRequest) flags() flag { return isRead | isRange } // IsParallelCommit returns whether the EndTxn request is attempting to perform diff --git a/pkg/kv/kvpb/api.proto b/pkg/kv/kvpb/api.proto index 89f3ed2667ac..09a0339eb5e5 100644 --- a/pkg/kv/kvpb/api.proto +++ b/pkg/kv/kvpb/api.proto @@ -2423,11 +2423,28 @@ message QueryResolvedTimestampResponse { (gogoproto.nullable) = false, (gogoproto.customname) = "ResolvedTS"]; } -// BarrierRequest is the request for a Barrier operation. This goes through Raft -// and has the purpose of waiting until all conflicting in-flight operations on -// this range have completed, without blocking any new operations. +// BarrierRequest is the request for a Barrier operation. This guarantees that +// all past and ongoing writes to a key span have completed and applied on the +// leaseholder. It does this by waiting for all conflicting write latches and +// then submitting a noop write through Raft, waiting for it to apply. Later +// writes are not affected -- in particular, it does not actually take out a +// latch, so writers don't have to wait for it to complete and can write below +// the barrier. message BarrierRequest { RequestHeader header = 1 [(gogoproto.nullable) = false, (gogoproto.embed) = true]; + + // WithLeaseAppliedIndex will return the LeaseAppliedIndex of the barrier + // command in the response, allowing the caller to wait for the barrier to + // apply on an arbitrary replica. It also returns the range descriptor, so the + // caller can detect any unexpected range changes. + // + // When enabled, the barrier request can no longer span multiple ranges, and + // will instead return RangeKeyMismatchError. The caller must be prepared to + // handle this. + // + // NB: This field was added in a patch release. Nodes prior to 24.1 are not + // guaranteed to support it, returning a zero LeaseAppliedIndex instead. + bool with_lease_applied_index = 2; } // BarrierResponse is the response for a Barrier operation. @@ -2437,6 +2454,14 @@ message BarrierResponse { // Timestamp at which this Barrier was evaluated. Can be used to guarantee // future operations happen on the same or newer leaseholders. util.hlc.Timestamp timestamp = 2 [(gogoproto.nullable) = false]; + + // LeaseAppliedIndex at which this Barrier was applied. Only returned when + // requested via WithLeaseAppliedIndex. + uint64 lease_applied_index = 3 [(gogoproto.casttype) = "LeaseAppliedIndex"]; + + // RangeDesc at the time the barrier was applied. Only returned when requested + // via WithLeaseAppliedIndex. + RangeDescriptor range_desc = 4 [(gogoproto.nullable) = false]; } // A RequestUnion contains exactly one of the requests. diff --git a/pkg/kv/kvserver/batcheval/BUILD.bazel b/pkg/kv/kvserver/batcheval/BUILD.bazel index 5a38c4ec1998..734a187c35a5 100644 --- a/pkg/kv/kvserver/batcheval/BUILD.bazel +++ b/pkg/kv/kvserver/batcheval/BUILD.bazel @@ -101,6 +101,7 @@ go_test( size = "medium", srcs = [ "cmd_add_sstable_test.go", + "cmd_barrier_test.go", "cmd_clear_range_test.go", "cmd_delete_range_gchint_test.go", "cmd_delete_range_test.go", diff --git a/pkg/kv/kvserver/batcheval/cmd_barrier.go b/pkg/kv/kvserver/batcheval/cmd_barrier.go index a03f992292d5..1681a1476e29 100644 --- a/pkg/kv/kvserver/batcheval/cmd_barrier.go +++ b/pkg/kv/kvserver/batcheval/cmd_barrier.go @@ -47,13 +47,19 @@ func declareKeysBarrier( return nil } -// Barrier evaluation is a no-op, as all the latch waiting happens in -// the latch manager. +// Barrier evaluation is a no-op, but it still goes through Raft because of +// BatchRequest.RequiresConsensus(). The latch waiting happens in the latch +// manager, and the WithLeaseAppliedIndex info is populated during application. func Barrier( _ context.Context, _ storage.ReadWriter, cArgs CommandArgs, response kvpb.Response, ) (result.Result, error) { + args := cArgs.Args.(*kvpb.BarrierRequest) resp := response.(*kvpb.BarrierResponse) resp.Timestamp = cArgs.EvalCtx.Clock().Now() - return result.Result{}, nil + return result.Result{ + Local: result.LocalResult{ + PopulateBarrierResponse: args.WithLeaseAppliedIndex, + }, + }, nil } diff --git a/pkg/kv/kvserver/batcheval/cmd_barrier_test.go b/pkg/kv/kvserver/batcheval/cmd_barrier_test.go new file mode 100644 index 000000000000..82a65da1f5bd --- /dev/null +++ b/pkg/kv/kvserver/batcheval/cmd_barrier_test.go @@ -0,0 +1,316 @@ +// Copyright 2024 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package batcheval_test + +import ( + "bytes" + "context" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/stretchr/testify/require" +) + +// TestBarrierEval tests basic Barrier evaluation. +func TestBarrierEval(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + start := roachpb.Key("a") + end := roachpb.Key("b") + + clock := hlc.NewClockForTesting(timeutil.NewManualTime(timeutil.Now())) + ts := clock.Now() + evalCtx := (&batcheval.MockEvalCtx{Clock: clock}).EvalContext() + + testutils.RunTrueAndFalse(t, "WithLeaseAppliedIndex", func(t *testing.T, withLAI bool) { + resp := kvpb.BarrierResponse{} + res, err := batcheval.Barrier(ctx, nil, batcheval.CommandArgs{ + EvalCtx: evalCtx, + Args: &kvpb.BarrierRequest{ + RequestHeader: kvpb.RequestHeader{Key: start, EndKey: end}, + WithLeaseAppliedIndex: withLAI, + }, + }, &resp) + require.NoError(t, err) + + require.Equal(t, result.Result{ + Local: result.LocalResult{ + PopulateBarrierResponse: withLAI, + }, + }, res) + + // Ignore the logical timestamp component, which is incremented per reading. + resp.Timestamp.Logical = 0 + + require.Equal(t, kvpb.BarrierResponse{ + Timestamp: ts, + }, resp) + }) +} + +// TestBarrier is an integration test for Barrier. It tests that it processes +// the request and response properly, within a single range and across multiple +// ranges. +func TestBarrier(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + // Set up a test server. + srv, _, kvDB := serverutils.StartServer(t, base.TestServerArgs{}) + defer srv.Stopper().Stop(ctx) + ssrv := srv.StorageLayer() + tsrv := srv.ApplicationLayer() + srv = nil // prevent direct access, use system or tenant as appropriate + + store, err := ssrv.GetStores().(*kvserver.Stores).GetStore(ssrv.GetFirstStoreID()) + require.NoError(t, err) + sender := kvDB.NonTransactionalSender() + + // We'll use /a to /z as our keyspace, and split off a range at /x. + prefix := tsrv.Codec().TenantPrefix() + _, _, err = ssrv.SplitRange(append(prefix, []byte("/x")...)) + require.NoError(t, err) + + // Send Barrier request with/without LeaseAppliedIndex, and within a single + // range or across multiple ranges. + testutils.RunTrueAndFalse(t, "WithLeaseAppliedIndex", func(t *testing.T, withLAI bool) { + testutils.RunTrueAndFalse(t, "crossRange", func(t *testing.T, crossRange bool) { + start := append(prefix, []byte("/a")...) + end := append(prefix, []byte("/b")...) + if crossRange { + end = append(prefix, []byte("/z")...) + } + repl := store.LookupReplica(roachpb.RKey(start)) + + tsBefore := tsrv.Clock().Now() + laiBefore := repl.GetLeaseAppliedIndex() + req := kvpb.BarrierRequest{ + RequestHeader: kvpb.RequestHeader{Key: start, EndKey: end}, + WithLeaseAppliedIndex: withLAI, + } + respI, pErr := kv.SendWrapped(ctx, sender, &req) + + // WithLeaseAppliedIndex should return RangeKeyMismatchError when across + // multiple ranges. + if withLAI && crossRange { + require.Error(t, pErr.GoError()) + require.IsType(t, &kvpb.RangeKeyMismatchError{}, pErr.GoError()) + return + } + + require.NoError(t, pErr.GoError()) + resp, ok := respI.(*kvpb.BarrierResponse) + require.True(t, ok) + + // The timestamp must be after the request was sent. + require.True(t, tsBefore.LessEq(resp.Timestamp)) + + // If WithLeaseAppliedIndex is set, it also returns the LAI and range + // descriptor. + if withLAI { + require.GreaterOrEqual(t, resp.LeaseAppliedIndex, laiBefore) + require.GreaterOrEqual(t, repl.GetLeaseAppliedIndex(), resp.LeaseAppliedIndex) + require.Equal(t, *repl.Desc(), resp.RangeDesc) + } else { + require.Zero(t, resp.LeaseAppliedIndex) + require.Zero(t, resp.RangeDesc) + } + }) + }) +} + +// TestBarrierLatches tests Barrier latch interactions. Specifically, that it +// waits for in-flight requests to complete, but that it does not block later +// requests. +func TestBarrierLatches(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + skip.UnderDuress(t) // too slow, times out + + // Use a timeout, to prevent blocking indefinitely if something goes wrong. + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + // We'll do reads and writes to two separate keys, with a range split in + // between. These keys will be under the tenant prefix. + readSuffix := roachpb.Key("/read") + splitSuffix := roachpb.Key("/split") + writeSuffix := roachpb.Key("/write") + + // Set up a request evaluation filter which will block Gets to /read and Puts + // to /write. These will signal that they're blocked via blockedC, and unblock + // when unblockC is closed. + // + // Unfortunately, we can't use a magic context to specify which requests to + // block, since this does not work with external process tenants which may be + // randomly enabled. We therefore have to match the actual keys. + blockedC := make(chan struct{}, 10) + unblockC := make(chan struct{}) + + evalFilter := func(args kvserverbase.FilterArgs) *kvpb.Error { + var shouldBlock bool + if key, err := keys.StripTenantPrefix(args.Req.Header().Key); err == nil { + if args.Req.Method() == kvpb.Get && bytes.Equal(key, readSuffix) { + shouldBlock = true + } + if args.Req.Method() == kvpb.Put && bytes.Equal(key, writeSuffix) { + shouldBlock = true + } + } + if shouldBlock { + // Notify callers that we're blocking. + select { + case blockedC <- struct{}{}: + t.Logf("blocked %s", args.Req) + case <-ctx.Done(): + return kvpb.NewError(ctx.Err()) + } + // Wait to unblock. + select { + case <-unblockC: + t.Logf("unblocked %s", args.Req) + case <-ctx.Done(): + return kvpb.NewError(ctx.Err()) + } + } + return nil + } + + // Set up a test server. + srv := serverutils.StartServerOnly(t, base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + EvalKnobs: kvserverbase.BatchEvalTestingKnobs{ + TestingEvalFilter: evalFilter, + }, + }, + }, + }) + defer srv.Stopper().Stop(ctx) + ssrv := srv.StorageLayer() + tsrv := srv.ApplicationLayer() + srv = nil // prevent direct access, use system or tenant as appropriate + + db := tsrv.DB() + store, err := ssrv.GetStores().(*kvserver.Stores).GetStore(ssrv.GetFirstStoreID()) + require.NoError(t, err) + _ = store + + // Determine the tenant prefix and keys. + prefix := tsrv.Codec().TenantPrefix() + readKey := append(prefix, readSuffix...) + splitKey := append(prefix, splitSuffix...) + writeKey := append(prefix, writeSuffix...) + + // Set up helpers to run barriers, both sync and async. + barrier := func(ctx context.Context, start, end roachpb.Key, withLAI bool) (err error) { + if withLAI { + _, _, err = db.BarrierWithLAI(ctx, start, end) + } else { + _, err = db.Barrier(ctx, start, end) + } + return + } + + barrierAsync := func(ctx context.Context, start, end roachpb.Key, withLAI bool) <-chan error { + errC := make(chan error, 1) + go func() { + errC <- barrier(ctx, start, end, withLAI) + }() + return errC + } + + // Split off a range at /split, to test cross-range barriers. + _, _, err = ssrv.SplitRange(splitKey) + require.NoError(t, err) + + // Spawn read and write requests, and wait for them to block. + go func() { + _ = db.Put(ctx, writeKey, "value") + }() + go func() { + _, _ = db.Get(ctx, readKey) + }() + + for i := 0; i < 2; i++ { + select { + case <-blockedC: + case <-ctx.Done(): + require.NoError(t, ctx.Err()) + } + } + + // Barriers should not conflict outside of these keys. + require.NoError(t, barrier(ctx, readKey.Next(), splitKey, true /* withLAI */)) + require.NoError(t, barrier(ctx, splitKey, writeKey, true /* withLAI */)) + require.Error(t, barrier(ctx, readKey.Next(), writeKey, true /* withLAI */)) // can't span ranges + require.NoError(t, barrier(ctx, readKey.Next(), writeKey, false /* withLAI */)) + + // Barriers should not conflict with read requests. + require.NoError(t, barrier(ctx, readKey, readKey.Next(), true /* withLAI */)) + + // Barriers should conflict with write requests. We send off two barriers: one + // WithLAI in a single range, and another across ranges. Neither of these + // should return in a second. + withLAIC := barrierAsync(ctx, splitKey, writeKey.Next(), true /* withLAI */) + withoutLAIC := barrierAsync(ctx, readKey, writeKey.Next(), false /* withLAI */) + select { + case err := <-withLAIC: + t.Fatalf("WithLAI=true barrier returned prematurely: %v", err) + case err := <-withoutLAIC: + t.Fatalf("WithLAI=false barrier returned prematurely: %v", err) + case <-time.After(time.Second): + } + + // While the barriers are blocked, later overlapping requests should be able + // to proceed and evaluate below them. + require.NoError(t, db.Put(ctx, splitKey, "value")) + _, err = db.Get(ctx, splitKey) + require.NoError(t, err) + + // Unblock the requests. This should now unblock the barriers as well. + close(unblockC) + + select { + case err := <-withLAIC: + require.NoError(t, err) + case <-time.After(3 * time.Second): + t.Fatal("WithLAI=true barrier did not return") + } + + select { + case err := <-withoutLAIC: + require.NoError(t, err) + case <-time.After(3 * time.Second): + t.Fatal("WithLAI=false barrier did not return") + } +} diff --git a/pkg/kv/kvserver/batcheval/result/result.go b/pkg/kv/kvserver/batcheval/result/result.go index 6a2136b03376..25fb2966d6f6 100644 --- a/pkg/kv/kvserver/batcheval/result/result.go +++ b/pkg/kv/kvserver/batcheval/result/result.go @@ -52,6 +52,9 @@ type LocalResult struct { // commit fails, or we may accidentally make uncommitted values // live. EndTxns []EndTxnIntents + // PopulateBarrierResponse will populate a BarrierResponse with the lease + // applied index and range descriptor when applied. + PopulateBarrierResponse bool // When set (in which case we better be the first range), call // GossipFirstRange if the Replica holds the lease. @@ -79,6 +82,7 @@ func (lResult *LocalResult) IsZero() bool { lResult.ResolvedLocks == nil && lResult.UpdatedTxns == nil && lResult.EndTxns == nil && + !lResult.PopulateBarrierResponse && !lResult.GossipFirstRange && !lResult.MaybeGossipSystemConfig && !lResult.MaybeGossipSystemConfigIfHaveFailure && @@ -93,13 +97,13 @@ func (lResult *LocalResult) String() string { return fmt.Sprintf("LocalResult (reply: %v, "+ "#encountered intents: %d, #acquired locks: %d, #resolved locks: %d"+ "#updated txns: %d #end txns: %d, "+ - "GossipFirstRange:%t MaybeGossipSystemConfig:%t "+ + "PopulateBarrierResponse:%t GossipFirstRange:%t MaybeGossipSystemConfig:%t "+ "MaybeGossipSystemConfigIfHaveFailure:%t MaybeAddToSplitQueue:%t "+ "MaybeGossipNodeLiveness:%s ", lResult.Reply, len(lResult.EncounteredIntents), len(lResult.AcquiredLocks), len(lResult.ResolvedLocks), len(lResult.UpdatedTxns), len(lResult.EndTxns), - lResult.GossipFirstRange, lResult.MaybeGossipSystemConfig, + lResult.PopulateBarrierResponse, lResult.GossipFirstRange, lResult.MaybeGossipSystemConfig, lResult.MaybeGossipSystemConfigIfHaveFailure, lResult.MaybeAddToSplitQueue, lResult.MaybeGossipNodeLiveness) } @@ -147,6 +151,17 @@ func (lResult *LocalResult) DetachEndTxns(alwaysOnly bool) []EndTxnIntents { return r } +// DetachPopulateBarrierResponse returns (and removes) the +// PopulateBarrierResponse value from the local result. +func (lResult *LocalResult) DetachPopulateBarrierResponse() bool { + if lResult == nil { + return false + } + r := lResult.PopulateBarrierResponse + lResult.PopulateBarrierResponse = false + return r +} + // Result is the result of evaluating a KV request. That is, the // proposer (which holds the lease, at least in the case in which the command // will complete successfully) has evaluated the request and is holding on to: @@ -368,6 +383,14 @@ func (p *Result) MergeAndDestroy(q Result) error { } q.Local.EndTxns = nil + if !p.Local.PopulateBarrierResponse { + p.Local.PopulateBarrierResponse = q.Local.PopulateBarrierResponse + } else { + // PopulateBarrierResponse is only valid for a single Barrier response. + return errors.AssertionFailedf("multiple PopulateBarrierResponse results") + } + q.Local.PopulateBarrierResponse = false + if p.Local.MaybeGossipNodeLiveness == nil { p.Local.MaybeGossipNodeLiveness = q.Local.MaybeGossipNodeLiveness } else if q.Local.MaybeGossipNodeLiveness != nil { diff --git a/pkg/kv/kvserver/replica_application_result.go b/pkg/kv/kvserver/replica_application_result.go index e3ee7fef4f38..1e7534dca24b 100644 --- a/pkg/kv/kvserver/replica_application_result.go +++ b/pkg/kv/kvserver/replica_application_result.go @@ -262,6 +262,17 @@ func (r *Replica) prepareLocalResult(ctx context.Context, cmd *replicatedCmd) { // in that case, it would seem prudent not to take advantage of that. In other // words, the line below this comment should be conditional on `pErr == nil`. cmd.response.EndTxns = cmd.proposal.Local.DetachEndTxns(pErr != nil) + + // Populate BarrierResponse if requested. + if pErr == nil && cmd.proposal.Local.DetachPopulateBarrierResponse() { + if resp := cmd.response.Reply.Responses[0].GetBarrier(); resp != nil { + resp.LeaseAppliedIndex = cmd.LeaseIndex + resp.RangeDesc = *r.Desc() + } else { + log.Fatalf(ctx, "PopulateBarrierResponse for %T", cmd.response.Reply.Responses[0].GetInner()) + } + } + if pErr == nil { cmd.localResult = cmd.proposal.Local } else if cmd.localResult != nil { From 5544e5d954fb4a11a476c500ac1e51328bc9c711 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Fri, 19 Jan 2024 10:28:30 +0000 Subject: [PATCH 3/4] kvnemsis: add support for `Barrier` operations This only executes random `Barrier` requests, but does not verify that the barrier guarantees are actually satisfied (i.e. that all past and concurrent writes are applied before it returns). At least we get some execution coverage, and verify that it does not have negative interactions with other operations. Epic: none Release note: None --- pkg/kv/kvnemesis/applier.go | 21 ++++++++++++ pkg/kv/kvnemesis/generator.go | 33 +++++++++++++++++++ pkg/kv/kvnemesis/generator_test.go | 5 ++- pkg/kv/kvnemesis/operations.go | 14 ++++++++ pkg/kv/kvnemesis/operations.proto | 8 +++++ pkg/kv/kvnemesis/operations_test.go | 2 ++ .../kvnemesis/testdata/TestOperationsFormat/5 | 3 ++ .../kvnemesis/testdata/TestOperationsFormat/6 | 3 ++ pkg/kv/kvnemesis/validator.go | 14 ++++++++ 9 files changed, 102 insertions(+), 1 deletion(-) create mode 100644 pkg/kv/kvnemesis/testdata/TestOperationsFormat/5 create mode 100644 pkg/kv/kvnemesis/testdata/TestOperationsFormat/6 diff --git a/pkg/kv/kvnemesis/applier.go b/pkg/kv/kvnemesis/applier.go index 6e6902d67f82..0500e5a9f9f3 100644 --- a/pkg/kv/kvnemesis/applier.go +++ b/pkg/kv/kvnemesis/applier.go @@ -151,6 +151,14 @@ func applyOp(ctx context.Context, env *Env, db *kv.DB, op *Operation) { case *ChangeZoneOperation: err := updateZoneConfigInEnv(ctx, env, o.Type) o.Result = resultInit(ctx, err) + case *BarrierOperation: + var err error + if o.WithLeaseAppliedIndex { + _, _, err = db.BarrierWithLAI(ctx, o.Key, o.EndKey) + } else { + _, err = db.Barrier(ctx, o.Key, o.EndKey) + } + o.Result = resultInit(ctx, err) case *ClosureTxnOperation: // Use a backoff loop to avoid thrashing on txn aborts. Don't wait between // epochs of the same transaction to avoid waiting while holding locks. @@ -435,6 +443,17 @@ func applyClientOp(ctx context.Context, db clientI, op *Operation, inTxn bool) { return } o.Result.OptionalTimestamp = ts + case *BarrierOperation: + _, _, err := dbRunWithResultAndTimestamp(ctx, db, func(b *kv.Batch) { + b.AddRawRequest(&kvpb.BarrierRequest{ + RequestHeader: kvpb.RequestHeader{ + Key: o.Key, + EndKey: o.EndKey, + }, + WithLeaseAppliedIndex: o.WithLeaseAppliedIndex, + }) + }) + o.Result = resultInit(ctx, err) case *BatchOperation: b := &kv.Batch{} applyBatchOp(ctx, b, db.Run, o) @@ -510,6 +529,8 @@ func applyBatchOp( setLastReqSeq(b, subO.Seq) case *AddSSTableOperation: panic(errors.AssertionFailedf(`AddSSTable cannot be used in batches`)) + case *BarrierOperation: + panic(errors.AssertionFailedf(`Barrier cannot be used in batches`)) default: panic(errors.AssertionFailedf(`unknown batch operation type: %T %v`, subO, subO)) } diff --git a/pkg/kv/kvnemesis/generator.go b/pkg/kv/kvnemesis/generator.go index 4d21e139f57d..d6c6c159a0ca 100644 --- a/pkg/kv/kvnemesis/generator.go +++ b/pkg/kv/kvnemesis/generator.go @@ -261,6 +261,8 @@ type ClientOperationConfig struct { DeleteRangeUsingTombstone int // AddSSTable is an operations that ingests an SSTable with random KV pairs. AddSSTable int + // Barrier is an operation that waits for in-flight writes to complete. + Barrier int } // BatchOperationConfig configures the relative probability of generating a @@ -373,6 +375,7 @@ func newAllOperationsConfig() GeneratorConfig { DeleteRange: 1, DeleteRangeUsingTombstone: 1, AddSSTable: 1, + Barrier: 1, } batchOpConfig := BatchOperationConfig{ Batch: 4, @@ -491,6 +494,12 @@ func NewDefaultConfig() GeneratorConfig { config.Ops.ClosureTxn.CommitBatchOps.AddSSTable = 0 config.Ops.ClosureTxn.TxnClientOps.AddSSTable = 0 config.Ops.ClosureTxn.TxnBatchOps.Ops.AddSSTable = 0 + // Barrier cannot be used in batches, and we omit it in txns too because it + // can result in spurious RangeKeyMismatchErrors that fail the txn operation. + config.Ops.Batch.Ops.Barrier = 0 + config.Ops.ClosureTxn.CommitBatchOps.Barrier = 0 + config.Ops.ClosureTxn.TxnClientOps.Barrier = 0 + config.Ops.ClosureTxn.TxnBatchOps.Ops.Barrier = 0 return config } @@ -766,6 +775,7 @@ func (g *generator) registerClientOps(allowed *[]opGen, c *ClientOperationConfig addOpGen(allowed, randDelRange, c.DeleteRange) addOpGen(allowed, randDelRangeUsingTombstone, c.DeleteRangeUsingTombstone) addOpGen(allowed, randAddSSTable, c.AddSSTable) + addOpGen(allowed, randBarrier, c.Barrier) } func (g *generator) registerBatchOps(allowed *[]opGen, c *BatchOperationConfig) { @@ -1056,6 +1066,21 @@ func randAddSSTable(g *generator, rng *rand.Rand) Operation { return addSSTable(f.Data(), sstSpan, sstTimestamp, seq, asWrites) } +func randBarrier(g *generator, rng *rand.Rand) Operation { + withLAI := rng.Float64() < 0.5 + var key, endKey string + if withLAI { + // Barriers requesting LAIs can't span multiple ranges, so we try to fit + // them within an existing range. This may race with a concurrent split, in + // which case the Barrier will fail, but that's ok -- most should still + // succeed. These errors are ignored by the validator. + key, endKey = randRangeSpan(rng, g.currentSplits) + } else { + key, endKey = randSpan(rng) + } + return barrier(key, endKey, withLAI) +} + func randScan(g *generator, rng *rand.Rand) Operation { key, endKey := randSpan(rng) return scan(key, endKey) @@ -1735,3 +1760,11 @@ func addSSTable( AsWrites: asWrites, }} } + +func barrier(key, endKey string, withLAI bool) Operation { + return Operation{Barrier: &BarrierOperation{ + Key: []byte(key), + EndKey: []byte(endKey), + WithLeaseAppliedIndex: withLAI, + }} +} diff --git a/pkg/kv/kvnemesis/generator_test.go b/pkg/kv/kvnemesis/generator_test.go index 2dfefae87c08..4d14a133fdd1 100644 --- a/pkg/kv/kvnemesis/generator_test.go +++ b/pkg/kv/kvnemesis/generator_test.go @@ -250,6 +250,8 @@ func TestRandStep(t *testing.T) { client.DeleteRangeUsingTombstone++ case *AddSSTableOperation: client.AddSSTable++ + case *BarrierOperation: + client.Barrier++ case *BatchOperation: batch.Batch++ countClientOps(&batch.Ops, nil, o.Ops...) @@ -270,7 +272,8 @@ func TestRandStep(t *testing.T) { *DeleteOperation, *DeleteRangeOperation, *DeleteRangeUsingTombstoneOperation, - *AddSSTableOperation: + *AddSSTableOperation, + *BarrierOperation: countClientOps(&counts.DB, &counts.Batch, step.Op) case *ClosureTxnOperation: countClientOps(&counts.ClosureTxn.TxnClientOps, &counts.ClosureTxn.TxnBatchOps, o.Ops...) diff --git a/pkg/kv/kvnemesis/operations.go b/pkg/kv/kvnemesis/operations.go index 036a59cbc86a..024c1df85049 100644 --- a/pkg/kv/kvnemesis/operations.go +++ b/pkg/kv/kvnemesis/operations.go @@ -41,6 +41,8 @@ func (op Operation) Result() *Result { return &o.Result case *AddSSTableOperation: return &o.Result + case *BarrierOperation: + return &o.Result case *SplitOperation: return &o.Result case *MergeOperation: @@ -131,6 +133,8 @@ func (op Operation) format(w *strings.Builder, fctx formatCtx) { o.format(w, fctx) case *AddSSTableOperation: o.format(w, fctx) + case *BarrierOperation: + o.format(w, fctx) case *SplitOperation: o.format(w, fctx) case *MergeOperation: @@ -339,6 +343,16 @@ func (op AddSSTableOperation) format(w *strings.Builder, fctx formatCtx) { } } +func (op BarrierOperation) format(w *strings.Builder, fctx formatCtx) { + if op.WithLeaseAppliedIndex { + fmt.Fprintf(w, `%s.BarrierWithLAI(ctx, %s, %s)`, + fctx.receiver, fmtKey(op.Key), fmtKey(op.EndKey)) + } else { + fmt.Fprintf(w, `%s.Barrier(ctx, %s, %s)`, fctx.receiver, fmtKey(op.Key), fmtKey(op.EndKey)) + } + op.Result.format(w) +} + func (op SplitOperation) format(w *strings.Builder, fctx formatCtx) { fmt.Fprintf(w, `%s.AdminSplit(ctx, %s, hlc.MaxTimestamp)`, fctx.receiver, fmtKey(op.Key)) op.Result.format(w) diff --git a/pkg/kv/kvnemesis/operations.proto b/pkg/kv/kvnemesis/operations.proto index bb33a83627cf..a27f8ce6f799 100644 --- a/pkg/kv/kvnemesis/operations.proto +++ b/pkg/kv/kvnemesis/operations.proto @@ -95,6 +95,13 @@ message AddSSTableOperation { Result result = 6 [(gogoproto.nullable) = false]; } +message BarrierOperation { + bytes key = 1; + bytes end_key = 2; + bool with_lease_applied_index = 3; + Result result = 4 [(gogoproto.nullable) = false]; +} + message SplitOperation { bytes key = 1; Result result = 2 [(gogoproto.nullable) = false]; @@ -150,6 +157,7 @@ message Operation { TransferLeaseOperation transfer_lease = 16; ChangeZoneOperation change_zone = 17; AddSSTableOperation add_sstable = 18 [(gogoproto.customname) = "AddSSTable"]; + BarrierOperation barrier = 22; } enum ResultType { diff --git a/pkg/kv/kvnemesis/operations_test.go b/pkg/kv/kvnemesis/operations_test.go index 5c24e2034477..95059b95b618 100644 --- a/pkg/kv/kvnemesis/operations_test.go +++ b/pkg/kv/kvnemesis/operations_test.go @@ -87,6 +87,8 @@ func TestOperationsFormat(t *testing.T) { { step: step(addSSTable(sstFile.Data(), sstSpan, sstTS, sstValueHeader.KVNemesisSeq.Get(), true)), }, + {step: step(barrier(k1, k2, false /* withLAI */))}, + {step: step(barrier(k3, k4, true /* withLAI */))}, } w := echotest.NewWalker(t, datapathutils.TestDataPath(t, t.Name())) diff --git a/pkg/kv/kvnemesis/testdata/TestOperationsFormat/5 b/pkg/kv/kvnemesis/testdata/TestOperationsFormat/5 new file mode 100644 index 000000000000..b848afebfc9b --- /dev/null +++ b/pkg/kv/kvnemesis/testdata/TestOperationsFormat/5 @@ -0,0 +1,3 @@ +echo +---- +···db0.Barrier(ctx, tk(1), tk(2)) diff --git a/pkg/kv/kvnemesis/testdata/TestOperationsFormat/6 b/pkg/kv/kvnemesis/testdata/TestOperationsFormat/6 new file mode 100644 index 000000000000..92c0790f55de --- /dev/null +++ b/pkg/kv/kvnemesis/testdata/TestOperationsFormat/6 @@ -0,0 +1,3 @@ +echo +---- +···db0.BarrierWithLAI(ctx, tk(3), tk(4)) diff --git a/pkg/kv/kvnemesis/validator.go b/pkg/kv/kvnemesis/validator.go index ab101a207e21..03c766ec4f96 100644 --- a/pkg/kv/kvnemesis/validator.go +++ b/pkg/kv/kvnemesis/validator.go @@ -693,6 +693,20 @@ func (v *validator) processOp(op Operation) { if v.buffering == bufferingSingle { v.checkAtomic(`addSSTable`, t.Result) } + case *BarrierOperation: + execTimestampStrictlyOptional = true + if op.Barrier.WithLeaseAppliedIndex && + resultHasErrorType(t.Result, &kvpb.RangeKeyMismatchError{}) { + // Barriers requesting LAIs can't span ranges. The generator will + // optimistically try to fit the barrier inside one of the current ranges, + // but this may race with a split, so we ignore the error in this case and + // try again later. + } else { + // Fail or retry on other errors, depending on type. + v.checkNonAmbError(op, t.Result, exceptUnhandledRetry) + } + // We don't yet actually check the barrier guarantees here, i.e. that all + // concurrent writes are applied by the time it completes. Maybe later. case *ScanOperation: if _, isErr := v.checkError(op, t.Result); isErr { break From 954f001b8f3ba0100f7b4e9686bb3798ece67be9 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Wed, 17 Jan 2024 13:31:38 +0000 Subject: [PATCH 4/4] kvserver: add `Replica.WaitForLeaseAppliedIndex()` This allows a caller to wait for a replica to reach a certain lease applied index. Similar functionality elsewhere is not migrated yet, out of caution. Epic: none Release note: None --- pkg/kv/kvserver/replica_command.go | 26 ++++++++ pkg/kv/kvserver/replica_command_test.go | 82 +++++++++++++++++++++++++ pkg/kv/kvserver/stores_server.go | 2 + 3 files changed, 110 insertions(+) diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index 11da90129c7c..1530604d0df1 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -893,6 +893,32 @@ func waitForReplicasInit( }) } +// WaitForLeaseAppliedIndex waits for the replica to reach the given lease +// applied index, or until the context is cancelled or the replica is destroyed. +// Note that the lease applied index may regress across restarts, since we don't +// sync state machine application to disk. +// +// TODO(erikgrinaker): it would be nice if we could be notified about LAI +// updates instead, but polling will do for now. +func (r *Replica) WaitForLeaseAppliedIndex( + ctx context.Context, lai kvpb.LeaseAppliedIndex, +) (kvpb.LeaseAppliedIndex, error) { + retryOpts := retry.Options{ + InitialBackoff: 10 * time.Millisecond, + Multiplier: 2, + MaxBackoff: time.Second, + } + for retry := retry.StartWithCtx(ctx, retryOpts); retry.Next(); { + if currentLAI := r.GetLeaseAppliedIndex(); currentLAI >= lai { + return currentLAI, nil + } + if _, err := r.IsDestroyed(); err != nil { + return 0, err + } + } + return 0, ctx.Err() +} + // ChangeReplicas atomically changes the replicas that are members of a range. // The change is performed in a distributed transaction and takes effect when // that transaction is committed. This transaction confirms that the supplied diff --git a/pkg/kv/kvserver/replica_command_test.go b/pkg/kv/kvserver/replica_command_test.go index 722db5b18f1a..8e68a9b386d3 100644 --- a/pkg/kv/kvserver/replica_command_test.go +++ b/pkg/kv/kvserver/replica_command_test.go @@ -13,7 +13,9 @@ package kvserver import ( "context" "encoding/binary" + math "math" "testing" + "time" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/keys" @@ -23,6 +25,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -608,3 +613,80 @@ func TestSynthesizeTargetsByChangeType(t *testing.T) { }) } } + +func TestWaitForLeaseAppliedIndex(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + const maxLAI = math.MaxUint64 + + ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + + tc := testContext{} + tc.Start(ctx, t, stopper) + db := tc.store.DB() + + // Submit a write and read it back to bump the initial LAI. + write := func(key, value string) { + require.NoError(t, db.Put(ctx, key, value)) + _, err := db.Get(ctx, key) + require.NoError(t, err) + } + write("foo", "bar") + + // Should return immediately when already at or past the LAI. + currentLAI := tc.repl.GetLeaseAppliedIndex() + require.NotZero(t, currentLAI) + resultLAI, err := tc.repl.WaitForLeaseAppliedIndex(ctx, currentLAI) + require.NoError(t, err) + require.GreaterOrEqual(t, resultLAI, currentLAI) + + // Should wait for a future LAI to be reached. + const numWrites = 10 + waitLAI := tc.repl.GetLeaseAppliedIndex() + numWrites + laiC := make(chan kvpb.LeaseAppliedIndex, 1) + go func() { + lai, err := tc.repl.WaitForLeaseAppliedIndex(ctx, waitLAI) + assert.NoError(t, err) // can't use require in goroutine + laiC <- lai + }() + + select { + case lai := <-laiC: + t.Fatalf("unexpected early LAI %d", lai) + case <-time.After(time.Second): + } + + for i := 0; i < numWrites; i++ { + write("foo", "bar") + } + + select { + case lai := <-laiC: + require.GreaterOrEqual(t, lai, waitLAI) + require.GreaterOrEqual(t, tc.repl.GetLeaseAppliedIndex(), lai) + case <-time.After(5 * time.Second): + t.Fatalf("timed out waiting for LAI %d", waitLAI) + } + + // Should error on context cancellation. + cancelCtx, cancel := context.WithCancel(ctx) + cancel() + _, err = tc.repl.WaitForLeaseAppliedIndex(cancelCtx, maxLAI) + require.Error(t, err) + require.Equal(t, cancelCtx.Err(), err) + + // Should error on destroyed replicas. + stopper.Stop(ctx) + + destroyErr := errors.New("destroyed") + tc.repl.mu.Lock() + tc.repl.mu.destroyStatus.Set(destroyErr, destroyReasonRemoved) + tc.repl.mu.Unlock() + + _, err = tc.repl.WaitForLeaseAppliedIndex(ctx, maxLAI) + require.Error(t, err) + require.Equal(t, destroyErr, err) +} diff --git a/pkg/kv/kvserver/stores_server.go b/pkg/kv/kvserver/stores_server.go index b4040707b7bc..ea1af8f10e49 100644 --- a/pkg/kv/kvserver/stores_server.go +++ b/pkg/kv/kvserver/stores_server.go @@ -74,6 +74,8 @@ func (is Server) CollectChecksum( // // It is the caller's responsibility to cancel or set a timeout on the context. // If the context is never canceled, WaitForApplication will retry forever. +// +// TODO(erikgrinaker): consider using Replica.WaitForLeaseAppliedIndex(). func (is Server) WaitForApplication( ctx context.Context, req *WaitForApplicationRequest, ) (*WaitForApplicationResponse, error) {