Skip to content

Commit

Permalink
Merge pull request #242 from ibuildthecloud/legacy
Browse files Browse the repository at this point in the history
Add ratelimiting to objectset apply
  • Loading branch information
ibuildthecloud authored Jan 11, 2019
2 parents d9246af + de17bf5 commit ad8537c
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 1 deletion.
1 change: 1 addition & 0 deletions pkg/objectset/desiredset.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
)

type DesiredSet struct {
remove bool
setID string
objs *ObjectSet
codeVersion string
Expand Down
30 changes: 29 additions & 1 deletion pkg/objectset/desiredset_apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ import (
"encoding/hex"
"fmt"
"sort"
"sync"

"k8s.io/apimachinery/pkg/selection"
"k8s.io/client-go/util/flowcontrol"

"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/api/meta"
Expand All @@ -25,14 +27,35 @@ const (
)

var (
hashOrder = []string{
ErrObjectSetDelay = errors.New("delaying object set apply")
hashOrder = []string{
LabelID,
LabelGVK,
LabelName,
LabelNamespace,
}
rls = map[string]flowcontrol.RateLimiter{}
rlsLock sync.Mutex
)

func (o *DesiredSet) getRateLimit(inputID string) flowcontrol.RateLimiter {
var rl flowcontrol.RateLimiter

rlsLock.Lock()
defer rlsLock.Unlock()
if o.remove {
delete(rls, inputID)
} else {
rl = rls[inputID]
if rl == nil {
rl = flowcontrol.NewTokenBucketRateLimiter(1.0/60.0, 10)
rls[inputID] = rl
}
}

return rl
}

func (o *DesiredSet) Apply() error {
if err := o.Err(); err != nil {
return err
Expand All @@ -43,6 +66,11 @@ func (o *DesiredSet) Apply() error {
return o.err(err)
}

rl := o.getRateLimit(labelSet[LabelHash])
if rl != nil && !rl.TryAccept() {
return ErrObjectSetDelay
}

inputID := o.inputID(labelSet[LabelHash])

objList, err := o.injectLabelsAndAnnotations(labelSet, annotationSet)
Expand Down
3 changes: 3 additions & 0 deletions pkg/objectset/template.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,13 @@ func (t Processor) Remove(owner runtime.Object) error {
}

func (t Processor) NewDesiredSet(owner runtime.Object, objs *ObjectSet) *DesiredSet {
remove := false
if objs == nil {
remove = true
objs = &ObjectSet{}
}
return &DesiredSet{
remove: remove,
objs: objs,
setID: t.setID,
codeVersion: t.codeVersion,
Expand Down

0 comments on commit ad8537c

Please sign in to comment.