Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Digital Ocean] Handle logic for kops edit/update cluster #9116

Merged
merged 6 commits into from
May 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/commands/status_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,5 +107,9 @@ func (s *CloudDiscoveryStatusStore) FindClusterStatus(cluster *kops.Cluster) (*k
if osCloud, ok := cloud.(openstack.OpenstackCloud); ok {
return osCloud.FindClusterStatus(cluster)
}

if doCloud, ok := cloud.(*digitalocean.Cloud); ok {
return doCloud.FindClusterStatus(cluster)
}
return nil, fmt.Errorf("etcd Status not implemented for %T", cloud)
}
1 change: 1 addition & 0 deletions pkg/resources/digitalocean/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ go_library(
"//pkg/cloudinstances:go_default_library",
"//pkg/resources:go_default_library",
"//pkg/resources/digitalocean/dns:go_default_library",
"//protokube/pkg/etcd:go_default_library",
"//upup/pkg/fi:go_default_library",
"//vendor/github.com/digitalocean/godo:go_default_library",
"//vendor/golang.org/x/oauth2:go_default_library",
Expand Down
103 changes: 103 additions & 0 deletions pkg/resources/digitalocean/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,13 @@ import (
"k8s.io/kops/pkg/apis/kops"
"k8s.io/kops/pkg/cloudinstances"
"k8s.io/kops/pkg/resources/digitalocean/dns"
"k8s.io/kops/protokube/pkg/etcd"
"k8s.io/kops/upup/pkg/fi"
)

const TagKubernetesClusterIndex = "k8s-index"
const TagKubernetesClusterNamePrefix = "KubernetesCluster"

// TokenSource implements oauth2.TokenSource
type TokenSource struct {
AccessToken string
Expand Down Expand Up @@ -139,6 +143,10 @@ func (c *Cloud) LoadBalancers() godo.LoadBalancersService {
return c.Client.LoadBalancers
}

func (c *Cloud) GetAllLoadBalancers() ([]godo.LoadBalancer, error) {
return getAllLoadBalancers(c)
}

// FindVPCInfo is not implemented, it's only here to satisfy the fi.Cloud interface
func (c *Cloud) FindVPCInfo(id string) (*fi.VPCInfo, error) {
return nil, errors.New("not implemented")
Expand Down Expand Up @@ -175,3 +183,98 @@ func (c *Cloud) GetApiIngressStatus(cluster *kops.Cluster) ([]kops.ApiIngressSta

return nil, nil
}

// FindClusterStatus discovers the status of the cluster, by looking for the tagged etcd volumes
func (c *Cloud) FindClusterStatus(cluster *kops.Cluster) (*kops.ClusterStatus, error) {
etcdStatus, err := findEtcdStatus(c, cluster)
if err != nil {
return nil, err
}
status := &kops.ClusterStatus{
EtcdClusters: etcdStatus,
}
klog.V(2).Infof("Cluster status (from cloud): %v", fi.DebugAsJsonString(status))
return status, nil
}

// findEtcdStatus discovers the status of etcd, by looking for the tagged etcd volumes
func findEtcdStatus(c *Cloud, cluster *kops.Cluster) ([]kops.EtcdClusterStatus, error) {
statusMap := make(map[string]*kops.EtcdClusterStatus)
volumes, err := getAllVolumesByRegion(c, c.RegionName)

if err != nil {
return nil, fmt.Errorf("failed to get all volumes by region from %s: %v", c.RegionName, err)
}

for _, volume := range volumes {
volumeID := volume.ID

etcdClusterName := ""
var etcdClusterSpec *etcd.EtcdClusterSpec

for _, myTag := range volume.Tags {
klog.V(8).Infof("findEtcdStatus status (from cloud): checking if volume with tag %q belongs to cluster", myTag)
// check if volume belongs to this cluster.
// tag will be in the format "KubernetesCluster:dev5-k8s-local" (where clusterName is dev5.k8s.local)
clusterName := strings.Replace(cluster.Name, ".", "-", -1)
if strings.Contains(myTag, fmt.Sprintf("%s:%s", TagKubernetesClusterNamePrefix, clusterName)) {
klog.V(10).Infof("findEtcdStatus cluster comparison matched for tag: %v", myTag)
Copy link

@timoreimann timoreimann May 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering if 10 could be too high. AFAIK, level 8 is where Kubernetes tends to log very specific details at the HTTP request level. I'm not too familiar with how kops handles log levels in general, so not a super strong opinion here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will keep it with 10 for now. There are lot places I had seen 10 before, and that's how I continued to use that. I'll keep a note to change this to 8 for all DO work as a separate PR at a later point.

// this volume belongs to our cluster, add this to our etcdClusterSpec.
// loop through the tags again and
for _, volumeTag := range volume.Tags {
if strings.Contains(volumeTag, TagKubernetesClusterIndex) {
volumeTagParts := strings.Split(volumeTag, ":")
if len(volumeTagParts) < 2 {
return nil, fmt.Errorf("volume tag split failed, too few components for tag %q on volume %q", volumeTag, volume)
}
dropletIndex := volumeTagParts[1]
etcdClusterSpec, err = c.getEtcdClusterSpec(volume.Name, dropletIndex)
if err != nil {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error should be handled before we log the spec in the line right above.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, updated.

return nil, fmt.Errorf("error parsing etcd cluster tag %q on volume %q: %v", volumeTag, volumeID, err)
}

klog.V(10).Infof("findEtcdStatus etcdClusterSpec: %v", fi.DebugAsJsonString(etcdClusterSpec))
etcdClusterName = etcdClusterSpec.ClusterKey
status := statusMap[etcdClusterName]
if status == nil {
status = &kops.EtcdClusterStatus{
Name: etcdClusterName,
}
statusMap[etcdClusterName] = status
}

memberName := etcdClusterSpec.NodeName
status.Members = append(status.Members, &kops.EtcdMemberStatus{
Name: memberName,
VolumeId: volume.ID,
})
}
}
}
}
}

status := make([]kops.EtcdClusterStatus, 0, len(statusMap))
for _, v := range statusMap {
status = append(status, *v)
}

return status, nil
}

func (c *Cloud) getEtcdClusterSpec(volumeName string, dropletName string) (*etcd.EtcdClusterSpec, error) {
var clusterKey string
if strings.Contains(volumeName, "etcd-main") {
clusterKey = "main"
} else if strings.Contains(volumeName, "etcd-events") {
clusterKey = "events"
} else {
return nil, fmt.Errorf("could not determine etcd cluster type for volume: %s", volumeName)
}

return &etcd.EtcdClusterSpec{
ClusterKey: clusterKey,
NodeName: dropletName,
NodeNames: []string{dropletName},
}, nil
}
63 changes: 52 additions & 11 deletions upup/pkg/fi/cloudup/dotasks/loadbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"errors"
"fmt"
"net"
"strings"
"time"

"github.com/digitalocean/godo"
Expand Down Expand Up @@ -49,6 +50,7 @@ func (lb *LoadBalancer) CompareWithID() *string {
}

func (lb *LoadBalancer) Find(c *fi.Context) (*LoadBalancer, error) {
klog.V(10).Infof("load balancer FIND - ID=%s, name=%s", fi.StringValue(lb.ID), fi.StringValue(lb.Name))
if fi.StringValue(lb.ID) == "" {
// Loadbalancer = nil if not found
return nil, nil
Expand Down Expand Up @@ -97,7 +99,6 @@ func (_ *LoadBalancer) CheckChanges(a, e, changes *LoadBalancer) error {
}

func (_ *LoadBalancer) RenderDO(t *do.DOAPITarget, a, e, changes *LoadBalancer) error {

Rules := []godo.ForwardingRule{
{
EntryProtocol: "https",
Expand All @@ -124,6 +125,24 @@ func (_ *LoadBalancer) RenderDO(t *do.DOAPITarget, a, e, changes *LoadBalancer)
HealthyThreshold: 5,
}

// check if load balancer exist.
loadBalancers, err := t.Cloud.GetAllLoadBalancers()

if err != nil {
return fmt.Errorf("LoadBalancers.List returned error: %v", err)
}

for _, loadbalancer := range loadBalancers {
klog.V(10).Infof("load balancer retrieved=%s, e.Name=%s", loadbalancer.Name, fi.StringValue(e.Name))
if strings.Contains(loadbalancer.Name, fi.StringValue(e.Name)) {
// load balancer already exists.
e.ID = fi.String(loadbalancer.ID)
e.IPAddress = fi.String(loadbalancer.IP) // This will be empty on create, but will be filled later on FindIPAddress invokation.
return nil
}
}

// load balancer doesn't exist. Create one.
klog.V(10).Infof("Creating load balancer for DO")

loadBalancerService := t.Cloud.LoadBalancers()
Expand All @@ -150,18 +169,40 @@ func (lb *LoadBalancer) FindIPAddress(c *fi.Context) (*string, error) {
cloud := c.Cloud.(*digitalocean.Cloud)
loadBalancerService := cloud.LoadBalancers()

klog.V(10).Infof("Find IP address for load balancer ID=%s", fi.StringValue(lb.ID))
loadBalancer, _, err := loadBalancerService.Get(context.TODO(), fi.StringValue(lb.ID))
if err != nil {
klog.Errorf("Error fetching load balancer with Name=%s", fi.StringValue(lb.Name))
return nil, err
}
if len(fi.StringValue(lb.ID)) > 0 {
// able to retrieve ID.
klog.V(10).Infof("Find IP address for load balancer ID=%s", fi.StringValue(lb.ID))
loadBalancer, _, err := loadBalancerService.Get(context.TODO(), fi.StringValue(lb.ID))
if err != nil {
klog.Errorf("Error fetching load balancer with Name=%s", fi.StringValue(lb.Name))
return nil, err
}

address := loadBalancer.IP

if isIPv4(address) {
klog.V(10).Infof("load balancer address=%s", address)
return &address, nil
}
} else {
// check with the name.
// check if load balancer exist.
loadBalancers, err := cloud.GetAllLoadBalancers()

address := loadBalancer.IP
if err != nil {
return nil, fmt.Errorf("LoadBalancers.List returned error: %v", err)
}

if isIPv4(address) {
klog.V(10).Infof("load balancer address=%s", address)
return &address, nil
for _, loadbalancer := range loadBalancers {
if strings.Contains(loadbalancer.Name, fi.StringValue(lb.Name)) {
// load balancer already exists.
address := loadbalancer.IP
if isIPv4(address) {
klog.V(10).Infof("load balancer address=%s", address)
return &address, nil
}
}
}
}

const lbWaitTime = 10 * time.Second
Expand Down