Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvcoord: account for the span overhead when condensing refresh spans #84230

Merged
merged 1 commit into from
Aug 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions pkg/kv/kvclient/kvcoord/condensable_span_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@ import (
// "footprint" of the set to grow, so the set should be thought of as on
// overestimate.
type condensableSpanSet struct {
// TODO(nvanbenschoten): It feels like there is a lot that we could do with
// this data structure to 1) reduce the per span overhead, and 2) avoid the
// retention of many small keys for the duration of a transaction. For
// instance, we could allocate a single large block of memory and copy keys
// into it. We could also store key lengths inline to minimize the per-span
// overhead. Recognizing that many spans are actually point keys would also
// help.
s []roachpb.Span
bytes int64

Expand Down Expand Up @@ -238,9 +245,7 @@ func (s *condensableSpanSet) bytesSize() int64 {
}

func spanSize(sp roachpb.Span) int64 {
return int64(len(sp.Key) + len(sp.EndKey))
}

func keySize(k roachpb.Key) int64 {
return int64(len(k))
// Since the span is included into a []roachpb.Span, we also need to account
// for the overhead of storing it in that slice.
return roachpb.SpanOverhead + int64(cap(sp.Key)+cap(sp.EndKey))
}
20 changes: 10 additions & 10 deletions pkg/kv/kvclient/kvcoord/condensable_span_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ func TestCondensableSpanSetMergeContiguousSpans(t *testing.T) {
s := condensableSpanSet{}
s.insert(roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("b")})
s.insert(roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key("c")})
require.Equal(t, int64(4), s.bytes)
require.Equal(t, 4+2*roachpb.SpanOverhead, s.bytes)
s.mergeAndSort()
require.Equal(t, int64(2), s.bytes)
require.Equal(t, 2+roachpb.SpanOverhead, s.bytes)
}

