Skip to content

Commit

Permalink
ebs br: make sure fsr credit is full filled (#48627) (#48743)
Browse files Browse the repository at this point in the history
close #48629
  • Loading branch information
ti-chi-bot authored Dec 11, 2023
1 parent 3a55235 commit c870a52
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 9 deletions.
1 change: 1 addition & 0 deletions br/pkg/aws/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ go_library(
"@com_github_aws_aws_sdk_go//aws",
"@com_github_aws_aws_sdk_go//aws/awserr",
"@com_github_aws_aws_sdk_go//aws/session",
"@com_github_aws_aws_sdk_go//service/cloudwatch",
"@com_github_aws_aws_sdk_go//service/ec2",
"@com_github_aws_aws_sdk_go//service/ec2/ec2iface",
"@com_github_pingcap_errors//:errors",
Expand Down
126 changes: 119 additions & 7 deletions br/pkg/aws/ebs.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ package aws
import (
"context"
"fmt"
"math"
"strings"
"sync"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/cloudwatch"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go/service/ec2/ec2iface"
"github.com/pingcap/errors"
Expand All @@ -31,7 +33,8 @@ const (
)

type EC2Session struct {
ec2 ec2iface.EC2API
ec2 ec2iface.EC2API
cloudwatchClient *cloudwatch.CloudWatch
// aws operation concurrency
concurrency uint
}
Expand All @@ -51,7 +54,8 @@ func NewEC2Session(concurrency uint, region string) (*EC2Session, error) {
return nil, errors.Trace(err)
}
ec2Session := ec2.New(sess)
return &EC2Session{ec2: ec2Session, concurrency: concurrency}, nil
cloudwatchClient := cloudwatch.New(sess)
return &EC2Session{ec2: ec2Session, cloudwatchClient: cloudwatchClient, concurrency: concurrency}, nil
}

// CreateSnapshots is the mainly steps to control the data volume snapshots.
Expand Down Expand Up @@ -325,8 +329,63 @@ func (e *EC2Session) EnableDataFSR(meta *config.EBSBasedBRMeta, targetAZ string)
return snapshotsIDsMap, eg.Wait()
}

// waitDataFSREnabled waits FSR for data volume snapshots are all enabled
// waitDataFSREnabled waits FSR for data volume snapshots are all enabled and also have enough credit balance
func (e *EC2Session) waitDataFSREnabled(snapShotIDs []*string, targetAZ string) error {
// Record current time
start := time.Now()

// get the maximum size of volumes, in GiB
var maxVolumeSize int64 = 0
resp, err := e.ec2.DescribeSnapshots(&ec2.DescribeSnapshotsInput{SnapshotIds: snapShotIDs})
if err != nil {
return errors.Trace(err)
}
if len(resp.Snapshots) <= 0 {
return errors.Errorf("specified snapshot [%s] is not found", *snapShotIDs[0])
}

for _, s := range resp.Snapshots {
if *s.VolumeSize > maxVolumeSize {
maxVolumeSize = *s.VolumeSize
}
}

// Calculate the time in minutes to fill 1.0 credit according to
// https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ebs-fast-snapshot-restore.html#volume-creation-credits
// 5 minutes more is just for safe
fillElapsedTime := 60.0/(math.Min(10, 1024.0/(float64)(maxVolumeSize))) + 5

// We have to sleep for at least fillElapsedTime minutes in order to make credits are filled to 1.0
// Let's heartbeat every 5 minutes
for time.Since(start) <= time.Duration(fillElapsedTime)*time.Minute {
log.Info("FSR enablement is ongoing, going to sleep for 5 minutes...")
time.Sleep(5 * time.Minute)
}

// Wait that all snapshot has enough fsr credit balance, it's very likely true since we have wait for long enough
log.Info("Start check and wait all snapshots have enough fsr credit balance")

startIdx := 0
retryCount := 0
for startIdx < len(snapShotIDs) {
creditBalance, _ := e.getFSRCreditBalance(snapShotIDs[startIdx], targetAZ)
if creditBalance != nil && *creditBalance >= 1.0 {
startIdx++
retryCount = 0
} else {
if creditBalance == nil {
// For invalid calling, retry 3 times
if retryCount >= 3 {
return errors.Errorf("cloudwatch metrics for %s operation failed after retrying", *snapShotIDs[startIdx])
}
retryCount++
}
// Retry for both invalid calling and not enough fsr credit
// Cloudwatch by default flushes every 5 seconds. So, 20 seconds wait should be enough
time.Sleep(20 * time.Second)
}
}

// Create a map to store the strings as keys
pendingSnapshots := make(map[string]struct{})

Expand Down Expand Up @@ -379,6 +438,51 @@ func (e *EC2Session) waitDataFSREnabled(snapShotIDs []*string, targetAZ string)
}
}

