diff --git a/.travis.yml b/.travis.yml index 47f9c99..fa06212 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,7 +1,8 @@ sudo: false +dist: xenial language: go go: - - 1.6 + - 1.11.x script: make deps test env: diff --git a/daemon/action_handler.go b/api/action_handler.go similarity index 52% rename from daemon/action_handler.go rename to api/action_handler.go index 1ddfdac..1eefa42 100644 --- a/daemon/action_handler.go +++ b/api/action_handler.go @@ -13,26 +13,50 @@ // You should have received a copy of the GNU General Public License // along with Graylog. If not, see . -package daemon +package api import ( "github.com/Graylog2/collector-sidecar/api/graylog" + "github.com/Graylog2/collector-sidecar/api/rest" + "github.com/Graylog2/collector-sidecar/backends" + "github.com/Graylog2/collector-sidecar/context" + "github.com/Graylog2/collector-sidecar/daemon" ) -func HandleCollectorActions(actions []graylog.ResponseCollectorAction) { +func HandleCollectorActions(actions []graylog.ResponseCollectorAction, ctx *context.Ctx) { for _, action := range actions { switch { case action.Properties["restart"] == true: restartAction(action) + case action.Properties["import"] == true: + configurationImportAction(action, ctx) } } } func restartAction(action graylog.ResponseCollectorAction) { - for name, runner := range Daemon.Runner { + for name, runner := range daemon.Daemon.Runner { if name == action.Backend { log.Infof("[%s] Executing requested collector restart", name) runner.Restart() } } +} + +func configurationImportAction(action graylog.ResponseCollectorAction, ctx *context.Ctx) { + for name := range daemon.Daemon.Runner { + if name == action.Backend { + log.Infof("[%s] Sending configuration to Graylog server", name) + backend := backends.Store.GetBackend(name) + renderedConfiguration := backend.RenderToString() + httpClient := rest.NewHTTPClient(GetTlsConfig(ctx)) + UploadConfiguration(httpClient, ctx, + &graylog.CollectorUpload{ + CollectorId: ctx.CollectorId, + NodeId: ctx.NodeId, + CollectorName: backend.Name(), + RenderedConfiguration: renderedConfiguration}) + + } + } } \ No newline at end of file diff --git a/api/graylog.go b/api/graylog.go index c4fc5bd..1825d74 100644 --- a/api/graylog.go +++ b/api/graylog.go @@ -146,7 +146,27 @@ func UpdateRegistration(httpClient *http.Client, ctx *context.Ctx, status *grayl // Run collector actions if provided if len(respBody.CollectorActions) != 0 { - daemon.HandleCollectorActions(respBody.CollectorActions) + HandleCollectorActions(respBody.CollectorActions, ctx) + } +} + +func UploadConfiguration(httpClient *http.Client, ctx *context.Ctx, payload *graylog.CollectorUpload) { + c := rest.NewClient(httpClient) + c.BaseURL = ctx.ServerUrl + + r, err := c.NewRequest("PUT", "/plugins/org.graylog.plugins.collector/collectors/"+ctx.CollectorId+"/configuration", nil, payload) + if err != nil { + log.Error("[UploadConfiguration] Can not initialize REST request") + return + } + + resp, err := c.Do(r, nil) + if resp != nil && resp.StatusCode == 404 { + log.Error("[UploadConfiguration] Can't upload rendered configuration, please update your Graylog server to the latest version.") + } else if resp != nil && resp.StatusCode != 202 { + log.Errorf("[UploadConfiguration] Bad response from Graylog server: %v", resp.Body) + } else if err != nil { + log.Error("[UploadConfiguration] Failed to upload collector configuration to server: ", err) } } diff --git a/api/graylog/requests.go b/api/graylog/requests.go index e8297a6..4ff02a0 100644 --- a/api/graylog/requests.go +++ b/api/graylog/requests.go @@ -45,3 +45,10 @@ type MetricsRequest struct { CpuIdle float64 `json:"cpu_idle"` Load1 float64 `json:"load_1"` } + +type CollectorUpload struct { + CollectorId string `json:"collector_id"` + NodeId string `json:"node_id"` + CollectorName string `json:"collector_name"` + RenderedConfiguration string `json:"rendered_configuration"` +} diff --git a/backends/beats/filebeat/render.go b/backends/beats/filebeat/render.go index 06064a4..0d2ac0b 100644 --- a/backends/beats/filebeat/render.go +++ b/backends/beats/filebeat/render.go @@ -60,7 +60,12 @@ func (fbc *FileBeatConfig) Render() bytes.Buffer { return result } -func (fbc *FileBeatConfig) RenderToFile() error { +func (fbc *FileBeatConfig) RenderToString() string { + buffer := fbc.Render() + return buffer.String() +} + +func (fbc *FileBeatConfig) RenderToFile() (error) { stringConfig := fbc.Render() err := common.CreatePathToFile(fbc.Beats.UserConfig.ConfigurationPath) if err != nil { @@ -70,7 +75,7 @@ func (fbc *FileBeatConfig) RenderToFile() error { return err } -func (fbc *FileBeatConfig) RenderOnChange(response graylog.ResponseCollectorConfiguration) bool { +func (fbc *FileBeatConfig) RenderOnChange(response graylog.ResponseCollectorConfiguration) (bool) { newConfig := NewCollectorConfig(fbc.Beats.Context) // holds file inputs @@ -191,7 +196,12 @@ func (fbc *FileBeatConfig) RenderOnChange(response graylog.ResponseCollectorConf if !fbc.Beats.Equals(newConfig.Beats) { log.Infof("[%s] Configuration change detected, rewriting configuration file.", fbc.Name()) fbc.Beats.Update(newConfig.Beats) - fbc.RenderToFile() + err := fbc.RenderToFile() + if err != nil { + msg := fmt.Sprintf("[%s] Failed to write configuration file: %s", fbc.Name(), err) + fbc.SetStatus(backends.StatusError, msg) + log.Errorf("[%s] %s", fbc.Name(), msg) + } return true } diff --git a/backends/beats/winlogbeat/render.go b/backends/beats/winlogbeat/render.go index 0a398a6..e4a3d0d 100644 --- a/backends/beats/winlogbeat/render.go +++ b/backends/beats/winlogbeat/render.go @@ -60,7 +60,12 @@ func (wlbc *WinLogBeatConfig) Render() bytes.Buffer { return result } -func (wlbc *WinLogBeatConfig) RenderToFile() error { +func (wlbc *WinLogBeatConfig) RenderToString() string { + buffer := wlbc.Render() + return buffer.String() +} + +func (wlbc *WinLogBeatConfig) RenderToFile() (error) { stringConfig := wlbc.Render() err := common.CreatePathToFile(wlbc.Beats.UserConfig.ConfigurationPath) if err != nil { @@ -70,7 +75,7 @@ func (wlbc *WinLogBeatConfig) RenderToFile() error { return err } -func (wlbc *WinLogBeatConfig) RenderOnChange(response graylog.ResponseCollectorConfiguration) bool { +func (wlbc *WinLogBeatConfig) RenderOnChange(response graylog.ResponseCollectorConfiguration) (bool) { newConfig := NewCollectorConfig(wlbc.Beats.Context) // holds event inputs @@ -148,7 +153,12 @@ func (wlbc *WinLogBeatConfig) RenderOnChange(response graylog.ResponseCollectorC if !wlbc.Beats.Equals(newConfig.Beats) { log.Infof("[%s] Configuration change detected, rewriting configuration file.", wlbc.Name()) wlbc.Beats.Update(newConfig.Beats) - wlbc.RenderToFile() + err := wlbc.RenderToFile() + if err != nil { + msg := fmt.Sprintf("[%s] Failed to write configuration file: %s", wlbc.Name(), err) + wlbc.SetStatus(backends.StatusError, msg) + log.Errorf("[%s] %s", wlbc.Name(), msg) + } return true } diff --git a/backends/nxlog/render.go b/backends/nxlog/render.go index f0bbf6d..c80920c 100644 --- a/backends/nxlog/render.go +++ b/backends/nxlog/render.go @@ -17,6 +17,8 @@ package nxlog import ( "bytes" + "fmt" + "github.com/Graylog2/collector-sidecar/backends" "io/ioutil" "os/exec" "strconv" @@ -389,7 +391,11 @@ func (nxc *NxConfig) Render() []byte { return common.ConvertLineBreak(result.Bytes()) } -func (nxc *NxConfig) RenderToFile() error { +func (nxc *NxConfig) RenderToString() string { + return string(nxc.Render()) +} + +func (nxc *NxConfig) RenderToFile() (error) { stringConfig := nxc.Render() err := common.CreatePathToFile(nxc.UserConfig.ConfigurationPath) if err != nil { @@ -399,7 +405,7 @@ func (nxc *NxConfig) RenderToFile() error { return err } -func (nxc *NxConfig) RenderOnChange(json graylog.ResponseCollectorConfiguration) bool { +func (nxc *NxConfig) RenderOnChange(json graylog.ResponseCollectorConfiguration) (bool) { jsonConfig := NewCollectorConfig(nxc.Context) for _, output := range json.Outputs { @@ -448,7 +454,12 @@ func (nxc *NxConfig) RenderOnChange(json graylog.ResponseCollectorConfiguration) if !nxc.Equals(jsonConfig) { log.Infof("[%s] Configuration change detected, rewriting configuration file.", nxc.Name()) nxc.Update(jsonConfig) - nxc.RenderToFile() + err := nxc.RenderToFile() + if err != nil { + msg := fmt.Sprintf("[%s] Failed to write configuration file: %s", nxc.Name(), err) + nxc.SetStatus(backends.StatusError, msg) + log.Errorf("[%s] %s", nxc.Name(), msg) + } return true } diff --git a/backends/registry.go b/backends/registry.go index b12c8f2..eb7c363 100644 --- a/backends/registry.go +++ b/backends/registry.go @@ -20,8 +20,8 @@ import ( "github.com/Graylog2/collector-sidecar/api/graylog" "github.com/Graylog2/collector-sidecar/context" - "github.com/Graylog2/collector-sidecar/system" "github.com/Graylog2/collector-sidecar/logger" + "github.com/Graylog2/collector-sidecar/system" ) var ( @@ -37,6 +37,7 @@ type Backend interface { ExecPath() string ConfigurationPath() string ExecArgs() []string + RenderToString() string RenderOnChange(graylog.ResponseCollectorConfiguration) bool ValidateConfigurationFile() bool ValidatePreconditions() bool diff --git a/services/periodicals.go b/services/periodicals.go index 132b08d..76677c3 100644 --- a/services/periodicals.go +++ b/services/periodicals.go @@ -70,7 +70,8 @@ func checkForUpdateAndRestart(httpClient *http.Client, checksum string, context for name, runner := range daemon.Daemon.Runner { backend := backends.Store.GetBackend(name) - if backend.RenderOnChange(jsonConfig) { + changed := backend.RenderOnChange(jsonConfig) + if changed { if !backend.ValidateConfigurationFile() { backends.SetStatusLogErrorf(name, "Collector configuration file is not valid, waiting for the next update.") continue @@ -81,7 +82,6 @@ func checkForUpdateAndRestart(httpClient *http.Client, checksum string, context backend.SetStatus(backends.StatusError, msg) log.Errorf("[%s] %s: %v", name, msg, err) } - } }