From 05b50a1ae1397264b2a6776587236b6e4a33a0f9 Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Mon, 23 Oct 2023 16:15:32 +0800 Subject: [PATCH] external: pre-alloc mem for writer, use offset to avoid golang slice (#47793) ref pingcap/tidb#47572 --- br/pkg/lightning/backend/external/BUILD.bazel | 4 +- br/pkg/lightning/backend/external/file.go | 44 ++------ .../lightning/backend/external/file_test.go | 12 +-- .../lightning/backend/external/iter_test.go | 21 ++-- br/pkg/lightning/backend/external/kv_buf.go | 75 +++++++++++++ .../lightning/backend/external/kv_buf_test.go | 77 ++++++++++++++ br/pkg/lightning/backend/external/merge.go | 2 + .../lightning/backend/external/split_test.go | 17 +-- br/pkg/lightning/backend/external/util.go | 3 +- .../lightning/backend/external/util_test.go | 12 ++- br/pkg/lightning/backend/external/writer.go | 100 +++++++++++++----- .../lightning/backend/external/writer_test.go | 11 +- pkg/ddl/backfilling_merge_sort.go | 1 + pkg/disttask/importinto/BUILD.bazel | 3 +- .../importinto/encode_and_sort_operator.go | 18 +++- .../encode_and_sort_operator_test.go | 7 ++ pkg/disttask/importinto/scheduler.go | 1 + 17 files changed, 315 insertions(+), 93 deletions(-) create mode 100644 br/pkg/lightning/backend/external/kv_buf.go create mode 100644 br/pkg/lightning/backend/external/kv_buf_test.go diff --git a/br/pkg/lightning/backend/external/BUILD.bazel b/br/pkg/lightning/backend/external/BUILD.bazel index 34e04ec24137d..7bbfae8750dec 100644 --- a/br/pkg/lightning/backend/external/BUILD.bazel +++ b/br/pkg/lightning/backend/external/BUILD.bazel @@ -9,6 +9,7 @@ go_library( "engine.go", "file.go", "iter.go", + "kv_buf.go", "kv_reader.go", "merge.go", "split.go", @@ -53,13 +54,14 @@ go_test( "engine_test.go", "file_test.go", "iter_test.go", + "kv_buf_test.go", "split_test.go", "util_test.go", "writer_test.go", ], embed = [":external"], flaky = True, - shard_count = 40, + shard_count = 41, deps = [ "//br/pkg/lightning/backend/kv", "//br/pkg/lightning/common", diff --git a/br/pkg/lightning/backend/external/file.go b/br/pkg/lightning/backend/external/file.go index 78b7a637ba98d..fd66938a03a0a 100644 --- a/br/pkg/lightning/backend/external/file.go +++ b/br/pkg/lightning/backend/external/file.go @@ -21,6 +21,9 @@ import ( "github.com/pingcap/tidb/br/pkg/storage" ) +// we use uint64 to store the length of key and value. +const lengthBytes = 8 + // KeyValueStore stores key-value pairs and maintains the range properties. type KeyValueStore struct { dataWriter storage.ExternalFileWriter @@ -46,51 +49,26 @@ func NewKeyValueStore( return kvStore, nil } -// AddKeyValue saves a key-value pair to the KeyValueStore. If the accumulated +// addEncodedData saves encoded key-value pairs to the KeyValueStore. +// data layout: keyLen + key + valueLen + value. If the accumulated // size or key count exceeds the given distance, a new range property will be // appended to the rangePropertiesCollector with current status. // `key` must be in strictly ascending order for invocations of a KeyValueStore. -func (s *KeyValueStore) AddKeyValue(key, value []byte) error { - var ( - b [8]byte - kvLen = 0 - ) - - // data layout: keyLen + key + valueLen + value - n, err := s.dataWriter.Write( - s.ctx, - binary.BigEndian.AppendUint64(b[:0], uint64(len(key))), - ) - if err != nil { - return err - } - kvLen += n - n, err = s.dataWriter.Write(s.ctx, key) - if err != nil { - return err - } - kvLen += n - n, err = s.dataWriter.Write( - s.ctx, - binary.BigEndian.AppendUint64(b[:0], uint64(len(value))), - ) - if err != nil { - return err - } - kvLen += n - n, err = s.dataWriter.Write(s.ctx, value) +func (s *KeyValueStore) addEncodedData(val []byte) error { + _, err := s.dataWriter.Write(s.ctx, val) if err != nil { return err } - kvLen += n + keyLen := binary.BigEndian.Uint64(val) + key := val[lengthBytes : lengthBytes+keyLen] if len(s.rc.currProp.firstKey) == 0 { s.rc.currProp.firstKey = key } s.rc.currProp.lastKey = key - s.offset += uint64(kvLen) - s.rc.currProp.size += uint64(len(key) + len(value)) + s.offset += uint64(len(val)) + s.rc.currProp.size += uint64(len(val) - lengthBytes*2) s.rc.currProp.keys++ if s.rc.currProp.size >= s.rc.propSizeDist || diff --git a/br/pkg/lightning/backend/external/file_test.go b/br/pkg/lightning/backend/external/file_test.go index 531b8a6e062dd..65ab999b17476 100644 --- a/br/pkg/lightning/backend/external/file_test.go +++ b/br/pkg/lightning/backend/external/file_test.go @@ -44,14 +44,14 @@ func TestAddKeyValueMaintainRangeProperty(t *testing.T) { require.Len(t, encoded, 0) k1, v1 := []byte("key1"), []byte("value1") - err = kvStore.AddKeyValue(k1, v1) + err = kvStore.addEncodedData(getEncodedData(k1, v1)) require.NoError(t, err) // when not accumulated enough data, no range property will be added. require.Equal(t, &initRC, rc) // propKeysDist = 2, so after adding 2 keys, a new range property will be added. k2, v2 := []byte("key2"), []byte("value2") - err = kvStore.AddKeyValue(k2, v2) + err = kvStore.addEncodedData(getEncodedData(k2, v2)) require.NoError(t, err) require.Len(t, rc.props, 1) expected := &rangeProperty{ @@ -67,7 +67,7 @@ func TestAddKeyValueMaintainRangeProperty(t *testing.T) { // when not accumulated enough data, no range property will be added. k3, v3 := []byte("key3"), []byte("value3") - err = kvStore.AddKeyValue(k3, v3) + err = kvStore.addEncodedData(getEncodedData(k3, v3)) require.NoError(t, err) require.Len(t, rc.props, 1) @@ -93,7 +93,7 @@ func TestAddKeyValueMaintainRangeProperty(t *testing.T) { rc.reset() kvStore, err = NewKeyValueStore(ctx, writer, rc) require.NoError(t, err) - err = kvStore.AddKeyValue(k1, v1) + err = kvStore.addEncodedData(getEncodedData(k1, v1)) require.NoError(t, err) require.Len(t, rc.props, 1) expected = &rangeProperty{ @@ -105,7 +105,7 @@ func TestAddKeyValueMaintainRangeProperty(t *testing.T) { } require.Equal(t, expected, rc.props[0]) - err = kvStore.AddKeyValue(k2, v2) + err = kvStore.addEncodedData(getEncodedData(k2, v2)) require.NoError(t, err) require.Len(t, rc.props, 2) expected = &rangeProperty{ @@ -149,7 +149,7 @@ func TestKVReadWrite(t *testing.T) { randLen = rand.Intn(10) + 1 values[i] = make([]byte, randLen) rand.Read(values[i]) - err = kvStore.AddKeyValue(keys[i], values[i]) + err = kvStore.addEncodedData(getEncodedData(keys[i], values[i])) require.NoError(t, err) } err = writer.Close(ctx) diff --git a/br/pkg/lightning/backend/external/iter_test.go b/br/pkg/lightning/backend/external/iter_test.go index c8fcbc06358a5..564f4508ff457 100644 --- a/br/pkg/lightning/backend/external/iter_test.go +++ b/br/pkg/lightning/backend/external/iter_test.go @@ -58,6 +58,15 @@ func (r *trackOpenFileReader) Close() error { return nil } +func getEncodedData(key, value []byte) []byte { + buf := make([]byte, 8*2+len(key)+len(value)) + binary.BigEndian.PutUint64(buf, uint64(len(key))) + copy(buf[8:], key) + binary.BigEndian.PutUint64(buf[8+len(key):], uint64(len(value))) + copy(buf[8*2+len(key):], value) + return buf +} + func TestMergeKVIter(t *testing.T) { ctx := context.Background() memStore := storage.NewMemStorage() @@ -78,7 +87,7 @@ func TestMergeKVIter(t *testing.T) { kvStore, err := NewKeyValueStore(ctx, writer, rc) require.NoError(t, err) for _, kv := range data[i] { - err = kvStore.AddKeyValue([]byte(kv[0]), []byte(kv[1])) + err = kvStore.addEncodedData(getEncodedData([]byte(kv[0]), []byte(kv[1]))) require.NoError(t, err) } err = writer.Close(ctx) @@ -130,7 +139,7 @@ func TestOneUpstream(t *testing.T) { kvStore, err := NewKeyValueStore(ctx, writer, rc) require.NoError(t, err) for _, kv := range data[i] { - err = kvStore.AddKeyValue([]byte(kv[0]), []byte(kv[1])) + err = kvStore.addEncodedData(getEncodedData([]byte(kv[0]), []byte(kv[1]))) require.NoError(t, err) } err = writer.Close(ctx) @@ -208,7 +217,7 @@ func TestCorruptContent(t *testing.T) { kvStore, err := NewKeyValueStore(ctx, writer, rc) require.NoError(t, err) for _, kv := range data[i] { - err = kvStore.AddKeyValue([]byte(kv[0]), []byte(kv[1])) + err = kvStore.addEncodedData(getEncodedData([]byte(kv[0]), []byte(kv[1]))) require.NoError(t, err) } if i == 0 { @@ -345,7 +354,7 @@ func TestHotspot(t *testing.T) { kvStore, err := NewKeyValueStore(ctx, writer, rc) require.NoError(t, err) for _, k := range keys[i] { - err = kvStore.AddKeyValue([]byte(k), value) + err = kvStore.addEncodedData(getEncodedData([]byte(k), value)) require.NoError(t, err) } err = writer.Close(ctx) @@ -446,13 +455,13 @@ func TestMemoryUsageWhenHotspotChange(t *testing.T) { for j := 0; j < checkHotspotPeriod; j++ { key := fmt.Sprintf("key%06d", cur) val := fmt.Sprintf("value%06d", cur) - err = kvStore.AddKeyValue([]byte(key), []byte(val)) + err = kvStore.addEncodedData(getEncodedData([]byte(key), []byte(val))) require.NoError(t, err) cur++ } for j := 0; j <= 12; j++ { key := fmt.Sprintf("key999%06d", cur+j) - err = kvStore.AddKeyValue([]byte(key), largeChunk) + err = kvStore.addEncodedData(getEncodedData([]byte(key), largeChunk)) require.NoError(t, err) } err = writer.Close(ctx) diff --git a/br/pkg/lightning/backend/external/kv_buf.go b/br/pkg/lightning/backend/external/kv_buf.go new file mode 100644 index 0000000000000..5079c3f475b41 --- /dev/null +++ b/br/pkg/lightning/backend/external/kv_buf.go @@ -0,0 +1,75 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package external + +import "github.com/docker/go-units" + +// DefaultBlockSize is the default block size for preAllocKVBuf. +const DefaultBlockSize = 16 * units.MiB + +// preAllocKVBuf pre allocates a large buffer of limit memLimit to reduce memory +// allocation, all space in this buffer will be reused when reset. +type preAllocKVBuf struct { + blocks [][]byte + blockSize int + curBlock []byte + curBlockIdx int + curIdx int +} + +func newPreAllocKVBuf(memLimit uint64, blockSize int) *preAllocKVBuf { + blockCount := (memLimit + uint64(blockSize) - 1) / uint64(blockSize) + b := &preAllocKVBuf{ + blocks: make([][]byte, 0, blockCount), + blockSize: blockSize, + } + for i := 0; i < int(blockCount); i++ { + b.blocks = append(b.blocks, make([]byte, blockSize)) + } + b.reset() + return b +} + +func (b *preAllocKVBuf) Alloc(s int) (blockIdx int32, res []byte, offset int32, allocated bool) { + if s > b.blockSize { + return + } + if b.blockSize-b.curIdx < s { + if b.curBlockIdx+1 >= len(b.blocks) { + return + } + b.curBlockIdx++ + b.curBlock = b.blocks[b.curBlockIdx] + b.curIdx = 0 + } + blockIdx = int32(b.curBlockIdx) + res = b.curBlock[b.curIdx : b.curIdx+s] + offset = int32(b.curIdx) + allocated = true + + b.curIdx += s + return +} + +func (b *preAllocKVBuf) reset() { + b.curBlockIdx = 0 + b.curBlock = b.blocks[0] + b.curIdx = 0 +} + +func (b *preAllocKVBuf) destroy() { + b.blocks = nil + b.curBlock = nil +} diff --git a/br/pkg/lightning/backend/external/kv_buf_test.go b/br/pkg/lightning/backend/external/kv_buf_test.go new file mode 100644 index 0000000000000..fddb3230b417d --- /dev/null +++ b/br/pkg/lightning/backend/external/kv_buf_test.go @@ -0,0 +1,77 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package external + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestNewPreAllocKVBuf(t *testing.T) { + cases := []struct { + memLimit uint64 + expectBlocks int + }{ + {1, 1}, + {15, 1}, + {16, 1}, + {17, 2}, + {31, 2}, + {32, 2}, + } + for _, c := range cases { + buf := newPreAllocKVBuf(c.memLimit, 16) + require.Equal(t, c.expectBlocks, len(buf.blocks)) + require.Equal(t, 16, buf.blockSize) + require.Equal(t, buf.blocks[0], buf.curBlock) + require.Equal(t, 0, buf.curBlockIdx) + require.Equal(t, 0, buf.curIdx) + } + + buf := newPreAllocKVBuf(16, 8) + // alloc larger than block size. + _, _, _, allocated := buf.Alloc(9) + require.False(t, allocated) + blockIdx, res, offset, allocated := buf.Alloc(8) + require.Equal(t, int32(0), blockIdx) + require.Equal(t, int32(0), offset) + require.True(t, allocated) + copy(res, "12345678") + blockIdx, res, offset, allocated = buf.Alloc(4) + require.Equal(t, int32(1), blockIdx) + require.Equal(t, int32(0), offset) + require.True(t, allocated) + copy(res, "aaaa") + blockIdx, res, offset, allocated = buf.Alloc(4) + require.Equal(t, int32(1), blockIdx) + require.Equal(t, int32(4), offset) + require.True(t, allocated) + copy(res, "bbbb") + _, _, _, allocated = buf.Alloc(4) + require.False(t, allocated) + + require.Equal(t, "12345678", string(buf.blocks[0])) + require.Equal(t, "aaaabbbb", string(buf.blocks[1])) + + buf.reset() + require.Equal(t, buf.blocks[0], buf.curBlock) + require.Equal(t, 0, buf.curBlockIdx) + require.Equal(t, 0, buf.curIdx) + + buf.destroy() + require.Nil(t, buf.blocks) + require.Nil(t, buf.curBlock) +} diff --git a/br/pkg/lightning/backend/external/merge.go b/br/pkg/lightning/backend/external/merge.go index 2e74159aac5b1..6ca5ac04127ce 100644 --- a/br/pkg/lightning/backend/external/merge.go +++ b/br/pkg/lightning/backend/external/merge.go @@ -16,6 +16,7 @@ func MergeOverlappingFiles( newFilePrefix string, writerID string, memSizeLimit uint64, + blockSize int, writeBatchCount uint64, propSizeDist uint64, propKeysDist uint64, @@ -30,6 +31,7 @@ func MergeOverlappingFiles( writer := NewWriterBuilder(). SetMemorySizeLimit(memSizeLimit). + SetBlockSize(blockSize). SetWriterBatchCount(writeBatchCount). SetPropKeysDistance(propKeysDist). SetPropSizeDistance(propSizeDist). diff --git a/br/pkg/lightning/backend/external/split_test.go b/br/pkg/lightning/backend/external/split_test.go index 056b360833b18..7259007629962 100644 --- a/br/pkg/lightning/backend/external/split_test.go +++ b/br/pkg/lightning/backend/external/split_test.go @@ -144,7 +144,7 @@ func TestSortedData(t *testing.T) { values := make([][]byte, kvNum) for i := range keys { keys[i] = []byte(fmt.Sprintf("key%03d", i)) - values[i] = []byte(fmt.Sprintf("value%03d", i)) + values[i] = []byte(fmt.Sprintf("val%03d", i)) } dataFiles, statFiles, err := MockExternalEngine(memStore, keys, values) @@ -177,7 +177,8 @@ func TestRangeSplitterStrictCase(t *testing.T) { subDir := "/mock-test" writer1 := NewWriterBuilder(). - SetMemorySizeLimit(15). // slightly larger than len("key01") + len("value01") + SetMemorySizeLimit(2*(lengthBytes*2+10)). + SetBlockSize(2*(lengthBytes*2+10)). SetPropSizeDistance(1). SetPropKeysDistance(1). Build(memStore, subDir, "1") @@ -185,7 +186,7 @@ func TestRangeSplitterStrictCase(t *testing.T) { []byte("key01"), []byte("key11"), []byte("key21"), } values1 := [][]byte{ - []byte("value01"), []byte("value11"), []byte("value21"), + []byte("val01"), []byte("val11"), []byte("val21"), } dataFiles1, statFiles1, err := MockExternalEngineWithWriter(memStore, writer1, subDir, keys1, values1) require.NoError(t, err) @@ -193,7 +194,8 @@ func TestRangeSplitterStrictCase(t *testing.T) { require.Len(t, statFiles1, 2) writer2 := NewWriterBuilder(). - SetMemorySizeLimit(15). + SetMemorySizeLimit(2*(lengthBytes*2+10)). + SetBlockSize(2*(lengthBytes*2+10)). SetPropSizeDistance(1). SetPropKeysDistance(1). Build(memStore, subDir, "2") @@ -201,7 +203,7 @@ func TestRangeSplitterStrictCase(t *testing.T) { []byte("key02"), []byte("key12"), []byte("key22"), } values2 := [][]byte{ - []byte("value02"), []byte("value12"), []byte("value22"), + []byte("val02"), []byte("val12"), []byte("val22"), } dataFiles12, statFiles12, err := MockExternalEngineWithWriter(memStore, writer2, subDir, keys2, values2) require.NoError(t, err) @@ -209,7 +211,8 @@ func TestRangeSplitterStrictCase(t *testing.T) { require.Len(t, statFiles12, 4) writer3 := NewWriterBuilder(). - SetMemorySizeLimit(15). + SetMemorySizeLimit(2*(lengthBytes*2+10)). + SetBlockSize(2*(lengthBytes*2+10)). SetPropSizeDistance(1). SetPropKeysDistance(1). Build(memStore, subDir, "3") @@ -217,7 +220,7 @@ func TestRangeSplitterStrictCase(t *testing.T) { []byte("key03"), []byte("key13"), []byte("key23"), } values3 := [][]byte{ - []byte("value03"), []byte("value13"), []byte("value23"), + []byte("val03"), []byte("val13"), []byte("val23"), } dataFiles123, statFiles123, err := MockExternalEngineWithWriter(memStore, writer3, subDir, keys3, values3) require.NoError(t, err) diff --git a/br/pkg/lightning/backend/external/util.go b/br/pkg/lightning/backend/external/util.go index c5e9f6df49d3b..83f3a8fd176d7 100644 --- a/br/pkg/lightning/backend/external/util.go +++ b/br/pkg/lightning/backend/external/util.go @@ -128,7 +128,8 @@ func MockExternalEngine( ) (dataFiles []string, statsFiles []string, err error) { subDir := "/mock-test" writer := NewWriterBuilder(). - SetMemorySizeLimit(128). + SetMemorySizeLimit(10*(lengthBytes*2+10)). + SetBlockSize(10*(lengthBytes*2+10)). SetPropSizeDistance(32). SetPropKeysDistance(4). Build(storage, "/mock-test", "0") diff --git a/br/pkg/lightning/backend/external/util_test.go b/br/pkg/lightning/backend/external/util_test.go index 62a53d8cfb97b..360dfd23c7977 100644 --- a/br/pkg/lightning/backend/external/util_test.go +++ b/br/pkg/lightning/backend/external/util_test.go @@ -107,7 +107,8 @@ func TestGetAllFileNames(t *testing.T) { ctx := context.Background() store := storage.NewMemStorage() w := NewWriterBuilder(). - SetMemorySizeLimit(20). + SetMemorySizeLimit(10*(lengthBytes*2+2)). + SetBlockSize(10*(lengthBytes*2+2)). SetPropSizeDistance(5). SetPropKeysDistance(3). Build(store, "/subtask", "0") @@ -127,7 +128,8 @@ func TestGetAllFileNames(t *testing.T) { require.NoError(t, err) w2 := NewWriterBuilder(). - SetMemorySizeLimit(20). + SetMemorySizeLimit(10*(lengthBytes*2+2)). + SetBlockSize(10*(lengthBytes*2+2)). SetPropSizeDistance(5). SetPropKeysDistance(3). Build(store, "/subtask", "3") @@ -140,7 +142,8 @@ func TestGetAllFileNames(t *testing.T) { require.NoError(t, err) w3 := NewWriterBuilder(). - SetMemorySizeLimit(20). + SetMemorySizeLimit(10*(lengthBytes*2+2)). + SetBlockSize(10*(lengthBytes*2+2)). SetPropSizeDistance(5). SetPropKeysDistance(3). Build(store, "/subtask", "12") @@ -169,7 +172,8 @@ func TestCleanUpFiles(t *testing.T) { ctx := context.Background() store := storage.NewMemStorage() w := NewWriterBuilder(). - SetMemorySizeLimit(20). + SetMemorySizeLimit(10*(lengthBytes*2+2)). + SetBlockSize(10*(lengthBytes*2+2)). SetPropSizeDistance(5). SetPropKeysDistance(3). Build(store, "/subtask", "0") diff --git a/br/pkg/lightning/backend/external/writer.go b/br/pkg/lightning/backend/external/writer.go index 9612e8a3d11f5..f60814338e57c 100644 --- a/br/pkg/lightning/backend/external/writer.go +++ b/br/pkg/lightning/backend/external/writer.go @@ -17,6 +17,7 @@ package external import ( "bytes" "context" + "encoding/binary" "encoding/hex" "path/filepath" "slices" @@ -99,6 +100,7 @@ func dummyOnCloseFunc(*WriterSummary) {} // WriterBuilder builds a new Writer. type WriterBuilder struct { memSizeLimit uint64 + blockSize int writeBatchCount uint64 propSizeDist uint64 propKeysDist uint64 @@ -114,6 +116,7 @@ type WriterBuilder struct { func NewWriterBuilder() *WriterBuilder { return &WriterBuilder{ memSizeLimit: DefaultMemSizeLimit, + blockSize: DefaultBlockSize, writeBatchCount: 8 * 1024, propSizeDist: 1 * size.MB, propKeysDist: 8 * 1024, @@ -174,6 +177,12 @@ func (b *WriterBuilder) SetMutex(mu *sync.Mutex) *WriterBuilder { return b } +// SetBlockSize sets the block size of pre-allocated buf in the writer. +func (b *WriterBuilder) SetBlockSize(blockSize int) *WriterBuilder { + b.blockSize = blockSize + return b +} + // Build builds a new Writer. The files writer will create are under the prefix // of "{prefix}/{writerID}". func (b *WriterBuilder) Build( @@ -199,8 +208,7 @@ func (b *WriterBuilder) Build( }, memSizeLimit: b.memSizeLimit, store: store, - kvBuffer: bp.NewBuffer(), - writeBatch: make([]common.KvPair, 0, b.writeBatchCount), + kvBuffer: newPreAllocKVBuf(b.memSizeLimit, b.blockSize), currentSeq: 0, filenamePrefix: filenamePrefix, keyAdapter: keyAdapter, @@ -266,6 +274,12 @@ func GetMaxOverlappingTotal(stats []MultipleFilesStat) int64 { return GetMaxOverlapping(points) } +type kvLocation struct { + blockIdx int32 + offset int32 + length int32 +} + // Writer is used to write data into external storage. type Writer struct { store storage.ExternalStorage @@ -279,8 +293,9 @@ type Writer struct { memSizeLimit uint64 - kvBuffer *membuf.Buffer - writeBatch []common.KvPair + kvBuffer *preAllocKVBuf + kvLocations []kvLocation + kvSize int64 onClose OnCloseFunc closed bool @@ -304,22 +319,36 @@ type Writer struct { // WriteRow implements ingest.Writer. func (w *Writer) WriteRow(ctx context.Context, idxKey, idxVal []byte, handle tidbkv.Handle) error { keyAdapter := w.keyAdapter - w.batchSize += uint64(len(idxKey) + len(idxVal)) var rowID []byte if handle != nil { rowID = handle.Encoded() } - buf := w.kvBuffer.AllocBytes(keyAdapter.EncodedLen(idxKey, rowID)) - key := keyAdapter.Encode(buf[:0], idxKey, rowID) - val := w.kvBuffer.AddBytes(idxVal) - - w.writeBatch = append(w.writeBatch, common.KvPair{Key: key, Val: val}) - if w.batchSize >= w.memSizeLimit { + encodedKeyLen := keyAdapter.EncodedLen(idxKey, rowID) + length := encodedKeyLen + len(idxVal) + lengthBytes*2 + blockIdx, dataBuf, off, allocated := w.kvBuffer.Alloc(length) + if !allocated { if err := w.flushKVs(ctx, false); err != nil { return err } + blockIdx, dataBuf, off, allocated = w.kvBuffer.Alloc(length) + // we now don't support KV larger than blockSize + if !allocated { + return errors.Errorf("failed to allocate kv buffer: %d", length) + } } + binary.BigEndian.AppendUint64(dataBuf[:0], uint64(encodedKeyLen)) + keyAdapter.Encode(dataBuf[lengthBytes:lengthBytes:lengthBytes+encodedKeyLen], idxKey, rowID) + binary.BigEndian.AppendUint64(dataBuf[lengthBytes+encodedKeyLen:lengthBytes+encodedKeyLen], uint64(len(idxVal))) + copy(dataBuf[lengthBytes*2+encodedKeyLen:], idxVal) + + w.kvLocations = append(w.kvLocations, kvLocation{ + blockIdx: blockIdx, + offset: off, + length: int32(length)}, + ) + w.kvSize += int64(encodedKeyLen + len(idxVal)) + w.batchSize += uint64(length) return nil } @@ -336,7 +365,7 @@ func (w *Writer) Close(ctx context.Context) error { return errors.Errorf("writer %s has been closed", w.writerID) } w.closed = true - defer w.kvBuffer.Destroy() + defer w.kvBuffer.destroy() err := w.flushKVs(ctx, true) if err != nil { return err @@ -346,10 +375,11 @@ func (w *Writer) Close(ctx context.Context) error { logutil.Logger(ctx).Info("close writer", zap.String("writerID", w.writerID), + zap.Int("kv-cnt-cap", cap(w.kvLocations)), zap.String("minKey", hex.EncodeToString(w.minKey)), zap.String("maxKey", hex.EncodeToString(w.maxKey))) - w.writeBatch = nil + w.kvLocations = nil w.onClose(&WriterSummary{ WriterID: w.writerID, @@ -373,7 +403,7 @@ func (w *Writer) recordMinMax(newMin, newMax tidbkv.Key, size uint64) { } func (w *Writer) flushKVs(ctx context.Context, fromClose bool) (err error) { - if len(w.writeBatch) == 0 { + if len(w.kvLocations) == 0 { return nil } if w.shareMu != nil { @@ -405,7 +435,7 @@ func (w *Writer) flushKVs(ctx context.Context, fromClose bool) (err error) { } return units.HumanSize(float64(n) / dur) } - kvCnt := len(w.writeBatch) + kvCnt := len(w.kvLocations) defer func() { w.currentSeq++ err1, err2 := dataWriter.Close(ctx), statWriter.Close(ctx) @@ -442,18 +472,19 @@ func (w *Writer) flushKVs(ctx context.Context, fromClose bool) (err error) { sortStart := time.Now() if w.shareMu != nil { sorty.MaxGor = min(8, uint64(variable.GetDDLReorgWorkerCounter())) - sorty.Sort(len(w.writeBatch), func(i, j, r, s int) bool { - if bytes.Compare(w.writeBatch[i].Key, w.writeBatch[j].Key) < 0 { + sorty.Sort(len(w.kvLocations), func(i, j, r, s int) bool { + posi, posj := w.kvLocations[i], w.kvLocations[j] + if bytes.Compare(w.getKeyByLoc(posi), w.getKeyByLoc(posj)) < 0 { if r != s { - w.writeBatch[r], w.writeBatch[s] = w.writeBatch[s], w.writeBatch[r] + w.kvLocations[r], w.kvLocations[s] = w.kvLocations[s], w.kvLocations[r] } return true } return false }) } else { - slices.SortFunc(w.writeBatch[:], func(i, j common.KvPair) int { - return bytes.Compare(i.Key, j.Key) + slices.SortFunc(w.kvLocations, func(i, j kvLocation) int { + return bytes.Compare(w.getKeyByLoc(i), w.getKeyByLoc(j)) }) } sortDuration = time.Since(sortStart) @@ -466,13 +497,11 @@ func (w *Writer) flushKVs(ctx context.Context, fromClose bool) (err error) { return err } - var kvSize uint64 - for _, pair := range w.writeBatch { - err = w.kvStore.AddKeyValue(pair.Key, pair.Val) + for _, pair := range w.kvLocations { + err = w.kvStore.addEncodedData(w.getEncodedKVData(pair)) if err != nil { return err } - kvSize += uint64(len(pair.Key)) + uint64(len(pair.Val)) } w.kvStore.Close() @@ -483,7 +512,8 @@ func (w *Writer) flushKVs(ctx context.Context, fromClose bool) (err error) { return err } - w.recordMinMax(w.writeBatch[0].Key, w.writeBatch[len(w.writeBatch)-1].Key, kvSize) + minKey, maxKey := w.getKeyByLoc(w.kvLocations[0]), w.getKeyByLoc(w.kvLocations[len(w.kvLocations)-1]) + w.recordMinMax(minKey, maxKey, uint64(w.kvSize)) // maintain 500-batch statistics @@ -491,8 +521,8 @@ func (w *Writer) flushKVs(ctx context.Context, fromClose bool) (err error) { w.multiFileStats[l-1].Filenames = append(w.multiFileStats[l-1].Filenames, [2]string{dataFile, statFile}, ) - w.fileMinKeys = append(w.fileMinKeys, tidbkv.Key(w.writeBatch[0].Key).Clone()) - w.fileMaxKeys = append(w.fileMaxKeys, tidbkv.Key(w.writeBatch[len(w.writeBatch)-1].Key).Clone()) + w.fileMinKeys = append(w.fileMinKeys, tidbkv.Key(minKey).Clone()) + w.fileMaxKeys = append(w.fileMaxKeys, tidbkv.Key(maxKey).Clone()) if fromClose || len(w.multiFileStats[l-1].Filenames) == multiFileStatNum { w.multiFileStats[l-1].build(w.fileMinKeys, w.fileMaxKeys) w.multiFileStats = append(w.multiFileStats, MultipleFilesStat{ @@ -502,13 +532,25 @@ func (w *Writer) flushKVs(ctx context.Context, fromClose bool) (err error) { w.fileMaxKeys = w.fileMaxKeys[:0] } - w.writeBatch = w.writeBatch[:0] + w.kvLocations = w.kvLocations[:0] + w.kvSize = 0 + w.kvBuffer.reset() w.rc.reset() - w.kvBuffer.Reset() w.batchSize = 0 return nil } +func (w *Writer) getEncodedKVData(pos kvLocation) []byte { + block := w.kvBuffer.blocks[pos.blockIdx] + return block[pos.offset : pos.offset+pos.length] +} + +func (w *Writer) getKeyByLoc(pos kvLocation) []byte { + block := w.kvBuffer.blocks[pos.blockIdx] + keyLen := binary.BigEndian.Uint64(block[pos.offset : pos.offset+lengthBytes]) + return block[pos.offset+lengthBytes : uint64(pos.offset)+lengthBytes+keyLen] +} + func (w *Writer) createStorageWriter(ctx context.Context) ( dataFile, statFile string, data, stats storage.ExternalFileWriter, diff --git a/br/pkg/lightning/backend/external/writer_test.go b/br/pkg/lightning/backend/external/writer_test.go index b3ea9d97887e9..f392651473fc9 100644 --- a/br/pkg/lightning/backend/external/writer_test.go +++ b/br/pkg/lightning/backend/external/writer_test.go @@ -110,9 +110,11 @@ func TestWriterFlushMultiFileNames(t *testing.T) { writer := NewWriterBuilder(). SetPropKeysDistance(2). - SetMemorySizeLimit(60). + SetMemorySizeLimit(3*(lengthBytes*2+20)). + SetBlockSize(3*(lengthBytes*2+20)). Build(memStore, "/test", "0") + require.Equal(t, 3*(lengthBytes*2+20), writer.kvBuffer.blockSize) // 200 bytes key values. kvCnt := 10 kvs := make([]common.KvPair, kvCnt) @@ -181,6 +183,7 @@ func TestWriterDuplicateDetect(t *testing.T) { "/test2", "mergeID", 1000, + 1000, 8*1024, 1*size.MB, 2, @@ -269,7 +272,8 @@ func TestWriterMultiFileStat(t *testing.T) { writer := NewWriterBuilder(). SetPropKeysDistance(2). - SetMemorySizeLimit(20). // 2 KV pair will trigger flush + SetMemorySizeLimit(52). + SetBlockSize(52). // 2 KV pair will trigger flush SetOnCloseFunc(closeFn). Build(memStore, "/test", "0") @@ -375,7 +379,8 @@ func TestWriterMultiFileStat(t *testing.T) { 100, "/test2", "mergeID", - 20, + 52, + 52, 8*1024, 1*size.MB, 2, diff --git a/pkg/ddl/backfilling_merge_sort.go b/pkg/ddl/backfilling_merge_sort.go index c51314f9e351f..729c09c95836e 100644 --- a/pkg/ddl/backfilling_merge_sort.go +++ b/pkg/ddl/backfilling_merge_sort.go @@ -110,6 +110,7 @@ func (m *mergeSortExecutor) RunSubtask(ctx context.Context, subtask *proto.Subta prefix, writerID, 256*size.MB, + external.DefaultBlockSize, 8*1024, 1*size.MB, 8*1024, diff --git a/pkg/disttask/importinto/BUILD.bazel b/pkg/disttask/importinto/BUILD.bazel index 88e9d12db7f35..0ddf6147ebaeb 100644 --- a/pkg/disttask/importinto/BUILD.bazel +++ b/pkg/disttask/importinto/BUILD.bazel @@ -92,7 +92,7 @@ go_test( embed = [":importinto"], flaky = True, race = "on", - shard_count = 14, + shard_count = 15, deps = [ "//br/pkg/lightning/backend", "//br/pkg/lightning/backend/external", @@ -116,6 +116,7 @@ go_test( "//pkg/util/logutil", "//pkg/util/mock", "//pkg/util/sqlexec", + "@com_github_docker_go_units//:go-units", "@com_github_ngaut_pools//:pools", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", diff --git a/pkg/disttask/importinto/encode_and_sort_operator.go b/pkg/disttask/importinto/encode_and_sort_operator.go index 2a74a45821920..651dfd0ac04a6 100644 --- a/pkg/disttask/importinto/encode_and_sort_operator.go +++ b/pkg/disttask/importinto/encode_and_sort_operator.go @@ -20,6 +20,7 @@ import ( "strconv" "time" + "github.com/docker/go-units" "github.com/google/uuid" "github.com/pingcap/errors" "github.com/pingcap/tidb/br/pkg/lightning/backend/external" @@ -43,6 +44,9 @@ const ( // Note: this size is the memory taken by KV, not the size of taken by golang, // each KV has additional 24*2 bytes overhead for golang slice. indexKVTotalBufSize = size.GB - external.DefaultMemSizeLimit + // we use a larger block size for data KV group to support larger row. + // TODO: make it configurable? + dataKVGroupBlockSize = 32 * units.MiB ) // encodeAndSortOperator is an operator that encodes and sorts data. @@ -158,7 +162,9 @@ func newChunkWorker(ctx context.Context, op *encodeAndSortOperator, indexMemoryS builder := external.NewWriterBuilder(). SetOnCloseFunc(func(summary *external.WriterSummary) { op.sharedVars.mergeIndexSummary(indexID, summary) - }).SetMemorySizeLimit(indexMemorySizeLimit) + }). + SetMemorySizeLimit(indexMemorySizeLimit). + SetBlockSize(getKVGroupBlockSize("")) prefix := subtaskPrefix(op.taskID, op.subtaskID) // writer id for index: index/{indexID}/{workerID} writerID := path.Join("index", strconv.Itoa(int(indexID)), workerUUID) @@ -168,7 +174,8 @@ func newChunkWorker(ctx context.Context, op *encodeAndSortOperator, indexMemoryS // sorted data kv storage path: /{taskID}/{subtaskID}/data/{workerID} builder := external.NewWriterBuilder(). - SetOnCloseFunc(op.sharedVars.mergeDataSummary) + SetOnCloseFunc(op.sharedVars.mergeDataSummary). + SetBlockSize(getKVGroupBlockSize(dataKVGroup)) prefix := subtaskPrefix(op.taskID, op.subtaskID) // writer id for data: data/{workerID} writerID := path.Join("data", workerUUID) @@ -250,3 +257,10 @@ func getNumOfIndexGenKV(tblInfo *model.TableInfo) int { } return count } + +func getKVGroupBlockSize(group string) int { + if group == dataKVGroup { + return dataKVGroupBlockSize + } + return external.DefaultBlockSize +} diff --git a/pkg/disttask/importinto/encode_and_sort_operator_test.go b/pkg/disttask/importinto/encode_and_sort_operator_test.go index 89b30de5ab48e..6a737c5d25de5 100644 --- a/pkg/disttask/importinto/encode_and_sort_operator_test.go +++ b/pkg/disttask/importinto/encode_and_sort_operator_test.go @@ -23,6 +23,7 @@ import ( "testing" "time" + "github.com/docker/go-units" "github.com/pingcap/errors" "github.com/pingcap/tidb/br/pkg/lightning/backend" "github.com/pingcap/tidb/br/pkg/lightning/backend/external" @@ -201,3 +202,9 @@ func TestGetWriterMemorySizeLimit(t *testing.T) { }), c.createSQL) } } + +func TestGetKVGroupBlockSize(t *testing.T) { + require.Equal(t, 32*units.MiB, getKVGroupBlockSize(dataKVGroup)) + require.Equal(t, 16*units.MiB, getKVGroupBlockSize("")) + require.Equal(t, 16*units.MiB, getKVGroupBlockSize("1")) +} diff --git a/pkg/disttask/importinto/scheduler.go b/pkg/disttask/importinto/scheduler.go index dd219c6f09764..04d516b9e6d37 100644 --- a/pkg/disttask/importinto/scheduler.go +++ b/pkg/disttask/importinto/scheduler.go @@ -315,6 +315,7 @@ func (m *mergeSortStepExecutor) RunSubtask(ctx context.Context, subtask *proto.S prefix, writerID, 256*size.MB, + getKVGroupBlockSize(sm.KVGroup), 8*1024, 1*size.MB, 8*1024,