Skip to content

Commit

Permalink
Wipe configchecksums also on assignment changes (#353)
Browse files Browse the repository at this point in the history
If an assignemnt is changed to switch a backend from
one configuration to another, we would still send configuration requests
using the old checksum.
The server will respond with `NotModified` and we don't try to restart
the backend.

Instead of also resetting the checksum when we update the assignment
store, simply wipe the entire checksum map if either the assignment
or the backends have changed.

Fixes #352
  • Loading branch information
mpfz0r authored and Marius Sturm committed Mar 11, 2019
1 parent 0f71051 commit dca0b17
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 10 deletions.
8 changes: 7 additions & 1 deletion assignments/assignment.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package assignments

import (
"github.com/Graylog2/collector-sidecar/common"
"reflect"
)

var (
Expand Down Expand Up @@ -59,7 +60,11 @@ func (as *assignmentStore) AssignedBackendIds() []string {
return result
}

func (as *assignmentStore) Update(assignments []ConfigurationAssignment) {
func (as *assignmentStore) Update(assignments []ConfigurationAssignment) bool {
beforeUpdate := make(map[string]string)
for k, v := range as.assignments {
beforeUpdate[k] = v
}
if len(assignments) != 0 {
var activeIds []string
for _, assignment := range assignments {
Expand All @@ -70,6 +75,7 @@ func (as *assignmentStore) Update(assignments []ConfigurationAssignment) {
} else {
Store.cleanup([]string{})
}
return !reflect.DeepEqual(beforeUpdate, as.assignments)
}

func (as *assignmentStore) cleanup(validBackendIds []string) {
Expand Down
4 changes: 1 addition & 3 deletions daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (dc *DaemonConfig) GetRunnerByBackendId(id string) Runner {
return nil
}

func (dc *DaemonConfig) SyncWithAssignments(configChecksums map[string]string, context *context.Ctx) {
func (dc *DaemonConfig) SyncWithAssignments(context *context.Ctx) {
if dc.Runner == nil {
return
}
Expand All @@ -125,7 +125,6 @@ func (dc *DaemonConfig) SyncWithAssignments(configChecksums map[string]string, c
log.Infof("[%s] Updating process configuration", runner.Name())
runnerServiceType := runnerBackend.ServiceType
runner.SetBackend(*backend)
configChecksums[backend.Id] = ""
if backend.ServiceType != runnerServiceType {
log.Infof("Changing process runner (%s -> %s) for: %s",
runnerServiceType, backend.ServiceType, backend.Name)
Expand All @@ -146,7 +145,6 @@ func (dc *DaemonConfig) SyncWithAssignments(configChecksums map[string]string, c
if backend == nil || assignments.Store.GetAll()[backend.Id] == "" {
log.Info("Removing process runner: " + backend.Name)
dc.DeleteRunner(id)
configChecksums[backend.Id] = ""
}
}
assignedBackends := []*backends.Backend{}
Expand Down
17 changes: 11 additions & 6 deletions services/periodicals.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,23 +45,28 @@ func StartPeriodicals(context *context.Ctx) {
for {
time.Sleep(time.Duration(context.UserConfig.UpdateInterval) * time.Second)

// registration response contains configuration assignments
response, err := updateCollectorRegistration(httpClient, assignmentChecksum, context)
// registration regResponse contains configuration assignments
regResponse, err := updateCollectorRegistration(httpClient, assignmentChecksum, context)
if err != nil {
continue
}
assignmentChecksum = response.Checksum
assignmentChecksum = regResponse.Checksum
// backend list is needed before configuration assignments are updated
backendResponse, err := fetchBackendList(httpClient, backendChecksum, context)
if err != nil {
continue
}
backendChecksum = backendResponse.Checksum

if !response.NotModified || !backendResponse.NotModified {
assignments.Store.Update(response.Assignments)
if !regResponse.NotModified || !backendResponse.NotModified {
modified := assignments.Store.Update(regResponse.Assignments)
// regResponse.NotModified is always false, because graylog does not implement caching yet.
// Thus we need to double check.
if modified || !backendResponse.NotModified {
configChecksums = make(map[string]string)
}
// create process instances
daemon.Daemon.SyncWithAssignments(configChecksums, context)
daemon.Daemon.SyncWithAssignments(context)
// test for new or updated configurations and start the corresponding collector
if assignments.Store.Len() == 0 {
if logOnce {
Expand Down

0 comments on commit dca0b17

Please sign in to comment.