From 9816f82ea4beb991080853f02f2a88a2f4d02c6b Mon Sep 17 00:00:00 2001 From: Siyuan Zhang Date: Tue, 16 Jan 2024 14:30:01 -0800 Subject: [PATCH] skip buffer dedupe for seq buffer. to fix https://github.com/etcd-io/etcd/issues/17247. Signed-off-by: Siyuan Zhang --- server/storage/backend/batch_tx_test.go | 57 ++++++++++++++++++------- server/storage/backend/tx_buffer.go | 5 ++- 2 files changed, 45 insertions(+), 17 deletions(-) diff --git a/server/storage/backend/batch_tx_test.go b/server/storage/backend/batch_tx_test.go index d0f6dcdc055e..20eaaa817115 100644 --- a/server/storage/backend/batch_tx_test.go +++ b/server/storage/backend/batch_tx_test.go @@ -241,18 +241,18 @@ func TestRangeAfterDeleteMatch(t *testing.T) { tx.Unlock() tx.Commit() - checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), []byte("foo"), nil, 0) + checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), schema.Test, []byte("foo"), nil, 0) checkForEach(t, b.BatchTx(), b.ReadTx(), [][]byte{[]byte("foo")}, [][]byte{[]byte("bar")}) tx.Lock() tx.UnsafeDelete(schema.Test, []byte("foo")) tx.Unlock() - checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), []byte("foo"), nil, 0) + checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), schema.Test, []byte("foo"), nil, 0) checkForEach(t, b.BatchTx(), b.ReadTx(), nil, nil) } -func TestRangeAfterOverwriteMatch(t *testing.T) { +func TestRangeAfterUnorderedKeyWriteMatch(t *testing.T) { b, _ := betesting.NewTmpBackend(t, time.Hour, 10000) defer betesting.Close(t, b) @@ -260,15 +260,40 @@ func TestRangeAfterOverwriteMatch(t *testing.T) { tx.Lock() tx.UnsafeCreateBucket(schema.Test) - tx.UnsafePut(schema.Test, []byte("foo"), []byte("bar2")) - tx.UnsafePut(schema.Test, []byte("foo"), []byte("bar0")) - tx.UnsafePut(schema.Test, []byte("foo1"), []byte("bar10")) - tx.UnsafePut(schema.Test, []byte("foo"), []byte("bar1")) - tx.UnsafePut(schema.Test, []byte("foo1"), []byte("bar11")) + tx.UnsafePut(schema.Test, []byte("foo5"), []byte("bar5")) + tx.UnsafePut(schema.Test, []byte("foo2"), []byte("bar2")) + tx.UnsafePut(schema.Test, []byte("foo1"), []byte("bar1")) + tx.UnsafePut(schema.Test, []byte("foo3"), []byte("bar3")) + tx.UnsafePut(schema.Test, []byte("foo"), []byte("bar")) + tx.UnsafePut(schema.Test, []byte("foo4"), []byte("bar4")) tx.Unlock() - checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), []byte("foo"), []byte("foo3"), 1) - checkForEach(t, b.BatchTx(), b.ReadTx(), [][]byte{[]byte("foo"), []byte("foo1")}, [][]byte{[]byte("bar1"), []byte("bar11")}) + checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), schema.Test, []byte("foo"), []byte("foo6"), 1) +} + +func TestRangeAfterAlternatingBucketWriteMatch(t *testing.T) { + b, _ := betesting.NewTmpBackend(t, time.Hour, 10000) + defer betesting.Close(t, b) + + tx := b.BatchTx() + + tx.Lock() + tx.UnsafeCreateBucket(schema.Key) + tx.UnsafeCreateBucket(schema.Test) + tx.UnsafeSeqPut(schema.Key, []byte("key1"), []byte("val1")) + tx.Unlock() + + tx.Lock() + tx.UnsafeSeqPut(schema.Key, []byte("key2"), []byte("val2")) + tx.Unlock() + tx.Commit() + tx.Commit() + + tx.Lock() + tx.UnsafePut(schema.Test, []byte("foo"), []byte("bar")) + tx.Unlock() + checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), schema.Key, []byte("key"), []byte("key5"), 100) + checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), schema.Test, []byte("foo"), []byte("foo3"), 1) } func TestRangeAfterOverwriteAndDeleteMatch(t *testing.T) { @@ -286,7 +311,7 @@ func TestRangeAfterOverwriteAndDeleteMatch(t *testing.T) { tx.UnsafePut(schema.Test, []byte("foo1"), []byte("bar11")) tx.Unlock() - checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), []byte("foo"), nil, 0) + checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), schema.Test, []byte("foo"), nil, 0) checkForEach(t, b.BatchTx(), b.ReadTx(), [][]byte{[]byte("foo"), []byte("foo1")}, [][]byte{[]byte("bar1"), []byte("bar11")}) tx.Lock() @@ -294,18 +319,18 @@ func TestRangeAfterOverwriteAndDeleteMatch(t *testing.T) { tx.UnsafeDelete(schema.Test, []byte("foo1")) tx.Unlock() - checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), []byte("foo"), nil, 0) - checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), []byte("foo1"), nil, 0) + checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), schema.Test, []byte("foo"), nil, 0) + checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), schema.Test, []byte("foo1"), nil, 0) checkForEach(t, b.BatchTx(), b.ReadTx(), [][]byte{[]byte("foo")}, [][]byte{[]byte("bar3")}) } -func checkRangeResponseMatch(t *testing.T, tx backend.BatchTx, rtx backend.ReadTx, key, endKey []byte, limit int64) { +func checkRangeResponseMatch(t *testing.T, tx backend.BatchTx, rtx backend.ReadTx, bucket backend.Bucket, key, endKey []byte, limit int64) { tx.Lock() - ks1, vs1 := tx.UnsafeRange(schema.Test, key, endKey, limit) + ks1, vs1 := tx.UnsafeRange(bucket, key, endKey, limit) tx.Unlock() rtx.RLock() - ks2, vs2 := rtx.UnsafeRange(schema.Test, key, endKey, limit) + ks2, vs2 := rtx.UnsafeRange(bucket, key, endKey, limit) rtx.RUnlock() if diff := cmp.Diff(ks1, ks2); diff != "" { diff --git a/server/storage/backend/tx_buffer.go b/server/storage/backend/tx_buffer.go index 590cf2af67ab..3a31df715c90 100644 --- a/server/storage/backend/tx_buffer.go +++ b/server/storage/backend/tx_buffer.go @@ -82,8 +82,8 @@ func (txw *txWriteBuffer) writeback(txr *txReadBuffer) { rb, ok := txr.buckets[k] if !ok { delete(txw.buckets, k) - txr.buckets[k] = wb wb.dedupe() + txr.buckets[k] = wb continue } if seq, ok := txw.bucket2seq[k]; ok && !seq && wb.used > 1 { @@ -209,6 +209,9 @@ func (bb *bucketBuffer) merge(bbsrc *bucketBuffer) { // dedupe removes duplicates, using only newest update func (bb *bucketBuffer) dedupe() { + if bb.used <= 1 { + return + } sort.Stable(bb) widx := 0 for ridx := 1; ridx < bb.used; ridx++ {