Skip to content

Commit

Permalink
changefeedccl: Release compression codec as soon as error occurs
Browse files Browse the repository at this point in the history
Proactively close compression codecs if an error occurs
when writing to cloud storage file.

While we always call Close on the sink object, this PR
ensures that even if this doesn't happen, the resources used
by compression code are released as soon as an error (during write)
occurs.

Informs #106774

Release note: None
  • Loading branch information
Yevgeniy Miretskiy committed Jul 14, 2023
1 parent 2fb2855 commit 388bb8b
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 4 deletions.
30 changes: 26 additions & 4 deletions pkg/ccl/changefeedccl/sink_cloudstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,11 +517,28 @@ func (s *cloudStorageSink) EmitRow(
key, value []byte,
updated, mvcc hlc.Timestamp,
alloc kvevent.Alloc,
) error {
) (retErr error) {
if s.files == nil {
return errors.New(`cannot EmitRow on a closed sink`)
}

defer func() {
if !s.compression.enabled() {
return
}
if retErr == nil {
retErr = ctx.Err()
}
if retErr != nil {
// If we are returning an error, immediately close all compression
// codecs to release resources. This step is also done in the
// Close() method, but doing this clean-up as soon as we know
// an error has occurred, ensures that we do not leak resources,
// even if the Close() method is not called.
retErr = errors.CombineErrors(retErr, s.closeAllCodecs())
}
}()

s.metrics.recordMessageSize(int64(len(key) + len(value)))
file, err := s.getOrCreateFile(topic, mvcc)
if err != nil {
Expand Down Expand Up @@ -802,8 +819,7 @@ func (f *cloudStorageSinkFile) flushToStorage(
return nil
}

// Close implements the Sink interface.
func (s *cloudStorageSink) Close() (err error) {
func (s *cloudStorageSink) closeAllCodecs() (err error) {
// Close any codecs we might have in use and collect the first error if any
// (other errors are ignored because they are likely going to be the same ones,
// though based on the current compression implementation, the close method
Expand All @@ -815,14 +831,20 @@ func (s *cloudStorageSink) Close() (err error) {
f := i.(*cloudStorageSinkFile)
if f.codec != nil {
cErr := f.codec.Close()
f.codec = nil
if err == nil {
err = cErr
}
}
return true
})
s.files = nil
return err
}

// Close implements the Sink interface.
func (s *cloudStorageSink) Close() error {
err := s.closeAllCodecs()
s.files = nil
err = errors.CombineErrors(err, s.waitAsyncFlush(context.Background()))
close(s.asyncFlushCh) // signal flusher to exit.
err = errors.CombineErrors(err, s.flushGroup.Wait())
Expand Down
50 changes: 50 additions & 0 deletions pkg/ccl/changefeedccl/sink_cloudstorage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -784,6 +784,56 @@ func TestCloudStorageSink(t *testing.T) {
})
}
})

// Verify no goroutines leaked when using compression with context cancellation.
testWithAndWithoutAsyncFlushing(t, `no goroutine leaks when context canceled`, func(t *testing.T) {
before := opts.Compression
// Compression codecs include buffering that interferes with other tests,
// e.g. the bucketing test that configures very small flush sizes.
defer func() {
opts.Compression = before
}()

topic := makeTopic(`t1`)

for _, compression := range []string{"gzip", "zstd"} {
opts.Compression = compression
t.Run("compress="+stringOrDefault(compression, "none"), func(t *testing.T) {
timestampOracle := explicitTimestampOracle(ts(1))
s, err := makeCloudStorageSink(
ctx, sinkURI(t, unlimitedFileSize), 1, settings, opts,
timestampOracle, externalStorageFromURI, user, nil,
)
require.NoError(t, err)
defer func() {
require.NoError(t, s.Close())
}()

// We need to run the following code inside separate
// closure so that we capture the set of goroutines started
// while writing the data (and ignore goroutines started by the sink
// itself).
func() {
defer leaktest.AfterTest(t)()

rng, _ := randutil.NewPseudoRand()
data := randutil.RandBytes(rng, 1024)
// Write few megs worth of data.
for n := 0; n < 20; n++ {
eventTS := ts(int64(n + 1))
require.NoError(t, s.EmitRow(ctx, topic, noKey, data, eventTS, eventTS, zeroAlloc))
}
cancledCtx, cancel := context.WithCancel(ctx)
cancel()

// Write 1 more piece of data. We want to make sure that when error happens
// (context cancellation in this case) that any resources used by compression
// codec are released (this is checked by leaktest).
require.Equal(t, context.Canceled, s.EmitRow(cancledCtx, topic, noKey, data, ts(1), ts(1), zeroAlloc))
}()
})
}
})
}

type explicitTimestampOracle hlc.Timestamp
Expand Down

0 comments on commit 388bb8b

Please sign in to comment.