Skip to content

Commit

Permalink
changefeedccl: Cleanup resources when closing file
Browse files Browse the repository at this point in the history
Ensure resources acquired by cloud storage files are released
when the sink is closed.

As of cockroachdb#88635,
cloud storage uses faster implementation of gzip compression
algorithm (along with zstd).  This new implementation
is sufficiently different from the standard gzip implementation
in that it requires the compression codec to be closed, even
when the caller is terminating.  Failure to do so results in the
memory as well as the goroutine leakage.

This resource leakage may become sufficiently noticable
if the changefeed experiences many repeated errors.

This PR modifies Close() call to make sure that the underlying
compression codecs are also closed (Note: we rely on the high level
logic in distSQL to ensure that the processor gets orderly shut down,
and the shutdown code calls Close() method; However, there is still
exists a possiblity that the shutdown might not be orderly, and in those
cases resource leakage may still occur.  This possiblity will need
to be revisited in the follow on PR).

Fixes cockroachdb#106774

Release note (enterprise change): Fix an issue where the changefeeds
emitting to cloud sink with compression may experience resource leakage
(memory and go routines) when experiencing transient errors.
  • Loading branch information
Yevgeniy Miretskiy committed Jul 13, 2023
1 parent ff5f220 commit 8d0a804
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 2 deletions.
22 changes: 20 additions & 2 deletions pkg/ccl/changefeedccl/sink_cloudstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -816,9 +816,27 @@ func (f *cloudStorageSinkFile) flushToStorage(
}

// Close implements the Sink interface.
func (s *cloudStorageSink) Close() error {
func (s *cloudStorageSink) Close() (err error) {
// Close any codecs we might have in use and collect the first error if any
// (other errors are ignored because they are likely going to be the same ones,
// though based on the current compression implementation, the close method
// should not return an error).
// Codecs need to be closed because of the klauspost compression library implementation
// details where it spins up go routines to perform compression in parallel.
// Those go routines are cleaned up when the compression codec is closed.
s.files.Ascend(func(i btree.Item) (wantMore bool) {
f := i.(*cloudStorageSinkFile)
if f.codec != nil {
cErr := f.codec.Close()
if err == nil {
err = cErr
}
}
return true
})
s.files = nil
err := s.waitAsyncFlush(context.Background())

err = errors.CombineErrors(err, s.waitAsyncFlush(context.Background()))
close(s.asyncFlushCh) // signal flusher to exit.
err = errors.CombineErrors(err, s.flushGroup.Wait())
return errors.CombineErrors(err, s.es.Close())
Expand Down
43 changes: 43 additions & 0 deletions pkg/ccl/changefeedccl/sink_cloudstorage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/span"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -747,4 +748,46 @@ func TestCloudStorageSink(t *testing.T) {
"w1\n",
}, slurpDir(t))
})

// Verify no goroutines leaked when using compression.
testWithAndWithoutAsyncFlushing(t, `no goroutine leaks with compression`, func(t *testing.T) {
before := opts.Compression
// Compression codecs include buffering that interferes with other tests,
// e.g. the bucketing test that configures very small flush sizes.
defer func() {
opts.Compression = before
}()

topic := makeTopic(`t1`)

for _, compression := range []string{"gzip", "zstd"} {
opts.Compression = compression
t.Run("compress="+stringOrDefault(compression, "none"), func(t *testing.T) {
timestampOracle := explicitTimestampOracle(ts(1))
s, err := makeCloudStorageSink(
ctx, sinkURI(t, unlimitedFileSize), 1, settings, opts,
timestampOracle, externalStorageFromURI, user, nil, nil,
)
require.NoError(t, err)

rng, _ := randutil.NewPseudoRand()
data := randutil.RandBytes(rng, 1024)
// Write few megs worth of data.
for n := 0; n < 20; n++ {
eventTS := ts(int64(n + 1))
require.NoError(t, s.EmitRow(ctx, topic, noKey, data, eventTS, eventTS, zeroAlloc))
}

// Close the sink. That's it -- we rely on leaktest detector to determine
// if the underlying compressor leaked go routines.
require.NoError(t, s.Close())
})
}
})
}

type explicitTimestampOracle hlc.Timestamp

func (o explicitTimestampOracle) inclusiveLowerBoundTS() hlc.Timestamp {
return hlc.Timestamp(o)
}

0 comments on commit 8d0a804

Please sign in to comment.