Skip to content

Commit

Permalink
kgo: fix three races
Browse files Browse the repository at this point in the history
One of these is new and unreleased,
one was introduced in v0.6.2,
and one has apparently existed since last summer and never cropped up?!?

The unreleased one is fairly obvious: we modify bufferedFetch when
closing a consumer session, so our defer cannot touch it.

The v0.6.2 one was rather obvious as well. The AND logic caused us to
fall into a branch we should not have. We instead move the AND into a
block on its own.

The one since last summer does not make sense and was never seen in
races before. I also don't think this is a franz-go problem but may
instead be a compress one hanging onto the input slice. Basically, the
race detector was showing that the basic logger was reusing a slice that
was currently in use in the zstd decompressor, which should not be a
thing since we get and put back into the slice writers pool in defined
blocks. The decompressor is **done** with the input slice on return!

But, as it turns out, resetting the reader was literally useless, we do
not need to do that at all, so we just don't.
  • Loading branch information
twmb committed Feb 10, 2021
1 parent 1e5c11d commit 802bf74
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 4 deletions.
1 change: 0 additions & 1 deletion pkg/kgo/compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,6 @@ func (d *decompressor) decompress(src []byte, codec byte) ([]byte, error) {
case 4:
unzstd := d.unzstdPool.Get().(*zstdDecoder)
defer d.unzstdPool.Put(unzstd)
unzstd.inner.Reset(bytes.NewReader(src))
return unzstd.inner.DecodeAll(src, nil)
default:
return nil, errors.New("unknown compression codec")
Expand Down
6 changes: 4 additions & 2 deletions pkg/kgo/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -1047,8 +1047,10 @@ func (recBuf *recBuf) bufferRecord(pr promisedRec, abortOnNewBatch bool) bool {
// Before we decide to keep this new batch, if this single record is too
// large for a batch, then we immediately fail it.
newBatchLength := newBatch.wireLength
if !produceVersionKnown && newBatch.v1wireLength > newBatchLength {
newBatchLength = newBatch.v1wireLength
if !produceVersionKnown {
if newBatch.v1wireLength > newBatchLength {
newBatchLength = newBatch.v1wireLength
}
} else {
switch recBuf.sink.produceVersion {
case 0, 1:
Expand Down
4 changes: 3 additions & 1 deletion pkg/kgo/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,8 +434,9 @@ func (s *source) fetch(consumerSession *consumerSession, doneFetch chan<- struct
// For all returns, if we do not buffer our fetch, then we want to
// ensure our used offsets are usable again.
var alreadySentToDoneFetch bool
var buffered bool
defer func() {
if len(s.buffered.fetch.Topics) == 0 {
if !buffered {
if req.numOffsets > 0 {
req.usedOffsets.finishUsingAll()
}
Expand Down Expand Up @@ -592,6 +593,7 @@ func (s *source) fetch(consumerSession *consumerSession, doneFetch chan<- struct
reloadOffsets.loadWithSessionNow(consumerSession)

if len(fetch.Topics) > 0 {
buffered = true
s.buffered = bufferedFetch{
fetch: fetch,
doneFetch: doneFetch,
Expand Down

0 comments on commit 802bf74

Please sign in to comment.