From dca0b1739b9eacd5dabd15765ecf9a67c78700c1 Mon Sep 17 00:00:00 2001 From: Marco Pfatschbacher Date: Mon, 11 Mar 2019 15:46:23 +0100 Subject: [PATCH] Wipe configchecksums also on assignment changes (#353) If an assignemnt is changed to switch a backend from one configuration to another, we would still send configuration requests using the old checksum. The server will respond with `NotModified` and we don't try to restart the backend. Instead of also resetting the checksum when we update the assignment store, simply wipe the entire checksum map if either the assignment or the backends have changed. Fixes #352 --- assignments/assignment.go | 8 +++++++- daemon/daemon.go | 4 +--- services/periodicals.go | 17 +++++++++++------ 3 files changed, 19 insertions(+), 10 deletions(-) diff --git a/assignments/assignment.go b/assignments/assignment.go index 31e37be..23ae179 100644 --- a/assignments/assignment.go +++ b/assignments/assignment.go @@ -17,6 +17,7 @@ package assignments import ( "github.com/Graylog2/collector-sidecar/common" + "reflect" ) var ( @@ -59,7 +60,11 @@ func (as *assignmentStore) AssignedBackendIds() []string { return result } -func (as *assignmentStore) Update(assignments []ConfigurationAssignment) { +func (as *assignmentStore) Update(assignments []ConfigurationAssignment) bool { + beforeUpdate := make(map[string]string) + for k, v := range as.assignments { + beforeUpdate[k] = v + } if len(assignments) != 0 { var activeIds []string for _, assignment := range assignments { @@ -70,6 +75,7 @@ func (as *assignmentStore) Update(assignments []ConfigurationAssignment) { } else { Store.cleanup([]string{}) } + return !reflect.DeepEqual(beforeUpdate, as.assignments) } func (as *assignmentStore) cleanup(validBackendIds []string) { diff --git a/daemon/daemon.go b/daemon/daemon.go index 9d25168..ea5e275 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -111,7 +111,7 @@ func (dc *DaemonConfig) GetRunnerByBackendId(id string) Runner { return nil } -func (dc *DaemonConfig) SyncWithAssignments(configChecksums map[string]string, context *context.Ctx) { +func (dc *DaemonConfig) SyncWithAssignments(context *context.Ctx) { if dc.Runner == nil { return } @@ -125,7 +125,6 @@ func (dc *DaemonConfig) SyncWithAssignments(configChecksums map[string]string, c log.Infof("[%s] Updating process configuration", runner.Name()) runnerServiceType := runnerBackend.ServiceType runner.SetBackend(*backend) - configChecksums[backend.Id] = "" if backend.ServiceType != runnerServiceType { log.Infof("Changing process runner (%s -> %s) for: %s", runnerServiceType, backend.ServiceType, backend.Name) @@ -146,7 +145,6 @@ func (dc *DaemonConfig) SyncWithAssignments(configChecksums map[string]string, c if backend == nil || assignments.Store.GetAll()[backend.Id] == "" { log.Info("Removing process runner: " + backend.Name) dc.DeleteRunner(id) - configChecksums[backend.Id] = "" } } assignedBackends := []*backends.Backend{} diff --git a/services/periodicals.go b/services/periodicals.go index 54e839b..693a336 100644 --- a/services/periodicals.go +++ b/services/periodicals.go @@ -45,12 +45,12 @@ func StartPeriodicals(context *context.Ctx) { for { time.Sleep(time.Duration(context.UserConfig.UpdateInterval) * time.Second) - // registration response contains configuration assignments - response, err := updateCollectorRegistration(httpClient, assignmentChecksum, context) + // registration regResponse contains configuration assignments + regResponse, err := updateCollectorRegistration(httpClient, assignmentChecksum, context) if err != nil { continue } - assignmentChecksum = response.Checksum + assignmentChecksum = regResponse.Checksum // backend list is needed before configuration assignments are updated backendResponse, err := fetchBackendList(httpClient, backendChecksum, context) if err != nil { @@ -58,10 +58,15 @@ func StartPeriodicals(context *context.Ctx) { } backendChecksum = backendResponse.Checksum - if !response.NotModified || !backendResponse.NotModified { - assignments.Store.Update(response.Assignments) + if !regResponse.NotModified || !backendResponse.NotModified { + modified := assignments.Store.Update(regResponse.Assignments) + // regResponse.NotModified is always false, because graylog does not implement caching yet. + // Thus we need to double check. + if modified || !backendResponse.NotModified { + configChecksums = make(map[string]string) + } // create process instances - daemon.Daemon.SyncWithAssignments(configChecksums, context) + daemon.Daemon.SyncWithAssignments(context) // test for new or updated configurations and start the corresponding collector if assignments.Store.Len() == 0 { if logOnce {