func TestCondensableSpanSetEstimateSize(t *testing.T) {
Expand All @@ -51,32 +51,32 @@ func TestCondensableSpanSetEstimateSize(t *testing.T) {
name: "new spans fit without merging",
set: []roachpb.Span{ab, bc},
newSpans: []roachpb.Span{ab},
mergeThreshold: 100,
expEstimate: 6,
mergeThreshold: 100 + 3*roachpb.SpanOverhead,
expEstimate: 6 + 3*roachpb.SpanOverhead,
},
{
// The set gets merged, the new spans don't.
name: "set needs merging",
set: []roachpb.Span{ab, bc},
newSpans: []roachpb.Span{ab},
mergeThreshold: 5,
expEstimate: 4,
mergeThreshold: 5 + 2*roachpb.SpanOverhead,
expEstimate: 4 + 2*roachpb.SpanOverhead,
},
{
// The set gets merged, and then it gets merged again with the newSpans.
name: "new spans fit without merging",
set: []roachpb.Span{ab, bc},
newSpans: []roachpb.Span{ab, bc},
mergeThreshold: 5,
expEstimate: 2,
mergeThreshold: 5 + 2*roachpb.SpanOverhead,
expEstimate: 2 + roachpb.SpanOverhead,
},
{
// Everything gets merged, but it still doesn't fit.
name: "new spans dont fit",
set: []roachpb.Span{ab, bc},
newSpans: []roachpb.Span{ab, bc, largeSpan},
mergeThreshold: 5,
expEstimate: 12,
mergeThreshold: 5 + 2*roachpb.SpanOverhead,
expEstimate: 12 + 2*roachpb.SpanOverhead,
},
}
for _, tc := range tests {
Expand Down
26 changes: 15 additions & 11 deletions pkg/kv/kvclient/kvcoord/txn_intercepter_pipeliner_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,23 +43,25 @@ func TestTxnPipelinerCondenseLockSpans(t *testing.T) {
fTof0 := roachpb.Span{Key: roachpb.Key("f"), EndKey: roachpb.Key("f0")}
g := roachpb.Span{Key: roachpb.Key("g"), EndKey: roachpb.Key(nil)}
g0Tog1 := roachpb.Span{Key: roachpb.Key("g0"), EndKey: roachpb.Key("g1")}
fTog1Closed := roachpb.Span{Key: roachpb.Key("f"), EndKey: roachpb.Key("g1")}
fTog1 := roachpb.Span{Key: roachpb.Key("f"), EndKey: roachpb.Key("g1")}
testCases := []struct {
span roachpb.Span
expLocks []roachpb.Span
expLocksSize int64
expLocksSize int64 // doesn't include the span overhead
}{
{span: a, expLocks: []roachpb.Span{a}, expLocksSize: 1},
{span: b, expLocks: []roachpb.Span{a, b}, expLocksSize: 2},
{span: c, expLocks: []roachpb.Span{a, b, c}, expLocksSize: 3},
{span: d, expLocks: []roachpb.Span{a, b, c, d}, expLocksSize: 10},
// Note that c-e condenses and then lists first.
{span: e, expLocks: []roachpb.Span{cToEClosed, a, b}, expLocksSize: 5},
{span: fTof0, expLocks: []roachpb.Span{cToEClosed, a, b, fTof0}, expLocksSize: 8},
{span: g, expLocks: []roachpb.Span{cToEClosed, a, b, fTof0, g}, expLocksSize: 9},
{span: g0Tog1, expLocks: []roachpb.Span{fTog1Closed, cToEClosed, aToBClosed}, expLocksSize: 9},
// Note that c-e condenses and then lists first, we proceed to condense
// a-b too to get under half of the threshold.
{span: e, expLocks: []roachpb.Span{cToEClosed, aToBClosed}, expLocksSize: 6},
{span: fTof0, expLocks: []roachpb.Span{cToEClosed, aToBClosed, fTof0}, expLocksSize: 9},
{span: g, expLocks: []roachpb.Span{cToEClosed, aToBClosed, fTof0, g}, expLocksSize: 10},
// f-g1 condenses and then aToBClosed gets reordered with cToEClosed.
{span: g0Tog1, expLocks: []roachpb.Span{fTog1, aToBClosed, cToEClosed}, expLocksSize: 9},
// Add a key in the middle of a span, which will get merged on commit.
{span: c, expLocks: []roachpb.Span{fTog1Closed, cToEClosed, aToBClosed, c}, expLocksSize: 10},
{span: c, expLocks: []roachpb.Span{fTog1, aToBClosed, cToEClosed, c}, expLocksSize: 10},
}
splits := []roachpb.Span{
{Key: roachpb.Key("a"), EndKey: roachpb.Key("c")},
Expand All @@ -78,12 +80,14 @@ func TestTxnPipelinerCondenseLockSpans(t *testing.T) {
descDB := kvcoord.TestingMockRangeDescriptorDBForDescs(descs...)
s := createTestDB(t)
st := s.Store.ClusterSettings()
kvcoord.TrackedWritesMaxSize.Override(ctx, &st.SV, 10) /* 10 bytes and it will condense */
// 10 bytes for the keys and 192 bytes for the span overhead, and then it
// will condense.
kvcoord.TrackedWritesMaxSize.Override(ctx, &st.SV, 10+4*roachpb.SpanOverhead)
defer s.Stop()

// Check end transaction locks, which should be condensed and split
// at range boundaries.
expLocks := []roachpb.Span{aToBClosed, cToEClosed, fTog1Closed}
expLocks := []roachpb.Span{aToBClosed, cToEClosed, fTog1}
sendFn := func(_ context.Context, ba roachpb.BatchRequest) (*roachpb.BatchResponse, error) {
resp := ba.CreateReply()
resp.Txn = ba.Txn
Expand Down Expand Up @@ -145,7 +149,7 @@ func TestTxnPipelinerCondenseLockSpans(t *testing.T) {
}
locksSize := int64(0)
for _, i := range locks {
locksSize += int64(len(i.Key) + len(i.EndKey))
locksSize += int64(len(i.Key) + len(i.EndKey)) // ignoring the span overhead
}
if a, e := locksSize, tc.expLocksSize; a != e {
t.Errorf("%d: keys size expected %d; got %d", i, e, a)
Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go
Original file line number Diff line number Diff line change
Expand Up @@ -1068,3 +1068,7 @@ func (a *inFlightWriteAlloc) clear() {
}
*a = (*a)[:0]
}

func keySize(k roachpb.Key) int64 {
return int64(len(k))
}
17 changes: 15 additions & 2 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1493,7 +1493,7 @@ func TestTxnPipelinerSavepoints(t *testing.T) {
require.Empty(t, tp.ifWrites.len())
}

// TestTxnCoordSenderCondenseLockSpans2 verifies that lock spans are condensed
// TestTxnPipelinerCondenseLockSpans2 verifies that lock spans are condensed
// along range boundaries when they exceed the maximum intent bytes threshold.
func TestTxnPipelinerCondenseLockSpans2(t *testing.T) {
defer leaktest.AfterTest(t)()
Expand Down Expand Up @@ -1708,17 +1708,22 @@ func TestTxnPipelinerRejectAboveBudget(t *testing.T) {
// The 0-based index of the request that's expected to be rejected. -1 if no
// request is expected to be rejected.
expRejectIdx int
maxSize int64
}{
{name: "large request",
reqs: []roachpb.BatchRequest{largeWrite},
expRejectIdx: 0,
maxSize: int64(len(largeAs)) - 1 + roachpb.SpanOverhead,
},
{name: "requests that add up",
reqs: []roachpb.BatchRequest{
putBatchNoAsyncConsensus(roachpb.Key("aaaa"), nil),
putBatchNoAsyncConsensus(roachpb.Key("bbbb"), nil),
putBatchNoAsyncConsensus(roachpb.Key("cccc"), nil)},
expRejectIdx: 2,
// maxSize is such that first two requests fit and the third one
// goes above the limit.
maxSize: 9 + 2*roachpb.SpanOverhead,
},
{name: "async requests that add up",
// Like the previous test, but this time the requests run with async
Expand All @@ -1729,6 +1734,7 @@ func TestTxnPipelinerRejectAboveBudget(t *testing.T) {
putBatch(roachpb.Key("bbbb"), nil),
putBatch(roachpb.Key("cccc"), nil)},
expRejectIdx: 2,
maxSize: 10 + roachpb.SpanOverhead,
},
{
name: "response goes over budget, next request rejected",
Expand All @@ -1737,6 +1743,7 @@ func TestTxnPipelinerRejectAboveBudget(t *testing.T) {
reqs: []roachpb.BatchRequest{delRange, putBatch(roachpb.Key("a"), nil)},
resp: []*roachpb.BatchResponse{delRangeResp},
expRejectIdx: 1,
maxSize: 10 + roachpb.SpanOverhead,
},
{
name: "response goes over budget",
Expand All @@ -1746,12 +1753,18 @@ func TestTxnPipelinerRejectAboveBudget(t *testing.T) {
reqs: []roachpb.BatchRequest{delRange},
resp: []*roachpb.BatchResponse{delRangeResp},
expRejectIdx: -1,
maxSize: 10 + roachpb.SpanOverhead,
},
{
// Request keys overlap, so they don't count twice.
name: "overlapping requests",
reqs: []roachpb.BatchRequest{mediumWrite, mediumWrite, mediumWrite},
expRejectIdx: -1,
// Our estimation logic for rejecting requests based on size
// consults both the in-flight write set (which doesn't account for
// the span overhead) as well as the lock footprint (which accounts
// for the span overhead).
maxSize: 16 + roachpb.SpanOverhead,
},
}
for _, tc := range testCases {
Expand All @@ -1761,7 +1774,7 @@ func TestTxnPipelinerRejectAboveBudget(t *testing.T) {
}

tp, mockSender := makeMockTxnPipeliner(nil /* iter */)
TrackedWritesMaxSize.Override(ctx, &tp.st.SV, 10) /* reject when exceeding 10 bytes */
TrackedWritesMaxSize.Override(ctx, &tp.st.SV, tc.maxSize)
rejectTxnOverTrackedWritesBudget.Override(ctx, &tp.st.SV, true)

txn := makeTxnProto()
Expand Down
29 changes: 14 additions & 15 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,10 @@ func TestTxnSpanRefresherCollectsSpans(t *testing.T) {
require.Nil(t, pErr)
require.NotNil(t, br)

require.Equal(t, []roachpb.Span{getArgs.Span(), delRangeArgs.Span()},
tsr.refreshFootprint.asSlice())
expSpans := []roachpb.Span{getArgs.Span(), delRangeArgs.Span()}
require.Equal(t, expSpans, tsr.refreshFootprint.asSlice())
require.False(t, tsr.refreshInvalid)
require.Equal(t, int64(3), tsr.refreshFootprint.bytes)
require.Equal(t, 3+int64(len(expSpans))*roachpb.SpanOverhead, tsr.refreshFootprint.bytes)
require.Zero(t, tsr.refreshedTimestamp)

// Scan with limit. Only the scanned keys are added to the refresh spans.
Expand All @@ -104,11 +104,10 @@ func TestTxnSpanRefresherCollectsSpans(t *testing.T) {
require.Nil(t, pErr)
require.NotNil(t, br)

require.Equal(t,
[]roachpb.Span{getArgs.Span(), delRangeArgs.Span(), {Key: scanArgs.Key, EndKey: keyC}},
tsr.refreshFootprint.asSlice())
expSpans = []roachpb.Span{getArgs.Span(), delRangeArgs.Span(), {Key: scanArgs.Key, EndKey: keyC}}
require.Equal(t, expSpans, tsr.refreshFootprint.asSlice())
require.False(t, tsr.refreshInvalid)
require.Equal(t, int64(5), tsr.refreshFootprint.bytes)
require.Equal(t, 5+int64(len(expSpans))*roachpb.SpanOverhead, tsr.refreshFootprint.bytes)
require.Zero(t, tsr.refreshedTimestamp)
}

Expand Down Expand Up @@ -863,8 +862,8 @@ func TestTxnSpanRefresherMaxTxnRefreshSpansBytes(t *testing.T) {
keyC := roachpb.Key("c")
keyD, keyE := roachpb.Key("d"), roachpb.Key("e")

// Set MaxTxnRefreshSpansBytes limit to 3 bytes.
MaxTxnRefreshSpansBytes.Override(ctx, &tsr.st.SV, 3)
// Set MaxTxnRefreshSpansBytes limit to 3 bytes plus the span overhead.
MaxTxnRefreshSpansBytes.Override(ctx, &tsr.st.SV, 3+roachpb.SpanOverhead)

// Send a batch below the limit.
var ba roachpb.BatchRequest
Expand All @@ -879,7 +878,7 @@ func TestTxnSpanRefresherMaxTxnRefreshSpansBytes(t *testing.T) {
require.Equal(t, []roachpb.Span{scanArgs.Span()}, tsr.refreshFootprint.asSlice())
require.False(t, tsr.refreshInvalid)
require.Zero(t, tsr.refreshedTimestamp)
require.Equal(t, int64(2), tsr.refreshFootprint.bytes)
require.Equal(t, 2+roachpb.SpanOverhead, tsr.refreshFootprint.bytes)

// Send another batch that pushes us above the limit. The tracked spans are
// adjacent so the spans will be merged, but not condensed.
Expand All @@ -893,7 +892,7 @@ func TestTxnSpanRefresherMaxTxnRefreshSpansBytes(t *testing.T) {

require.Equal(t, []roachpb.Span{{Key: keyA, EndKey: keyC}}, tsr.refreshFootprint.asSlice())
require.False(t, tsr.refreshInvalid)
require.Equal(t, int64(2), tsr.refreshFootprint.bytes)
require.Equal(t, 2+roachpb.SpanOverhead, tsr.refreshFootprint.bytes)
require.False(t, tsr.refreshFootprint.condensed)
require.Equal(t, int64(0), tsr.refreshMemoryLimitExceeded.Count())
require.Zero(t, tsr.refreshedTimestamp)
Expand All @@ -916,7 +915,7 @@ func TestTxnSpanRefresherMaxTxnRefreshSpansBytes(t *testing.T) {
require.Equal(t, int64(0), tsr.refreshFailWithCondensedSpans.Count())

// Return a transaction retry error and make sure the metric indicating that
// we did not retry due to the refresh span bytes in incremented.
// we did not retry due to the refresh span bytes is incremented.
mockSender.MockSend(func(request roachpb.BatchRequest) (batchResponse *roachpb.BatchResponse, r *roachpb.Error) {
return nil, roachpb.NewErrorWithTxn(
roachpb.NewTransactionRetryError(roachpb.RETRY_SERIALIZABLE, ""), ba.Txn)
Expand Down Expand Up @@ -1088,8 +1087,8 @@ func TestTxnSpanRefresherEpochIncrement(t *testing.T) {
keyA, keyB := roachpb.Key("a"), roachpb.Key("b")
keyC, keyD := roachpb.Key("c"), roachpb.Key("d")

// Set MaxTxnRefreshSpansBytes limit to 3 bytes.
MaxTxnRefreshSpansBytes.Override(ctx, &tsr.st.SV, 3)
// Set MaxTxnRefreshSpansBytes limit to 3 bytes plus the span overhead.
MaxTxnRefreshSpansBytes.Override(ctx, &tsr.st.SV, 3+roachpb.SpanOverhead)

// Send a batch below the limit.
var ba roachpb.BatchRequest
Expand All @@ -1103,7 +1102,7 @@ func TestTxnSpanRefresherEpochIncrement(t *testing.T) {

require.Equal(t, []roachpb.Span{scanArgs.Span()}, tsr.refreshFootprint.asSlice())
require.False(t, tsr.refreshInvalid)
require.Equal(t, int64(2), tsr.refreshFootprint.bytes)
require.Equal(t, 2+roachpb.SpanOverhead, tsr.refreshFootprint.bytes)
require.Zero(t, tsr.refreshedTimestamp)

// Incrementing the transaction epoch clears the spans.
Expand Down