Skip to content

Commit

Permalink
skip buffer dedupe for seq buffer.
Browse files Browse the repository at this point in the history
to fix etcd-io#17247.

Signed-off-by: Siyuan Zhang <[email protected]>
  • Loading branch information
siyuanfoundation committed Jan 17, 2024
1 parent e3c70c8 commit 085dbd2
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 17 deletions.
91 changes: 75 additions & 16 deletions server/storage/backend/batch_tx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,34 +241,93 @@ func TestRangeAfterDeleteMatch(t *testing.T) {
tx.Unlock()
tx.Commit()

checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), []byte("foo"), nil, 0)
checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), schema.Test, []byte("foo"), nil, 0)
checkForEach(t, b.BatchTx(), b.ReadTx(), [][]byte{[]byte("foo")}, [][]byte{[]byte("bar")})

tx.Lock()
tx.UnsafeDelete(schema.Test, []byte("foo"))
tx.Unlock()

checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), []byte("foo"), nil, 0)
checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), schema.Test, []byte("foo"), nil, 0)
checkForEach(t, b.BatchTx(), b.ReadTx(), nil, nil)
}

func TestRangeAfterOverwriteMatch(t *testing.T) {
func TestRangeAfterUnorderedKeyWriteMatch(t *testing.T) {
b, _ := betesting.NewTmpBackend(t, time.Hour, 10000)
defer betesting.Close(t, b)

tx := b.BatchTx()

tx.Lock()
tx.UnsafeCreateBucket(schema.Test)
tx.UnsafePut(schema.Test, []byte("foo"), []byte("bar2"))
tx.UnsafePut(schema.Test, []byte("foo"), []byte("bar0"))
tx.UnsafePut(schema.Test, []byte("foo1"), []byte("bar10"))
tx.UnsafePut(schema.Test, []byte("foo"), []byte("bar1"))
tx.UnsafePut(schema.Test, []byte("foo1"), []byte("bar11"))
tx.UnsafePut(schema.Test, []byte("foo5"), []byte("bar5"))
tx.UnsafePut(schema.Test, []byte("foo2"), []byte("bar2"))
tx.UnsafePut(schema.Test, []byte("foo1"), []byte("bar1"))
tx.UnsafePut(schema.Test, []byte("foo3"), []byte("bar3"))
tx.UnsafePut(schema.Test, []byte("foo"), []byte("bar"))
tx.UnsafePut(schema.Test, []byte("foo4"), []byte("bar4"))
tx.Unlock()

checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), []byte("foo"), []byte("foo3"), 1)
checkForEach(t, b.BatchTx(), b.ReadTx(), [][]byte{[]byte("foo"), []byte("foo1")}, [][]byte{[]byte("bar1"), []byte("bar11")})
checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), schema.Test, []byte("foo"), []byte("foo6"), 1)
}

func TestRangeAfterAlternatingBucketWriteMatch(t *testing.T) {
b, _ := betesting.NewTmpBackend(t, time.Hour, 10000)
defer betesting.Close(t, b)

tx := b.BatchTx()

tx.Lock()
tx.UnsafeCreateBucket(schema.Key)
tx.UnsafeCreateBucket(schema.Test)
tx.UnsafeSeqPut(schema.Key, []byte("key1"), []byte("val1"))
tx.Unlock()
checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), schema.Key, []byte("key"), []byte("key5"), 100)
checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), schema.Test, []byte("foo"), []byte("foo3"), 1)

tx.Lock()
tx.UnsafeSeqPut(schema.Key, []byte("key2"), []byte("val2"))
tx.Unlock()
checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), schema.Key, []byte("key"), []byte("key5"), 100)
checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), schema.Test, []byte("foo"), []byte("foo3"), 1)
tx.Commit()

tx.Lock()
tx.UnsafePut(schema.Test, []byte("foo"), []byte("bar"))
tx.Unlock()

tx.Lock()
tx.UnsafePut(schema.Test, []byte("foo1"), []byte("bar1"))
tx.Unlock()
checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), schema.Key, []byte("key"), []byte("key5"), 100)
checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), schema.Test, []byte("foo"), []byte("foo3"), 1)

