diff --git a/api/graylog.go b/api/graylog.go index c2ded77c..a9826c83 100644 --- a/api/graylog.go +++ b/api/graylog.go @@ -40,6 +40,26 @@ var ( configurationOverride = false ) +func GetServerVersion(httpClient *http.Client, ctx *context.Ctx) (*GraylogVersion, error) { + // In case of an error just assume 4.0.0 + fallbackVersion, _ := NewGraylogVersion("4.0.0") + + c := rest.NewClient(httpClient, ctx) + c.BaseURL = ctx.ServerUrl + r, err := c.NewRequest("GET", "/", nil, nil) + if err != nil { + log.Errorf("Cannot retrieve server version %v", err) + return fallbackVersion, err + } + versionResponse := graylog.ServerVersionResponse{} + resp, err := c.Do(r, &versionResponse) + if err != nil || resp == nil { + log.Errorf("Error fetching server version %v", err) + return fallbackVersion, err + } + return NewGraylogVersion(versionResponse.Version) +} + func RequestBackendList(httpClient *http.Client, checksum string, ctx *context.Ctx) (graylog.ResponseBackendList, error) { c := rest.NewClient(httpClient, ctx) c.BaseURL = ctx.ServerUrl @@ -137,7 +157,7 @@ func RequestConfiguration( return configurationResponse, nil } -func UpdateRegistration(httpClient *http.Client, checksum string, ctx *context.Ctx, status *graylog.StatusRequest) (graylog.ResponseCollectorRegistration, error) { +func UpdateRegistration(httpClient *http.Client, checksum string, ctx *context.Ctx, serverVersion *GraylogVersion, status *graylog.StatusRequest) (graylog.ResponseCollectorRegistration, error) { c := rest.NewClient(httpClient, ctx) c.BaseURL = ctx.ServerUrl @@ -145,7 +165,6 @@ func UpdateRegistration(httpClient *http.Client, checksum string, ctx *context.C registration.NodeName = ctx.UserConfig.NodeName registration.NodeDetails.OperatingSystem = common.GetSystemName() - registration.NodeDetails.Tags = ctx.UserConfig.Tags if ctx.UserConfig.SendStatus { metrics := &graylog.MetricsRequest{ @@ -173,6 +192,10 @@ func UpdateRegistration(httpClient *http.Client, checksum string, ctx *context.C } } } + if serverVersion.SupportsExtendedNodeDetails() { + registration.NodeDetails.CollectorConfigurationDirectory = ctx.UserConfig.CollectorConfigurationDirectory + registration.NodeDetails.Tags = ctx.UserConfig.Tags + } r, err := c.NewRequest("PUT", "/sidecars/"+ctx.NodeId, nil, registration) if checksum != "" { @@ -255,18 +278,24 @@ func GetTlsConfig(ctx *context.Ctx) *tls.Config { return tlsConfig } -func NewStatusRequest() graylog.StatusRequest { +func NewStatusRequest(serverVersion *GraylogVersion) graylog.StatusRequest { statusRequest := graylog.StatusRequest{Backends: make([]graylog.StatusRequestBackend, 0)} combinedStatus := backends.StatusUnknown runningCount, stoppedCount, errorCount := 0, 0, 0 for id, runner := range daemon.Daemon.Runner { + collectorId := strings.Split(id, "-")[0] + configurationId := "" + if serverVersion.SupportsMultipleBackends() { + configurationId = strings.Split(id, "-")[1] + } backendStatus := runner.GetBackend().Status() statusRequest.Backends = append(statusRequest.Backends, graylog.StatusRequestBackend{ - Id: id, - Status: backendStatus.Status, - Message: backendStatus.Message, - VerboseMessage: backendStatus.VerboseMessage, + CollectorId: collectorId, + ConfigurationId: configurationId, + Status: backendStatus.Status, + Message: backendStatus.Message, + VerboseMessage: backendStatus.VerboseMessage, }) switch backendStatus.Status { case backends.StatusRunning: diff --git a/api/graylog/requests.go b/api/graylog/requests.go index 5a65407b..d732a7c5 100644 --- a/api/graylog/requests.go +++ b/api/graylog/requests.go @@ -25,19 +25,21 @@ type RegistrationRequest struct { } type NodeDetailsRequest struct { - OperatingSystem string `json:"operating_system"` - IP string `json:"ip,omitempty"` - LogFileList []common.File `json:"log_file_list,omitempty"` - Metrics *MetricsRequest `json:"metrics,omitempty"` - Status *StatusRequest `json:"status,omitempty"` - Tags []string `json:"tags,omitempty"` + OperatingSystem string `json:"operating_system"` + IP string `json:"ip,omitempty"` + LogFileList []common.File `json:"log_file_list,omitempty"` + Metrics *MetricsRequest `json:"metrics,omitempty"` + Status *StatusRequest `json:"status,omitempty"` + CollectorConfigurationDirectory string `json:"collector_configuration_directory,omitempty"` + Tags []string `json:"tags,omitempty"` } type StatusRequestBackend struct { - Id string `json:"collector_id"` - Status int `json:"status"` - Message string `json:"message"` - VerboseMessage string `json:"verbose_message"` + CollectorId string `json:"collector_id"` + ConfigurationId string `json:"configuration_id,omitempty"` + Status int `json:"status"` + Message string `json:"message"` + VerboseMessage string `json:"verbose_message"` } type StatusRequest struct { diff --git a/api/graylog/responses.go b/api/graylog/responses.go index 91a62432..1705b1d9 100644 --- a/api/graylog/responses.go +++ b/api/graylog/responses.go @@ -42,15 +42,20 @@ type ResponseBackendList struct { NotModified bool } +type ServerVersionResponse struct { + ClusterId string `json:"cluster_id"` + NodeId string `json:"node_id"` + Version string `json:"version"` +} + type ResponseCollectorBackend struct { - Id string `json:"id"` - Name string `json:"name"` - ServiceType string `json:"service_type"` - OperatingSystem string `json:"node_operating_system"` - ExecutablePath string `json:"executable_path"` - ConfigurationFileName string `json:"configuration_file_name"` - ExecuteParameters string `json:"execute_parameters"` - ValidationParameters string `json:"validation_parameters"` + Id string `json:"id"` + Name string `json:"name"` + ServiceType string `json:"service_type"` + OperatingSystem string `json:"node_operating_system"` + ExecutablePath string `json:"executable_path"` + ExecuteParameters string `json:"execute_parameters"` + ValidationParameters string `json:"validation_parameters"` } type ResponseCollectorConfiguration struct { diff --git a/api/version.go b/api/version.go new file mode 100644 index 00000000..4038e823 --- /dev/null +++ b/api/version.go @@ -0,0 +1,25 @@ +package api + +import "github.com/hashicorp/go-version" + +type GraylogVersion struct { + *version.Version +} + +func NewGraylogVersion(v string) (*GraylogVersion, error) { + newVersion, err := version.NewVersion(v) + if err != nil { + return nil, err + } + return &GraylogVersion{newVersion}, nil +} + +func (v *GraylogVersion) SupportsMultipleBackends() bool { + // cannot use version.Constraints because of a bug in comparing pre-releases + return v.Version.Segments()[0] >= 4 && v.Version.Segments()[1] >= 4 +} + +func (v *GraylogVersion) SupportsExtendedNodeDetails() bool { + // cannot use version.Constraints because of a bug in comparing pre-releases + return v.Version.Segments()[0] >= 4 && v.Version.Segments()[1] >= 4 +} diff --git a/assignments/assignment.go b/assignments/assignment.go index 25741f03..5a0b9719 100644 --- a/assignments/assignment.go +++ b/assignments/assignment.go @@ -21,7 +21,7 @@ import ( ) var ( - // global store of configuration assignments, [backendId]ConfigurationId + // global store of configuration assignments, [backendId-configurationID]ConfigurationId Store = &assignmentStore{make(map[string]string)} ) @@ -34,9 +34,9 @@ type ConfigurationAssignment struct { ConfigurationId string `json:"configuration_id"` } -func (as *assignmentStore) SetAssignment(assignment *ConfigurationAssignment) { - if as.assignments[assignment.BackendId] != assignment.ConfigurationId { - as.assignments[assignment.BackendId] = assignment.ConfigurationId +func (as *assignmentStore) SetAssignment(backendId string, configId string) { + if as.assignments[backendId] != configId { + as.assignments[backendId] = configId } } @@ -60,16 +60,28 @@ func (as *assignmentStore) AssignedBackendIds() []string { return result } +func expandAssignments(assignments []ConfigurationAssignment) map[string]string { + expandedAssignments := make(map[string]string) + + for _, assignment := range assignments { + configId := assignment.ConfigurationId + expandedAssignments[assignment.BackendId+"-"+configId] = configId + } + return expandedAssignments +} + func (as *assignmentStore) Update(assignments []ConfigurationAssignment) bool { + expandedAssignments := expandAssignments(assignments) + beforeUpdate := make(map[string]string) for k, v := range as.assignments { beforeUpdate[k] = v } - if len(assignments) != 0 { + if len(expandedAssignments) != 0 { var activeIds []string - for _, assignment := range assignments { - Store.SetAssignment(&assignment) - activeIds = append(activeIds, assignment.BackendId) + for backendId, assignment := range expandedAssignments { + Store.SetAssignment(backendId, assignment) + activeIds = append(activeIds, backendId) } Store.cleanup(activeIds) } else { diff --git a/backends/backend.go b/backends/backend.go index 2c897ee8..bcf9c587 100644 --- a/backends/backend.go +++ b/backends/backend.go @@ -35,6 +35,7 @@ import ( type Backend struct { Enabled *bool Id string + ConfigId string Name string ServiceType string OperatingSystem string @@ -46,27 +47,24 @@ type Backend struct { backendStatus system.VerboseStatus } -func BackendFromResponse(response graylog.ResponseCollectorBackend, ctx *context.Ctx) *Backend { +func BackendFromResponse(response graylog.ResponseCollectorBackend, configId string, ctx *context.Ctx) *Backend { return &Backend{ Enabled: common.NewTrue(), - Id: response.Id, - Name: response.Name, + Id: response.Id + "-" + configId, + ConfigId: configId, + Name: response.Name + "-" + configId, ServiceType: response.ServiceType, OperatingSystem: response.OperatingSystem, ExecutablePath: response.ExecutablePath, - ConfigurationPath: BuildConfigurationPath(response, ctx), + ConfigurationPath: BuildConfigurationPath(response, configId, ctx), ExecuteParameters: response.ExecuteParameters, ValidationParameters: response.ValidationParameters, backendStatus: system.VerboseStatus{}, } } -func BuildConfigurationPath(response graylog.ResponseCollectorBackend, ctx *context.Ctx) string { - if response.ConfigurationFileName != "" { - return filepath.Join(ctx.UserConfig.CollectorConfigurationDirectory, response.ConfigurationFileName) - } else { - return filepath.Join(ctx.UserConfig.CollectorConfigurationDirectory, response.Name+".conf") - } +func BuildConfigurationPath(response graylog.ResponseCollectorBackend, configId string, ctx *context.Ctx) string { + return filepath.Join(ctx.UserConfig.CollectorConfigurationDirectory, configId, response.Name+".conf") } func (b *Backend) Equals(a *Backend) bool { @@ -84,6 +82,7 @@ func (b *Backend) EqualSettings(a *Backend) bool { aBackend := &Backend{ Enabled: b.Enabled, Id: a.Id, + ConfigId: a.ConfigId, Name: a.Name, ServiceType: a.ServiceType, OperatingSystem: a.OperatingSystem, diff --git a/backends/registry.go b/backends/registry.go index e157ef9c..5145a7a2 100644 --- a/backends/registry.go +++ b/backends/registry.go @@ -50,15 +50,6 @@ func (bs *backendStore) GetBackend(id string) *Backend { return bs.backends[id] } -func (bs *backendStore) GetBackendById(id string) *Backend { - for _, backend := range bs.backends { - if backend.Id == id { - return backend - } - } - return nil -} - func (bs *backendStore) Update(backends []Backend) { if len(backends) != 0 { var activeIds []string diff --git a/backends/render.go b/backends/render.go index 1e8db9ff..973405e5 100644 --- a/backends/render.go +++ b/backends/render.go @@ -18,10 +18,9 @@ package backends import ( "bytes" "fmt" + "github.com/Graylog2/collector-sidecar/common" "github.com/Graylog2/collector-sidecar/context" "io/ioutil" - - "github.com/Graylog2/collector-sidecar/common" ) func (b *Backend) render() []byte { diff --git a/benchmarks/bench-rest-api.go b/benchmarks/bench-rest-api.go index 286d69b4..a171a297 100644 --- a/benchmarks/bench-rest-api.go +++ b/benchmarks/bench-rest-api.go @@ -69,9 +69,10 @@ func startHeartbeat(ctx *context.Ctx, done chan bool, metrics chan time.Duration return default: time.Sleep(time.Duration(ctx.UserConfig.UpdateInterval) * time.Second) - statusRequest := api.NewStatusRequest() + version, _ := api.NewGraylogVersion("4.0.0") + statusRequest := api.NewStatusRequest(version) t := time.Now() - response, err := api.UpdateRegistration(httpClient, "nochecksum", ctx, &statusRequest) + response, err := api.UpdateRegistration(httpClient, "nochecksum", ctx, version, &statusRequest) if err != nil { fmt.Printf("[%s] can't register sidecar: %v\n", ctx.UserConfig.NodeId, err) return diff --git a/daemon/action_handler.go b/daemon/action_handler.go index bd24c973..d7365510 100644 --- a/daemon/action_handler.go +++ b/daemon/action_handler.go @@ -23,7 +23,7 @@ import ( func HandleCollectorActions(actions []graylog.ResponseCollectorAction) { for _, action := range actions { - backend := backends.Store.GetBackendById(action.BackendId) + backend := backends.Store.GetBackend(action.BackendId) if backend == nil { log.Errorf("Got action for non-existing collector: %s", action.BackendId) continue diff --git a/daemon/daemon.go b/daemon/daemon.go index 8963d085..c998dc52 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -142,14 +142,14 @@ func (dc *DaemonConfig) SyncWithAssignments(context *context.Ctx) { } // cleanup backends that should not run anymore - if backend == nil || assignments.Store.GetAll()[backend.Id] == "" { - log.Info("Removing process runner: " + backend.Name) + if backend == nil || assignments.Store.GetAssignment(backend.Id) == "" { + log.Info("Removing process runner: " + id) dc.DeleteRunner(id) } } assignedBackends := []*backends.Backend{} for backendId := range assignments.Store.GetAll() { - backend := backends.Store.GetBackendById(backendId) + backend := backends.Store.GetBackend(backendId) if backend != nil { assignedBackends = append(assignedBackends, backend) } diff --git a/go.mod b/go.mod index edd607c5..6450471b 100644 --- a/go.mod +++ b/go.mod @@ -1,25 +1,29 @@ module github.com/Graylog2/collector-sidecar -go 1.14 +go 1.19 require ( - github.com/BurntSushi/toml v0.3.1 // indirect github.com/Sirupsen/logrus v0.11.0 - github.com/StackExchange/wmi v0.0.0-20160811214555-e54cbda6595d // indirect - github.com/davecgh/go-spew v1.1.1 // indirect github.com/docker/go-units v0.3.3 github.com/elastic/go-ucfg v0.7.0 github.com/elastic/gosigar v0.0.0-20160829190344-2716c1fe855e github.com/flynn-archive/go-shlex v0.0.0-20150515145356-3f9db97f8568 - github.com/go-ole/go-ole v1.2.1-0.20161116064658-5e9c030faf78 // indirect + github.com/hashicorp/go-version v1.6.0 github.com/kardianos/service v1.2.1 - github.com/kr/text v0.2.0 // indirect github.com/natefinch/lumberjack v2.0.0+incompatible - github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect github.com/pborman/uuid v0.0.0-20160216163710-c55201b03606 github.com/rifflock/lfshook v0.0.0-20161216150210-24f7833daaff - github.com/stretchr/testify v1.6.1 // indirect golang.org/x/sys v0.0.0-20201119102817-f84b799fce68 +) + +require ( + github.com/BurntSushi/toml v0.3.1 // indirect + github.com/StackExchange/wmi v0.0.0-20160811214555-e54cbda6595d // indirect + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/go-ole/go-ole v1.2.1-0.20161116064658-5e9c030faf78 // indirect + github.com/kr/text v0.2.0 // indirect + github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect + github.com/stretchr/testify v1.6.1 // indirect gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b // indirect gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect gopkg.in/yaml.v2 v2.3.0 // indirect diff --git a/go.sum b/go.sum index a8f1e2f7..e32c70d3 100644 --- a/go.sum +++ b/go.sum @@ -18,6 +18,8 @@ github.com/flynn-archive/go-shlex v0.0.0-20150515145356-3f9db97f8568 h1:BMXYYRWT github.com/flynn-archive/go-shlex v0.0.0-20150515145356-3f9db97f8568/go.mod h1:rZfgFAXFS/z/lEd6LJmf9HVZ1LkgYiHx5pHhV5DR16M= github.com/go-ole/go-ole v1.2.1-0.20161116064658-5e9c030faf78 h1:0afyVEbxVeRz0ioQYn+9oDaeveMBXJi7juWqz2newuY= github.com/go-ole/go-ole v1.2.1-0.20161116064658-5e9c030faf78/go.mod h1:7FAglXiTm7HKlQRDeOQ6ZNUHidzCWXuZWq/1dTyBNF8= +github.com/hashicorp/go-version v1.6.0 h1:feTTfFNnjP967rlCxM/I9g701jU+RN74YKx2mOkIeek= +github.com/hashicorp/go-version v1.6.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= github.com/kardianos/service v1.2.1 h1:AYndMsehS+ywIS6RB9KOlcXzteWUzxgMgBymJD7+BYk= github.com/kardianos/service v1.2.1/go.mod h1:CIMRFEJVL+0DS1a3Nx06NaMn4Dz63Ng6O7dl0qH0zVM= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= diff --git a/jenkins.groovy b/jenkins.groovy index be69ca96..eba1877f 100644 --- a/jenkins.groovy +++ b/jenkins.groovy @@ -11,7 +11,7 @@ pipeline tools { - go 'Go' + go 'Go 1.19' } environment diff --git a/services/periodicals.go b/services/periodicals.go index 406e36d5..a630cf6f 100644 --- a/services/periodicals.go +++ b/services/periodicals.go @@ -39,29 +39,53 @@ func StartPeriodicals(context *context.Ctx) { go func() { configChecksums := make(map[string]string) - backendChecksum := "" - assignmentChecksum := "" + var lastBackendResponse graylog.ResponseBackendList + var lastRegResponse graylog.ResponseCollectorRegistration logOnce := true + firstRun := true for { - time.Sleep(time.Duration(context.UserConfig.UpdateInterval) * time.Second) + if !firstRun { + time.Sleep(time.Duration(context.UserConfig.UpdateInterval) * time.Second) + } + firstRun = false + + serverVersion, _ := api.GetServerVersion(httpClient, context) // registration regResponse contains configuration assignments - regResponse, err := updateCollectorRegistration(httpClient, assignmentChecksum, context) + regResponse, err := updateCollectorRegistration(httpClient, lastRegResponse.Checksum, context, serverVersion) if err != nil { continue } - assignmentChecksum = regResponse.Checksum + if !regResponse.NotModified { + lastRegResponse = regResponse + } + // backend list is needed before configuration assignments are updated - backendResponse, err := fetchBackendList(httpClient, backendChecksum, context) + backendResponse, err := fetchBackendList(httpClient, lastBackendResponse.Checksum, context) if err != nil { continue } - backendChecksum = backendResponse.Checksum + if !backendResponse.NotModified { + lastBackendResponse = backendResponse + } if !regResponse.NotModified || !backendResponse.NotModified { - modified := assignments.Store.Update(regResponse.Assignments) + modified := assignments.Store.Update(lastRegResponse.Assignments) + + backendList := []backends.Backend{} + // TODO this is inefficient + for _, assignment := range lastRegResponse.Assignments { + configId := assignment.ConfigurationId + for _, backend := range lastBackendResponse.Backends { + if backend.Id == assignment.BackendId { + backendList = append(backendList, *backends.BackendFromResponse(backend, configId, context)) + } + } + } + backends.Store.Update(backendList) + // regResponse.NotModified is always false, because graylog does not implement caching yet. - // Thus we need to double check. + // Thus, we need to double-check. if modified || !backendResponse.NotModified { configChecksums = make(map[string]string) } @@ -78,15 +102,18 @@ func StartPeriodicals(context *context.Ctx) { logOnce = true } } + log.Debugf("backend store %v", *backends.Store) + log.Debugf("assignments store %v", assignments.Store.GetAll()) + log.Debugf("runner store %v", daemon.Daemon.Runner) checkForUpdateAndRestart(httpClient, configChecksums, context) } }() } // report collector status to Graylog server and receive assignments -func updateCollectorRegistration(httpClient *http.Client, checksum string, context *context.Ctx) (graylog.ResponseCollectorRegistration, error) { - statusRequest := api.NewStatusRequest() - return api.UpdateRegistration(httpClient, checksum, context, &statusRequest) +func updateCollectorRegistration(httpClient *http.Client, checksum string, context *context.Ctx, serverVersion *api.GraylogVersion) (graylog.ResponseCollectorRegistration, error) { + statusRequest := api.NewStatusRequest(serverVersion) + return api.UpdateRegistration(httpClient, checksum, context, serverVersion, &statusRequest) } func fetchBackendList(httpClient *http.Client, checksum string, ctx *context.Ctx) (graylog.ResponseBackendList, error) { @@ -95,17 +122,6 @@ func fetchBackendList(httpClient *http.Client, checksum string, ctx *context.Ctx log.Error("Can't fetch collector list from Graylog API: ", err) return response, err } - if response.NotModified { - // etag match, skipping all other actions - return response, nil - } - - backendList := []backends.Backend{} - for _, backendEntry := range response.Backends { - backendList = append(backendList, *backends.BackendFromResponse(backendEntry, ctx)) - } - backends.Store.Update(backendList) - return response, nil }