Skip to content

Commit

Permalink
server, cloud: plumb an ExternalStorage memory monitor
Browse files Browse the repository at this point in the history
This change adds a dedicated ExternalStorage memory monitor
as a child of the root memory monitor. This can be used to memory
monitor ExternalStorage operations.

As a start, we memory monitor the ChunkSize that every Azure,S3 and GCS
Writer will buffer when uploading file chunks to storage. During a backup
this could be num nodes * ChunkSize of memory which was previously
unmonitored.

Release note: None
  • Loading branch information
adityamaru committed Apr 29, 2022
1 parent c5abb54 commit b0c1217
Show file tree
Hide file tree
Showing 27 changed files with 279 additions and 155 deletions.
11 changes: 3 additions & 8 deletions pkg/ccl/backupccl/backup_metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,9 @@ func TestMetadataSST(t *testing.T) {
func checkMetadata(
ctx context.Context, t *testing.T, tc *testcluster.TestCluster, backupLoc string,
) {
store, err := cloud.ExternalStorageFromURI(
ctx,
backupLoc,
base.ExternalIODirConfig{},
tc.Servers[0].ClusterSettings(),
blobs.TestEmptyBlobClientFactory,
security.RootUserName(),
tc.Servers[0].InternalExecutor().(*sql.InternalExecutor), tc.Servers[0].DB(), nil)
store, err := cloud.ExternalStorageFromURI(ctx, backupLoc, base.ExternalIODirConfig{},
tc.Servers[0].ClusterSettings(), blobs.TestEmptyBlobClientFactory, security.RootUserName(),
tc.Servers[0].InternalExecutor().(*sql.InternalExecutor), tc.Servers[0].DB(), nil, nil)
if err != nil {
t.Fatal(err)
}
Expand Down
18 changes: 6 additions & 12 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,12 +565,9 @@ func TestBackupRestoreAppend(t *testing.T) {
// Find the backup times in the collection and try RESTORE'ing to each, and
// within each also check if we can restore to individual times captured with
// incremental backups that were appended to that backup.
store, err := cloud.ExternalStorageFromURI(ctx, "userfile:///0",
base.ExternalIODirConfig{},
tc.Servers[0].ClusterSettings(),
blobs.TestEmptyBlobClientFactory,
security.RootUserName(),
tc.Servers[0].InternalExecutor().(*sql.InternalExecutor), tc.Servers[0].DB(), nil)
store, err := cloud.ExternalStorageFromURI(ctx, "userfile:///0", base.ExternalIODirConfig{},
tc.Servers[0].ClusterSettings(), blobs.TestEmptyBlobClientFactory, security.RootUserName(),
tc.Servers[0].InternalExecutor().(*sql.InternalExecutor), tc.Servers[0].DB(), nil, nil)
require.NoError(t, err)
defer store.Close()
var files []string
Expand Down Expand Up @@ -8044,12 +8041,9 @@ func TestReadBackupManifestMemoryMonitoring(t *testing.T) {
defer dirCleanupFn()

st := cluster.MakeTestingClusterSettings()
storage, err := cloud.ExternalStorageFromURI(ctx,
"nodelocal://0/test",
base.ExternalIODirConfig{},
st,
blobs.TestBlobServiceClient(dir),
security.RootUserName(), nil, nil, nil)
storage, err := cloud.ExternalStorageFromURI(ctx, "nodelocal://0/test",
base.ExternalIODirConfig{}, st, blobs.TestBlobServiceClient(dir),
security.RootUserName(), nil, nil, nil, nil)
require.NoError(t, err)

m := mon.NewMonitor("test-monitor", mon.MemoryResource, nil, nil, 0, 0, st)
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/backupccl/restore_data_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,8 @@ func runTestIngest(t *testing.T, init func(*cluster.Settings)) {
Cfg: &execinfra.ServerConfig{
DB: kvDB,
ExternalStorage: func(ctx context.Context, dest roachpb.ExternalStorage) (cloud.ExternalStorage, error) {
return cloud.MakeExternalStorage(ctx, dest, base.ExternalIODirConfig{},
s.ClusterSettings(), blobs.TestBlobServiceClient(s.ClusterSettings().ExternalIODir), nil, nil, nil)
return cloud.MakeExternalStorage(ctx, dest, base.ExternalIODirConfig{}, s.ClusterSettings(),
blobs.TestBlobServiceClient(s.ClusterSettings().ExternalIODir), nil, nil, nil, nil)
},
Settings: s.ClusterSettings(),
Codec: keys.SystemSQLCodec,
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/sink_cloudstorage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func TestCloudStorageSink(t *testing.T) {
externalStorageFromURI := func(ctx context.Context, uri string, user security.SQLUsername) (cloud.ExternalStorage,
error) {
return cloud.ExternalStorageFromURI(ctx, uri, base.ExternalIODirConfig{}, settings,
clientFactory, user, nil, nil, nil)
clientFactory, user, nil, nil, nil, nil)
}

user := security.RootUserName()
Expand Down
6 changes: 3 additions & 3 deletions pkg/ccl/cliccl/debug_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,8 +283,8 @@ func externalStorageFromURIFactory(
) (cloud.ExternalStorage, error) {
defaultSettings := &cluster.Settings{}
defaultSettings.SV.Init(ctx, nil /* opaque */)
return cloud.ExternalStorageFromURI(ctx, uri, base.ExternalIODirConfig{},
defaultSettings, newBlobFactory, user, nil /*Internal Executor*/, nil /*kvDB*/, nil)
return cloud.ExternalStorageFromURI(ctx, uri, base.ExternalIODirConfig{}, defaultSettings,
newBlobFactory, user, nil, nil, nil, nil)
}

func getManifestFromURI(ctx context.Context, path string) (backupccl.BackupManifest, error) {
Expand Down Expand Up @@ -580,7 +580,7 @@ func makeIters(
var err error
clusterSettings := cluster.MakeClusterSettings()
dirStorage[i], err = cloud.MakeExternalStorage(ctx, file.Dir, base.ExternalIODirConfig{},
clusterSettings, newBlobFactory, nil /*internal executor*/, nil /*kvDB*/, nil)
clusterSettings, newBlobFactory, nil, nil, nil, nil)
if err != nil {
return nil, nil, errors.Wrapf(err, "making external storage")
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/ccl/workloadccl/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ func GetStorage(ctx context.Context, cfg FixtureConfig) (cloud.ExternalStorage,
return nil, errors.AssertionFailedf("unsupported external storage provider; valid providers are gs, s3, and azure")
}

s, err := cloud.ExternalStorageFromURI(ctx, cfg.ObjectPathToURI(),
base.ExternalIODirConfig{}, clustersettings.MakeClusterSettings(),
nil, security.SQLUsername{}, nil, nil, nil)
s, err := cloud.ExternalStorageFromURI(ctx, cfg.ObjectPathToURI(), base.ExternalIODirConfig{},
clustersettings.MakeClusterSettings(), nil, security.SQLUsername{},
nil, nil, nil, nil)
if err != nil {
return nil, errors.Wrap(err, storageError)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/cloud/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ go_library(
"//pkg/util/ctxgroup",
"//pkg/util/ioctx",
"//pkg/util/log",
"//pkg/util/mon",
"//pkg/util/quotapool",
"//pkg/util/retry",
"//pkg/util/sysutil",
Expand Down
1 change: 1 addition & 0 deletions pkg/cloud/amazon/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ go_library(
"//pkg/util/contextutil",
"//pkg/util/ioctx",
"//pkg/util/log",
"//pkg/util/mon",
"//pkg/util/syncutil",
"//pkg/util/tracing",
"@com_github_aws_aws_sdk_go//aws",
Expand Down
57 changes: 48 additions & 9 deletions pkg/cloud/amazon/s3_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
"github.com/cockroachdb/cockroach/pkg/util/ioctx"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -79,6 +80,25 @@ type s3Storage struct {

opts s3ClientConfig
cached *s3Client
mon *mon.BytesMonitor
}

var _ cloud.ExternalStorage = &s3Storage{}

// s3StorageWriter wraps the underlying s3 cloud storage writer.
type s3StorageWriter struct {
io.WriteCloser

ctx context.Context
memAcc *mon.BoundAccount
}

// Close implements the WriteCloser interface.
func (w *s3StorageWriter) Close() error {
if w.memAcc != nil {
w.memAcc.Close(w.ctx)
}
return w.WriteCloser.Close()
}

// s3Client wraps an SDK client and uploader for a given session.
Expand Down Expand Up @@ -125,8 +145,6 @@ var s3ClientCache struct {
client *s3Client
}

var _ cloud.ExternalStorage = &s3Storage{}

type serverSideEncMode string

const (
Expand Down Expand Up @@ -191,8 +209,8 @@ func parseS3URL(_ cloud.ExternalStorageURIContext, uri *url.URL) (roachpb.Extern
return conf, nil
}

// MakeS3Storage returns an instance of S3 ExternalStorage.
func MakeS3Storage(
// makeS3Storage returns an instance of S3 ExternalStorage.
func makeS3Storage(
ctx context.Context, args cloud.ExternalStorageContext, dest roachpb.ExternalStorage,
) (cloud.ExternalStorage, error) {
telemetry.Count("external-io.s3")
Expand Down Expand Up @@ -258,6 +276,7 @@ func MakeS3Storage(
prefix: conf.Prefix,
settings: args.Settings,
opts: clientConfig(conf),
mon: args.Mon,
}

reuse := reuseSession.Get(&args.Settings.SV)
Expand Down Expand Up @@ -402,14 +421,28 @@ func (s *s3Storage) Settings() *cluster.Settings {
}

func (s *s3Storage) Writer(ctx context.Context, basename string) (io.WriteCloser, error) {
ctx, sp := tracing.ChildSpan(ctx, "s3.Writer")
sp.RecordStructured(&types.StringValue{Value: fmt.Sprintf("s3.Writer: %s", path.Join(s.prefix, basename))})

// The PartSize determines how much the uploader will buffer while uploading
// chunks to storage. This buffering is to enable retrying uploads in the face
// of transient errors. We should account for this buffer size in our memory
// monitor.
var memAcc *mon.BoundAccount
if s.mon != nil {
acc := s.mon.MakeBoundAccount()
memAcc = &acc
if err := memAcc.Grow(ctx, cloud.WriteChunkSize.Get(&s.settings.SV)); err != nil {
return nil, err
}
}

uploader, err := s.getUploader(ctx)
if err != nil {
return nil, err
}

ctx, sp := tracing.ChildSpan(ctx, "s3.Writer")
sp.RecordStructured(&types.StringValue{Value: fmt.Sprintf("s3.Writer: %s", path.Join(s.prefix, basename))})
return cloud.BackgroundPipe(ctx, func(ctx context.Context, r io.Reader) error {
w := cloud.BackgroundPipe(ctx, func(ctx context.Context, r io.Reader) error {
defer sp.Finish()
// Upload the file to S3.
// TODO(dt): test and tune the uploader parameters.
Expand All @@ -422,7 +455,13 @@ func (s *s3Storage) Writer(ctx context.Context, basename string) (io.WriteCloser
StorageClass: nilIfEmpty(s.conf.StorageClass),
})
return errors.Wrap(err, "upload failed")
}), nil
})

return &s3StorageWriter{
WriteCloser: w,
ctx: ctx,
memAcc: memAcc,
}, nil
}

func (s *s3Storage) openStreamAt(
Expand Down Expand Up @@ -610,5 +649,5 @@ func s3ErrDelay(err error) time.Duration {

func init() {
cloud.RegisterExternalStorageProvider(roachpb.ExternalStorageProvider_s3,
parseS3URL, MakeS3Storage, cloud.RedactedParams(AWSSecretParam, AWSTempTokenParam), "s3")
parseS3URL, makeS3Storage, cloud.RedactedParams(AWSSecretParam, AWSTempTokenParam), "s3")
}
20 changes: 10 additions & 10 deletions pkg/cloud/amazon/s3_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
"github.com/stretchr/testify/require"
)

func makeS3Storage(
func testingMakeS3Storage(
ctx context.Context, uri string, user security.SQLUsername,
) (cloud.ExternalStorage, error) {
conf, err := cloud.ExternalStorageConfFromURI(uri, user)
Expand All @@ -46,7 +46,7 @@ func makeS3Storage(
// Setup a sink for the given args.
clientFactory := blobs.TestBlobServiceClient(testSettings.ExternalIODir)
s, err := cloud.MakeExternalStorage(ctx, conf, base.ExternalIODirConfig{}, testSettings,
clientFactory, nil, nil, nil)
clientFactory, nil, nil, nil, nil)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -75,8 +75,8 @@ func TestPutS3(t *testing.T) {
user := security.RootUserName()
t.Run("auth-empty-no-cred", func(t *testing.T) {
_, err := cloud.ExternalStorageFromURI(ctx, fmt.Sprintf("s3://%s/%s", bucket,
"backup-test-default"), base.ExternalIODirConfig{}, testSettings,
blobs.TestEmptyBlobClientFactory, user, nil, nil, nil)
"backup-test-default"), base.ExternalIODirConfig{}, testSettings, blobs.TestEmptyBlobClientFactory,
user, nil, nil, nil, nil)
require.EqualError(t, err, fmt.Sprintf(
`%s is set to '%s', but %s is not set`,
cloud.AuthParam,
Expand Down Expand Up @@ -162,7 +162,7 @@ func TestPutS3(t *testing.T) {
cloud.AuthParam, cloud.AuthParamImplicit, AWSServerSideEncryptionMode,
"unsupported-algorithm")

_, err = makeS3Storage(ctx, invalidSSEModeURI, user)
_, err = testingMakeS3Storage(ctx, invalidSSEModeURI, user)
require.True(t, testutils.IsError(err, "unsupported server encryption mode unsupported-algorithm. Supported values are `aws:kms` and `AES256"))

// Specify aws:kms encryption mode but don't specify kms ID.
Expand All @@ -171,7 +171,7 @@ func TestPutS3(t *testing.T) {
bucket, "backup-test-sse-256",
cloud.AuthParam, cloud.AuthParamImplicit, AWSServerSideEncryptionMode,
"aws:kms")
_, err = makeS3Storage(ctx, invalidKMSURI, user)
_, err = testingMakeS3Storage(ctx, invalidKMSURI, user)
require.True(t, testutils.IsError(err, "AWS_SERVER_KMS_ID param must be set when using aws:kms server side encryption mode."))
})
}
Expand Down Expand Up @@ -215,7 +215,7 @@ func TestPutS3Endpoint(t *testing.T) {
func TestS3DisallowCustomEndpoints(t *testing.T) {
defer leaktest.AfterTest(t)()
dest := roachpb.ExternalStorage{S3Config: &roachpb.ExternalStorage_S3{Endpoint: "http://do.not.go.there/"}}
s3, err := MakeS3Storage(context.Background(),
s3, err := makeS3Storage(context.Background(),
cloud.ExternalStorageContext{
IOConf: base.ExternalIODirConfig{DisableHTTP: true},
},
Expand All @@ -231,7 +231,7 @@ func TestS3DisallowImplicitCredentials(t *testing.T) {

testSettings := cluster.MakeTestingClusterSettings()

s3, err := MakeS3Storage(context.Background(),
s3, err := makeS3Storage(context.Background(),
cloud.ExternalStorageContext{
IOConf: base.ExternalIODirConfig{DisableImplicitCredentials: true},
Settings: testSettings,
Expand Down Expand Up @@ -284,8 +284,8 @@ func TestS3BucketDoesNotExist(t *testing.T) {

// Setup a sink for the given args.
clientFactory := blobs.TestBlobServiceClient(testSettings.ExternalIODir)
s, err := cloud.MakeExternalStorage(ctx, conf, base.ExternalIODirConfig{}, testSettings,
clientFactory, nil, nil, nil)
s, err := cloud.MakeExternalStorage(ctx, conf, base.ExternalIODirConfig{},
testSettings, clientFactory, nil, nil, nil, nil)
if err != nil {
t.Fatal(err)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/cloud/azure/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ go_library(
"//pkg/settings/cluster",
"//pkg/util/contextutil",
"//pkg/util/ioctx",
"//pkg/util/mon",
"//pkg/util/tracing",
"@com_github_azure_azure_storage_blob_go//azblob",
"@com_github_cockroachdb_errors//:errors",
Expand Down
45 changes: 42 additions & 3 deletions pkg/cloud/azure/azure_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
"github.com/cockroachdb/cockroach/pkg/util/ioctx"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
"github.com/gogo/protobuf/types"
Expand Down Expand Up @@ -65,10 +66,27 @@ type azureStorage struct {
container azblob.ContainerURL
prefix string
settings *cluster.Settings
mon *mon.BytesMonitor
}

var _ cloud.ExternalStorage = &azureStorage{}

// azureStorageWriter wraps the underlying azure cloud storage writer.
type azureStorageWriter struct {
io.WriteCloser

ctx context.Context
memAcc *mon.BoundAccount
}

// Close implements the WriteCloser interface.
func (w *azureStorageWriter) Close() error {
if w.memAcc != nil {
w.memAcc.Close(w.ctx)
}
return w.WriteCloser.Close()
}

func makeAzureStorage(
_ context.Context, args cloud.ExternalStorageContext, dest roachpb.ExternalStorage,
) (cloud.ExternalStorage, error) {
Expand Down Expand Up @@ -121,15 +139,36 @@ func (s *azureStorage) Writer(ctx context.Context, basename string) (io.WriteClo
sp.RecordStructured(&types.StringValue{Value: fmt.Sprintf("azure.Writer: %s",
path.Join(s.prefix, basename))})
blob := s.getBlob(basename)
return cloud.BackgroundPipe(ctx, func(ctx context.Context, r io.Reader) error {

// The BufferSize determines how much the client will buffer while uploading
// chunks to storage. This buffering is to enable retrying uploads in the face
// of transient errors. We should account for this buffer size in our memory
// monitor.
var memAcc *mon.BoundAccount
bufferSize := int(cloud.WriteChunkSize.Get(&s.settings.SV))
if s.mon != nil {
acc := s.mon.MakeBoundAccount()
memAcc = &acc
if err := memAcc.Grow(ctx, int64(bufferSize)); err != nil {
return nil, err
}
}

w := cloud.BackgroundPipe(ctx, func(ctx context.Context, r io.Reader) error {
defer sp.Finish()
_, err := azblob.UploadStreamToBlockBlob(
ctx, r, blob, azblob.UploadStreamToBlockBlobOptions{
BufferSize: int(cloud.WriteChunkSize.Get(&s.settings.SV)),
BufferSize: bufferSize,
},
)
return err
}), nil
})

return &azureStorageWriter{
WriteCloser: w,
ctx: ctx,
memAcc: memAcc,
}, nil
}

// ReadFile is shorthand for ReadFileAt with offset 0.
Expand Down
Loading

0 comments on commit b0c1217

Please sign in to comment.