// getFSRCreditBalance is used to get maximum fsr credit balance of snapshot for last 5 minutes
func (e *EC2Session) getFSRCreditBalance(snapshotID *string, targetAZ string) (*float64, error) {
// Set the time range to query for metrics
startTime := time.Now().Add(-5 * time.Minute)
endTime := time.Now()

// Prepare the input for the GetMetricStatisticsWithContext API call
input := &cloudwatch.GetMetricStatisticsInput{
StartTime: aws.Time(startTime),
EndTime: aws.Time(endTime),
Namespace: aws.String("AWS/EBS"),
MetricName: aws.String("FastSnapshotRestoreCreditsBalance"),
Dimensions: []*cloudwatch.Dimension{
{
Name: aws.String("SnapshotId"),
Value: snapshotID,
},
{
Name: aws.String("AvailabilityZone"),
Value: aws.String(targetAZ),
},
},
Period: aws.Int64(300),
Statistics: []*string{aws.String("Maximum")},
}

log.Info("metrics input", zap.Any("input", input))

// Call cloudwatchClient API to retrieve the FastSnapshotRestoreCreditsBalance metric data
resp, err := e.cloudwatchClient.GetMetricStatisticsWithContext(context.Background(), input)
if err != nil {
log.Error("GetMetricStatisticsWithContext failed", zap.Error(err))
return nil, errors.Trace(err)
}

// parse the response
if len(resp.Datapoints) == 0 {
log.Warn("No result for metric FastSnapshotRestoreCreditsBalance returned", zap.Stringp("snapshot", snapshotID))
return nil, nil
}
result := resp.Datapoints[0]
log.Info("credit balance", zap.Stringp("snapshot", snapshotID), zap.Float64p("credit", result.Maximum))
return result.Maximum, nil
}

// DisableDataFSR disables FSR for data volume snapshots
func (e *EC2Session) DisableDataFSR(snapshotsIDsMap map[string][]*string) error {
if len(snapshotsIDsMap) == 0 {
Expand Down Expand Up @@ -530,7 +634,7 @@ func (e *EC2Session) CreateVolumes(meta *config.EBSBasedBRMeta, volumeType strin
return newVolumeIDMap, eg.Wait()
}

func (e *EC2Session) WaitVolumesCreated(volumeIDMap map[string]string, progress glue.Progress) (int64, error) {
func (e *EC2Session) WaitVolumesCreated(volumeIDMap map[string]string, progress glue.Progress, fsrEnabledRequired bool) (int64, error) {
pendingVolumes := make([]*string, 0, len(volumeIDMap))
for oldVolID := range volumeIDMap {
newVolumeID := volumeIDMap[oldVolID]
Expand All @@ -550,7 +654,11 @@ func (e *EC2Session) WaitVolumesCreated(volumeIDMap map[string]string, progress
return 0, errors.Trace(err)
}

createdVolumeSize, unfinishedVolumes := e.HandleDescribeVolumesResponse(resp)
createdVolumeSize, unfinishedVolumes, err := e.HandleDescribeVolumesResponse(resp, fsrEnabledRequired)
if err != nil {
return 0, errors.Trace(err)
}

progress.IncBy(int64(len(pendingVolumes) - len(unfinishedVolumes)))
totalVolumeSize += createdVolumeSize
pendingVolumes = unfinishedVolumes
Expand Down Expand Up @@ -593,12 +701,16 @@ func ec2Tag(key, val string) *ec2.Tag {
return &ec2.Tag{Key: &key, Value: &val}
}

func (e *EC2Session) HandleDescribeVolumesResponse(resp *ec2.DescribeVolumesOutput) (int64, []*string) {
func (e *EC2Session) HandleDescribeVolumesResponse(resp *ec2.DescribeVolumesOutput, fsrEnabledRequired bool) (int64, []*string, error) {
totalVolumeSize := int64(0)

var unfinishedVolumes []*string
for _, volume := range resp.Volumes {
if *volume.State == ec2.VolumeStateAvailable {
if fsrEnabledRequired && volume.FastRestored != nil && !*volume.FastRestored {
log.Error("snapshot fsr is not enabled for the volume", zap.String("volume", *volume.SnapshotId))
return 0, nil, errors.Errorf("Snapshot [%s] of volume [%s] is not fsr enabled", *volume.SnapshotId, *volume.VolumeId)
}
log.Info("volume is available", zap.String("id", *volume.VolumeId))
totalVolumeSize += *volume.Size
} else {
Expand All @@ -607,5 +719,5 @@ func (e *EC2Session) HandleDescribeVolumesResponse(resp *ec2.DescribeVolumesOutp
}
}

return totalVolumeSize, unfinishedVolumes
return totalVolumeSize, unfinishedVolumes, nil
}
2 changes: 1 addition & 1 deletion br/pkg/aws/ebs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func TestHandleDescribeVolumesResponse(t *testing.T) {
}

e := &EC2Session{}
createdVolumeSize, unfinishedVolumes := e.HandleDescribeVolumesResponse(curentVolumesStates)
createdVolumeSize, unfinishedVolumes, _ := e.HandleDescribeVolumesResponse(curentVolumesStates, false)
require.Equal(t, int64(4), createdVolumeSize)
require.Equal(t, 1, len(unfinishedVolumes))
}
2 changes: 1 addition & 1 deletion br/pkg/task/restore_ebs_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ func (h *restoreEBSMetaHelper) restoreVolumes(progress glue.Progress) (map[strin
if err != nil {
return nil, 0, errors.Trace(err)
}
totalSize, err = ec2Session.WaitVolumesCreated(volumeIDMap, progress)
totalSize, err = ec2Session.WaitVolumesCreated(volumeIDMap, progress, h.cfg.UseFSR)
if err != nil {
return nil, 0, errors.Trace(err)
}
Expand Down

0 comments on commit c870a52

Please sign in to comment.