Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve configuration change detection #2656

Merged
merged 4 commits into from
Jun 21, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions internal/ingress/controller/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,9 @@ type Configuration struct {
// http://github.com/influxdata/nginx-influxdb-module/
// By default this is disabled
EnableInfluxDB bool `json:"enable-influxdb"`

// Checksum contains a checksum of the configmap configuration
Checksum string `json:"-"`
}

// NewDefault returns the default nginx configuration
Expand Down
23 changes: 4 additions & 19 deletions internal/ingress/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"sort"
"strconv"
"strings"
"sync/atomic"
"time"

"github.com/golang/glog"
Expand Down Expand Up @@ -155,14 +154,16 @@ func (n *NGINXController) syncIngress(interface{}) error {
TCPEndpoints: n.getStreamServices(n.cfg.TCPConfigMapName, apiv1.ProtocolTCP),
UDPEndpoints: n.getStreamServices(n.cfg.UDPConfigMapName, apiv1.ProtocolUDP),
PassthroughBackends: passUpstreams,

ConfigurationChecksum: n.store.GetBackendConfiguration().Checksum,
}

if !n.isForceReload() && n.runningConfig.Equal(&pcfg) {
if n.runningConfig.Equal(&pcfg) {
glog.V(3).Infof("No configuration change detected, skipping backend reload.")
return nil
}

if n.cfg.DynamicConfigurationEnabled && n.IsDynamicConfigurationEnough(&pcfg) && !n.isForceReload() {
if n.cfg.DynamicConfigurationEnabled && n.IsDynamicConfigurationEnough(&pcfg) {
glog.Infof("Changes handled by the dynamic configuration, skipping backend reload.")
} else {
glog.Infof("Configuration changes detected, backend reload required.")
Expand Down Expand Up @@ -200,7 +201,6 @@ func (n *NGINXController) syncIngress(interface{}) error {
}

n.runningConfig = &pcfg
n.SetForceReload(false)

return nil
}
Expand Down Expand Up @@ -1048,21 +1048,6 @@ func (n *NGINXController) createServers(data []*extensions.Ingress,
return servers
}

func (n *NGINXController) isForceReload() bool {
return atomic.LoadInt32(&n.forceReload) != 0
}

// SetForceReload sets whether the backend should be reloaded regardless of
// configuration changes.
func (n *NGINXController) SetForceReload(shouldReload bool) {
if shouldReload {
atomic.StoreInt32(&n.forceReload, 1)
n.syncQueue.Enqueue(&extensions.Ingress{})
} else {
atomic.StoreInt32(&n.forceReload, 0)
}
}

// extractTLSSecretName returns the name of the Secret containing a SSL
// certificate for the given host name, or an empty string.
func extractTLSSecretName(host string, ing *extensions.Ingress,
Expand Down
15 changes: 7 additions & 8 deletions internal/ingress/controller/nginx.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
proxyproto "github.com/armon/go-proxyproto"
"github.com/eapache/channels"
apiv1 "k8s.io/api/core/v1"
extensions "k8s.io/api/extensions/v1beta1"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"
Expand Down Expand Up @@ -153,7 +152,7 @@ Error loading new template: %v

n.t = template
glog.Info("New NGINX configuration template loaded.")
n.SetForceReload(true)
n.syncQueue.EnqueueTask(task.GetDummyObject("template-change"))
}

ngxTpl, err := ngx_template.NewTemplate(tmplPath, fs)
Expand Down Expand Up @@ -194,7 +193,7 @@ Error loading new template: %v
for _, f := range filesToWatch {
_, err = watch.NewFileWatcher(f, func() {
glog.Info("File %v changed. Reloading NGINX", f)
n.SetForceReload(true)
n.syncQueue.EnqueueTask(task.GetDummyObject("file-change"))
})
if err != nil {
glog.Fatalf("Error creating file watcher for %v: %v", f, err)
Expand Down Expand Up @@ -232,8 +231,6 @@ type NGINXController struct {
// runningConfig contains the running configuration in the Backend
runningConfig *ingress.Configuration

forceReload int32

t *ngx_template.Template

resolver []net.IP
Expand Down Expand Up @@ -278,7 +275,7 @@ func (n *NGINXController) Start() {

go n.syncQueue.Run(time.Second, n.stopCh)
// force initial sync
n.syncQueue.Enqueue(&extensions.Ingress{})
n.syncQueue.EnqueueTask(task.GetDummyObject("initial-sync"))

for {
select {
Expand Down Expand Up @@ -311,10 +308,12 @@ func (n *NGINXController) Start() {
if evt, ok := event.(store.Event); ok {
glog.V(3).Infof("Event %v received - object %v", evt.Type, evt.Obj)
if evt.Type == store.ConfigurationEvent {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's special with this event type? Why can we not simply enqueue a skippable task?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because event could be en endpoint, service, ingress, secret or configmap but only on a change in a configmap should escape the enqueue skippable logic

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but only on a change in a configmap should escape the enqueue skippable logic

This is the part I don't understand completely. My current understanding why we have to force reload in this case is because in syncIngress function the model we use currently does not include this configmap data, and therefore when the change is only about configmap it does not regenerate the Nginx configuration.

But in this PR you are adding a new field to that model (ConfigurationChecksum) which to my understanding means we don't need this special case anymore when event type is ConfigurationEvent.

n.SetForceReload(true)
// TODO: is this necessary? Consider removing this special case
n.syncQueue.EnqueueTask(task.GetDummyObject("configmap-change"))
continue
}

n.syncQueue.Enqueue(evt.Obj)
n.syncQueue.EnqueueSkippableTask(evt.Obj)
} else {
glog.Warningf("Unexpected event type received %T", event)
}
Expand Down
10 changes: 10 additions & 0 deletions internal/ingress/controller/template/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/golang/glog"

"github.com/mitchellh/hashstructure"
"github.com/mitchellh/mapstructure"

"k8s.io/apimachinery/pkg/util/sets"
Expand Down Expand Up @@ -191,6 +192,15 @@ func ReadConfig(src map[string]string) config.Configuration {
glog.Warningf("unexpected error merging defaults: %v", err)
}

hash, err := hashstructure.Hash(to, &hashstructure.HashOptions{
TagName: "json",
})
if err != nil {
glog.Warningf("unexpected error obtaining hash: %v", err)
}

to.Checksum = fmt.Sprintf("%v", hash)

return to
}

Expand Down
27 changes: 27 additions & 0 deletions internal/ingress/controller/template/configmap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@ limitations under the License.
package template

import (
"fmt"
"reflect"
"testing"
"time"

"github.com/kylelemons/godebug/pretty"
"github.com/mitchellh/hashstructure"

"k8s.io/ingress-nginx/internal/ingress/controller/config"
)
Expand Down Expand Up @@ -88,6 +90,14 @@ func TestMergeConfigMapToStruct(t *testing.T) {
def.NginxStatusIpv6Whitelist = []string{"::1", "2001::/16"}
def.ProxyAddOriginalUriHeader = false

hash, err := hashstructure.Hash(def, &hashstructure.HashOptions{
TagName: "json",
})
if err != nil {
t.Fatalf("unexpected error obtaining hash: %v", err)
}
def.Checksum = fmt.Sprintf("%v", hash)

to := ReadConfig(conf)
if diff := pretty.Compare(to, def); diff != "" {
t.Errorf("unexpected diff: (-got +want)\n%s", diff)
Expand All @@ -107,13 +117,30 @@ func TestMergeConfigMapToStruct(t *testing.T) {
}

def = config.NewDefault()
hash, err = hashstructure.Hash(def, &hashstructure.HashOptions{
TagName: "json",
})
if err != nil {
t.Fatalf("unexpected error obtaining hash: %v", err)
}
def.Checksum = fmt.Sprintf("%v", hash)

to = ReadConfig(map[string]string{})
if diff := pretty.Compare(to, def); diff != "" {
t.Errorf("unexpected diff: (-got +want)\n%s", diff)
}

def = config.NewDefault()
def.WhitelistSourceRange = []string{"1.1.1.1/32"}

hash, err = hashstructure.Hash(def, &hashstructure.HashOptions{
TagName: "json",
})
if err != nil {
t.Fatalf("unexpected error obtaining hash: %v", err)
}
def.Checksum = fmt.Sprintf("%v", hash)

to = ReadConfig(map[string]string{
"whitelist-source-range": "1.1.1.1/32",
})
Expand Down
3 changes: 3 additions & 0 deletions internal/ingress/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ type Configuration struct {
// It contains information about the associated Server Name Indication (SNI).
// +optional
PassthroughBackends []*SSLPassthroughBackend `json:"passthroughBackends,omitempty"`

// ConfigurationChecksum contains the particular checksum of a Configuration object
ConfigurationChecksum string `json:"configurationChecksum,omitempty"`
}

// Backend describes one or more remote server/s (endpoints) associated with a service
Expand Down
4 changes: 4 additions & 0 deletions internal/ingress/types_equals.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ func (c1 *Configuration) Equal(c2 *Configuration) bool {
}
}

if c1.ConfigurationChecksum != c2.ConfigurationChecksum {
return false
}

return true
}

Expand Down
34 changes: 29 additions & 5 deletions internal/task/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/golang/glog"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
Expand Down Expand Up @@ -50,23 +51,39 @@ type Queue struct {

// Element represents one item of the queue
type Element struct {
Key interface{}
Timestamp int64
Key interface{}
Timestamp int64
IsSkippable bool
}

// Run ...
// Run starts processing elements in the queue
func (t *Queue) Run(period time.Duration, stopCh <-chan struct{}) {
wait.Until(t.worker, period, stopCh)
}

// Enqueue enqueues ns/name of the given api object in the task queue.
func (t *Queue) Enqueue(obj interface{}) {
// EnqueueTask enqueues ns/name of the given api object in the task queue.
func (t *Queue) EnqueueTask(obj interface{}) {
t.enqueue(obj, false)
}

// EnqueueSkippableTask enqueues ns/name of the given api object in
// the task queue that can be skipped
func (t *Queue) EnqueueSkippableTask(obj interface{}) {
t.enqueue(obj, true)
}

// enqueue enqueues ns/name of the given api object in the task queue.
func (t *Queue) enqueue(obj interface{}, skippable bool) {
if t.IsShuttingDown() {
glog.Errorf("queue has been shutdown, failed to enqueue: %v", obj)
return
}

ts := time.Now().UnixNano()
if !skippable {
// make sure the timestamp is bigger than lastSync
ts = time.Now().Add(24 * time.Hour).UnixNano()
}
glog.V(3).Infof("queuing item %v", obj)
key, err := t.fn(obj)
if err != nil {
Expand Down Expand Up @@ -166,3 +183,10 @@ func NewCustomTaskQueue(syncFn func(interface{}) error, fn func(interface{}) (in

return q
}

// GetDummyObject returns a valid object that can be used in the Queue
func GetDummyObject(name string) *metav1.ObjectMeta {
return &metav1.ObjectMeta{
Name: name,
}
}
18 changes: 9 additions & 9 deletions internal/task/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func TestEnqueueSuccess(t *testing.T) {
k: "testKey",
v: "testValue",
}
q.Enqueue(mo)
q.EnqueueSkippableTask(mo)
// wait for 'mockSynFn'
time.Sleep(time.Millisecond * 10)
if atomic.LoadUint32(&sr) != 1 {
Expand Down Expand Up @@ -99,7 +99,7 @@ func TestEnqueueFailed(t *testing.T) {
q.Shutdown()
// wait for shutdown
time.Sleep(time.Millisecond * 10)
q.Enqueue(mo)
q.EnqueueSkippableTask(mo)
// wait for 'mockSynFn'
time.Sleep(time.Millisecond * 10)
// queue is shutdown, so mockSynFn should not be executed, so the result should be 0
Expand All @@ -121,7 +121,7 @@ func TestEnqueueKeyError(t *testing.T) {
v: "testValue",
}

q.Enqueue(mo)
q.EnqueueSkippableTask(mo)
// wait for 'mockSynFn'
time.Sleep(time.Millisecond * 10)
// key error, so the result should be 0
Expand All @@ -142,16 +142,16 @@ func TestSkipEnqueue(t *testing.T) {
k: "testKey",
v: "testValue",
}
q.Enqueue(mo)
q.Enqueue(mo)
q.Enqueue(mo)
q.Enqueue(mo)
q.EnqueueSkippableTask(mo)
q.EnqueueSkippableTask(mo)
q.EnqueueTask(mo)
q.EnqueueSkippableTask(mo)
// run queue
go q.Run(time.Second, stopCh)
// wait for 'mockSynFn'
time.Sleep(time.Millisecond * 10)
if atomic.LoadUint32(&sr) != 1 {
t.Errorf("sr should be 1, but is %d", sr)
if atomic.LoadUint32(&sr) != 2 {
t.Errorf("sr should be 2, but is %d", sr)
}

// shutdown queue before exit
Expand Down
2 changes: 2 additions & 0 deletions rootfs/etc/nginx/template/nginx.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
{{ $proxyHeaders := .ProxySetHeaders }}
{{ $addHeaders := .AddHeaders }}

# Configuration checksum: {{ $all.Cfg.Checksum }}

# setup custom paths that do not require root access
pid /tmp/nginx.pid;

Expand Down
21 changes: 21 additions & 0 deletions vendor/github.com/mitchellh/hashstructure/LICENSE

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading