Skip to content

Commit

Permalink
Add snapshots support
Browse files Browse the repository at this point in the history
  • Loading branch information
tsmetana committed Nov 30, 2018
1 parent da7e111 commit 38721a1
Show file tree
Hide file tree
Showing 7 changed files with 706 additions and 9 deletions.
182 changes: 181 additions & 1 deletion pkg/cloud/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ const (
const (
// VolumeNameTagKey is the key value that refers to the volume's name.
VolumeNameTagKey = "com.amazon.aws.csi.volume"
// SnapshotNameTagKey is the key value that refers to the snapshot's name.
SnapshotNameTagKey = "com.amazon.aws.csi.snapshot"
)

var (
Expand Down Expand Up @@ -105,7 +107,21 @@ type DiskOptions struct {
Encrypted bool
// KmsKeyID represents a fully qualified resource name to the key to use for encryption.
// example: arn:aws:kms:us-east-1:012345678910:key/abcd1234-a123-456a-a12b-a123b4cd56ef
KmsKeyID string
KmsKeyID string
SnapshotID string
}

// Snapshot represents an EBS volume snapshot
type Snapshot struct {
SnapshotID string
SourceVolumeID string
Size int64
CreationTime time.Time
}

// SnapshotOptions represents parameters to create an EBS volume
type SnapshotOptions struct {
Tags map[string]string
}

// EC2 abstracts aws.EC2 to facilitate its mocking.
Expand All @@ -117,6 +133,9 @@ type EC2 interface {
DetachVolumeWithContext(ctx aws.Context, input *ec2.DetachVolumeInput, opts ...request.Option) (*ec2.VolumeAttachment, error)
AttachVolumeWithContext(ctx aws.Context, input *ec2.AttachVolumeInput, opts ...request.Option) (*ec2.VolumeAttachment, error)
DescribeInstancesWithContext(ctx aws.Context, input *ec2.DescribeInstancesInput, opts ...request.Option) (*ec2.DescribeInstancesOutput, error)
CreateSnapshotWithContext(ctx aws.Context, input *ec2.CreateSnapshotInput, opts ...request.Option) (*ec2.Snapshot, error)
DeleteSnapshotWithContext(ctx aws.Context, input *ec2.DeleteSnapshotInput, opts ...request.Option) (*ec2.DeleteSnapshotOutput, error)
DescribeSnapshotsWithContext(ctx aws.Context, input *ec2.DescribeSnapshotsInput, opts ...request.Option) (*ec2.DescribeSnapshotsOutput, error)
}

type Cloud interface {
Expand All @@ -128,6 +147,9 @@ type Cloud interface {
GetDiskByName(ctx context.Context, name string, capacityBytes int64) (disk *Disk, err error)
GetDiskByID(ctx context.Context, volumeID string) (disk *Disk, err error)
IsExistInstance(ctx context.Context, nodeID string) (success bool)
CreateSnapshot(ctx context.Context, volumeID string, snapshotOptions *SnapshotOptions) (snapshot *Snapshot, err error)
DeleteSnapshot(ctx context.Context, snapshotID string) (success bool, err error)
GetSnapshotByName(ctx context.Context, name string) (snapshot *Snapshot, err error)
}

type cloud struct {
Expand Down Expand Up @@ -226,6 +248,10 @@ func (c *cloud) CreateDisk(ctx context.Context, volumeName string, diskOptions *
if iops > 0 {
request.Iops = aws.Int64(iops)
}
snapshotID := diskOptions.SnapshotID
if len(snapshotID) > 0 {
request.SnapshotId = aws.String(snapshotID)
}

response, err := c.ec2.CreateVolumeWithContext(ctx, request)
if err != nil {
Expand All @@ -249,6 +275,11 @@ func (c *cloud) CreateDisk(ctx context.Context, volumeName string, diskOptions *
return nil, fmt.Errorf("failed to create encrypted volume: the volume disappeared after creation, most likely due to inaccessible KMS encryption key")
}
}
} else if len(diskOptions.SnapshotID) > 0 {
err := c.waitForCreate(ctx, volumeID)
if err != nil {
return nil, fmt.Errorf("failed to restore snapshot %s: %v", diskOptions.SnapshotID, err)
}
}

return &Disk{CapacityGiB: size, VolumeID: volumeID, AvailabilityZone: zone}, nil
Expand Down Expand Up @@ -396,6 +427,81 @@ func (c *cloud) IsExistInstance(ctx context.Context, nodeID string) bool {
return true
}

func (c *cloud) CreateSnapshot(ctx context.Context, volumeID string, snapshotOptions *SnapshotOptions) (snapshot *Snapshot, err error) {
descriptions := "Created by AWS CSI driver for volume " + volumeID

var tags []*ec2.Tag
for key, value := range snapshotOptions.Tags {
tags = append(tags, &ec2.Tag{Key: &key, Value: &value})
}
tagSpec := ec2.TagSpecification{
ResourceType: aws.String("snapshot"),
Tags: tags,
}
request := &ec2.CreateSnapshotInput{
VolumeId: aws.String(volumeID),
DryRun: aws.Bool(false),
TagSpecifications: []*ec2.TagSpecification{&tagSpec},
Description: aws.String(descriptions),
}

res, err := c.ec2.CreateSnapshotWithContext(ctx, request)
if err != nil {
return nil, fmt.Errorf("error creating snapshot of volume %s: %v", volumeID, err)
}
if res == nil {
return nil, fmt.Errorf("nil CreateSnapshotResponse")
}
err = c.waitForSnapshotCreate(ctx, res.SnapshotId)
if err != nil {
return nil, err
}

return c.ec2SnapshotResponseToStruct(res), nil
}

func (c *cloud) DeleteSnapshot(ctx context.Context, snapshotID string) (success bool, err error) {
request := &ec2.DeleteSnapshotInput{}
request.SnapshotId = aws.String(snapshotID)
request.DryRun = aws.Bool(false)
if _, err := c.ec2.DeleteSnapshotWithContext(ctx, request); err != nil {
if isAWSErrorSnapshotNotFound(err) {
return false, ErrNotFound
}
return false, fmt.Errorf("DeleteSnapshot could not delete volume: %v", err)
}
return true, nil
}

func (c *cloud) GetSnapshotByName(ctx context.Context, name string) (snapshot *Snapshot, err error) {
request := &ec2.DescribeSnapshotsInput{
Filters: []*ec2.Filter{
{
Name: aws.String("tag:" + SnapshotNameTagKey),
Values: []*string{aws.String(name)},
},
},
}

ec2snapshot, err := c.getSnapshot(ctx, request)
if err != nil {
return nil, err
}

return c.ec2SnapshotResponseToStruct(ec2snapshot), nil
}

// Helper method conerting EC2 snapshot type to the internal struct
func (c *cloud) ec2SnapshotResponseToStruct(ec2Snapshot *ec2.Snapshot) *Snapshot {
snapshotSize := util.GiBToBytes(aws.Int64Value(ec2Snapshot.VolumeSize))
return &Snapshot{
SnapshotID: aws.StringValue(ec2Snapshot.SnapshotId),
SourceVolumeID: aws.StringValue(ec2Snapshot.VolumeId),
Size: snapshotSize,
CreationTime: aws.TimeValue(ec2Snapshot.StartTime),
}
}

func (c *cloud) getVolume(ctx context.Context, request *ec2.DescribeVolumesInput) (*ec2.Volume, error) {
var volumes []*ec2.Volume
var nextToken *string
Expand Down Expand Up @@ -455,6 +561,34 @@ func (c *cloud) getInstance(ctx context.Context, nodeID string) (*ec2.Instance,
return instances[0], nil
}

func (c *cloud) getSnapshot(ctx context.Context, request *ec2.DescribeSnapshotsInput) (*ec2.Snapshot, error) {
var snapshots []*ec2.Snapshot
var nextToken *string

for {
response, err := c.ec2.DescribeSnapshotsWithContext(ctx, request)
if err != nil {
return nil, err
}
for _, snapshot := range response.Snapshots {
snapshots = append(snapshots, snapshot)
}
nextToken = response.NextToken
if aws.StringValue(nextToken) == "" {
break
}
request.NextToken = nextToken
}

if l := len(snapshots); l > 1 {
return nil, errors.New("Multiple snapshots with the same name found")
} else if l < 1 {
return nil, ErrNotFound
}

return snapshots[0], nil
}

// waitForAttachmentStatus polls until the attachment status is the expected value.
func (c *cloud) waitForAttachmentState(ctx context.Context, volumeID, state string) error {
// Most attach/detach operations on AWS finish within 1-4 seconds.
Expand Down Expand Up @@ -549,3 +683,49 @@ func isAWSErrorVolumeNotFound(err error) bool {

return false
}

// Helper function for describeSnapshot callers. Tries to retype given error to AWS error
// and returns true in case the AWS error is "InvalidSnapshot.NotFound", false otherwise
func isAWSErrorSnapshotNotFound(err error) bool {
if awsError, ok := err.(awserr.Error); ok {
// https://docs.aws.amazon.com/AWSEC2/latest/APIReference/errors-overview.html
if awsError.Code() == "InvalidSnapshot.NotFound" {
return true
}
}

return false
}

func (c *cloud) waitForSnapshotCreate(ctx context.Context, snapshotID *string) error {
var (
checkInterval = 1 * time.Second
checkTimeout = 30 * time.Second
)

request := &ec2.DescribeSnapshotsInput{
SnapshotIds: []*string{
snapshotID,
},
}

err := wait.Poll(checkInterval, checkTimeout, func() (done bool, err error) {
snapshot, err := c.getSnapshot(ctx, request)
if err != nil {
return true, err
}
if snapshot.State != nil {
switch *snapshot.State {
case "completed":
return true, nil
case "pending":
return false, nil
default:
return true, fmt.Errorf("unexpected State of newly created AWS EBS snapshot %v: %q", snapshotID, *snapshot.State)
}
}
return false, nil
})

return err
}
Loading

0 comments on commit 38721a1

Please sign in to comment.