diff --git a/pkg/ccl/backupccl/backup_metadata_test.go b/pkg/ccl/backupccl/backup_metadata_test.go index dcaf1aa45cbd..2f79c84fa70b 100644 --- a/pkg/ccl/backupccl/backup_metadata_test.go +++ b/pkg/ccl/backupccl/backup_metadata_test.go @@ -68,14 +68,9 @@ func TestMetadataSST(t *testing.T) { func checkMetadata( ctx context.Context, t *testing.T, tc *testcluster.TestCluster, backupLoc string, ) { - store, err := cloud.ExternalStorageFromURI( - ctx, - backupLoc, - base.ExternalIODirConfig{}, - tc.Servers[0].ClusterSettings(), - blobs.TestEmptyBlobClientFactory, - security.RootUserName(), - tc.Servers[0].InternalExecutor().(*sql.InternalExecutor), tc.Servers[0].DB(), nil) + store, err := cloud.ExternalStorageFromURI(ctx, backupLoc, base.ExternalIODirConfig{}, + tc.Servers[0].ClusterSettings(), blobs.TestEmptyBlobClientFactory, security.RootUserName(), + tc.Servers[0].InternalExecutor().(*sql.InternalExecutor), tc.Servers[0].DB(), nil, nil) if err != nil { t.Fatal(err) } diff --git a/pkg/ccl/backupccl/backup_test.go b/pkg/ccl/backupccl/backup_test.go index 5c96263bdec7..0263dbce192c 100644 --- a/pkg/ccl/backupccl/backup_test.go +++ b/pkg/ccl/backupccl/backup_test.go @@ -565,12 +565,9 @@ func TestBackupRestoreAppend(t *testing.T) { // Find the backup times in the collection and try RESTORE'ing to each, and // within each also check if we can restore to individual times captured with // incremental backups that were appended to that backup. - store, err := cloud.ExternalStorageFromURI(ctx, "userfile:///0", - base.ExternalIODirConfig{}, - tc.Servers[0].ClusterSettings(), - blobs.TestEmptyBlobClientFactory, - security.RootUserName(), - tc.Servers[0].InternalExecutor().(*sql.InternalExecutor), tc.Servers[0].DB(), nil) + store, err := cloud.ExternalStorageFromURI(ctx, "userfile:///0", base.ExternalIODirConfig{}, + tc.Servers[0].ClusterSettings(), blobs.TestEmptyBlobClientFactory, security.RootUserName(), + tc.Servers[0].InternalExecutor().(*sql.InternalExecutor), tc.Servers[0].DB(), nil, nil) require.NoError(t, err) defer store.Close() var files []string @@ -8044,12 +8041,9 @@ func TestReadBackupManifestMemoryMonitoring(t *testing.T) { defer dirCleanupFn() st := cluster.MakeTestingClusterSettings() - storage, err := cloud.ExternalStorageFromURI(ctx, - "nodelocal://0/test", - base.ExternalIODirConfig{}, - st, - blobs.TestBlobServiceClient(dir), - security.RootUserName(), nil, nil, nil) + storage, err := cloud.ExternalStorageFromURI(ctx, "nodelocal://0/test", + base.ExternalIODirConfig{}, st, blobs.TestBlobServiceClient(dir), + security.RootUserName(), nil, nil, nil, nil) require.NoError(t, err) m := mon.NewMonitor("test-monitor", mon.MemoryResource, nil, nil, 0, 0, st) diff --git a/pkg/ccl/backupccl/restore_data_processor_test.go b/pkg/ccl/backupccl/restore_data_processor_test.go index 28a521bf1f2e..156a537ff33a 100644 --- a/pkg/ccl/backupccl/restore_data_processor_test.go +++ b/pkg/ccl/backupccl/restore_data_processor_test.go @@ -237,8 +237,8 @@ func runTestIngest(t *testing.T, init func(*cluster.Settings)) { Cfg: &execinfra.ServerConfig{ DB: kvDB, ExternalStorage: func(ctx context.Context, dest roachpb.ExternalStorage) (cloud.ExternalStorage, error) { - return cloud.MakeExternalStorage(ctx, dest, base.ExternalIODirConfig{}, - s.ClusterSettings(), blobs.TestBlobServiceClient(s.ClusterSettings().ExternalIODir), nil, nil, nil) + return cloud.MakeExternalStorage(ctx, dest, base.ExternalIODirConfig{}, s.ClusterSettings(), + blobs.TestBlobServiceClient(s.ClusterSettings().ExternalIODir), nil, nil, nil, nil) }, Settings: s.ClusterSettings(), Codec: keys.SystemSQLCodec, diff --git a/pkg/ccl/changefeedccl/sink_cloudstorage_test.go b/pkg/ccl/changefeedccl/sink_cloudstorage_test.go index 7cfc4261f5f1..e9fc0b314170 100644 --- a/pkg/ccl/changefeedccl/sink_cloudstorage_test.go +++ b/pkg/ccl/changefeedccl/sink_cloudstorage_test.go @@ -159,7 +159,7 @@ func TestCloudStorageSink(t *testing.T) { externalStorageFromURI := func(ctx context.Context, uri string, user security.SQLUsername) (cloud.ExternalStorage, error) { return cloud.ExternalStorageFromURI(ctx, uri, base.ExternalIODirConfig{}, settings, - clientFactory, user, nil, nil, nil) + clientFactory, user, nil, nil, nil, nil) } user := security.RootUserName() diff --git a/pkg/ccl/cliccl/debug_backup.go b/pkg/ccl/cliccl/debug_backup.go index 988f79ea3960..14c7a49043bb 100644 --- a/pkg/ccl/cliccl/debug_backup.go +++ b/pkg/ccl/cliccl/debug_backup.go @@ -283,8 +283,8 @@ func externalStorageFromURIFactory( ) (cloud.ExternalStorage, error) { defaultSettings := &cluster.Settings{} defaultSettings.SV.Init(ctx, nil /* opaque */) - return cloud.ExternalStorageFromURI(ctx, uri, base.ExternalIODirConfig{}, - defaultSettings, newBlobFactory, user, nil /*Internal Executor*/, nil /*kvDB*/, nil) + return cloud.ExternalStorageFromURI(ctx, uri, base.ExternalIODirConfig{}, defaultSettings, + newBlobFactory, user, nil, nil, nil, nil) } func getManifestFromURI(ctx context.Context, path string) (backupccl.BackupManifest, error) { @@ -580,7 +580,7 @@ func makeIters( var err error clusterSettings := cluster.MakeClusterSettings() dirStorage[i], err = cloud.MakeExternalStorage(ctx, file.Dir, base.ExternalIODirConfig{}, - clusterSettings, newBlobFactory, nil /*internal executor*/, nil /*kvDB*/, nil) + clusterSettings, newBlobFactory, nil, nil, nil, nil) if err != nil { return nil, nil, errors.Wrapf(err, "making external storage") } diff --git a/pkg/ccl/workloadccl/storage.go b/pkg/ccl/workloadccl/storage.go index fe8395f1b354..93fb724fd564 100644 --- a/pkg/ccl/workloadccl/storage.go +++ b/pkg/ccl/workloadccl/storage.go @@ -35,9 +35,9 @@ func GetStorage(ctx context.Context, cfg FixtureConfig) (cloud.ExternalStorage, return nil, errors.AssertionFailedf("unsupported external storage provider; valid providers are gs, s3, and azure") } - s, err := cloud.ExternalStorageFromURI(ctx, cfg.ObjectPathToURI(), - base.ExternalIODirConfig{}, clustersettings.MakeClusterSettings(), - nil, security.SQLUsername{}, nil, nil, nil) + s, err := cloud.ExternalStorageFromURI(ctx, cfg.ObjectPathToURI(), base.ExternalIODirConfig{}, + clustersettings.MakeClusterSettings(), nil, security.SQLUsername{}, + nil, nil, nil, nil) if err != nil { return nil, errors.Wrap(err, storageError) } diff --git a/pkg/cloud/BUILD.bazel b/pkg/cloud/BUILD.bazel index 5efeb18747e3..efc861768a2d 100644 --- a/pkg/cloud/BUILD.bazel +++ b/pkg/cloud/BUILD.bazel @@ -24,6 +24,7 @@ go_library( "//pkg/util/ctxgroup", "//pkg/util/ioctx", "//pkg/util/log", + "//pkg/util/mon", "//pkg/util/quotapool", "//pkg/util/retry", "//pkg/util/sysutil", diff --git a/pkg/cloud/amazon/BUILD.bazel b/pkg/cloud/amazon/BUILD.bazel index 15616cabf1b7..a3653b76f559 100644 --- a/pkg/cloud/amazon/BUILD.bazel +++ b/pkg/cloud/amazon/BUILD.bazel @@ -18,6 +18,7 @@ go_library( "//pkg/util/contextutil", "//pkg/util/ioctx", "//pkg/util/log", + "//pkg/util/mon", "//pkg/util/syncutil", "//pkg/util/tracing", "@com_github_aws_aws_sdk_go//aws", diff --git a/pkg/cloud/amazon/s3_storage.go b/pkg/cloud/amazon/s3_storage.go index 696cb4f82f70..7cd09566f782 100644 --- a/pkg/cloud/amazon/s3_storage.go +++ b/pkg/cloud/amazon/s3_storage.go @@ -34,6 +34,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/contextutil" "github.com/cockroachdb/cockroach/pkg/util/ioctx" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" @@ -79,6 +80,25 @@ type s3Storage struct { opts s3ClientConfig cached *s3Client + mon *mon.BytesMonitor +} + +var _ cloud.ExternalStorage = &s3Storage{} + +// s3StorageWriter wraps the underlying s3 cloud storage writer. +type s3StorageWriter struct { + io.WriteCloser + + ctx context.Context + memAcc *mon.BoundAccount +} + +// Close implements the WriteCloser interface. +func (w *s3StorageWriter) Close() error { + if w.memAcc != nil { + w.memAcc.Close(w.ctx) + } + return w.WriteCloser.Close() } // s3Client wraps an SDK client and uploader for a given session. @@ -125,8 +145,6 @@ var s3ClientCache struct { client *s3Client } -var _ cloud.ExternalStorage = &s3Storage{} - type serverSideEncMode string const ( @@ -191,8 +209,8 @@ func parseS3URL(_ cloud.ExternalStorageURIContext, uri *url.URL) (roachpb.Extern return conf, nil } -// MakeS3Storage returns an instance of S3 ExternalStorage. -func MakeS3Storage( +// makeS3Storage returns an instance of S3 ExternalStorage. +func makeS3Storage( ctx context.Context, args cloud.ExternalStorageContext, dest roachpb.ExternalStorage, ) (cloud.ExternalStorage, error) { telemetry.Count("external-io.s3") @@ -258,6 +276,7 @@ func MakeS3Storage( prefix: conf.Prefix, settings: args.Settings, opts: clientConfig(conf), + mon: args.Mon, } reuse := reuseSession.Get(&args.Settings.SV) @@ -402,14 +421,28 @@ func (s *s3Storage) Settings() *cluster.Settings { } func (s *s3Storage) Writer(ctx context.Context, basename string) (io.WriteCloser, error) { + ctx, sp := tracing.ChildSpan(ctx, "s3.Writer") + sp.RecordStructured(&types.StringValue{Value: fmt.Sprintf("s3.Writer: %s", path.Join(s.prefix, basename))}) + + // The PartSize determines how much the uploader will buffer while uploading + // chunks to storage. This buffering is to enable retrying uploads in the face + // of transient errors. We should account for this buffer size in our memory + // monitor. + var memAcc *mon.BoundAccount + if s.mon != nil { + acc := s.mon.MakeBoundAccount() + memAcc = &acc + if err := memAcc.Grow(ctx, cloud.WriteChunkSize.Get(&s.settings.SV)); err != nil { + return nil, err + } + } + uploader, err := s.getUploader(ctx) if err != nil { return nil, err } - ctx, sp := tracing.ChildSpan(ctx, "s3.Writer") - sp.RecordStructured(&types.StringValue{Value: fmt.Sprintf("s3.Writer: %s", path.Join(s.prefix, basename))}) - return cloud.BackgroundPipe(ctx, func(ctx context.Context, r io.Reader) error { + w := cloud.BackgroundPipe(ctx, func(ctx context.Context, r io.Reader) error { defer sp.Finish() // Upload the file to S3. // TODO(dt): test and tune the uploader parameters. @@ -422,7 +455,13 @@ func (s *s3Storage) Writer(ctx context.Context, basename string) (io.WriteCloser StorageClass: nilIfEmpty(s.conf.StorageClass), }) return errors.Wrap(err, "upload failed") - }), nil + }) + + return &s3StorageWriter{ + WriteCloser: w, + ctx: ctx, + memAcc: memAcc, + }, nil } func (s *s3Storage) openStreamAt( @@ -610,5 +649,5 @@ func s3ErrDelay(err error) time.Duration { func init() { cloud.RegisterExternalStorageProvider(roachpb.ExternalStorageProvider_s3, - parseS3URL, MakeS3Storage, cloud.RedactedParams(AWSSecretParam, AWSTempTokenParam), "s3") + parseS3URL, makeS3Storage, cloud.RedactedParams(AWSSecretParam, AWSTempTokenParam), "s3") } diff --git a/pkg/cloud/amazon/s3_storage_test.go b/pkg/cloud/amazon/s3_storage_test.go index 33843cf4a533..33eef718c2ad 100644 --- a/pkg/cloud/amazon/s3_storage_test.go +++ b/pkg/cloud/amazon/s3_storage_test.go @@ -34,7 +34,7 @@ import ( "github.com/stretchr/testify/require" ) -func makeS3Storage( +func testingMakeS3Storage( ctx context.Context, uri string, user security.SQLUsername, ) (cloud.ExternalStorage, error) { conf, err := cloud.ExternalStorageConfFromURI(uri, user) @@ -46,7 +46,7 @@ func makeS3Storage( // Setup a sink for the given args. clientFactory := blobs.TestBlobServiceClient(testSettings.ExternalIODir) s, err := cloud.MakeExternalStorage(ctx, conf, base.ExternalIODirConfig{}, testSettings, - clientFactory, nil, nil, nil) + clientFactory, nil, nil, nil, nil) if err != nil { return nil, err } @@ -75,8 +75,8 @@ func TestPutS3(t *testing.T) { user := security.RootUserName() t.Run("auth-empty-no-cred", func(t *testing.T) { _, err := cloud.ExternalStorageFromURI(ctx, fmt.Sprintf("s3://%s/%s", bucket, - "backup-test-default"), base.ExternalIODirConfig{}, testSettings, - blobs.TestEmptyBlobClientFactory, user, nil, nil, nil) + "backup-test-default"), base.ExternalIODirConfig{}, testSettings, blobs.TestEmptyBlobClientFactory, + user, nil, nil, nil, nil) require.EqualError(t, err, fmt.Sprintf( `%s is set to '%s', but %s is not set`, cloud.AuthParam, @@ -162,7 +162,7 @@ func TestPutS3(t *testing.T) { cloud.AuthParam, cloud.AuthParamImplicit, AWSServerSideEncryptionMode, "unsupported-algorithm") - _, err = makeS3Storage(ctx, invalidSSEModeURI, user) + _, err = testingMakeS3Storage(ctx, invalidSSEModeURI, user) require.True(t, testutils.IsError(err, "unsupported server encryption mode unsupported-algorithm. Supported values are `aws:kms` and `AES256")) // Specify aws:kms encryption mode but don't specify kms ID. @@ -171,7 +171,7 @@ func TestPutS3(t *testing.T) { bucket, "backup-test-sse-256", cloud.AuthParam, cloud.AuthParamImplicit, AWSServerSideEncryptionMode, "aws:kms") - _, err = makeS3Storage(ctx, invalidKMSURI, user) + _, err = testingMakeS3Storage(ctx, invalidKMSURI, user) require.True(t, testutils.IsError(err, "AWS_SERVER_KMS_ID param must be set when using aws:kms server side encryption mode.")) }) } @@ -215,7 +215,7 @@ func TestPutS3Endpoint(t *testing.T) { func TestS3DisallowCustomEndpoints(t *testing.T) { defer leaktest.AfterTest(t)() dest := roachpb.ExternalStorage{S3Config: &roachpb.ExternalStorage_S3{Endpoint: "http://do.not.go.there/"}} - s3, err := MakeS3Storage(context.Background(), + s3, err := makeS3Storage(context.Background(), cloud.ExternalStorageContext{ IOConf: base.ExternalIODirConfig{DisableHTTP: true}, }, @@ -231,7 +231,7 @@ func TestS3DisallowImplicitCredentials(t *testing.T) { testSettings := cluster.MakeTestingClusterSettings() - s3, err := MakeS3Storage(context.Background(), + s3, err := makeS3Storage(context.Background(), cloud.ExternalStorageContext{ IOConf: base.ExternalIODirConfig{DisableImplicitCredentials: true}, Settings: testSettings, @@ -284,8 +284,8 @@ func TestS3BucketDoesNotExist(t *testing.T) { // Setup a sink for the given args. clientFactory := blobs.TestBlobServiceClient(testSettings.ExternalIODir) - s, err := cloud.MakeExternalStorage(ctx, conf, base.ExternalIODirConfig{}, testSettings, - clientFactory, nil, nil, nil) + s, err := cloud.MakeExternalStorage(ctx, conf, base.ExternalIODirConfig{}, + testSettings, clientFactory, nil, nil, nil, nil) if err != nil { t.Fatal(err) } diff --git a/pkg/cloud/azure/BUILD.bazel b/pkg/cloud/azure/BUILD.bazel index a1339dca3678..e03cb067d477 100644 --- a/pkg/cloud/azure/BUILD.bazel +++ b/pkg/cloud/azure/BUILD.bazel @@ -13,6 +13,7 @@ go_library( "//pkg/settings/cluster", "//pkg/util/contextutil", "//pkg/util/ioctx", + "//pkg/util/mon", "//pkg/util/tracing", "@com_github_azure_azure_storage_blob_go//azblob", "@com_github_cockroachdb_errors//:errors", diff --git a/pkg/cloud/azure/azure_storage.go b/pkg/cloud/azure/azure_storage.go index bc659d8ba134..5d3d28207789 100644 --- a/pkg/cloud/azure/azure_storage.go +++ b/pkg/cloud/azure/azure_storage.go @@ -26,6 +26,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/util/contextutil" "github.com/cockroachdb/cockroach/pkg/util/ioctx" + "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" "github.com/gogo/protobuf/types" @@ -65,10 +66,27 @@ type azureStorage struct { container azblob.ContainerURL prefix string settings *cluster.Settings + mon *mon.BytesMonitor } var _ cloud.ExternalStorage = &azureStorage{} +// azureStorageWriter wraps the underlying azure cloud storage writer. +type azureStorageWriter struct { + io.WriteCloser + + ctx context.Context + memAcc *mon.BoundAccount +} + +// Close implements the WriteCloser interface. +func (w *azureStorageWriter) Close() error { + if w.memAcc != nil { + w.memAcc.Close(w.ctx) + } + return w.WriteCloser.Close() +} + func makeAzureStorage( _ context.Context, args cloud.ExternalStorageContext, dest roachpb.ExternalStorage, ) (cloud.ExternalStorage, error) { @@ -121,15 +139,36 @@ func (s *azureStorage) Writer(ctx context.Context, basename string) (io.WriteClo sp.RecordStructured(&types.StringValue{Value: fmt.Sprintf("azure.Writer: %s", path.Join(s.prefix, basename))}) blob := s.getBlob(basename) - return cloud.BackgroundPipe(ctx, func(ctx context.Context, r io.Reader) error { + + // The BufferSize determines how much the client will buffer while uploading + // chunks to storage. This buffering is to enable retrying uploads in the face + // of transient errors. We should account for this buffer size in our memory + // monitor. + var memAcc *mon.BoundAccount + bufferSize := int(cloud.WriteChunkSize.Get(&s.settings.SV)) + if s.mon != nil { + acc := s.mon.MakeBoundAccount() + memAcc = &acc + if err := memAcc.Grow(ctx, int64(bufferSize)); err != nil { + return nil, err + } + } + + w := cloud.BackgroundPipe(ctx, func(ctx context.Context, r io.Reader) error { defer sp.Finish() _, err := azblob.UploadStreamToBlockBlob( ctx, r, blob, azblob.UploadStreamToBlockBlobOptions{ - BufferSize: int(cloud.WriteChunkSize.Get(&s.settings.SV)), + BufferSize: bufferSize, }, ) return err - }), nil + }) + + return &azureStorageWriter{ + WriteCloser: w, + ctx: ctx, + memAcc: memAcc, + }, nil } // ReadFile is shorthand for ReadFileAt with offset 0. diff --git a/pkg/cloud/cloudtestutils/cloud_test_helpers.go b/pkg/cloud/cloudtestutils/cloud_test_helpers.go index 4aff98ba5832..6b0d1e93615d 100644 --- a/pkg/cloud/cloudtestutils/cloud_test_helpers.go +++ b/pkg/cloud/cloudtestutils/cloud_test_helpers.go @@ -117,7 +117,7 @@ func storeFromURI( } // Setup a sink for the given args. s, err := cloud.MakeExternalStorage(ctx, conf, base.ExternalIODirConfig{}, testSettings, - clientFactory, ie, kvDB, nil) + clientFactory, ie, kvDB, nil, nil) if err != nil { t.Fatal(err) } @@ -144,7 +144,8 @@ func CheckExportStore( // Setup a sink for the given args. clientFactory := blobs.TestBlobServiceClient(testSettings.ExternalIODir) - s, err := cloud.MakeExternalStorage(ctx, conf, ioConf, testSettings, clientFactory, ie, kvDB, nil) + s, err := cloud.MakeExternalStorage(ctx, conf, ioConf, testSettings, clientFactory, ie, kvDB, + nil, nil) if err != nil { t.Fatal(err) } @@ -493,9 +494,8 @@ func uploadData( data := randutil.RandBytes(rnd, 16<<20) ctx := context.Background() - s, err := cloud.MakeExternalStorage( - ctx, dest, base.ExternalIODirConfig{}, testSettings, - nil, nil, nil, nil) + s, err := cloud.MakeExternalStorage(ctx, dest, base.ExternalIODirConfig{}, testSettings, + nil, nil, nil, nil, nil) require.NoError(t, err) require.NoError(t, cloud.WriteFile(ctx, s, basename, bytes.NewReader(data))) return data, func() { @@ -536,7 +536,7 @@ func CheckAntagonisticRead( ctx := context.Background() s, err := cloud.MakeExternalStorage( ctx, conf, base.ExternalIODirConfig{}, testSettings, - nil, nil, nil, nil) + nil, nil, nil, nil, nil) require.NoError(t, err) defer s.Close() diff --git a/pkg/cloud/external_storage.go b/pkg/cloud/external_storage.go index 654fb17a76d0..70fac84be787 100644 --- a/pkg/cloud/external_storage.go +++ b/pkg/cloud/external_storage.go @@ -24,6 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/util/ioctx" + "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/errors" ) @@ -149,6 +150,7 @@ type ExternalStorageContext struct { BlobClientFactory blobs.BlobClientFactory InternalExecutor sqlutil.InternalExecutor DB *kv.DB + Mon *mon.BytesMonitor } // ExternalStorageConstructor is a function registered to create instances diff --git a/pkg/cloud/gcp/BUILD.bazel b/pkg/cloud/gcp/BUILD.bazel index bfec0a2e091b..7c9c648e59b6 100644 --- a/pkg/cloud/gcp/BUILD.bazel +++ b/pkg/cloud/gcp/BUILD.bazel @@ -17,6 +17,7 @@ go_library( "//pkg/settings/cluster", "//pkg/util/contextutil", "//pkg/util/ioctx", + "//pkg/util/mon", "//pkg/util/tracing", "@com_github_cockroachdb_errors//:errors", "@com_github_gogo_protobuf//types", diff --git a/pkg/cloud/gcp/gcs_storage.go b/pkg/cloud/gcp/gcs_storage.go index 3ce1d6c9c83f..c034683609e6 100644 --- a/pkg/cloud/gcp/gcs_storage.go +++ b/pkg/cloud/gcp/gcs_storage.go @@ -28,6 +28,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/util/contextutil" "github.com/cockroachdb/cockroach/pkg/util/ioctx" + "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" "github.com/gogo/protobuf/types" @@ -75,10 +76,27 @@ type gcsStorage struct { ioConf base.ExternalIODirConfig prefix string settings *cluster.Settings + mon *mon.BytesMonitor } var _ cloud.ExternalStorage = &gcsStorage{} +// gcsStorageWriter wraps the underlying google cloud storage writer. +type gcsStorageWriter struct { + io.WriteCloser + + ctx context.Context + memAcc *mon.BoundAccount +} + +// Close implements the WriteCloser interface. +func (w *gcsStorageWriter) Close() error { + if w.memAcc != nil { + w.memAcc.Close(w.ctx) + } + return w.WriteCloser.Close() +} + func (g *gcsStorage) Conf() roachpb.ExternalStorage { return roachpb.ExternalStorage{ Provider: roachpb.ExternalStorageProvider_gs, @@ -152,6 +170,7 @@ func makeGCSStorage( ioConf: args.IOConf, prefix: conf.Prefix, settings: args.Settings, + mon: args.Mon, }, nil } @@ -166,7 +185,25 @@ func (g *gcsStorage) Writer(ctx context.Context, basename string) (io.WriteClose if !gcsChunkingEnabled.Get(&g.settings.SV) { w.ChunkSize = 0 } - return w, nil + + // The ChunkSize determines how much the client will buffer while uploading + // chunks to storage. This buffering is to enable retrying uploads in the face + // of transient errors. We should account for this buffer size in our memory + // monitor. + var memAcc *mon.BoundAccount + if g.mon != nil { + acc := g.mon.MakeBoundAccount() + memAcc = &acc + if err := memAcc.Grow(ctx, int64(w.ChunkSize)); err != nil { + return nil, err + } + } + + return &gcsStorageWriter{ + WriteCloser: w, + ctx: ctx, + memAcc: memAcc, + }, nil } // ReadFile is shorthand for ReadFileAt with offset 0. diff --git a/pkg/cloud/gcp/gcs_storage_test.go b/pkg/cloud/gcp/gcs_storage_test.go index 94b8bd07ac3d..c1eadaa5fad0 100644 --- a/pkg/cloud/gcp/gcs_storage_test.go +++ b/pkg/cloud/gcp/gcs_storage_test.go @@ -128,9 +128,8 @@ func TestFileDoesNotExist(t *testing.T) { conf, err := cloud.ExternalStorageConfFromURI(gsFile, user) require.NoError(t, err) - s, err := cloud.MakeExternalStorage( - context.Background(), conf, base.ExternalIODirConfig{}, testSettings, - nil, nil, nil, nil) + s, err := cloud.MakeExternalStorage(context.Background(), conf, base.ExternalIODirConfig{}, + testSettings, nil, nil, nil, nil, nil) require.NoError(t, err) _, err = s.ReadFile(context.Background(), "") require.Error(t, err, "") @@ -143,9 +142,8 @@ func TestFileDoesNotExist(t *testing.T) { conf, err := cloud.ExternalStorageConfFromURI(gsFile, user) require.NoError(t, err) - s, err := cloud.MakeExternalStorage( - context.Background(), conf, base.ExternalIODirConfig{}, testSettings, nil, - nil, nil, nil) + s, err := cloud.MakeExternalStorage(context.Background(), conf, base.ExternalIODirConfig{}, + testSettings, nil, nil, nil, nil, nil) require.NoError(t, err) _, err = s.ReadFile(context.Background(), "") require.Error(t, err, "") @@ -173,9 +171,11 @@ func TestCompressedGCS(t *testing.T) { conf2, err := cloud.ExternalStorageConfFromURI(gsFile2, user) require.NoError(t, err) - s1, err := cloud.MakeExternalStorage(ctx, conf1, base.ExternalIODirConfig{}, testSettings, nil, nil, nil, nil) + s1, err := cloud.MakeExternalStorage(ctx, conf1, base.ExternalIODirConfig{}, testSettings, + nil, nil, nil, nil, nil) require.NoError(t, err) - s2, err := cloud.MakeExternalStorage(ctx, conf2, base.ExternalIODirConfig{}, testSettings, nil, nil, nil, nil) + s2, err := cloud.MakeExternalStorage(ctx, conf2, base.ExternalIODirConfig{}, testSettings, + nil, nil, nil, nil, nil) require.NoError(t, err) reader1, err := s1.ReadFile(context.Background(), "") diff --git a/pkg/cloud/httpsink/http_storage_test.go b/pkg/cloud/httpsink/http_storage_test.go index 7a86f83b640e..237044c33417 100644 --- a/pkg/cloud/httpsink/http_storage_test.go +++ b/pkg/cloud/httpsink/http_storage_test.go @@ -161,8 +161,8 @@ func TestPutHttp(t *testing.T) { if err != nil { t.Fatal(err) } - s, err := cloud.MakeExternalStorage(ctx, conf, base.ExternalIODirConfig{}, - testSettings, blobs.TestEmptyBlobClientFactory, nil, nil, nil) + s, err := cloud.MakeExternalStorage(ctx, conf, base.ExternalIODirConfig{}, testSettings, + blobs.TestEmptyBlobClientFactory, nil, nil, nil, nil) if err != nil { t.Fatal(err) } @@ -315,10 +315,9 @@ func TestCanDisableHttp(t *testing.T) { } testSettings := cluster.MakeTestingClusterSettings() - s, err := cloud.MakeExternalStorage( - context.Background(), - roachpb.ExternalStorage{Provider: roachpb.ExternalStorageProvider_http}, - conf, testSettings, blobs.TestEmptyBlobClientFactory, nil, nil, nil) + s, err := cloud.MakeExternalStorage(context.Background(), + roachpb.ExternalStorage{Provider: roachpb.ExternalStorageProvider_http}, conf, + testSettings, blobs.TestEmptyBlobClientFactory, nil, nil, nil, nil) require.Nil(t, s) require.Error(t, err) } @@ -336,10 +335,8 @@ func TestCanDisableOutbound(t *testing.T) { roachpb.ExternalStorageProvider_gs, roachpb.ExternalStorageProvider_nodelocal, } { - s, err := cloud.MakeExternalStorage( - context.Background(), - roachpb.ExternalStorage{Provider: provider}, - conf, testSettings, blobs.TestEmptyBlobClientFactory, nil, nil, nil) + s, err := cloud.MakeExternalStorage(context.Background(), roachpb.ExternalStorage{Provider: provider}, + conf, testSettings, blobs.TestEmptyBlobClientFactory, nil, nil, nil, nil) require.Nil(t, s) require.Error(t, err) } @@ -368,9 +365,8 @@ func TestExternalStorageCanUseHTTPProxy(t *testing.T) { conf, err := cloud.ExternalStorageConfFromURI("http://my-server", security.RootUserName()) require.NoError(t, err) - s, err := cloud.MakeExternalStorage( - context.Background(), conf, base.ExternalIODirConfig{}, testSettings, nil, - nil, nil, nil) + s, err := cloud.MakeExternalStorage(context.Background(), conf, base.ExternalIODirConfig{}, + testSettings, nil, nil, nil, nil, nil) require.NoError(t, err) stream, err := s.ReadFile(context.Background(), "file") require.NoError(t, err) diff --git a/pkg/cloud/impl_registry.go b/pkg/cloud/impl_registry.go index 5145ececad42..761add0c887b 100644 --- a/pkg/cloud/impl_registry.go +++ b/pkg/cloud/impl_registry.go @@ -28,6 +28,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/util/ioctx" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/quotapool" "github.com/cockroachdb/errors" ) @@ -141,12 +142,14 @@ func ExternalStorageFromURI( ie sqlutil.InternalExecutor, kvDB *kv.DB, limiters Limiters, + mon *mon.BytesMonitor, ) (ExternalStorage, error) { conf, err := ExternalStorageConfFromURI(uri, user) if err != nil { return nil, err } - return MakeExternalStorage(ctx, conf, externalConfig, settings, blobClientFactory, ie, kvDB, limiters) + return MakeExternalStorage(ctx, conf, externalConfig, settings, blobClientFactory, ie, kvDB, + limiters, mon) } // SanitizeExternalStorageURI returns the external storage URI with with some @@ -193,6 +196,7 @@ func MakeExternalStorage( ie sqlutil.InternalExecutor, kvDB *kv.DB, limiters Limiters, + mon *mon.BytesMonitor, ) (ExternalStorage, error) { args := ExternalStorageContext{ IOConf: conf, @@ -200,6 +204,7 @@ func MakeExternalStorage( BlobClientFactory: blobClientFactory, InternalExecutor: ie, DB: kvDB, + Mon: mon, } if conf.DisableOutbound && dest.Provider != roachpb.ExternalStorageProvider_userfile { return nil, errors.New("external network access is disabled") diff --git a/pkg/cloud/nodelocal/nodelocal_storage.go b/pkg/cloud/nodelocal/nodelocal_storage.go index 01ac575f0dd8..68fddbaf86a6 100644 --- a/pkg/cloud/nodelocal/nodelocal_storage.go +++ b/pkg/cloud/nodelocal/nodelocal_storage.go @@ -73,18 +73,6 @@ func MakeLocalStorageURI(path string) string { return fmt.Sprintf("nodelocal://0/%s", path) } -// TestingMakeLocalStorage is used by tests. -func TestingMakeLocalStorage( - ctx context.Context, - cfg roachpb.ExternalStorage_LocalFilePath, - settings *cluster.Settings, - blobClientFactory blobs.BlobClientFactory, - ioConf base.ExternalIODirConfig, -) (cloud.ExternalStorage, error) { - args := cloud.ExternalStorageContext{IOConf: ioConf, BlobClientFactory: blobClientFactory, Settings: settings} - return makeLocalStorage(ctx, args, roachpb.ExternalStorage{LocalFile: cfg}) -} - func makeLocalStorage( ctx context.Context, args cloud.ExternalStorageContext, dest roachpb.ExternalStorage, ) (cloud.ExternalStorage, error) { diff --git a/pkg/cloud/nullsink/nullsink_storage_test.go b/pkg/cloud/nullsink/nullsink_storage_test.go index 6f6a261ad71a..84607c4f334d 100644 --- a/pkg/cloud/nullsink/nullsink_storage_test.go +++ b/pkg/cloud/nullsink/nullsink_storage_test.go @@ -36,7 +36,7 @@ func TestNullSinkReadAndWrite(t *testing.T) { t.Fatal(err) } - s, err := cloud.MakeExternalStorage(ctx, conf, base.ExternalIODirConfig{}, nil, nil, nil, nil, nil) + s, err := cloud.MakeExternalStorage(ctx, conf, base.ExternalIODirConfig{}, nil, nil, nil, nil, nil, nil) if err != nil { t.Fatal(err) } diff --git a/pkg/cloud/userfile/file_table_storage_test.go b/pkg/cloud/userfile/file_table_storage_test.go index 50eec5d18fe9..91610db75bbf 100644 --- a/pkg/cloud/userfile/file_table_storage_test.go +++ b/pkg/cloud/userfile/file_table_storage_test.go @@ -69,7 +69,7 @@ func TestPutUserFileTable(t *testing.T) { store, err := cloud.ExternalStorageFromURI(ctx, userfileURL.String()+"/", base.ExternalIODirConfig{}, cluster.NoSettings, blobs.TestEmptyBlobClientFactory, - security.RootUserName(), ie, kvDB, nil) + security.RootUserName(), ie, kvDB, nil, nil) require.NoError(t, err) defer store.Close() @@ -116,13 +116,13 @@ func TestUserScoping(t *testing.T) { // Write file as user1. fileTableSystem1, err := cloud.ExternalStorageFromURI(ctx, dest, base.ExternalIODirConfig{}, - cluster.NoSettings, blobs.TestEmptyBlobClientFactory, user1, ie, kvDB, nil) + cluster.NoSettings, blobs.TestEmptyBlobClientFactory, user1, ie, kvDB, nil, nil) require.NoError(t, err) require.NoError(t, cloud.WriteFile(ctx, fileTableSystem1, filename, bytes.NewReader([]byte("aaa")))) // Attempt to read/write file as user2 and expect to fail. fileTableSystem2, err := cloud.ExternalStorageFromURI(ctx, dest, base.ExternalIODirConfig{}, - cluster.NoSettings, blobs.TestEmptyBlobClientFactory, user2, ie, kvDB, nil) + cluster.NoSettings, blobs.TestEmptyBlobClientFactory, user2, ie, kvDB, nil, nil) require.NoError(t, err) _, err = fileTableSystem2.ReadFile(ctx, filename) require.Error(t, err) @@ -130,7 +130,8 @@ func TestUserScoping(t *testing.T) { // Read file as root and expect to succeed. fileTableSystem3, err := cloud.ExternalStorageFromURI(ctx, dest, base.ExternalIODirConfig{}, - cluster.NoSettings, blobs.TestEmptyBlobClientFactory, security.RootUserName(), ie, kvDB, nil) + cluster.NoSettings, blobs.TestEmptyBlobClientFactory, security.RootUserName(), ie, kvDB, + nil, nil) require.NoError(t, err) _, err = fileTableSystem3.ReadFile(ctx, filename) require.NoError(t, err) diff --git a/pkg/server/external_storage_builder.go b/pkg/server/external_storage_builder.go index aee8638db460..ccc73a7c21fb 100644 --- a/pkg/server/external_storage_builder.go +++ b/pkg/server/external_storage_builder.go @@ -22,14 +22,15 @@ import ( "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/errors" ) // externalStorageBuilder is a wrapper around the ExternalStorage factory // methods. It allows us to separate the creation and initialization of the // builder between NewServer() and Start() respectively. -// TODO(adityamaru): Consider moving this to pkg/cloud/impl at a future -// stage of the ongoing refactor. +// +// TODO(adityamaru): Consider moving this to pkg/cloud/impl. type externalStorageBuilder struct { conf base.ExternalIODirConfig settings *cluster.Settings @@ -38,33 +39,39 @@ type externalStorageBuilder struct { ie *sql.InternalExecutor db *kv.DB limiters cloud.Limiters + mon *mon.BytesMonitor +} + +// externalStorageBuilderConfig contains the information needed to initialize an +// externalStorageBuilder. +type externalStorageBuilderConfig struct { + conf base.ExternalIODirConfig + settings *cluster.Settings + nodeIDContainer *base.NodeIDContainer + nodeDialer *nodedialer.Dialer + ie *sql.InternalExecutor + db *kv.DB + mon *mon.BytesMonitor + knobs base.TestingKnobs } -func (e *externalStorageBuilder) init( - ctx context.Context, - conf base.ExternalIODirConfig, - settings *cluster.Settings, - nodeIDContainer *base.NodeIDContainer, - nodeDialer *nodedialer.Dialer, - testingKnobs base.TestingKnobs, - ie *sql.InternalExecutor, - db *kv.DB, -) { +func (e *externalStorageBuilder) init(ctx context.Context, cfg *externalStorageBuilderConfig) { var blobClientFactory blobs.BlobClientFactory - if p, ok := testingKnobs.Server.(*TestingKnobs); ok && p.BlobClientFactory != nil { + if p, ok := cfg.knobs.Server.(*TestingKnobs); ok && p.BlobClientFactory != nil { blobClientFactory = p.BlobClientFactory } if blobClientFactory == nil { - blobClientFactory = blobs.NewBlobClientFactory(nodeIDContainer, nodeDialer, settings.ExternalIODir) + blobClientFactory = blobs.NewBlobClientFactory(cfg.nodeIDContainer, + cfg.nodeDialer, cfg.settings.ExternalIODir) } - e.conf = conf - e.settings = settings + e.conf = cfg.conf + e.settings = cfg.settings e.blobClientFactory = blobClientFactory e.initCalled = true - e.ie = ie - e.db = db - e.limiters = cloud.MakeLimiters(ctx, &settings.SV) - + e.ie = cfg.ie + e.db = cfg.db + e.limiters = cloud.MakeLimiters(ctx, &cfg.settings.SV) + e.mon = cfg.mon } func (e *externalStorageBuilder) makeExternalStorage( @@ -74,7 +81,7 @@ func (e *externalStorageBuilder) makeExternalStorage( return nil, errors.New("cannot create external storage before init") } return cloud.MakeExternalStorage(ctx, dest, e.conf, e.settings, e.blobClientFactory, e.ie, - e.db, e.limiters) + e.db, e.limiters, e.mon) } func (e *externalStorageBuilder) makeExternalStorageFromURI( @@ -83,5 +90,6 @@ func (e *externalStorageBuilder) makeExternalStorageFromURI( if !e.initCalled { return nil, errors.New("cannot create external storage before init") } - return cloud.ExternalStorageFromURI(ctx, uri, e.conf, e.settings, e.blobClientFactory, user, e.ie, e.db, e.limiters) + return cloud.ExternalStorageFromURI(ctx, uri, e.conf, e.settings, e.blobClientFactory, user, + e.ie, e.db, e.limiters, e.mon) } diff --git a/pkg/server/server.go b/pkg/server/server.go index c37d2c1e162b..65d4eb0c2709 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -978,16 +978,24 @@ func (s *Server) PreStart(ctx context.Context) error { // Initialize the external storage builders configuration params now that the // engines have been created. The object can be used to create ExternalStorage // objects hereafter. - fileTableInternalExecutor := sql.MakeInternalExecutor(ctx, s.PGServer().SQLServer, sql.MemoryMetrics{}, s.st) - s.externalStorageBuilder.init( - ctx, - s.cfg.ExternalIODirConfig, - s.st, - s.nodeIDContainer, - s.nodeDialer, - s.cfg.TestingKnobs, - &fileTableInternalExecutor, - s.db) + fileTableInternalExecutor := sql.MakeInternalExecutor(ctx, + s.PGServer().SQLServer, sql.MemoryMetrics{}, s.st) + externalStorageMemoryMonitor := mon.NewMonitorInheritWithLimit( + "external-storage-mem", 0 /* limit */, s.sqlServer.execCfg.RootMemoryMonitor) + externalStorageMemoryMonitor.Start(ctx, s.sqlServer.execCfg.RootMemoryMonitor, mon.BoundAccount{}) + s.externalStorageBuilder.init(ctx, &externalStorageBuilderConfig{ + conf: s.cfg.ExternalIODirConfig, + settings: s.st, + nodeIDContainer: s.nodeIDContainer, + nodeDialer: s.nodeDialer, + ie: &fileTableInternalExecutor, + db: s.db, + mon: externalStorageMemoryMonitor, + knobs: s.cfg.TestingKnobs, + }) + s.stopper.AddCloser(stop.CloserFn(func() { + externalStorageMemoryMonitor.Stop(ctx) + })) // Filter out self from the gossip bootstrap addresses. filtered := s.cfg.FilterGossipBootstrapAddresses(ctx) diff --git a/pkg/server/tenant.go b/pkg/server/tenant.go index a93c304625e4..e5200384be00 100644 --- a/pkg/server/tenant.go +++ b/pkg/server/tenant.go @@ -47,6 +47,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metric" + "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/netutil" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -143,6 +144,32 @@ func startTenantInternal( histogramWindowInterval: args.HistogramWindowInterval(), settings: args.Settings, }) + + // Setup the factories that can be used to initialize an ExternalStorage + // object to interact with ExternalStorage sinks. + esb := &externalStorageBuilder{} + args.externalStorage = esb.makeExternalStorage + args.externalStorageFromURI = esb.makeExternalStorageFromURI + + externalStorageMemoryMonitor := mon.NewMonitorInheritWithLimit( + "external-storage-mem", 0 /* limit */, args.monitorAndMetrics.rootSQLMemoryMonitor) + externalStorageMemoryMonitor.Start(ctx, args.monitorAndMetrics.rootSQLMemoryMonitor, mon.BoundAccount{}) + esb.init( + ctx, + &externalStorageBuilderConfig{ + conf: sqlCfg.ExternalIODirConfig, + settings: baseCfg.Settings, + nodeIDContainer: baseCfg.IDContainer, + nodeDialer: args.nodeDialer, + ie: args.circularInternalExecutor, + db: args.db, + mon: externalStorageMemoryMonitor, + knobs: baseCfg.TestingKnobs, + }) + stopper.AddCloser(stop.CloserFn(func() { + externalStorageMemoryMonitor.Stop(ctx) + })) + closedSessionCache := sql.NewClosedSessionCache( baseCfg.Settings, args.monitorAndMetrics.rootSQLMemoryMonitor, time.Now) args.closedSessionCache = closedSessionCache @@ -509,21 +536,6 @@ func makeTenantSQLServerArgs( runtime := status.NewRuntimeStatSampler(startupCtx, clock) registry.AddMetricStruct(runtime) - esb := &externalStorageBuilder{} - externalStorage := esb.makeExternalStorage - externalStorageFromURI := esb.makeExternalStorageFromURI - - esb.init( - startupCtx, - sqlCfg.ExternalIODirConfig, - baseCfg.Settings, - baseCfg.IDContainer, - nodeDialer, - baseCfg.TestingKnobs, - circularInternalExecutor, - db, - ) - grpcServer := newGRPCServer(rpcContext) // In a SQL-only server, there is no separate node initialization // phase. Start RPC immediately in the operational state. @@ -540,8 +552,6 @@ func makeTenantSQLServerArgs( isMeta1Leaseholder: func(_ context.Context, _ hlc.ClockTimestamp) (bool, error) { return false, errors.New("isMeta1Leaseholder is not available to secondary tenants") }, - externalStorage: externalStorage, - externalStorageFromURI: externalStorageFromURI, // Set instance ID to 0 and node ID to nil to indicate // that the instance ID will be bound later during preStart. nodeIDContainer: instanceIDContainer, diff --git a/pkg/sql/importer/import_processor_test.go b/pkg/sql/importer/import_processor_test.go index ea2044b2cfad..0e1ed1f0293d 100644 --- a/pkg/sql/importer/import_processor_test.go +++ b/pkg/sql/importer/import_processor_test.go @@ -887,8 +887,7 @@ func externalStorageFactory( if err != nil { return nil, err } - return cloud.MakeExternalStorage(ctx, dest, base.ExternalIODirConfig{}, - nil, blobs.TestBlobServiceClient(workdir), nil, nil, nil) + return cloud.MakeExternalStorage(ctx, dest, base.ExternalIODirConfig{}, nil, blobs.TestBlobServiceClient(workdir), nil, nil, nil, nil) } // Helper to create and initialize testSpec. diff --git a/pkg/sql/importer/import_stmt_test.go b/pkg/sql/importer/import_stmt_test.go index 7aa17eee3fd7..7b20c45b0127 100644 --- a/pkg/sql/importer/import_stmt_test.go +++ b/pkg/sql/importer/import_stmt_test.go @@ -2549,7 +2549,8 @@ func TestImportObjectLevelRBAC(t *testing.T) { // Write to userfile storage now that testuser has CREATE privileges. ie := tc.Server(0).InternalExecutor().(*sql.InternalExecutor) fileTableSystem1, err := cloud.ExternalStorageFromURI(ctx, dest, base.ExternalIODirConfig{}, - cluster.NoSettings, blobs.TestEmptyBlobClientFactory, security.TestUserName(), ie, tc.Server(0).DB(), nil) + cluster.NoSettings, blobs.TestEmptyBlobClientFactory, security.TestUserName(), ie, + tc.Server(0).DB(), nil, nil) require.NoError(t, err) require.NoError(t, cloud.WriteFile(ctx, fileTableSystem1, filename, bytes.NewReader([]byte(data)))) } @@ -5725,12 +5726,10 @@ func TestImportPgDumpIgnoredStmts(t *testing.T) { sqlDB.CheckQueryResults(t, "SELECT * FROM foo", [][]string{{"1"}, {"2"}, {"3"}}) // Read the unsupported log and verify its contents. - store, err := cloud.ExternalStorageFromURI(ctx, ignoredLog, - base.ExternalIODirConfig{}, - tc.Server(0).ClusterSettings(), - blobs.TestEmptyBlobClientFactory, - security.RootUserName(), - tc.Server(0).InternalExecutor().(*sql.InternalExecutor), tc.Server(0).DB(), nil) + store, err := cloud.ExternalStorageFromURI(ctx, ignoredLog, base.ExternalIODirConfig{}, + tc.Server(0).ClusterSettings(), blobs.TestEmptyBlobClientFactory, security.RootUserName(), + tc.Server(0).InternalExecutor().(*sql.InternalExecutor), tc.Server(0).DB(), + nil, nil) require.NoError(t, err) defer store.Close()