Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement topology awareness support for dynamic provisioning #42

Merged
merged 3 commits into from
Oct 10, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 50 additions & 7 deletions pkg/cloud/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"context"
"errors"
"fmt"
"math/rand"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
Expand Down Expand Up @@ -80,26 +82,38 @@ var (
ErrAlreadyExists = errors.New("Resource already exists")
)

func init() {
rand.Seed(time.Now().UnixNano())
}
leakingtapan marked this conversation as resolved.
Show resolved Hide resolved

// Disk represents a EBS volume
type Disk struct {
VolumeID string
CapacityGiB int64
VolumeID string
CapacityGiB int64
AvailabilityZone string
}

// DiskOptions represents parameters to create an EBS volume
type DiskOptions struct {
CapacityBytes int64
Tags map[string]string
VolumeType string
IOPSPerGB int64
// the availability zone to create volume in
// if empty a random zone will be used
AvailabilityZone string
}

// EC2 abstracts aws.EC2 to facilitate its mocking.
// See https://docs.aws.amazon.com/sdk-for-go/api/service/ec2/ for details
type EC2 interface {
DescribeVolumesWithContext(ctx aws.Context, input *ec2.DescribeVolumesInput, opts ...request.Option) (*ec2.DescribeVolumesOutput, error)
CreateVolumeWithContext(ctx aws.Context, input *ec2.CreateVolumeInput, opts ...request.Option) (*ec2.Volume, error)
DeleteVolumeWithContext(ctx aws.Context, input *ec2.DeleteVolumeInput, opts ...request.Option) (*ec2.DeleteVolumeOutput, error)
DetachVolumeWithContext(ctx aws.Context, input *ec2.DetachVolumeInput, opts ...request.Option) (*ec2.VolumeAttachment, error)
AttachVolumeWithContext(ctx aws.Context, input *ec2.AttachVolumeInput, opts ...request.Option) (*ec2.VolumeAttachment, error)
DescribeInstancesWithContext(ctx aws.Context, input *ec2.DescribeInstancesInput, opts ...request.Option) (*ec2.DescribeInstancesOutput, error)
DescribeAvailabilityZonesWithContext(ctx aws.Context, input *ec2.DescribeAvailabilityZonesInput, opts ...request.Option) (*ec2.DescribeAvailabilityZonesOutput, error)
}

