Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

replace patch with update api to rewrite IPPool #114

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cmd/whereabouts.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"fmt"
"strings"

Expand Down Expand Up @@ -81,6 +82,10 @@ func cmdDel(args *skel.CmdArgs) error {
_, err = storage.IPManagement(types.Deallocate, *ipamConf, args.ContainerID, getPodRef(args.Args))
if err != nil {
logging.Verbosef("WARNING: Problem deallocating IP: %s", err)
// ok to return context deadline error. this makes kubelet/cni would retry for deallocate.
if err == context.DeadlineExceeded || strings.Contains(err.Error(), "context deadline exceeded") {
return err
}
// return fmt.Errorf("Error deallocating IP: %s", err)
}

Expand Down
17 changes: 17 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,23 @@ func LoadIPAMConfig(bytes []byte, envArgs string) (*types.IPAMConfig, string, er
n.IPAM.Datastore = types.DatastoreETCD
}

if n.IPAM.AllocateRequestTimeout == 0 {
// default to 10s
n.IPAM.AllocateRequestTimeout = 10
}

if n.IPAM.DeAllocateRequestTimeout == 0 {
// default to 10s
n.IPAM.DeAllocateRequestTimeout = 10
}

if !strings.EqualFold(n.IPAM.BackOffRetryScheme, "exponential") {
if n.IPAM.BackoffLinearStep == 0 {
// set backoff step to 500 ms
n.IPAM.BackoffLinearStep = 500
}
}

var err error
storageError := "You have not configured the storage engine (looks like you're using an invalid `%s` parameter in your config)"
switch n.IPAM.Datastore {
Expand Down
46 changes: 3 additions & 43 deletions pkg/storage/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package storage

import (
"context"
"encoding/json"
"fmt"
"net"
"strconv"
Expand All @@ -12,7 +11,6 @@ import (
whereaboutsv1alpha1 "github.com/dougbtv/whereabouts/pkg/api/v1alpha1"
"github.com/dougbtv/whereabouts/pkg/logging"
whereaboutstypes "github.com/dougbtv/whereabouts/pkg/types"
jsonpatch "gomodules.xyz/jsonpatch/v2"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -165,49 +163,11 @@ func (p *KubernetesIPPool) Allocations() []whereaboutstypes.IPReservation {

// Update sets the pool allocated IP list to the given IP reservations
func (p *KubernetesIPPool) Update(ctx context.Context, reservations []whereaboutstypes.IPReservation) error {
// marshal the current pool to serve as the base for the patch creation
orig := p.pool.DeepCopy()
origBytes, err := json.Marshal(orig)
if err != nil {
return err
}

// update the pool before marshalling once again
// update the pool with new ip allocations
p.pool.Spec.Allocations = toAllocationMap(reservations, p.firstIP)
modBytes, err := json.Marshal(p.pool)
if err != nil {
return err
}

// create the patch
patch, err := jsonpatch.CreatePatch(origBytes, modBytes)
if err != nil {
return err
}

// add additional tests to the patch
ops := []jsonpatch.Operation{
// ensure patch is applied to appropriate resource version only
{Operation: "test", Path: "/metadata/resourceVersion", Value: orig.ObjectMeta.ResourceVersion},
}
for _, o := range patch {
// safeguard add ops -- "add" will update existing paths, this "test" ensures the path is empty
if o.Operation == "add" {
var m map[string]interface{}
ops = append(ops, jsonpatch.Operation{Operation: "test", Path: o.Path, Value: m})
}
}
ops = append(ops, patch...)
patchData, err := json.Marshal(ops)
if err != nil {
return err
}

// apply the patch
err = p.client.Patch(ctx, orig, client.ConstantPatch(types.JSONPatchType, patchData))
err := p.client.Update(ctx, p.pool)
if err != nil {
if errors.IsInvalid(err) {
// expect "invalid" errors if any of the jsonpatch "test" Operations fail
if errors.IsConflict(err) {
return &temporaryError{err}
}
return err
Expand Down
41 changes: 33 additions & 8 deletions pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package storage

import (
"context"
"crypto/rand"
"fmt"
"math/big"
"net"
"strings"
"time"

"github.com/dougbtv/whereabouts/pkg/allocate"
Expand All @@ -12,9 +15,6 @@ import (
)

var (
// RequestTimeout defines how long the context timesout in
RequestTimeout = 10 * time.Second

// DatastoreRetries defines how many retries are attempted when updating the Pool
DatastoreRetries = 100
)
Expand Down Expand Up @@ -60,21 +60,31 @@ func IPManagement(mode int, ipamConf types.IPAMConfig, containerID string, podRe
}
defer ipam.Close()

ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout)
defer cancel()
var ctx context.Context
var cancel context.CancelFunc
switch mode {
case types.Allocate:
ctx, cancel = context.WithTimeout(context.Background(), time.Duration(ipamConf.AllocateRequestTimeout)*time.Second)
defer cancel()
case types.Deallocate:
ctx, cancel = context.WithTimeout(context.Background(), time.Duration(ipamConf.DeAllocateRequestTimeout)*time.Second)
defer cancel()
}

// Check our connectivity first
if err := ipam.Status(ctx); err != nil {
logging.Errorf("IPAM connectivity error: %v", err)
return newip, err
}

var step int
// handle the ip add/del until successful
RETRYLOOP:
for j := 0; j < DatastoreRetries; j++ {
for j := 1; j < DatastoreRetries+1; j++ {
select {
case <-ctx.Done():
return newip, nil
// return last available newip and context.DeadlineExceeded error
return newip, context.DeadlineExceeded
default:
// retry the IPAM loop if the context has not been cancelled
}
Expand All @@ -83,6 +93,13 @@ RETRYLOOP:
if err != nil {
logging.Errorf("IPAM error reading pool allocations (attempt: %d): %v", j, err)
if e, ok := err.(temporary); ok && e.Temporary() {
interval, _ := rand.Int(rand.Reader, big.NewInt(1000))
if strings.EqualFold(ipamConf.BackOffRetryScheme, "exponential") {
time.Sleep(time.Duration(int(interval.Int64())*(2^j)) * time.Millisecond)
} else {
time.Sleep(time.Duration(int(interval.Int64())+step) * time.Millisecond)
step += ipamConf.BackoffLinearStep
}
continue
}
return newip, err
Expand All @@ -107,8 +124,16 @@ RETRYLOOP:

err = pool.Update(ctx, updatedreservelist)
if err != nil {
logging.Errorf("IPAM error updating pool (attempt: %d): %v", j, err)
logging.Errorf("IPAM error updating pool %s (attempt: %d): %v", ipamConf.Range, j, err)
if e, ok := err.(temporary); ok && e.Temporary() {
logging.Errorf("IPAM error is temporary for pool %s: %v, retrying", ipamConf.Range, err)
interval, _ := rand.Int(rand.Reader, big.NewInt(1000))
if strings.EqualFold(ipamConf.BackOffRetryScheme, "exponential") {
time.Sleep(time.Duration(int(interval.Int64())*(2^j)) * time.Millisecond)
} else {
time.Sleep(time.Duration(int(interval.Int64())+step) * time.Millisecond)
step += ipamConf.BackoffLinearStep
}
continue
}
break RETRYLOOP
Expand Down
48 changes: 26 additions & 22 deletions pkg/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,28 +22,32 @@ type Net struct {

// IPAMConfig describes the expected json configuration for this plugin
type IPAMConfig struct {
Name string
Type string `json:"type"`
Routes []*cnitypes.Route `json:"routes"`
Datastore string `json:"datastore"`
Addresses []Address `json:"addresses,omitempty"`
OmitRanges []string `json:"exclude,omitempty"`
DNS cnitypes.DNS `json:"dns"`
Range string `json:"range"`
RangeStart net.IP `json:"range_start,omitempty"`
RangeEnd net.IP `json:"range_end,omitempty"`
GatewayStr string `json:"gateway"`
EtcdHost string `json:"etcd_host,omitempty"`
EtcdUsername string `json:"etcd_username,omitempty"`
EtcdPassword string `json:"etcd_password,omitempty"`
EtcdKeyFile string `json:"etcd_key_file,omitempty"`
EtcdCertFile string `json:"etcd_cert_file,omitempty"`
EtcdCACertFile string `json:"etcd_ca_cert_file,omitempty"`
LogFile string `json:"log_file"`
LogLevel string `json:"log_level"`
Gateway net.IP
Kubernetes KubernetesConfig `json:"kubernetes,omitempty"`
ConfigurationPath string `json:"configuration_path"`
Name string
Type string `json:"type"`
Routes []*cnitypes.Route `json:"routes"`
Datastore string `json:"datastore"`
Addresses []Address `json:"addresses,omitempty"`
OmitRanges []string `json:"exclude,omitempty"`
DNS cnitypes.DNS `json:"dns"`
Range string `json:"range"`
RangeStart net.IP `json:"range_start,omitempty"`
RangeEnd net.IP `json:"range_end,omitempty"`
GatewayStr string `json:"gateway"`
EtcdHost string `json:"etcd_host,omitempty"`
EtcdUsername string `json:"etcd_username,omitempty"`
EtcdPassword string `json:"etcd_password,omitempty"`
EtcdKeyFile string `json:"etcd_key_file,omitempty"`
EtcdCertFile string `json:"etcd_cert_file,omitempty"`
EtcdCACertFile string `json:"etcd_ca_cert_file,omitempty"`
LogFile string `json:"log_file"`
LogLevel string `json:"log_level"`
Gateway net.IP
Kubernetes KubernetesConfig `json:"kubernetes,omitempty"`
ConfigurationPath string `json:"configuration_path"`
AllocateRequestTimeout int `json:"allocate_request_timeout"`
DeAllocateRequestTimeout int `json:"deallocate_request_timeout"`
BackOffRetryScheme string `json:"backoff_scheme"`
BackoffLinearStep int `json:"linear_step"`
}

// IPAMEnvArgs are the environment vars we expect
Expand Down