Skip to content

Commit

Permalink
Add comment about Iter
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Jul 4, 2024
1 parent 73a3454 commit a2e135e
Showing 1 changed file with 15 additions and 4 deletions.
19 changes: 15 additions & 4 deletions go/mysql/binlog_event_compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,8 @@ func (tp *TransactionPayload) decode() error {
if decompressedFile != nil {
fstat, err := decompressedFile.Stat()
if err != nil {
return vterrors.New(vtrpcpb.Code_INTERNAL, fmt.Sprintf("error getting stats on temporary file used to store uncompressed transaction payload: %v", err))
return vterrors.New(vtrpcpb.Code_INTERNAL, fmt.Sprintf("error getting stats on temporary file %s used to store the uncompressed transaction payload: %v",
decompressedFile.Name(), err))
}
log.Errorf("DEBUG: Decompressed transaction payload to temporary file %s of size %d MiB", decompressedFile.Name(), fstat.Size()/1024/1024)
// Read from the temporary file.
Expand All @@ -238,7 +239,7 @@ func (tp *TransactionPayload) decode() error {
eventData := make([]byte, eventLen)
// The event includes the header, so we move back to the start of the event.
if _, err := decompressedFile.Seek(-headerLen, 1); err != nil {
return nil, vterrors.Wrapf(err, "error seeking to the beginning of the event in the %s file", decompressedFile.Name())
return nil, vterrors.Wrapf(err, "error seeking to the beginning of the %s file", decompressedFile.Name())
}
i, err = decompressedFile.Read(eventData)
if err != nil && err != io.EOF {
Expand Down Expand Up @@ -296,7 +297,7 @@ func (tp *TransactionPayload) decompress() ([]byte, *os.File, error) {
return nil, nil, vterrors.New(vtrpcpb.Code_INTERNAL, fmt.Sprintf("error creating temporary file to store uncompressed transaction payload: %v", err))
}
// Delete the file path on the FS. It will then be fully removed when we
// close our open file descriptor.
// close our open file descriptor after trying to read it in decode().
defer os.Remove(tmpFile.Name())
in := bytes.NewReader(tp.payload)
streamDecoder, err := zstd.NewReader(in)
Expand All @@ -319,9 +320,19 @@ func (tp *TransactionPayload) decompress() ([]byte, *os.File, error) {
}

if uint64(len(decompressedBytes)) != tp.uncompressedSize {
return []byte{}, nil, vterrors.New(vtrpcpb.Code_INVALID_ARGUMENT,
return nil, nil, vterrors.New(vtrpcpb.Code_INVALID_ARGUMENT,
fmt.Sprintf("decompressed size %d does not match expected size %d", len(decompressedBytes), tp.uncompressedSize))
}

return decompressedBytes, nil, nil
}

// Events returns an iterator over the internal binlog events that
// were contained within the compressed transaction payload/event.
// It returns a single-use iterator.
// TODO: come back to this when main is on go 1.23. See:
// - https://tip.golang.org/wiki/RangefuncExperiment
// - https://github.com/golang/go/blob/release-branch.go1.23/src/iter/iter.go
//func (tp *TransactionPayload) Events() iter.Seq[BinlogEvent] {
// return tp.iterator
//}

0 comments on commit a2e135e

Please sign in to comment.