Skip to content

Commit

Permalink
Merge pull request #42 from leakingtapan/topology
Browse files Browse the repository at this point in the history
Implement topology awareness support for dynamic provisioning
  • Loading branch information
k8s-ci-robot authored Oct 10, 2018
2 parents 17a60a2 + 7d48363 commit fa25e65
Show file tree
Hide file tree
Showing 7 changed files with 225 additions and 29 deletions.
57 changes: 50 additions & 7 deletions pkg/cloud/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"context"
"errors"
"fmt"
"math/rand"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
Expand Down Expand Up @@ -80,26 +82,38 @@ var (
ErrAlreadyExists = errors.New("Resource already exists")
)

func init() {
rand.Seed(time.Now().UnixNano())
}

// Disk represents a EBS volume
type Disk struct {
VolumeID string
CapacityGiB int64
VolumeID string
CapacityGiB int64
AvailabilityZone string
}

// DiskOptions represents parameters to create an EBS volume
type DiskOptions struct {
CapacityBytes int64
Tags map[string]string
VolumeType string
IOPSPerGB int64
// the availability zone to create volume in
// if empty a random zone will be used
AvailabilityZone string
}

// EC2 abstracts aws.EC2 to facilitate its mocking.
// See https://docs.aws.amazon.com/sdk-for-go/api/service/ec2/ for details
type EC2 interface {
DescribeVolumesWithContext(ctx aws.Context, input *ec2.DescribeVolumesInput, opts ...request.Option) (*ec2.DescribeVolumesOutput, error)
CreateVolumeWithContext(ctx aws.Context, input *ec2.CreateVolumeInput, opts ...request.Option) (*ec2.Volume, error)
DeleteVolumeWithContext(ctx aws.Context, input *ec2.DeleteVolumeInput, opts ...request.Option) (*ec2.DeleteVolumeOutput, error)
DetachVolumeWithContext(ctx aws.Context, input *ec2.DetachVolumeInput, opts ...request.Option) (*ec2.VolumeAttachment, error)
AttachVolumeWithContext(ctx aws.Context, input *ec2.AttachVolumeInput, opts ...request.Option) (*ec2.VolumeAttachment, error)
DescribeInstancesWithContext(ctx aws.Context, input *ec2.DescribeInstancesInput, opts ...request.Option) (*ec2.DescribeInstancesOutput, error)
DescribeAvailabilityZonesWithContext(ctx aws.Context, input *ec2.DescribeAvailabilityZonesInput, opts ...request.Option) (*ec2.DescribeAvailabilityZonesOutput, error)
}

