Skip to content

Commit

Permalink
Improve configuration change detection (#2656)
Browse files Browse the repository at this point in the history
* Use information about the configuration configmap to determine changes

* Add hashstructure dependency

* Rename queue functions

* Add test for configmap checksum
  • Loading branch information
aledbf authored Jun 21, 2018
1 parent a6978a8 commit aec40c1
Show file tree
Hide file tree
Showing 15 changed files with 564 additions and 42 deletions.
8 changes: 7 additions & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions internal/ingress/controller/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,9 @@ type Configuration struct {
// http://github.com/influxdata/nginx-influxdb-module/
// By default this is disabled
EnableInfluxDB bool `json:"enable-influxdb"`

// Checksum contains a checksum of the configmap configuration
Checksum string `json:"-"`
}

// NewDefault returns the default nginx configuration
Expand Down
23 changes: 4 additions & 19 deletions internal/ingress/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"sort"
"strconv"
"strings"
"sync/atomic"
"time"

"github.com/golang/glog"
Expand Down Expand Up @@ -155,14 +154,16 @@ func (n *NGINXController) syncIngress(interface{}) error {
TCPEndpoints: n.getStreamServices(n.cfg.TCPConfigMapName, apiv1.ProtocolTCP),
UDPEndpoints: n.getStreamServices(n.cfg.UDPConfigMapName, apiv1.ProtocolUDP),
PassthroughBackends: passUpstreams,

ConfigurationChecksum: n.store.GetBackendConfiguration().Checksum,
}

if !n.isForceReload() && n.runningConfig.Equal(&pcfg) {
if n.runningConfig.Equal(&pcfg) {
glog.V(3).Infof("No configuration change detected, skipping backend reload.")
return nil
}

if n.cfg.DynamicConfigurationEnabled && n.IsDynamicConfigurationEnough(&pcfg) && !n.isForceReload() {
if n.cfg.DynamicConfigurationEnabled && n.IsDynamicConfigurationEnough(&pcfg) {
glog.Infof("Changes handled by the dynamic configuration, skipping backend reload.")
} else {
glog.Infof("Configuration changes detected, backend reload required.")
Expand Down Expand Up @@ -200,7 +201,6 @@ func (n *NGINXController) syncIngress(interface{}) error {
}

n.runningConfig = &pcfg
n.SetForceReload(false)

return nil
}
Expand Down Expand Up @@ -1048,21 +1048,6 @@ func (n *NGINXController) createServers(data []*extensions.Ingress,
return servers
}

func (n *NGINXController) isForceReload() bool {
return atomic.LoadInt32(&n.forceReload) != 0
}

// SetForceReload sets whether the backend should be reloaded regardless of
// configuration changes.
func (n *NGINXController) SetForceReload(shouldReload bool) {
if shouldReload {
atomic.StoreInt32(&n.forceReload, 1)
n.syncQueue.Enqueue(&extensions.Ingress{})
} else {
atomic.StoreInt32(&n.forceReload, 0)
}
}

// extractTLSSecretName returns the name of the Secret containing a SSL
// certificate for the given host name, or an empty string.
func extractTLSSecretName(host string, ing *extensions.Ingress,
Expand Down
15 changes: 7 additions & 8 deletions internal/ingress/controller/nginx.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
proxyproto "github.com/armon/go-proxyproto"
"github.com/eapache/channels"
apiv1 "k8s.io/api/core/v1"
extensions "k8s.io/api/extensions/v1beta1"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"
Expand Down Expand Up @@ -153,7 +152,7 @@ Error loading new template: %v

n.t = template
glog.Info("New NGINX configuration template loaded.")
n.SetForceReload(true)
n.syncQueue.EnqueueTask(task.GetDummyObject("template-change"))
}

ngxTpl, err := ngx_template.NewTemplate(tmplPath, fs)
Expand Down Expand Up @@ -194,7 +193,7 @@ Error loading new template: %v
for _, f := range filesToWatch {
_, err = watch.NewFileWatcher(f, func() {
glog.Info("File %v changed. Reloading NGINX", f)
n.SetForceReload(true)
n.syncQueue.EnqueueTask(task.GetDummyObject("file-change"))
})
if err != nil {
glog.Fatalf("Error creating file watcher for %v: %v", f, err)
Expand Down Expand Up @@ -232,8 +231,6 @@ type NGINXController struct {
// runningConfig contains the running configuration in the Backend
runningConfig *ingress.Configuration

forceReload int32

t *ngx_template.Template

resolver []net.IP
Expand Down Expand Up @@ -278,7 +275,7 @@ func (n *NGINXController) Start() {

go n.syncQueue.Run(time.Second, n.stopCh)
// force initial sync
n.syncQueue.Enqueue(&extensions.Ingress{})
n.syncQueue.EnqueueTask(task.GetDummyObject("initial-sync"))

for {
select {
Expand Down Expand Up @@ -311,10 +308,12 @@ func (n *NGINXController) Start() {
if evt, ok := event.(store.Event); ok {
glog.V(3).Infof("Event %v received - object %v", evt.Type, evt.Obj)
if evt.Type == store.ConfigurationEvent {
n.SetForceReload(true)
// TODO: is this necessary? Consider removing this special case
n.syncQueue.EnqueueTask(task.GetDummyObject("configmap-change"))
continue
}

n.syncQueue.Enqueue(evt.Obj)
n.syncQueue.EnqueueSkippableTask(evt.Obj)
} else {
glog.Warningf("Unexpected event type received %T", event)
}
Expand Down
10 changes: 10 additions & 0 deletions internal/ingress/controller/template/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/golang/glog"

"github.com/mitchellh/hashstructure"
"github.com/mitchellh/mapstructure"

"k8s.io/apimachinery/pkg/util/sets"
Expand Down Expand Up @@ -191,6 +192,15 @@ func ReadConfig(src map[string]string) config.Configuration {
glog.Warningf("unexpected error merging defaults: %v", err)
}

hash, err := hashstructure.Hash(to, &hashstructure.HashOptions{
TagName: "json",
})
if err != nil {
glog.Warningf("unexpected error obtaining hash: %v", err)
}

to.Checksum = fmt.Sprintf("%v", hash)

return to
}

Expand Down
27 changes: 27 additions & 0 deletions internal/ingress/controller/template/configmap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@ limitations under the License.
package template

import (
"fmt"
"reflect"
"testing"
"time"

"github.com/kylelemons/godebug/pretty"
"github.com/mitchellh/hashstructure"

"k8s.io/ingress-nginx/internal/ingress/controller/config"
)
Expand Down Expand Up @@ -88,6 +90,14 @@ func TestMergeConfigMapToStruct(t *testing.T) {
def.NginxStatusIpv6Whitelist = []string{"::1", "2001::/16"}
def.ProxyAddOriginalUriHeader = false

hash, err := hashstructure.Hash(def, &hashstructure.HashOptions{
TagName: "json",
})
if err != nil {
t.Fatalf("unexpected error obtaining hash: %v", err)
}
def.Checksum = fmt.Sprintf("%v", hash)

to := ReadConfig(conf)
if diff := pretty.Compare(to, def); diff != "" {
t.Errorf("unexpected diff: (-got +want)\n%s", diff)
Expand All @@ -107,13 +117,30 @@ func TestMergeConfigMapToStruct(t *testing.T) {
}

def = config.NewDefault()
hash, err = hashstructure.Hash(def, &hashstructure.HashOptions{
TagName: "json",
})
if err != nil {
t.Fatalf("unexpected error obtaining hash: %v", err)
}
def.Checksum = fmt.Sprintf("%v", hash)

to = ReadConfig(map[string]string{})
if diff := pretty.Compare(to, def); diff != "" {
t.Errorf("unexpected diff: (-got +want)\n%s", diff)
}

def = config.NewDefault()
def.WhitelistSourceRange = []string{"1.1.1.1/32"}

hash, err = hashstructure.Hash(def, &hashstructure.HashOptions{
TagName: "json",
})
if err != nil {
t.Fatalf("unexpected error obtaining hash: %v", err)
}
def.Checksum = fmt.Sprintf("%v", hash)

to = ReadConfig(map[string]string{
"whitelist-source-range": "1.1.1.1/32",
})
Expand Down
3 changes: 3 additions & 0 deletions internal/ingress/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ type Configuration struct {
// It contains information about the associated Server Name Indication (SNI).
// +optional
PassthroughBackends []*SSLPassthroughBackend `json:"passthroughBackends,omitempty"`

// ConfigurationChecksum contains the particular checksum of a Configuration object
ConfigurationChecksum string `json:"configurationChecksum,omitempty"`
}

// Backend describes one or more remote server/s (endpoints) associated with a service
Expand Down
4 changes: 4 additions & 0 deletions internal/ingress/types_equals.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ func (c1 *Configuration) Equal(c2 *Configuration) bool {
}
}

if c1.ConfigurationChecksum != c2.ConfigurationChecksum {
return false
}

return true
}

Expand Down
34 changes: 29 additions & 5 deletions internal/task/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/golang/glog"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
Expand Down Expand Up @@ -50,23 +51,39 @@ type Queue struct {

// Element represents one item of the queue
type Element struct {
Key interface{}
Timestamp int64
Key interface{}
Timestamp int64
IsSkippable bool
}

// Run ...
// Run starts processing elements in the queue
func (t *Queue) Run(period time.Duration, stopCh <-chan struct{}) {
wait.Until(t.worker, period, stopCh)
}

// Enqueue enqueues ns/name of the given api object in the task queue.
func (t *Queue) Enqueue(obj interface{}) {
// EnqueueTask enqueues ns/name of the given api object in the task queue.
func (t *Queue) EnqueueTask(obj interface{}) {
t.enqueue(obj, false)
}

// EnqueueSkippableTask enqueues ns/name of the given api object in
// the task queue that can be skipped
func (t *Queue) EnqueueSkippableTask(obj interface{}) {
t.enqueue(obj, true)
}

// enqueue enqueues ns/name of the given api object in the task queue.
func (t *Queue) enqueue(obj interface{}, skippable bool) {
if t.IsShuttingDown() {
glog.Errorf("queue has been shutdown, failed to enqueue: %v", obj)
return
}

ts := time.Now().UnixNano()
if !skippable {
// make sure the timestamp is bigger than lastSync
ts = time.Now().Add(24 * time.Hour).UnixNano()
}
glog.V(3).Infof("queuing item %v", obj)
key, err := t.fn(obj)
if err != nil {
Expand Down Expand Up @@ -166,3 +183,10 @@ func NewCustomTaskQueue(syncFn func(interface{}) error, fn func(interface{}) (in

return q
}

// GetDummyObject returns a valid object that can be used in the Queue
func GetDummyObject(name string) *metav1.ObjectMeta {
return &metav1.ObjectMeta{
Name: name,
}
}
18 changes: 9 additions & 9 deletions internal/task/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func TestEnqueueSuccess(t *testing.T) {
k: "testKey",
v: "testValue",
}
q.Enqueue(mo)
q.EnqueueSkippableTask(mo)
// wait for 'mockSynFn'
time.Sleep(time.Millisecond * 10)
if atomic.LoadUint32(&sr) != 1 {
Expand Down Expand Up @@ -99,7 +99,7 @@ func TestEnqueueFailed(t *testing.T) {
q.Shutdown()
// wait for shutdown
time.Sleep(time.Millisecond * 10)
q.Enqueue(mo)
q.EnqueueSkippableTask(mo)
// wait for 'mockSynFn'
time.Sleep(time.Millisecond * 10)
// queue is shutdown, so mockSynFn should not be executed, so the result should be 0
Expand All @@ -121,7 +121,7 @@ func TestEnqueueKeyError(t *testing.T) {
v: "testValue",
}

q.Enqueue(mo)
q.EnqueueSkippableTask(mo)
// wait for 'mockSynFn'
time.Sleep(time.Millisecond * 10)
// key error, so the result should be 0
Expand All @@ -142,16 +142,16 @@ func TestSkipEnqueue(t *testing.T) {
k: "testKey",
v: "testValue",
}
q.Enqueue(mo)
q.Enqueue(mo)
q.Enqueue(mo)
q.Enqueue(mo)
q.EnqueueSkippableTask(mo)
q.EnqueueSkippableTask(mo)
q.EnqueueTask(mo)
q.EnqueueSkippableTask(mo)
// run queue
go q.Run(time.Second, stopCh)
// wait for 'mockSynFn'
time.Sleep(time.Millisecond * 10)
if atomic.LoadUint32(&sr) != 1 {
t.Errorf("sr should be 1, but is %d", sr)
if atomic.LoadUint32(&sr) != 2 {
t.Errorf("sr should be 2, but is %d", sr)
}

// shutdown queue before exit
Expand Down
2 changes: 2 additions & 0 deletions rootfs/etc/nginx/template/nginx.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
{{ $proxyHeaders := .ProxySetHeaders }}
{{ $addHeaders := .AddHeaders }}

# Configuration checksum: {{ $all.Cfg.Checksum }}

# setup custom paths that do not require root access
pid /tmp/nginx.pid;

Expand Down
21 changes: 21 additions & 0 deletions vendor/github.com/mitchellh/hashstructure/LICENSE

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit aec40c1

Please sign in to comment.