diff --git a/go/vt/mysqlctl/builtinbackupengine.go b/go/vt/mysqlctl/builtinbackupengine.go index 494d765f2a9..b8350a731d4 100644 --- a/go/vt/mysqlctl/builtinbackupengine.go +++ b/go/vt/mysqlctl/builtinbackupengine.go @@ -399,8 +399,8 @@ func (be *BuiltinBackupEngine) executeFullBackup(ctx context.Context, params Bac // Save initial state so we can restore. replicaStartRequired := false sourceIsPrimary := false - superReadOnly := true //nolint - readOnly := true //nolint + superReadOnly := true // nolint + readOnly := true // nolint var replicationPosition replication.Position semiSyncSource, semiSyncReplica := params.Mysqld.SemiSyncEnabled(ctx) @@ -793,7 +793,11 @@ func (bp *backupPipe) ReportProgress(period time.Duration, logger logutil.Logger // backupFile backs up an individual file. func (be *BuiltinBackupEngine) backupFile(ctx context.Context, params BackupParams, bh backupstorage.BackupHandle, fe *FileEntry, name string) (finalErr error) { ctx, cancel := context.WithCancel(ctx) - defer cancel() + defer func() { + if finalErr != nil { + cancel() + } + }() // Open the source file for reading. openSourceAt := time.Now() source, err := fe.open(params.Cnf, true) diff --git a/go/vt/mysqlctl/s3backupstorage/s3.go b/go/vt/mysqlctl/s3backupstorage/s3.go index 1af1362ae30..b3a8117aafa 100644 --- a/go/vt/mysqlctl/s3backupstorage/s3.go +++ b/go/vt/mysqlctl/s3backupstorage/s3.go @@ -44,6 +44,7 @@ import ( "github.com/aws/aws-sdk-go-v2/feature/s3/manager" "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/aws/aws-sdk-go-v2/service/s3/types" + transport "github.com/aws/smithy-go/endpoints" "github.com/aws/smithy-go/middleware" "github.com/spf13/pflag" @@ -110,6 +111,23 @@ var logNameMap logNameToLogLevel const sseCustomerPrefix = "sse_c:" +type endpointResolver struct { + r s3.EndpointResolverV2 + endpoint *string +} + +func (er *endpointResolver) ResolveEndpoint(ctx context.Context, params s3.EndpointParameters) (transport.Endpoint, error) { + params.Endpoint = er.endpoint + return er.r.ResolveEndpoint(ctx, params) +} + +func newEndpointResolver() *endpointResolver { + return &endpointResolver{ + r: s3.NewDefaultEndpointResolverV2(), + endpoint: &endpoint, + } +} + type iClient interface { manager.UploadAPIClient manager.DownloadAPIClient @@ -182,8 +200,7 @@ func (bh *S3BackupHandle) AddFile(ctx context.Context, filename string, filesize }) object := objName(bh.dir, bh.name, filename) sendStats := bh.bs.params.Stats.Scope(stats.Operation("AWS:Request:Send")) - // Using UploadWithContext breaks uploading to Minio and Ceph https://github.com/vitessio/vitess/issues/14188 - _, err := uploader.Upload(context.Background(), &s3.PutObjectInput{ + _, err := uploader.Upload(ctx, &s3.PutObjectInput{ Bucket: &bucket, Key: &object, Body: reader, @@ -494,7 +511,7 @@ func (bs *S3BackupStorage) client() (*s3.Client, error) { o.RetryMaxAttempts = retryCount o.Retryer = &ClosedConnectionRetryer{} } - }) + }, s3.WithEndpointResolverV2(newEndpointResolver())) if len(bucket) == 0 { return nil, fmt.Errorf("--s3_backup_storage_bucket required") diff --git a/go/vt/mysqlctl/xtrabackupengine.go b/go/vt/mysqlctl/xtrabackupengine.go index 784d718af26..daba8bb1ec3 100644 --- a/go/vt/mysqlctl/xtrabackupengine.go +++ b/go/vt/mysqlctl/xtrabackupengine.go @@ -316,8 +316,15 @@ func (be *XtrabackupEngine) backupFiles( // would impose a timeout that starts counting right now, so it would // include the time spent uploading the file content. We only want to impose // a timeout on the final Close() step. + // This context also allows us to immediately abort AddFiles if we encountered + // an error in this function. addFilesCtx, cancelAddFiles := context.WithCancel(ctx) - defer cancelAddFiles() + defer func() { + if finalErr != nil { + cancelAddFiles() + } + }() + destFiles, err := addStripeFiles(addFilesCtx, params, bh, backupFileName, numStripes) if err != nil { return replicationPosition, vterrors.Wrapf(err, "cannot create backup file %v", backupFileName)