Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VReplication: Handle large binlog compressed transactions more efficiently #16328

Merged
merged 26 commits into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
65a082d
WiP
mattlord Jul 3, 2024
73a3454
Merge remote-tracking branch 'origin/main' into uncompressed_trx_to_file
mattlord Jul 3, 2024
a2e135e
Add comment about Iter
mattlord Jul 4, 2024
79dda45
Testing
mattlord Jul 5, 2024
f03017f
Minor improvements
mattlord Jul 5, 2024
ba37be1
Add stats and test
mattlord Jul 6, 2024
32f488f
Changes from self review
mattlord Jul 6, 2024
153463f
Move to exp/mmap
mattlord Jul 7, 2024
564bd04
Improve memory allocation for in-mem case
mattlord Jul 7, 2024
26a605b
Use zst.Decoder stream directly
mattlord Jul 7, 2024
4551297
Do fewer type conversions
mattlord Jul 8, 2024
624d790
Refactor TransactionPayload
mattlord Jul 8, 2024
22d407d
Mr nitty came back from nitland
mattlord Jul 8, 2024
1e27dab
Address review comments
mattlord Jul 8, 2024
2bdd36a
One last nit
mattlord Jul 8, 2024
be10b48
Effectively revert the error text change
mattlord Jul 8, 2024
9c795d9
Use stateful decoder pool
mattlord Jul 9, 2024
83196e9
Add finalizer; don't need IOReadCloser anymore.
mattlord Jul 9, 2024
bd8b094
Cleanup
mattlord Jul 9, 2024
b01c6ca
Revert sync.Pool related code
mattlord Jul 9, 2024
8c48208
Knitting
mattlord Jul 9, 2024
7b52a36
Remove LeakChecker as it catches unrelated MySQL sig handlers
mattlord Jul 9, 2024
62d3b90
Re-apply stateful decoder pool
mattlord Jul 10, 2024
2a08009
Adjust function comment
mattlord Jul 10, 2024
d5cbf05
Address review comments
mattlord Jul 10, 2024
4e6206a
This was unexported and moved to binlog_event_compression.go
mattlord Jul 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions go/mysql/binlog_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,12 @@ type BinlogEvent interface {
// IsWriteRows(), IsUpdateRows(), or IsDeleteRows() returns
// true.
Rows(BinlogFormat, *TableMap) (Rows, error)
// TransactionPayload returns a list of BinlogEvents contained
// within the compressed transaction.
TransactionPayload(BinlogFormat) ([]BinlogEvent, error)
// TransactionPayload returns a TransactionPayload type which provides
// a GetNextEvent() method to iterate over the events contained within
// the uncompressed payload. You must call Close() when you are done
// with the TransactionPayload to ensure that the underlying resources
// used are cleaned up.
TransactionPayload(BinlogFormat) (*TransactionPayload, error)
// NextLogFile returns the name of the next binary log file & pos.
// This is only valid if IsRotate() returns true
NextLogFile(BinlogFormat) (string, uint64, error)
Expand Down
270 changes: 178 additions & 92 deletions go/mysql/binlog_event_compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@ package mysql
import (
"bytes"
"encoding/binary"
"fmt"
"io"
"sync"

"github.com/klauspost/compress/zstd"

vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/vterrors"

vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

// This file contains code related to handling compression related
Expand All @@ -41,35 +44,63 @@ const (
payloadUncompressedSizeField
)

// Compression algorithms that are supported (only zstd today
// in MySQL 8.0):
// https://dev.mysql.com/doc/refman/8.0/en/binary-log-transaction-compression.html
const (
// Compression algorithms that are supported (only zstd today
// in MySQL 8.0):
// https://dev.mysql.com/doc/refman/8.0/en/binary-log-transaction-compression.html
TransactionPayloadCompressionZstd = 0
TransactionPayloadCompressionNone = 255
)

var TransactionPayloadCompressionTypes = map[uint64]string{
TransactionPayloadCompressionZstd: "ZSTD",
TransactionPayloadCompressionNone: "NONE",
}
// Length of the binlog event header in the transaction payload.
headerLen = BinlogEventLenOffset + 4
mattlord marked this conversation as resolved.
Show resolved Hide resolved

// Create a reader that caches decompressors. This is used for
// smaller events that we want to handle entirely using in-memory
// buffers.
var zstdDecoder, _ = zstd.NewReader(nil, zstd.WithDecoderConcurrency(0))
// At what size should we switch from the in-memory buffer
// decoding to streaming mode which is much slower, but does
// not require everything be done in memory.
zstdInMemoryDecompressorMaxSize = 128 << (10 * 2) // 128MiB
)

// At what size should we switch from the in-memory buffer
// decoding to streaming mode -- which is slower, but does not
// require everything be done in memory.
const zstdInMemoryDecompressorMaxSize = 128 << (10 * 2) // 128MiB
var (
TransactionPayloadCompressionTypes = map[uint64]string{
TransactionPayloadCompressionZstd: "ZSTD",
TransactionPayloadCompressionNone: "NONE",
}

// Metrics.
compressedTrxPayloadsInMem = stats.NewCounter("CompressedTransactionPayloadsInMemory", "The number of compressed binlog transaction payloads that were processed in memory")
compressedTrxPayloadsUsingStream = stats.NewCounter("CompressedTransactionPayloadsViaStream", "The number of compressed binlog transaction payloads that were processed using a stream")

// Create a concurrent stateless decoder that caches decompressors.
// This is used for smaller events that we want to handle entirely
// using in-memory buffers via DecodeAll.
statelessDecoder, _ = zstd.NewReader(nil, zstd.WithDecoderConcurrency(0))
mattlord marked this conversation as resolved.
Show resolved Hide resolved

// Use a pool of stateful decoders for larger payloads that we want
// to stream. The number of large (> zstdInMemoryDecompressorMaxSize)
// payloads should typically be relatively low, but there may be times
// where there are many of them -- and users like vstreamer may have
// N concurrent streams per vttablet which could lead to a lot of
// allocations and GC overhead so this pool allows us to handle
// concurrent cases better while still scaling to 0 when there's no
// usage.
statefulDecoderPool = sync.Pool{
New: func() any {
d, err := zstd.NewReader(nil, zstd.WithDecoderMaxMemory(zstdInMemoryDecompressorMaxSize))
if err != nil { // Should only happen e.g. due to ENOMEM
log.Errorf("Error creating stateful decoder: %v", err)
}
return d
},
}
)

type TransactionPayload struct {
Size uint64
CompressionType uint64
UncompressedSize uint64
Payload []byte
Events []BinlogEvent
size uint64
compressionType uint64
uncompressedSize uint64
payload []byte
reader io.Reader
iterator func() (BinlogEvent, error)
}

// IsTransactionPayload returns true if a compressed transaction
Expand All @@ -78,8 +109,12 @@ func (ev binlogEvent) IsTransactionPayload() bool {
return ev.Type() == eTransactionPayloadEvent
}

// TransactionPayload returns the BinlogEvents contained within
// the compressed transaction.
// TransactionPayload processes the payload and provides a GetNextEvent()
// method which should be used in a loop to read BinlogEvents one by one
// that were within the compressed transaction. That function will return
// io.EOF when there are no more events left in the payload. You must
// call Close() when you are done with the TransactionPayload to ensure
// that the underlying reader and related resources are cleaned up.
// The following event types are compressed as part of the
// transaction payload:
//
Expand Down Expand Up @@ -129,16 +164,18 @@ func (ev binlogEvent) IsTransactionPayload() bool {
// We need to extract the compressed transaction payload from the GTID
// event, decompress it with zstd, and then process the internal events
// (e.g. Query and Row events) that make up the transaction.
func (ev binlogEvent) TransactionPayload(format BinlogFormat) ([]BinlogEvent, error) {
func (ev binlogEvent) TransactionPayload(format BinlogFormat) (*TransactionPayload, error) {
tp := &TransactionPayload{}
if err := tp.Decode(ev.Bytes()[format.HeaderLength:]); err != nil {
return nil, vterrors.Wrapf(err, "error decoding transaction payload event")
if err := tp.process(ev.Bytes()[format.HeaderLength:]); err != nil {
return nil, vterrors.Wrap(err, "error decoding transaction payload event")
}
return tp.Events, nil
return tp, nil
}

// Decode decodes and decompresses the payload.
func (tp *TransactionPayload) Decode(data []byte) error {
// process reads and decompresses the payload, setting up the iterator
// that can then be used in GetNextEvent() to read the binlog events
// from the uncompressed payload one at a time.
func (tp *TransactionPayload) process(data []byte) error {
if err := tp.read(data); err != nil {
return err
}
Expand All @@ -147,7 +184,8 @@ func (tp *TransactionPayload) Decode(data []byte) error {

// read unmarshalls the transaction payload event into the
// TransactionPayload struct. The compressed payload itself will still
// need to be decoded -- meaning decompressing it and extracting the
// need to be decoded -- meaning decompressing it and setting up the
// iterator that can then be used by GetNextEvent() to extract the
// internal events.
func (tp *TransactionPayload) read(data []byte) error {
pos := uint64(0)
Expand All @@ -160,7 +198,7 @@ func (tp *TransactionPayload) read(data []byte) error {
pos++

if fieldType == payloadHeaderEndMark {
tp.Payload = data[pos:]
tp.payload = data[pos:]
return nil // we're done
}

Expand All @@ -172,17 +210,17 @@ func (tp *TransactionPayload) read(data []byte) error {

switch fieldType {
case payloadSizeField:
tp.Size, ok = readFixedLenUint64(data[pos : pos+fieldLen])
tp.size, ok = readFixedLenUint64(data[pos : pos+fieldLen])
if !ok {
return vterrors.New(vtrpcpb.Code_INTERNAL, "error reading payload size")
}
case payloadCompressionTypeField:
tp.CompressionType, ok = readFixedLenUint64(data[pos : pos+fieldLen])
tp.compressionType, ok = readFixedLenUint64(data[pos : pos+fieldLen])
if !ok {
return vterrors.New(vtrpcpb.Code_INTERNAL, "error reading compression type")
}
case payloadUncompressedSizeField:
tp.UncompressedSize, ok = readFixedLenUint64(data[pos : pos+fieldLen])
tp.uncompressedSize, ok = readFixedLenUint64(data[pos : pos+fieldLen])
if !ok {
return vterrors.New(vtrpcpb.Code_INTERNAL, "error reading uncompressed payload size")
}
Expand All @@ -192,78 +230,126 @@ func (tp *TransactionPayload) read(data []byte) error {
}
}

// decode decompresses the payload and extracts the internal binlog
// events.
// decode decompresses the payload and assigns the iterator to a
// function that can then be used to retrieve the events from the
// uncompressed transaction one at a time.
func (tp *TransactionPayload) decode() error {
if tp.CompressionType != TransactionPayloadCompressionZstd {
return vterrors.New(vtrpcpb.Code_INTERNAL,
fmt.Sprintf("TransactionPayload has unsupported compression type of %d", tp.CompressionType))
if tp.compressionType != TransactionPayloadCompressionZstd {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL,
"TransactionPayload has unsupported compression type of %d", tp.compressionType)
}

decompressedPayload, err := tp.decompress()
decompressedPayloadLen := uint64(len(decompressedPayload))
if err != nil {
return vterrors.Wrapf(err, "error decompressing transaction payload")
err := tp.decompress()
if err != nil || tp.reader == nil {
return vterrors.Wrap(err, "error decompressing transaction payload")
}

pos := uint64(0)

for {
eventLenPosEnd := pos + BinlogEventLenOffset + 4
if eventLenPosEnd > decompressedPayloadLen { // No more events in the payload
break
header := make([]byte, headerLen)
tp.iterator = func() (ble BinlogEvent, err error) {
bytesRead, err := io.ReadFull(tp.reader, header)
if err != nil {
if err == io.EOF {
return nil, io.EOF
}
return nil, vterrors.Wrap(err, "error reading event header from uncompressed transaction payload")
}
eventLen := uint64(binary.LittleEndian.Uint32(decompressedPayload[pos+BinlogEventLenOffset : eventLenPosEnd]))
if pos+eventLen > decompressedPayloadLen {
return vterrors.New(vtrpcpb.Code_INTERNAL,
fmt.Sprintf("[BUG] event length of %d at pos %d in decompressed transaction payload is beyond the expected payload length of %d",
eventLen, pos, decompressedPayloadLen))
if bytesRead != headerLen {
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "[BUG] expected header length of %d but only read %d bytes",
headerLen, bytesRead)
}
eventData := decompressedPayload[pos : pos+eventLen]
ble := NewMysql56BinlogEvent(eventData)
tp.Events = append(tp.Events, ble)

pos += eventLen
eventLen := int64(binary.LittleEndian.Uint32(header[BinlogEventLenOffset:headerLen]))
eventData := make([]byte, eventLen)
copy(eventData, header) // The event includes the header
bytesRead, err = io.ReadFull(tp.reader, eventData[headerLen:])
if err != nil && err != io.EOF {
return nil, vterrors.Wrap(err, "error reading binlog event data from uncompressed transaction payload")
}
if int64(bytesRead+headerLen) != eventLen {
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "[BUG] expected binlog event length of %d but only read %d bytes",
eventLen, bytesRead)
}
return NewMysql56BinlogEvent(eventData), nil
}

return nil
}

// Decompress the payload.
func (tp *TransactionPayload) decompress() ([]byte, error) {
if len(tp.Payload) == 0 {
return []byte{}, vterrors.New(vtrpcpb.Code_INVALID_ARGUMENT, "cannot decompress empty payload")
// decompress decompresses the payload. If the payload is larger than
// zstdInMemoryDecompressorMaxSize then we stream the decompression via
// the package's pool of zstd.Decoders, otherwise we use in-memory
// buffers with the package's concurrent statelessDecoder.
// In either case, we setup the reader that can be used within the
// iterator to read the events one at a time from the decompressed
// payload in GetNextEvent().
func (tp *TransactionPayload) decompress() error {
if len(tp.payload) == 0 {
return vterrors.New(vtrpcpb.Code_INVALID_ARGUMENT, "cannot decompress empty compressed transaction payload")
}
var (
decompressedBytes []byte
err error
)

// Switch to slower but less memory intensive stream mode for larger payloads.
if tp.UncompressedSize > zstdInMemoryDecompressorMaxSize {
in := bytes.NewReader(tp.Payload)
streamDecoder, err := zstd.NewReader(in)
if err != nil {
return nil, err
}
defer streamDecoder.Close()
out := io.Writer(&bytes.Buffer{})
_, err = io.Copy(out, streamDecoder)
if err != nil {
return nil, err

// Switch to slower but less memory intensive stream mode for
// larger payloads.
if tp.uncompressedSize > zstdInMemoryDecompressorMaxSize {
in := bytes.NewReader(tp.payload)
streamDecoder, ok := statefulDecoderPool.Get().(*zstd.Decoder)
if streamDecoder == nil || !ok {
mattlord marked this conversation as resolved.
Show resolved Hide resolved
return vterrors.New(vtrpcpb.Code_INTERNAL, "failed to create stateful stream decoder")
}
decompressedBytes = out.(*bytes.Buffer).Bytes()
} else { // Process smaller payloads using in-memory buffers.
decompressedBytes, err = zstdDecoder.DecodeAll(tp.Payload, nil)
if err != nil {
return nil, err
if err := streamDecoder.Reset(in); err != nil {
return vterrors.Wrap(err, "error resetting stateful stream decoder")
}
compressedTrxPayloadsUsingStream.Add(1)
tp.reader = streamDecoder
return nil
}

if uint64(len(decompressedBytes)) != tp.UncompressedSize {
return []byte{}, vterrors.New(vtrpcpb.Code_INVALID_ARGUMENT,
fmt.Sprintf("decompressed size %d does not match expected size %d", len(decompressedBytes), tp.UncompressedSize))
// Process smaller payloads using only in-memory buffers.
if statelessDecoder == nil { // Should never happen
return vterrors.New(vtrpcpb.Code_INTERNAL, "failed to create stateless decoder")
}
decompressedBytes := make([]byte, 0, tp.uncompressedSize) // Perform a single pre-allocation
decompressedBytes, err := statelessDecoder.DecodeAll(tp.payload, decompressedBytes[:0])
if err != nil {
return err
}
if uint64(len(decompressedBytes)) != tp.uncompressedSize {
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT,
"uncompressed transaction payload size %d does not match expected size %d", len(decompressedBytes), tp.uncompressedSize)
}
compressedTrxPayloadsInMem.Add(1)
tp.reader = bytes.NewReader(decompressedBytes)
return nil
}

return decompressedBytes, nil
// Close should be called in a defer where the TransactionPayload is
// used to ensure that the underlying reader and related resources
// used are cleaned up.
func (tp *TransactionPayload) Close() {
switch reader := tp.reader.(type) {
case *zstd.Decoder:
if err := reader.Reset(nil); err == nil || err == io.EOF {
readersPool.Put(reader)
}
default:
reader = nil
}
tp.iterator = nil
}

// GetNextEvent returns the next binlog event that was contained within
// the compressed transaction payload. It will return io.EOF when there
// are no more events left in the payload.
func (tp *TransactionPayload) GetNextEvent() (BinlogEvent, error) {
if tp == nil || tp.iterator == nil {
return nil, vterrors.New(vtrpcpb.Code_INVALID_ARGUMENT, "TransactionPayload has been closed")
}
return tp.iterator()
}

// Events returns an iterator over the internal binlog events that
// were contained within the compressed transaction payload/event.
// It returns a single-use iterator.
// TODO(mattlord): implement this when main is on go 1.23. See:
// - https://tip.golang.org/wiki/RangefuncExperiment
// - https://github.com/golang/go/blob/release-branch.go1.23/src/iter/iter.go
//func (tp *TransactionPayload) Events() iter.Seq[BinlogEvent] {
// return tp.iterator
//}
4 changes: 2 additions & 2 deletions go/mysql/binlog_event_filepos.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,8 +247,8 @@ func (ev filePosFakeEvent) Rows(BinlogFormat, *TableMap) (Rows, error) {
return Rows{}, nil
}

func (ev filePosFakeEvent) TransactionPayload(BinlogFormat) ([]BinlogEvent, error) {
return []BinlogEvent{}, nil
func (ev filePosFakeEvent) TransactionPayload(BinlogFormat) (*TransactionPayload, error) {
return &TransactionPayload{}, nil
}

func (ev filePosFakeEvent) NextLogFile(BinlogFormat) (string, uint64, error) {
Expand Down
Loading
Loading