Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support multiple configurations per collector #441

Merged
merged 20 commits into from
Sep 14, 2022
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 29 additions & 3 deletions api/graylog.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,26 @@ var (
configurationOverride = false
)

func GetServerVersion(httpClient *http.Client, ctx *context.Ctx) (*GraylogVersion, error) {
// In case of an error just assume 4.0.0
fallbackVersion, _ := NewGraylogVersion("4.0.0")

c := rest.NewClient(httpClient, ctx)
c.BaseURL = ctx.ServerUrl
r, err := c.NewRequest("GET", "/", nil, nil)
if err != nil {
log.Errorf("Cannot retrieve server version %v", err)
return fallbackVersion, err
}
versionResponse := graylog.ServerVersionResponse{}
resp, err := c.Do(r, &versionResponse)
if err != nil || resp == nil {
log.Errorf("Error fetching server version %v", err)
return fallbackVersion, err
}
return NewGraylogVersion(versionResponse.Version)
}

func RequestBackendList(httpClient *http.Client, checksum string, ctx *context.Ctx) (graylog.ResponseBackendList, error) {
c := rest.NewClient(httpClient, ctx)
c.BaseURL = ctx.ServerUrl
Expand Down Expand Up @@ -137,15 +157,14 @@ func RequestConfiguration(
return configurationResponse, nil
}

func UpdateRegistration(httpClient *http.Client, checksum string, ctx *context.Ctx, status *graylog.StatusRequest) (graylog.ResponseCollectorRegistration, error) {
func UpdateRegistration(httpClient *http.Client, checksum string, ctx *context.Ctx, serverVersion *GraylogVersion, status *graylog.StatusRequest) (graylog.ResponseCollectorRegistration, error) {
c := rest.NewClient(httpClient, ctx)
c.BaseURL = ctx.ServerUrl

registration := graylog.RegistrationRequest{}

registration.NodeName = ctx.UserConfig.NodeName
registration.NodeDetails.OperatingSystem = common.GetSystemName()
registration.NodeDetails.Tags = ctx.UserConfig.Tags

if ctx.UserConfig.SendStatus {
metrics := &graylog.MetricsRequest{
Expand Down Expand Up @@ -173,6 +192,10 @@ func UpdateRegistration(httpClient *http.Client, checksum string, ctx *context.C
}
}
}
if serverVersion.SupportsExtendedNodeDetails() {
registration.NodeDetails.CollectorConfigurationDirectory = ctx.UserConfig.CollectorConfigurationDirectory
registration.NodeDetails.Tags = ctx.UserConfig.Tags
}

r, err := c.NewRequest("PUT", "/sidecars/"+ctx.NodeId, nil, registration)
if checksum != "" {
Expand Down Expand Up @@ -255,12 +278,15 @@ func GetTlsConfig(ctx *context.Ctx) *tls.Config {
return tlsConfig
}

func NewStatusRequest() graylog.StatusRequest {
func NewStatusRequest(serverVersion *GraylogVersion) graylog.StatusRequest {
statusRequest := graylog.StatusRequest{Backends: make([]graylog.StatusRequestBackend, 0)}
combinedStatus := backends.StatusUnknown
runningCount, stoppedCount, errorCount := 0, 0, 0

for id, runner := range daemon.Daemon.Runner {
if !serverVersion.SupportsMultipleBackends() {
id = strings.Split(id, "-")[0]
}
mpfz0r marked this conversation as resolved.
Show resolved Hide resolved
backendStatus := runner.GetBackend().Status()
statusRequest.Backends = append(statusRequest.Backends, graylog.StatusRequestBackend{
Id: id,
Expand Down
13 changes: 7 additions & 6 deletions api/graylog/requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@ type RegistrationRequest struct {
}

type NodeDetailsRequest struct {
OperatingSystem string `json:"operating_system"`
IP string `json:"ip,omitempty"`
LogFileList []common.File `json:"log_file_list,omitempty"`
Metrics *MetricsRequest `json:"metrics,omitempty"`
Status *StatusRequest `json:"status,omitempty"`
Tags []string `json:"tags,omitempty"`
OperatingSystem string `json:"operating_system"`
IP string `json:"ip,omitempty"`
LogFileList []common.File `json:"log_file_list,omitempty"`
Metrics *MetricsRequest `json:"metrics,omitempty"`
Status *StatusRequest `json:"status,omitempty"`
CollectorConfigurationDirectory string `json:"collector_configuration_directory,omitempty"`
Tags []string `json:"tags,omitempty"`
}

type StatusRequestBackend struct {
Expand Down
21 changes: 13 additions & 8 deletions api/graylog/responses.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,20 @@ type ResponseBackendList struct {
NotModified bool
}

type ServerVersionResponse struct {
ClusterId string `json:"cluster_id"`
NodeId string `json:"node_id"`
Version string `json:"version"`
}

type ResponseCollectorBackend struct {
Id string `json:"id"`
Name string `json:"name"`
ServiceType string `json:"service_type"`
OperatingSystem string `json:"node_operating_system"`
ExecutablePath string `json:"executable_path"`
ConfigurationFileName string `json:"configuration_file_name"`
ExecuteParameters string `json:"execute_parameters"`
ValidationParameters string `json:"validation_parameters"`
Id string `json:"id"`
Name string `json:"name"`
ServiceType string `json:"service_type"`
OperatingSystem string `json:"node_operating_system"`
ExecutablePath string `json:"executable_path"`
ExecuteParameters string `json:"execute_parameters"`
ValidationParameters string `json:"validation_parameters"`
}

type ResponseCollectorConfiguration struct {
Expand Down
25 changes: 25 additions & 0 deletions api/version.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package api

import "github.com/hashicorp/go-version"

type GraylogVersion struct {
*version.Version
}

func NewGraylogVersion(v string) (*GraylogVersion, error) {
newVersion, err := version.NewVersion(v)
if err != nil {
return nil, err
}
return &GraylogVersion{newVersion}, nil
}

func (v *GraylogVersion) SupportsMultipleBackends() bool {
// cannot use version.Constraints because of a bug in comparing pre-releases
return v.Version.Segments()[0] >= 4 && v.Version.Segments()[1] >= 4
}

func (v *GraylogVersion) SupportsExtendedNodeDetails() bool {
// cannot use version.Constraints because of a bug in comparing pre-releases
return v.Version.Segments()[0] >= 4 && v.Version.Segments()[1] >= 4
}
28 changes: 20 additions & 8 deletions assignments/assignment.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
)

var (
// global store of configuration assignments, [backendId]ConfigurationId
// global store of configuration assignments, [backendId-configurationID]ConfigurationId
Store = &assignmentStore{make(map[string]string)}
)

Expand All @@ -34,9 +34,9 @@ type ConfigurationAssignment struct {
ConfigurationId string `json:"configuration_id"`
}

func (as *assignmentStore) SetAssignment(assignment *ConfigurationAssignment) {
if as.assignments[assignment.BackendId] != assignment.ConfigurationId {
as.assignments[assignment.BackendId] = assignment.ConfigurationId
func (as *assignmentStore) SetAssignment(backendId string, configId string) {
if as.assignments[backendId] != configId {
as.assignments[backendId] = configId
}
}

Expand All @@ -60,16 +60,28 @@ func (as *assignmentStore) AssignedBackendIds() []string {
return result
}

func expandAssignments(assignments []ConfigurationAssignment) map[string]string {
expandedAssignments := make(map[string]string)

for _, assignment := range assignments {
configId := assignment.ConfigurationId
expandedAssignments[assignment.BackendId+"-"+configId] = configId
}
return expandedAssignments
}

func (as *assignmentStore) Update(assignments []ConfigurationAssignment) bool {
expandedAssignments := expandAssignments(assignments)

beforeUpdate := make(map[string]string)
for k, v := range as.assignments {
beforeUpdate[k] = v
}
if len(assignments) != 0 {
if len(expandedAssignments) != 0 {
var activeIds []string
for _, assignment := range assignments {
Store.SetAssignment(&assignment)
activeIds = append(activeIds, assignment.BackendId)
for backendId, assignment := range expandedAssignments {
Store.SetAssignment(backendId, assignment)
activeIds = append(activeIds, backendId)
}
Store.cleanup(activeIds)
} else {
Expand Down
18 changes: 8 additions & 10 deletions backends/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
type Backend struct {
Enabled *bool
Id string
ConfigId string
Name string
ServiceType string
OperatingSystem string
Expand All @@ -46,27 +47,24 @@ type Backend struct {
backendStatus system.VerboseStatus
}

func BackendFromResponse(response graylog.ResponseCollectorBackend, ctx *context.Ctx) *Backend {
func BackendFromResponse(response graylog.ResponseCollectorBackend, configId string, ctx *context.Ctx) *Backend {
return &Backend{
Enabled: common.NewTrue(),
Id: response.Id,
Name: response.Name,
Id: response.Id + "-" + configId,
ConfigId: configId,
Name: response.Name + "-" + configId,
ServiceType: response.ServiceType,
OperatingSystem: response.OperatingSystem,
ExecutablePath: response.ExecutablePath,
ConfigurationPath: BuildConfigurationPath(response, ctx),
ConfigurationPath: BuildConfigurationPath(response, configId, ctx),
ExecuteParameters: response.ExecuteParameters,
ValidationParameters: response.ValidationParameters,
backendStatus: system.VerboseStatus{},
}
}

func BuildConfigurationPath(response graylog.ResponseCollectorBackend, ctx *context.Ctx) string {
if response.ConfigurationFileName != "" {
return filepath.Join(ctx.UserConfig.CollectorConfigurationDirectory, response.ConfigurationFileName)
} else {
return filepath.Join(ctx.UserConfig.CollectorConfigurationDirectory, response.Name+".conf")
}
func BuildConfigurationPath(response graylog.ResponseCollectorBackend, configId string, ctx *context.Ctx) string {
return filepath.Join(ctx.UserConfig.CollectorConfigurationDirectory, configId, response.Name+".conf")
}

func (b *Backend) Equals(a *Backend) bool {
Expand Down
9 changes: 0 additions & 9 deletions backends/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,6 @@ func (bs *backendStore) GetBackend(id string) *Backend {
return bs.backends[id]
}

func (bs *backendStore) GetBackendById(id string) *Backend {
for _, backend := range bs.backends {
if backend.Id == id {
return backend
}
}
return nil
}

func (bs *backendStore) Update(backends []Backend) {
if len(backends) != 0 {
var activeIds []string
Expand Down
3 changes: 1 addition & 2 deletions backends/render.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@ package backends
import (
"bytes"
"fmt"
"github.com/Graylog2/collector-sidecar/common"
"github.com/Graylog2/collector-sidecar/context"
"io/ioutil"

"github.com/Graylog2/collector-sidecar/common"
)

func (b *Backend) render() []byte {
Expand Down
5 changes: 3 additions & 2 deletions benchmarks/bench-rest-api.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,10 @@ func startHeartbeat(ctx *context.Ctx, done chan bool, metrics chan time.Duration
return
default:
time.Sleep(time.Duration(ctx.UserConfig.UpdateInterval) * time.Second)
statusRequest := api.NewStatusRequest()
version, _ := api.NewGraylogVersion("4.0.0")
statusRequest := api.NewStatusRequest(version)
t := time.Now()
response, err := api.UpdateRegistration(httpClient, "nochecksum", ctx, &statusRequest)
response, err := api.UpdateRegistration(httpClient, "nochecksum", ctx, version, &statusRequest)
if err != nil {
fmt.Printf("[%s] can't register sidecar: %v\n", ctx.UserConfig.NodeId, err)
return
Expand Down
2 changes: 1 addition & 1 deletion daemon/action_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (

func HandleCollectorActions(actions []graylog.ResponseCollectorAction) {
for _, action := range actions {
backend := backends.Store.GetBackendById(action.BackendId)
backend := backends.Store.GetBackend(action.BackendId)
if backend == nil {
log.Errorf("Got action for non-existing collector: %s", action.BackendId)
continue
Expand Down
6 changes: 3 additions & 3 deletions daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,14 +142,14 @@ func (dc *DaemonConfig) SyncWithAssignments(context *context.Ctx) {
}

// cleanup backends that should not run anymore
if backend == nil || assignments.Store.GetAll()[backend.Id] == "" {
log.Info("Removing process runner: " + backend.Name)
if backend == nil || assignments.Store.GetAssignment(backend.Id) == "" {
log.Info("Removing process runner: " + id)
dc.DeleteRunner(id)
}
}
assignedBackends := []*backends.Backend{}
for backendId := range assignments.Store.GetAll() {
backend := backends.Store.GetBackendById(backendId)
backend := backends.Store.GetBackend(backendId)
if backend != nil {
assignedBackends = append(assignedBackends, backend)
}
Expand Down
20 changes: 12 additions & 8 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,25 +1,29 @@
module github.com/Graylog2/collector-sidecar

go 1.14
go 1.19

require (
github.com/BurntSushi/toml v0.3.1 // indirect
github.com/Sirupsen/logrus v0.11.0
github.com/StackExchange/wmi v0.0.0-20160811214555-e54cbda6595d // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/docker/go-units v0.3.3
github.com/elastic/go-ucfg v0.7.0
github.com/elastic/gosigar v0.0.0-20160829190344-2716c1fe855e
github.com/flynn-archive/go-shlex v0.0.0-20150515145356-3f9db97f8568
github.com/go-ole/go-ole v1.2.1-0.20161116064658-5e9c030faf78 // indirect
github.com/hashicorp/go-version v1.6.0
github.com/kardianos/service v1.2.1
github.com/kr/text v0.2.0 // indirect
github.com/natefinch/lumberjack v2.0.0+incompatible
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
github.com/pborman/uuid v0.0.0-20160216163710-c55201b03606
github.com/rifflock/lfshook v0.0.0-20161216150210-24f7833daaff
github.com/stretchr/testify v1.6.1 // indirect
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68
)

require (
github.com/BurntSushi/toml v0.3.1 // indirect
github.com/StackExchange/wmi v0.0.0-20160811214555-e54cbda6595d // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/go-ole/go-ole v1.2.1-0.20161116064658-5e9c030faf78 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
github.com/stretchr/testify v1.6.1 // indirect
gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
gopkg.in/yaml.v2 v2.3.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ github.com/flynn-archive/go-shlex v0.0.0-20150515145356-3f9db97f8568 h1:BMXYYRWT
github.com/flynn-archive/go-shlex v0.0.0-20150515145356-3f9db97f8568/go.mod h1:rZfgFAXFS/z/lEd6LJmf9HVZ1LkgYiHx5pHhV5DR16M=
github.com/go-ole/go-ole v1.2.1-0.20161116064658-5e9c030faf78 h1:0afyVEbxVeRz0ioQYn+9oDaeveMBXJi7juWqz2newuY=
github.com/go-ole/go-ole v1.2.1-0.20161116064658-5e9c030faf78/go.mod h1:7FAglXiTm7HKlQRDeOQ6ZNUHidzCWXuZWq/1dTyBNF8=
github.com/hashicorp/go-version v1.6.0 h1:feTTfFNnjP967rlCxM/I9g701jU+RN74YKx2mOkIeek=
github.com/hashicorp/go-version v1.6.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA=
github.com/kardianos/service v1.2.1 h1:AYndMsehS+ywIS6RB9KOlcXzteWUzxgMgBymJD7+BYk=
github.com/kardianos/service v1.2.1/go.mod h1:CIMRFEJVL+0DS1a3Nx06NaMn4Dz63Ng6O7dl0qH0zVM=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
Expand Down
2 changes: 1 addition & 1 deletion jenkins.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pipeline

tools
{
go 'Go'
go 'Go 1.19'
}

environment
Expand Down
Loading