Skip to content

Commit

Permalink
external: pre-alloc mem for writer, use offset to avoid golang slice (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
D3Hunter authored Oct 23, 2023
1 parent b0c01fa commit 05b50a1
Show file tree
Hide file tree
Showing 17 changed files with 315 additions and 93 deletions.
4 changes: 3 additions & 1 deletion br/pkg/lightning/backend/external/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ go_library(
"engine.go",
"file.go",
"iter.go",
"kv_buf.go",
"kv_reader.go",
"merge.go",
"split.go",
Expand Down Expand Up @@ -53,13 +54,14 @@ go_test(
"engine_test.go",
"file_test.go",
"iter_test.go",
"kv_buf_test.go",
"split_test.go",
"util_test.go",
"writer_test.go",
],
embed = [":external"],
flaky = True,
shard_count = 40,
shard_count = 41,
deps = [
"//br/pkg/lightning/backend/kv",
"//br/pkg/lightning/common",
Expand Down
44 changes: 11 additions & 33 deletions br/pkg/lightning/backend/external/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import (
"github.com/pingcap/tidb/br/pkg/storage"
)

// we use uint64 to store the length of key and value.
const lengthBytes = 8

// KeyValueStore stores key-value pairs and maintains the range properties.
type KeyValueStore struct {
dataWriter storage.ExternalFileWriter
Expand All @@ -46,51 +49,26 @@ func NewKeyValueStore(
return kvStore, nil
}

// AddKeyValue saves a key-value pair to the KeyValueStore. If the accumulated
// addEncodedData saves encoded key-value pairs to the KeyValueStore.
// data layout: keyLen + key + valueLen + value. If the accumulated
// size or key count exceeds the given distance, a new range property will be
// appended to the rangePropertiesCollector with current status.
// `key` must be in strictly ascending order for invocations of a KeyValueStore.
func (s *KeyValueStore) AddKeyValue(key, value []byte) error {
var (
b [8]byte
kvLen = 0
)

// data layout: keyLen + key + valueLen + value
n, err := s.dataWriter.Write(
s.ctx,
binary.BigEndian.AppendUint64(b[:0], uint64(len(key))),
)
if err != nil {
return err
}
kvLen += n
n, err = s.dataWriter.Write(s.ctx, key)
if err != nil {
return err
}
kvLen += n
n, err = s.dataWriter.Write(
s.ctx,
binary.BigEndian.AppendUint64(b[:0], uint64(len(value))),
)
if err != nil {
return err
}
kvLen += n
n, err = s.dataWriter.Write(s.ctx, value)
func (s *KeyValueStore) addEncodedData(val []byte) error {
_, err := s.dataWriter.Write(s.ctx, val)
if err != nil {
return err
}
kvLen += n

keyLen := binary.BigEndian.Uint64(val)
key := val[lengthBytes : lengthBytes+keyLen]
if len(s.rc.currProp.firstKey) == 0 {
s.rc.currProp.firstKey = key
}
s.rc.currProp.lastKey = key

s.offset += uint64(kvLen)
s.rc.currProp.size += uint64(len(key) + len(value))
s.offset += uint64(len(val))
s.rc.currProp.size += uint64(len(val) - lengthBytes*2)
s.rc.currProp.keys++

if s.rc.currProp.size >= s.rc.propSizeDist ||
Expand Down
12 changes: 6 additions & 6 deletions br/pkg/lightning/backend/external/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,14 @@ func TestAddKeyValueMaintainRangeProperty(t *testing.T) {
require.Len(t, encoded, 0)

k1, v1 := []byte("key1"), []byte("value1")
err = kvStore.AddKeyValue(k1, v1)
err = kvStore.addEncodedData(getEncodedData(k1, v1))
require.NoError(t, err)
// when not accumulated enough data, no range property will be added.
require.Equal(t, &initRC, rc)

// propKeysDist = 2, so after adding 2 keys, a new range property will be added.
k2, v2 := []byte("key2"), []byte("value2")
err = kvStore.AddKeyValue(k2, v2)
err = kvStore.addEncodedData(getEncodedData(k2, v2))
require.NoError(t, err)
require.Len(t, rc.props, 1)
expected := &rangeProperty{
Expand All @@ -67,7 +67,7 @@ func TestAddKeyValueMaintainRangeProperty(t *testing.T) {

// when not accumulated enough data, no range property will be added.
k3, v3 := []byte("key3"), []byte("value3")
err = kvStore.AddKeyValue(k3, v3)
err = kvStore.addEncodedData(getEncodedData(k3, v3))
require.NoError(t, err)
require.Len(t, rc.props, 1)

Expand All @@ -93,7 +93,7 @@ func TestAddKeyValueMaintainRangeProperty(t *testing.T) {
rc.reset()
kvStore, err = NewKeyValueStore(ctx, writer, rc)
require.NoError(t, err)
err = kvStore.AddKeyValue(k1, v1)
err = kvStore.addEncodedData(getEncodedData(k1, v1))
require.NoError(t, err)
require.Len(t, rc.props, 1)
expected = &rangeProperty{
Expand All @@ -105,7 +105,7 @@ func TestAddKeyValueMaintainRangeProperty(t *testing.T) {
}
require.Equal(t, expected, rc.props[0])

err = kvStore.AddKeyValue(k2, v2)
err = kvStore.addEncodedData(getEncodedData(k2, v2))
require.NoError(t, err)
require.Len(t, rc.props, 2)
expected = &rangeProperty{
Expand Down Expand Up @@ -149,7 +149,7 @@ func TestKVReadWrite(t *testing.T) {
randLen = rand.Intn(10) + 1
values[i] = make([]byte, randLen)
rand.Read(values[i])
err = kvStore.AddKeyValue(keys[i], values[i])
err = kvStore.addEncodedData(getEncodedData(keys[i], values[i]))
require.NoError(t, err)
}
err = writer.Close(ctx)
Expand Down
21 changes: 15 additions & 6 deletions br/pkg/lightning/backend/external/iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,15 @@ func (r *trackOpenFileReader) Close() error {
return nil
}

func getEncodedData(key, value []byte) []byte {
buf := make([]byte, 8*2+len(key)+len(value))
binary.BigEndian.PutUint64(buf, uint64(len(key)))
copy(buf[8:], key)
binary.BigEndian.PutUint64(buf[8+len(key):], uint64(len(value)))
copy(buf[8*2+len(key):], value)
return buf
}

func TestMergeKVIter(t *testing.T) {
ctx := context.Background()
memStore := storage.NewMemStorage()
Expand All @@ -78,7 +87,7 @@ func TestMergeKVIter(t *testing.T) {
kvStore, err := NewKeyValueStore(ctx, writer, rc)
require.NoError(t, err)
for _, kv := range data[i] {
err = kvStore.AddKeyValue([]byte(kv[0]), []byte(kv[1]))
err = kvStore.addEncodedData(getEncodedData([]byte(kv[0]), []byte(kv[1])))
require.NoError(t, err)
}
err = writer.Close(ctx)
Expand Down Expand Up @@ -130,7 +139,7 @@ func TestOneUpstream(t *testing.T) {
kvStore, err := NewKeyValueStore(ctx, writer, rc)
require.NoError(t, err)
for _, kv := range data[i] {
err = kvStore.AddKeyValue([]byte(kv[0]), []byte(kv[1]))
err = kvStore.addEncodedData(getEncodedData([]byte(kv[0]), []byte(kv[1])))
require.NoError(t, err)
}
err = writer.Close(ctx)
Expand Down Expand Up @@ -208,7 +217,7 @@ func TestCorruptContent(t *testing.T) {
kvStore, err := NewKeyValueStore(ctx, writer, rc)
require.NoError(t, err)
for _, kv := range data[i] {
err = kvStore.AddKeyValue([]byte(kv[0]), []byte(kv[1]))
err = kvStore.addEncodedData(getEncodedData([]byte(kv[0]), []byte(kv[1])))
require.NoError(t, err)
}
if i == 0 {
Expand Down Expand Up @@ -345,7 +354,7 @@ func TestHotspot(t *testing.T) {
kvStore, err := NewKeyValueStore(ctx, writer, rc)
require.NoError(t, err)
for _, k := range keys[i] {
err = kvStore.AddKeyValue([]byte(k), value)
err = kvStore.addEncodedData(getEncodedData([]byte(k), value))
require.NoError(t, err)
}
err = writer.Close(ctx)
Expand Down Expand Up @@ -446,13 +455,13 @@ func TestMemoryUsageWhenHotspotChange(t *testing.T) {
for j := 0; j < checkHotspotPeriod; j++ {
key := fmt.Sprintf("key%06d", cur)
val := fmt.Sprintf("value%06d", cur)
err = kvStore.AddKeyValue([]byte(key), []byte(val))
err = kvStore.addEncodedData(getEncodedData([]byte(key), []byte(val)))
require.NoError(t, err)
cur++
}
for j := 0; j <= 12; j++ {
key := fmt.Sprintf("key999%06d", cur+j)
err = kvStore.AddKeyValue([]byte(key), largeChunk)
err = kvStore.addEncodedData(getEncodedData([]byte(key), largeChunk))
require.NoError(t, err)
}
err = writer.Close(ctx)
Expand Down
75 changes: 75 additions & 0 deletions br/pkg/lightning/backend/external/kv_buf.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package external

import "github.com/docker/go-units"

// DefaultBlockSize is the default block size for preAllocKVBuf.
const DefaultBlockSize = 16 * units.MiB

// preAllocKVBuf pre allocates a large buffer of limit memLimit to reduce memory
// allocation, all space in this buffer will be reused when reset.
type preAllocKVBuf struct {
blocks [][]byte
blockSize int
curBlock []byte
curBlockIdx int
curIdx int
}

func newPreAllocKVBuf(memLimit uint64, blockSize int) *preAllocKVBuf {
blockCount := (memLimit + uint64(blockSize) - 1) / uint64(blockSize)
b := &preAllocKVBuf{
blocks: make([][]byte, 0, blockCount),
blockSize: blockSize,
}
for i := 0; i < int(blockCount); i++ {
b.blocks = append(b.blocks, make([]byte, blockSize))
}
b.reset()
return b
}

func (b *preAllocKVBuf) Alloc(s int) (blockIdx int32, res []byte, offset int32, allocated bool) {
if s > b.blockSize {
return
}
if b.blockSize-b.curIdx < s {
if b.curBlockIdx+1 >= len(b.blocks) {
return
}
b.curBlockIdx++
b.curBlock = b.blocks[b.curBlockIdx]
b.curIdx = 0
}
blockIdx = int32(b.curBlockIdx)
res = b.curBlock[b.curIdx : b.curIdx+s]
offset = int32(b.curIdx)
allocated = true

b.curIdx += s
return
}

func (b *preAllocKVBuf) reset() {
b.curBlockIdx = 0
b.curBlock = b.blocks[0]
b.curIdx = 0
}

func (b *preAllocKVBuf) destroy() {
b.blocks = nil
b.curBlock = nil
}
77 changes: 77 additions & 0 deletions br/pkg/lightning/backend/external/kv_buf_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package external

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestNewPreAllocKVBuf(t *testing.T) {
cases := []struct {
memLimit uint64
expectBlocks int
}{
{1, 1},
{15, 1},
{16, 1},
{17, 2},
{31, 2},
{32, 2},
}
for _, c := range cases {
buf := newPreAllocKVBuf(c.memLimit, 16)
require.Equal(t, c.expectBlocks, len(buf.blocks))
require.Equal(t, 16, buf.blockSize)
require.Equal(t, buf.blocks[0], buf.curBlock)
require.Equal(t, 0, buf.curBlockIdx)
require.Equal(t, 0, buf.curIdx)
}

buf := newPreAllocKVBuf(16, 8)
// alloc larger than block size.
_, _, _, allocated := buf.Alloc(9)
require.False(t, allocated)
blockIdx, res, offset, allocated := buf.Alloc(8)
require.Equal(t, int32(0), blockIdx)
require.Equal(t, int32(0), offset)
require.True(t, allocated)
copy(res, "12345678")
blockIdx, res, offset, allocated = buf.Alloc(4)
require.Equal(t, int32(1), blockIdx)
require.Equal(t, int32(0), offset)
require.True(t, allocated)
copy(res, "aaaa")
blockIdx, res, offset, allocated = buf.Alloc(4)
require.Equal(t, int32(1), blockIdx)
require.Equal(t, int32(4), offset)
require.True(t, allocated)
copy(res, "bbbb")
_, _, _, allocated = buf.Alloc(4)
require.False(t, allocated)

require.Equal(t, "12345678", string(buf.blocks[0]))
require.Equal(t, "aaaabbbb", string(buf.blocks[1]))

buf.reset()
require.Equal(t, buf.blocks[0], buf.curBlock)
require.Equal(t, 0, buf.curBlockIdx)
require.Equal(t, 0, buf.curIdx)

buf.destroy()
require.Nil(t, buf.blocks)
require.Nil(t, buf.curBlock)
}
2 changes: 2 additions & 0 deletions br/pkg/lightning/backend/external/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ func MergeOverlappingFiles(
newFilePrefix string,
writerID string,
memSizeLimit uint64,
blockSize int,
writeBatchCount uint64,
propSizeDist uint64,
propKeysDist uint64,
Expand All @@ -30,6 +31,7 @@ func MergeOverlappingFiles(

writer := NewWriterBuilder().
SetMemorySizeLimit(memSizeLimit).
SetBlockSize(blockSize).
SetWriterBatchCount(writeBatchCount).
SetPropKeysDistance(propKeysDist).
SetPropSizeDistance(propSizeDist).
Expand Down
Loading

0 comments on commit 05b50a1

Please sign in to comment.