Skip to content

Commit

Permalink
Merge pull request #130613 from rharding6373/backport23.1-130204
Browse files Browse the repository at this point in the history
release-23.1: changefeedccl: fix memory leak in cloud storage sink with fast gzip
  • Loading branch information
rharding6373 authored Sep 20, 2024
2 parents 3357497 + c1716ac commit a3a11c3
Show file tree
Hide file tree
Showing 5 changed files with 257 additions and 75 deletions.
3 changes: 3 additions & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ go_test(
"//pkg/ccl/storageccl",
"//pkg/ccl/utilccl",
"//pkg/cloud",
"//pkg/cloud/cloudpb",
"//pkg/cloud/impl:cloudimpl",
"//pkg/internal/sqlsmith",
"//pkg/jobs",
Expand Down Expand Up @@ -299,6 +300,7 @@ go_test(
"//pkg/util/ctxgroup",
"//pkg/util/encoding",
"//pkg/util/hlc",
"//pkg/util/ioctx",
"//pkg/util/json",
"//pkg/util/leaktest",
"//pkg/util/log",
Expand All @@ -324,6 +326,7 @@ go_test(
"@com_github_gogo_protobuf//types",
"@com_github_golang_mock//gomock",
"@com_github_jackc_pgx_v4//:pgx",
"@com_github_klauspost_compress//gzip",
"@com_github_lib_pq//:pq",
"@com_github_shopify_sarama//:sarama",
"@com_github_stretchr_testify//assert",
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ func getSink(
}
return makeCloudStorageSink(
ctx, sinkURL{URL: u}, nodeID, serverCfg.Settings, encodingOpts,
timestampOracle, serverCfg.ExternalStorageFromURI, user, metricsBuilder,
timestampOracle, serverCfg.ExternalStorageFromURI, user, metricsBuilder, nil,
)
})
case u.Scheme == changefeedbase.SinkSchemeExperimentalSQL:
Expand Down
51 changes: 49 additions & 2 deletions pkg/ccl/changefeedccl/sink_cloudstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,9 @@ type cloudStorageSink struct {
asyncFlushCh chan flushRequest // channel for submitting flush requests.
asyncFlushTermCh chan struct{} // channel closed by async flusher to indicate an error
asyncFlushErr error // set by async flusher, prior to closing asyncFlushTermCh

// testingKnobs may be nil if no knobs are set.
testingKnobs *TestingKnobs
}

type flushRequest struct {
Expand Down Expand Up @@ -359,6 +362,7 @@ func makeCloudStorageSink(
makeExternalStorageFromURI cloud.ExternalStorageFromURIFactory,
user username.SQLUsername,
mb metricsRecorderBuilder,
testingKnobs *TestingKnobs,
) (Sink, error) {
var targetMaxFileSize int64 = 16 << 20 // 16MB
if fileSizeParam := u.consumeParam(changefeedbase.SinkParamFileSize); fileSizeParam != `` {
Expand Down Expand Up @@ -399,6 +403,7 @@ func makeCloudStorageSink(
flushGroup: ctxgroup.WithContext(ctx),
asyncFlushCh: make(chan flushRequest, flushQueueDepth),
asyncFlushTermCh: make(chan struct{}),
testingKnobs: testingKnobs,
}
s.flushGroup.GoCtx(s.asyncFlusher)

Expand Down Expand Up @@ -625,6 +630,27 @@ func (s *cloudStorageSink) flushTopicVersions(
}
return err == nil
})
if err != nil {
return err
}

// Allow synchronization with the async flusher to happen.
if s.testingKnobs != nil && s.testingKnobs.AsyncFlushSync != nil {
s.testingKnobs.AsyncFlushSync()
}

// Wait for the async flush to complete before clearing files.
// Note that if waitAsyncFlush returns an error some successfully
// flushed files may not be removed from s.files. This is ok, since
// the error will trigger the sink to be closed, and we will only use
// s.files to ensure that the codecs are closed before deallocating it.
err = s.waitAsyncFlush(ctx)
if err != nil {
return err
}

// Files need to be cleared after the flush completes, otherwise file
// resources may be leaked.
for _, v := range toRemove {
s.files.Delete(cloudStorageSinkKey{topic: topic, schemaID: v})
}
Expand All @@ -647,9 +673,24 @@ func (s *cloudStorageSink) Flush(ctx context.Context) error {
if err != nil {
return err
}
s.files.Clear(true /* addNodesToFreeList */)
// Allow synchronization with the async flusher to happen.
if s.testingKnobs != nil && s.testingKnobs.AsyncFlushSync != nil {
s.testingKnobs.AsyncFlushSync()
}
s.setDataFileTimestamp()
return s.waitAsyncFlush(ctx)

// Note that if waitAsyncFlush returns an error some successfully
// flushed files may not be removed from s.files. This is ok, since
// the error will trigger the sink to be closed, and we will only use
// s.files to ensure that the codecs are closed before deallocating it.
err = s.waitAsyncFlush(ctx)
if err != nil {
return err
}
// Files need to be cleared after the flush completes, otherwise file resources
// may not be released properly when closing the sink.
s.files.Clear(true /* addNodesToFreeList */)
return nil
}

func (s *cloudStorageSink) setDataFileTimestamp() {
Expand Down Expand Up @@ -778,6 +819,12 @@ func (s *cloudStorageSink) asyncFlusher(ctx context.Context) error {
continue
}

// Allow synchronization with the flushing routine to happen between getting
// the flush request from the channel and completing the flush.
if s.testingKnobs != nil && s.testingKnobs.AsyncFlushSync != nil {
s.testingKnobs.AsyncFlushSync()
}

// flush file to storage.
flushDone := s.metrics.recordFlushRequestCallback()
err := req.file.flushToStorage(ctx, s.es, req.dest, s.metrics)
Expand Down
Loading

0 comments on commit a3a11c3

Please sign in to comment.