diff --git a/assignments/assignment.go b/assignments/assignment.go index 31e37be..23ae179 100644 --- a/assignments/assignment.go +++ b/assignments/assignment.go @@ -17,6 +17,7 @@ package assignments import ( "github.com/Graylog2/collector-sidecar/common" + "reflect" ) var ( @@ -59,7 +60,11 @@ func (as *assignmentStore) AssignedBackendIds() []string { return result } -func (as *assignmentStore) Update(assignments []ConfigurationAssignment) { +func (as *assignmentStore) Update(assignments []ConfigurationAssignment) bool { + beforeUpdate := make(map[string]string) + for k, v := range as.assignments { + beforeUpdate[k] = v + } if len(assignments) != 0 { var activeIds []string for _, assignment := range assignments { @@ -70,6 +75,7 @@ func (as *assignmentStore) Update(assignments []ConfigurationAssignment) { } else { Store.cleanup([]string{}) } + return !reflect.DeepEqual(beforeUpdate, as.assignments) } func (as *assignmentStore) cleanup(validBackendIds []string) { diff --git a/daemon/daemon.go b/daemon/daemon.go index 9d25168..ea5e275 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -111,7 +111,7 @@ func (dc *DaemonConfig) GetRunnerByBackendId(id string) Runner { return nil } -func (dc *DaemonConfig) SyncWithAssignments(configChecksums map[string]string, context *context.Ctx) { +func (dc *DaemonConfig) SyncWithAssignments(context *context.Ctx) { if dc.Runner == nil { return } @@ -125,7 +125,6 @@ func (dc *DaemonConfig) SyncWithAssignments(configChecksums map[string]string, c log.Infof("[%s] Updating process configuration", runner.Name()) runnerServiceType := runnerBackend.ServiceType runner.SetBackend(*backend) - configChecksums[backend.Id] = "" if backend.ServiceType != runnerServiceType { log.Infof("Changing process runner (%s -> %s) for: %s", runnerServiceType, backend.ServiceType, backend.Name) @@ -146,7 +145,6 @@ func (dc *DaemonConfig) SyncWithAssignments(configChecksums map[string]string, c if backend == nil || assignments.Store.GetAll()[backend.Id] == "" { log.Info("Removing process runner: " + backend.Name) dc.DeleteRunner(id) - configChecksums[backend.Id] = "" } } assignedBackends := []*backends.Backend{} diff --git a/services/periodicals.go b/services/periodicals.go index 54e839b..693a336 100644 --- a/services/periodicals.go +++ b/services/periodicals.go @@ -45,12 +45,12 @@ func StartPeriodicals(context *context.Ctx) { for { time.Sleep(time.Duration(context.UserConfig.UpdateInterval) * time.Second) - // registration response contains configuration assignments - response, err := updateCollectorRegistration(httpClient, assignmentChecksum, context) + // registration regResponse contains configuration assignments + regResponse, err := updateCollectorRegistration(httpClient, assignmentChecksum, context) if err != nil { continue } - assignmentChecksum = response.Checksum + assignmentChecksum = regResponse.Checksum // backend list is needed before configuration assignments are updated backendResponse, err := fetchBackendList(httpClient, backendChecksum, context) if err != nil { @@ -58,10 +58,15 @@ func StartPeriodicals(context *context.Ctx) { } backendChecksum = backendResponse.Checksum - if !response.NotModified || !backendResponse.NotModified { - assignments.Store.Update(response.Assignments) + if !regResponse.NotModified || !backendResponse.NotModified { + modified := assignments.Store.Update(regResponse.Assignments) + // regResponse.NotModified is always false, because graylog does not implement caching yet. + // Thus we need to double check. + if modified || !backendResponse.NotModified { + configChecksums = make(map[string]string) + } // create process instances - daemon.Daemon.SyncWithAssignments(configChecksums, context) + daemon.Daemon.SyncWithAssignments(context) // test for new or updated configurations and start the corresponding collector if assignments.Store.Len() == 0 { if logOnce {