type Cloud interface {
Expand Down Expand Up @@ -158,8 +172,10 @@ func (c *cloud) GetMetadata() MetadataService {
}

func (c *cloud) CreateDisk(ctx context.Context, volumeName string, diskOptions *DiskOptions) (*Disk, error) {
var createType string
var iops int64
var (
createType string
iops int64
)
capacityGiB := util.BytesToGiB(diskOptions.CapacityBytes)

switch diskOptions.VolumeType {
Expand Down Expand Up @@ -189,9 +205,22 @@ func (c *cloud) CreateDisk(ctx context.Context, volumeName string, diskOptions *
Tags: tags,
}

m := c.GetMetadata()
var (
zone string
err error
)
if diskOptions.AvailabilityZone == "" {
zone, err = c.pickRandomAvailabilityZone(ctx)
if err != nil {
return nil, err
}
glog.V(5).Infof("AZ is not provided. Choose random AZ [%s]", zone)
} else {
zone = diskOptions.AvailabilityZone
}
leakingtapan marked this conversation as resolved.
Show resolved Hide resolved

request := &ec2.CreateVolumeInput{
AvailabilityZone: aws.String(m.GetAvailabilityZone()),
AvailabilityZone: aws.String(zone),
leakingtapan marked this conversation as resolved.
Show resolved Hide resolved
Size: aws.Int64(capacityGiB),
VolumeType: aws.String(createType),
TagSpecifications: []*ec2.TagSpecification{&tagSpec},
Expand All @@ -215,7 +244,7 @@ func (c *cloud) CreateDisk(ctx context.Context, volumeName string, diskOptions *
return nil, fmt.Errorf("disk size was not returned by CreateVolume")
}

return &Disk{CapacityGiB: size, VolumeID: volumeID}, nil
return &Disk{CapacityGiB: size, VolumeID: volumeID, AvailabilityZone: zone}, nil
}

func (c *cloud) DeleteDisk(ctx context.Context, volumeID string) (bool, error) {
Expand Down Expand Up @@ -435,3 +464,17 @@ func (c *cloud) getInstance(ctx context.Context, nodeID string) (*ec2.Instance,

return instances[0], nil
}

func (c *cloud) pickRandomAvailabilityZone(ctx context.Context) (string, error) {
output, err := c.ec2.DescribeAvailabilityZonesWithContext(ctx, &ec2.DescribeAvailabilityZonesInput{})
if err != nil {
return "", err
}

var zones []string
for _, zone := range output.AvailabilityZones {
zones = append(zones, *zone.ZoneName)
}

return zones[rand.Int()%len(zones)], nil
}
42 changes: 38 additions & 4 deletions pkg/cloud/cloud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,23 @@ func TestCreateDisk(t *testing.T) {
name: "success: normal",
volumeName: "vol-test-name",
diskOptions: &DiskOptions{
CapacityBytes: util.GiBToBytes(1),
Tags: map[string]string{VolumeNameTagKey: "vol-test"},
CapacityBytes: util.GiBToBytes(1),
Tags: map[string]string{VolumeNameTagKey: "vol-test"},
AvailabilityZone: "",
},
expDisk: &Disk{
VolumeID: "vol-test",
CapacityGiB: 1,
},
expErr: nil,
},
{
name: "success: normal with provided zone",
volumeName: "vol-test-name",
diskOptions: &DiskOptions{
CapacityBytes: util.GiBToBytes(1),
Tags: map[string]string{VolumeNameTagKey: "vol-test"},
AvailabilityZone: "us-west-2",
},
expDisk: &Disk{
VolumeID: "vol-test",
Expand All @@ -56,8 +71,9 @@ func TestCreateDisk(t *testing.T) {
name: "fail: CreateVolume returned an error",
volumeName: "vol-test-name-error",
diskOptions: &DiskOptions{
CapacityBytes: util.GiBToBytes(1),
Tags: map[string]string{VolumeNameTagKey: "vol-test"},
CapacityBytes: util.GiBToBytes(1),
Tags: map[string]string{VolumeNameTagKey: "vol-test"},
AvailabilityZone: "",
},
expErr: fmt.Errorf("CreateVolume generic error"),
},
Expand All @@ -80,6 +96,24 @@ func TestCreateDisk(t *testing.T) {
ctx := context.Background()
mockEC2.EXPECT().CreateVolumeWithContext(gomock.Eq(ctx), gomock.Any()).Return(vol, tc.expErr)

if tc.diskOptions.AvailabilityZone == "" {
describeAvailabilityZonesResp := &ec2.DescribeAvailabilityZonesOutput{
AvailabilityZones: []*ec2.AvailabilityZone{
&ec2.AvailabilityZone{
ZoneName: aws.String("us-west-2a"),
},
&ec2.AvailabilityZone{
ZoneName: aws.String("us-west-2b"),
},
&ec2.AvailabilityZone{
ZoneName: aws.String("us-west-2c"),
},
},
}

mockEC2.EXPECT().DescribeAvailabilityZonesWithContext(gomock.Eq(ctx), gomock.Any()).Return(describeAvailabilityZonesResp, nil)
}

disk, err := c.CreateDisk(ctx, tc.volumeName, tc.diskOptions)
if err != nil {
if tc.expErr == nil {
Expand Down
18 changes: 18 additions & 0 deletions pkg/cloud/mocks/mock_ec2.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

66 changes: 50 additions & 16 deletions pkg/driver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,24 +68,23 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
}
}

if disk == nil {
opts := &cloud.DiskOptions{
CapacityBytes: volSizeBytes,
Tags: map[string]string{cloud.VolumeNameTagKey: volName},
}
newDisk, err := d.cloud.CreateDisk(ctx, volName, opts)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not create volume %q: %v", volName, err)
}
disk = newDisk
// volume exists already
if disk != nil {
return newCreateVolumeResponse(disk), nil
}

return &csi.CreateVolumeResponse{
Volume: &csi.Volume{
Id: disk.VolumeID,
CapacityBytes: util.GiBToBytes(disk.CapacityGiB),
},
}, nil
// create a new volume
zone := pickAvailabilityZone(req.GetAccessibilityRequirements())
opts := &cloud.DiskOptions{
CapacityBytes: volSizeBytes,
AvailabilityZone: zone,
Tags: map[string]string{cloud.VolumeNameTagKey: volName},
}
disk, err = d.cloud.CreateDisk(ctx, volName, opts)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not create volume %q: %v", volName, err)
}
return newCreateVolumeResponse(disk), nil
}

func (d *Driver) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
Expand Down Expand Up @@ -253,3 +252,38 @@ func (d *Driver) DeleteSnapshot(ctx context.Context, req *csi.DeleteSnapshotRequ
func (d *Driver) ListSnapshots(ctx context.Context, req *csi.ListSnapshotsRequest) (*csi.ListSnapshotsResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}

// pickAvailabilityZone selects 1 zone given topology requirement.
// if not found, empty string is returned.
func pickAvailabilityZone(requirement *csi.TopologyRequirement) string {
if requirement == nil {
return ""
}
for _, topology := range requirement.GetPreferred() {
zone, exists := topology.GetSegments()[topologyKey]
if exists {
return zone
}
}
for _, topology := range requirement.GetRequisite() {
zone, exists := topology.GetSegments()[topologyKey]
if exists {
return zone
}
}
return ""
}

func newCreateVolumeResponse(disk *cloud.Disk) *csi.CreateVolumeResponse {
return &csi.CreateVolumeResponse{
Volume: &csi.Volume{
Id: disk.VolumeID,
CapacityBytes: util.GiBToBytes(disk.CapacityGiB),
AccessibleTopology: []*csi.Topology{
&csi.Topology{
Segments: map[string]string{topologyKey: disk.AvailabilityZone},
},
},
},
}
}
62 changes: 61 additions & 1 deletion pkg/driver/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func TestCreateVolume(t *testing.T) {
t.Fatalf("Could not get error status code from error: %v", srvErr)
}
if srvErr.Code() != tc.expErrCode {
t.Fatalf("Expected error code %d, got %d", tc.expErrCode, srvErr.Code())
t.Fatalf("Expected error code %d, got %d message %s", tc.expErrCode, srvErr.Code(), srvErr.Message())
}
continue
}
Expand Down Expand Up @@ -235,3 +235,63 @@ func TestDeleteVolume(t *testing.T) {
}
}
}

func TestPickAvailabilityZone(t *testing.T) {
expZone := "us-west-2b"
testCases := []struct {
name string
requirement *csi.TopologyRequirement
expZone string
}{
{
name: "Pick from preferred",
requirement: &csi.TopologyRequirement{
Requisite: []*csi.Topology{
&csi.Topology{
Segments: map[string]string{topologyKey: expZone},
},
},
Preferred: []*csi.Topology{
&csi.Topology{
Segments: map[string]string{topologyKey: expZone},
},
},
},
expZone: expZone,
},
{
name: "Pick from requisite",
requirement: &csi.TopologyRequirement{
Requisite: []*csi.Topology{
&csi.Topology{
Segments: map[string]string{topologyKey: expZone},
},
},
},
expZone: expZone,
},
{
name: "Pick from empty topology",
requirement: &csi.TopologyRequirement{
Preferred: []*csi.Topology{&csi.Topology{}},
Requisite: []*csi.Topology{&csi.Topology{}},
},
expZone: "",
},
{
name: "Topology Requirement is nil",
requirement: nil,
expZone: "",
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
actual := pickAvailabilityZone(tc.requirement)
if actual != tc.expZone {
t.Fatalf("Expected zone %v, got zone: %v", tc.expZone, actual)
}
})
}

}
1 change: 1 addition & 0 deletions pkg/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
const (
driverName = "com.amazon.aws.csi.ebs"
vendorVersion = "0.0.1" // FIXME
topologyKey = driverName + "/zone"
)

type Driver struct {
Expand Down
8 changes: 7 additions & 1 deletion pkg/driver/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,14 @@ func (d *Driver) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabi
func (d *Driver) NodeGetInfo(ctx context.Context, req *csi.NodeGetInfoRequest) (*csi.NodeGetInfoResponse, error) {
glog.V(4).Infof("NodeGetInfo: called with args %#v", req)
m := d.cloud.GetMetadata()

topology := &csi.Topology{
Segments: map[string]string{topologyKey: m.GetAvailabilityZone()},
}

return &csi.NodeGetInfoResponse{
NodeId: m.GetInstanceID(),
NodeId: m.GetInstanceID(),
AccessibleTopology: topology,
}, nil
}

Expand Down