tx.Lock()
tx.UnsafeSeqPut(schema.Key, []byte("key3"), []byte("val3"))
tx.Unlock()
checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), schema.Key, []byte("key"), []byte("key5"), 100)
checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), schema.Test, []byte("foo"), []byte("foo3"), 1)

tx.Lock()
tx.UnsafePut(schema.Test, []byte("foo2"), []byte("bar2"))
tx.Unlock()
checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), schema.Key, []byte("key"), []byte("key5"), 100)
checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), schema.Test, []byte("foo"), []byte("foo3"), 1)
tx.Commit()

tx.Lock()
tx.UnsafePut(schema.Test, []byte("foo3"), []byte("bar3"))
tx.Unlock()
checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), schema.Key, []byte("key"), []byte("key5"), 100)
checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), schema.Test, []byte("foo"), []byte("foo3"), 1)
tx.Commit()

tx.Lock()
tx.UnsafeSeqPut(schema.Key, []byte("key4"), []byte("val4"))
tx.Unlock()
checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), schema.Key, []byte("key"), []byte("key5"), 100)
checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), schema.Test, []byte("foo"), []byte("foo3"), 1)

}

func TestRangeAfterOverwriteAndDeleteMatch(t *testing.T) {
Expand All @@ -286,26 +345,26 @@ func TestRangeAfterOverwriteAndDeleteMatch(t *testing.T) {
tx.UnsafePut(schema.Test, []byte("foo1"), []byte("bar11"))
tx.Unlock()

checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), []byte("foo"), nil, 0)
checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), schema.Test, []byte("foo"), nil, 0)
checkForEach(t, b.BatchTx(), b.ReadTx(), [][]byte{[]byte("foo"), []byte("foo1")}, [][]byte{[]byte("bar1"), []byte("bar11")})

tx.Lock()
tx.UnsafePut(schema.Test, []byte("foo"), []byte("bar3"))
tx.UnsafeDelete(schema.Test, []byte("foo1"))
tx.Unlock()

checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), []byte("foo"), nil, 0)
checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), []byte("foo1"), nil, 0)
checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), schema.Test, []byte("foo"), nil, 0)
checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), schema.Test, []byte("foo1"), nil, 0)
checkForEach(t, b.BatchTx(), b.ReadTx(), [][]byte{[]byte("foo")}, [][]byte{[]byte("bar3")})
}

func checkRangeResponseMatch(t *testing.T, tx backend.BatchTx, rtx backend.ReadTx, key, endKey []byte, limit int64) {
func checkRangeResponseMatch(t *testing.T, tx backend.BatchTx, rtx backend.ReadTx, bucket backend.Bucket, key, endKey []byte, limit int64) {
tx.Lock()
ks1, vs1 := tx.UnsafeRange(schema.Test, key, endKey, limit)
ks1, vs1 := tx.UnsafeRange(bucket, key, endKey, limit)
tx.Unlock()

rtx.RLock()
ks2, vs2 := rtx.UnsafeRange(schema.Test, key, endKey, limit)
ks2, vs2 := rtx.UnsafeRange(bucket, key, endKey, limit)
rtx.RUnlock()

if diff := cmp.Diff(ks1, ks2); diff != "" {
Expand Down
5 changes: 4 additions & 1 deletion server/storage/backend/tx_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ func (txw *txWriteBuffer) writeback(txr *txReadBuffer) {
rb, ok := txr.buckets[k]
if !ok {
delete(txw.buckets, k)
txr.buckets[k] = wb
wb.dedupe()
txr.buckets[k] = wb
continue
}
if seq, ok := txw.bucket2seq[k]; ok && !seq && wb.used > 1 {
Expand Down Expand Up @@ -209,6 +209,9 @@ func (bb *bucketBuffer) merge(bbsrc *bucketBuffer) {

// dedupe removes duplicates, using only newest update
func (bb *bucketBuffer) dedupe() {
if bb.used <= 1 {
return
}
sort.Stable(bb)
widx := 0
for ridx := 1; ridx < bb.used; ridx++ {
Expand Down

0 comments on commit 085dbd2

Please sign in to comment.