type Cloud interface {
Expand Down Expand Up @@ -158,8 +172,10 @@ func (c *cloud) GetMetadata() MetadataService {
}

func (c *cloud) CreateDisk(ctx context.Context, volumeName string, diskOptions *DiskOptions) (*Disk, error) {
var createType string
var iops int64
var (
createType string
iops int64
)
capacityGiB := util.BytesToGiB(diskOptions.CapacityBytes)

switch diskOptions.VolumeType {
Expand Down Expand Up @@ -189,9 +205,22 @@ func (c *cloud) CreateDisk(ctx context.Context, volumeName string, diskOptions *
Tags: tags,
}

m := c.GetMetadata()
var (
zone string
err error
)
if diskOptions.AvailabilityZone == "" {
zone, err = c.pickRandomAvailabilityZone(ctx)
if err != nil {
return nil, err
}
glog.V(5).Infof("AZ is not provided. Choose random AZ [%s]", zone)
} else {
zone = diskOptions.AvailabilityZone
}

request := &ec2.CreateVolumeInput{
AvailabilityZone: aws.String(m.GetAvailabilityZone()),
AvailabilityZone: aws.String(zone),
Size: aws.Int64(capacityGiB),
VolumeType: aws.String(createType),
TagSpecifications: []*ec2.TagSpecification{&tagSpec},
Expand All @@ -215,7 +244,7 @@ func (c *cloud) CreateDisk(ctx context.Context, volumeName string, diskOptions *
return nil, fmt.Errorf("disk size was not returned by CreateVolume")
}

return &Disk{CapacityGiB: size, VolumeID: volumeID}, nil
return &Disk{CapacityGiB: size, VolumeID: volumeID, AvailabilityZone: zone}, nil
}

func (c *cloud) DeleteDisk(ctx context.Context, volumeID string) (bool, error) {
Expand Down Expand Up @@ -435,3 +464,17 @@ func (c *cloud) getInstance(ctx context.Context, nodeID string) (*ec2.Instance,

return instances[0], nil
}

func (c *cloud) pickRandomAvailabilityZone(ctx context.Context) (string, error) {
output, err := c.ec2.DescribeAvailabilityZonesWithContext(ctx, &ec2.DescribeAvailabilityZonesInput{})
if err != nil {
return "", err
}

var zones []string
for _, zone := range output.AvailabilityZones {
zones = append(zones, *zone.ZoneName)
}

return zones[rand.Int()%len(zones)], nil
}
42 changes: 38 additions & 4 deletions pkg/cloud/cloud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,23 @@ func TestCreateDisk(t *testing.T) {
name: "success: normal",
volumeName: "vol-test-name",
diskOptions: &DiskOptions{
CapacityBytes: util.GiBToBytes(1),
Tags: map[string]string{VolumeNameTagKey: "vol-test"},
CapacityBytes: util.GiBToBytes(1),
Tags: map[string]string{VolumeNameTagKey: "vol-test"},
AvailabilityZone: "",
},
expDisk: &Disk{
VolumeID: "vol-test",
CapacityGiB: 1,
},
expErr: nil,
},
{
name: "success: normal with provided zone",
volumeName: "vol-test-name",
diskOptions: &DiskOptions{
CapacityBytes: util.GiBToBytes(1),
Tags: map[string]string{VolumeNameTagKey: "vol-test"},
AvailabilityZone: "us-west-2",
},
expDisk: &Disk{
VolumeID: "vol-test",
Expand All @@ -56,8 +71,9 @@ func TestCreateDisk(t *testing.T) {
name: "fail: CreateVolume returned an error",
volumeName: "vol-test-name-error",
diskOptions: &DiskOptions{
CapacityBytes: util.GiBToBytes(1),
Tags: map[string]string{VolumeNameTagKey: "vol-test"},
CapacityBytes: util.GiBToBytes(1),
Tags: map[string]string{VolumeNameTagKey: "vol-test"},
AvailabilityZone: "",
},
expErr: fmt.Errorf("CreateVolume generic error"),
},
Expand All @@ -80,6 +96,24 @@ func TestCreateDisk(t *testing.T) {
ctx := context.Background()
mockEC2.EXPECT().CreateVolumeWithContext(gomock.Eq(ctx), gomock.Any()).Return(vol, tc.expErr)

if tc.diskOptions.AvailabilityZone == "" {
describeAvailabilityZonesResp := &ec2.DescribeAvailabilityZonesOutput{
AvailabilityZones: []*ec2.AvailabilityZone{
&ec2.AvailabilityZone{
ZoneName: aws.String("us-west-2a"),
},
&ec2.AvailabilityZone{
ZoneName: aws.String("us-west-2b"),
},
&ec2.AvailabilityZone{
ZoneName: aws.String("us-west-2c"),
},
},
}

mockEC2.EXPECT().DescribeAvailabilityZonesWithContext(gomock.Eq(ctx), gomock.Any()).Return(describeAvailabilityZonesResp, nil)
}

disk, err := c.CreateDisk(ctx, tc.volumeName, tc.diskOptions)
if err != nil {
if tc.expErr == nil {
Expand Down
18 changes: 18 additions & 0 deletions pkg/cloud/mocks/mock_ec2.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

66 changes: 50 additions & 16 deletions pkg/driver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,24 +68,23 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
}
}

if disk == nil {
opts := &cloud.DiskOptions{
CapacityBytes: volSizeBytes,
Tags: map[string]string{cloud.VolumeNameTagKey: volName},
}
newDisk, err := d.cloud.CreateDisk(ctx, volName, opts)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not create volume %q: %v", volName, err)
}
disk = newDisk
// volume exists already
if disk != nil {
return newCreateVolumeResponse(disk), nil
}

return &csi.CreateVolumeResponse{
Volume: &csi.Volume{
Id: disk.VolumeID,
CapacityBytes: util.GiBToBytes(disk.CapacityGiB),
},
}, nil
// create a new volume
zone := pickAvailabilityZone(req.GetAccessibilityRequirements())
opts := &cloud.DiskOptions{
CapacityBytes: volSizeBytes,
AvailabilityZone: zone,
Tags: map[string]string{cloud.VolumeNameTagKey: volName},
}
disk, err = d.cloud.CreateDisk(ctx, volName, opts)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not create volume %q: %v", volName, err)
}
return newCreateVolumeResponse(disk), nil
}

func (d *Driver) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
Expand Down Expand Up @@ -253,3 +252,38 @@ func (d *Driver) DeleteSnapshot(ctx context.Context, req *csi.DeleteSnapshotRequ
func (d *Driver) ListSnapshots(ctx context.Context, req *csi.ListSnapshotsRequest) (*csi.ListSnapshotsResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}

// pickAvailabilityZone selects 1 zone given topology requirement.
// if not found, empty string is returned.
func pickAvailabilityZone(requirement *csi.TopologyRequirement) string {
if requirement == nil {
return ""
}
for _, topology := range requirement.GetPreferred() {
zone, exists := topology.GetSegments()[topologyKey]
if exists {
return zone
}
}
for _, topology := range requirement.GetRequisite() {
zone, exists := topology.GetSegments()[topologyKey]
if exists {
return zone
}
}
return ""
}

func newCreateVolumeResponse(disk *cloud.Disk) *csi.CreateVolumeResponse {
return &csi.CreateVolumeResponse{
Volume: &csi.Volume{
Id: disk.VolumeID,
CapacityBytes: util.GiBToBytes(disk.CapacityGiB),
AccessibleTopology: []*csi.Topology{
&csi.Topology{
Segments: map[string]string{topologyKey: disk.AvailabilityZone},
},
},
},
}
}
62 changes: 61 additions & 1 deletion pkg/driver/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func TestCreateVolume(t *testing.T) {
t.Fatalf("Could not get error status code from error: %v", srvErr)
}
if srvErr.Code() != tc.expErrCode {
t.Fatalf("Expected error code %d, got %d", tc.expErrCode, srvErr.Code())
t.Fatalf("Expected error code %d, got %d message %s", tc.expErrCode, srvErr.Code(), srvErr.Message())
}
continue
}
Expand Down Expand Up @@ -235,3 +235,63 @@ func TestDeleteVolume(t *testing.T) {
}
}
}

func TestPickAvailabilityZone(t *testing.T) {
expZone := "us-west-2b"
testCases := []struct {
name string
requirement *csi.TopologyRequirement
expZone string
}{
{
name: "Pick from preferred",
requirement: &csi.TopologyRequirement{
Requisite: []*csi.Topology{
&csi.Topology{
Segments: map[string]string{topologyKey: expZone},
},
},
Preferred: []*csi.Topology{
&csi.Topology{
Segments: map[string]string{topologyKey: expZone},
},
},
},
expZone: expZone,
},
{
name: "Pick from requisite",
requirement: &csi.TopologyRequirement{
Requisite: []*csi.Topology{
&csi.Topology{
Segments: map[string]string{topologyKey: expZone},
},
},
},
expZone: expZone,
},
{
name: "Pick from empty topology",
requirement: &csi.TopologyRequirement{
Preferred: []*csi.Topology{&csi.Topology{}},
Requisite: []*csi.Topology{&csi.Topology{}},
},
expZone: "",
},
{
name: "Topology Requirement is nil",
requirement: nil,
expZone: "",
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
actual := pickAvailabilityZone(tc.requirement)
if actual != tc.expZone {
t.Fatalf("Expected zone %v, got zone: %v", tc.expZone, actual)
}
})
}

}
1 change: 1 addition & 0 deletions pkg/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
const (
driverName = "com.amazon.aws.csi.ebs"
vendorVersion = "0.0.1" // FIXME
topologyKey = driverName + "/zone"
)

type Driver struct {
Expand Down
8 changes: 7 additions & 1 deletion pkg/driver/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,14 @@ func (d *Driver) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabi
func (d *Driver) NodeGetInfo(ctx context.Context, req *csi.NodeGetInfoRequest) (*csi.NodeGetInfoResponse, error) {
glog.V(4).Infof("NodeGetInfo: called with args %#v", req)
m := d.cloud.GetMetadata()

topology := &csi.Topology{
Segments: map[string]string{topologyKey: m.GetAvailabilityZone()},
}

return &csi.NodeGetInfoResponse{
NodeId: m.GetInstanceID(),
NodeId: m.GetInstanceID(),
AccessibleTopology: topology,
}, nil
}

Expand Down

0 comments on commit fa25e65

Please sign in to comment.