Skip to content

Commit

Permalink
Configuration upload (#315)
Browse files Browse the repository at this point in the history
* Add periodical for uploading rendered configurations back to the server

* Make use of collector upload api call

* Implement configuration upload as user action

* Update Travis Go version

* Use Xenial as base system

* Fix render tests

* Notifu user about failed configuration uploads

* Clean up change history
  • Loading branch information
Marius Sturm authored and mpfz0r committed Dec 21, 2018
1 parent 56a5e25 commit 602a7cb
Show file tree
Hide file tree
Showing 9 changed files with 101 additions and 17 deletions.
3 changes: 2 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
sudo: false
dist: xenial
language: go
go:
- 1.6
- 1.11.x

script: make deps test
env:
Expand Down
30 changes: 27 additions & 3 deletions daemon/action_handler.go → api/action_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,50 @@
// You should have received a copy of the GNU General Public License
// along with Graylog. If not, see <http://www.gnu.org/licenses/>.

package daemon
package api

import (
"github.com/Graylog2/collector-sidecar/api/graylog"
"github.com/Graylog2/collector-sidecar/api/rest"
"github.com/Graylog2/collector-sidecar/backends"
"github.com/Graylog2/collector-sidecar/context"
"github.com/Graylog2/collector-sidecar/daemon"
)

func HandleCollectorActions(actions []graylog.ResponseCollectorAction) {
func HandleCollectorActions(actions []graylog.ResponseCollectorAction, ctx *context.Ctx) {
for _, action := range actions {
switch {
case action.Properties["restart"] == true:
restartAction(action)
case action.Properties["import"] == true:
configurationImportAction(action, ctx)
}
}
}

func restartAction(action graylog.ResponseCollectorAction) {
for name, runner := range Daemon.Runner {
for name, runner := range daemon.Daemon.Runner {
if name == action.Backend {
log.Infof("[%s] Executing requested collector restart", name)
runner.Restart()
}
}
}

func configurationImportAction(action graylog.ResponseCollectorAction, ctx *context.Ctx) {
for name := range daemon.Daemon.Runner {
if name == action.Backend {
log.Infof("[%s] Sending configuration to Graylog server", name)
backend := backends.Store.GetBackend(name)
renderedConfiguration := backend.RenderToString()
httpClient := rest.NewHTTPClient(GetTlsConfig(ctx))
UploadConfiguration(httpClient, ctx,
&graylog.CollectorUpload{
CollectorId: ctx.CollectorId,
NodeId: ctx.NodeId,
CollectorName: backend.Name(),
RenderedConfiguration: renderedConfiguration})

}
}
}
22 changes: 21 additions & 1 deletion api/graylog.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,27 @@ func UpdateRegistration(httpClient *http.Client, ctx *context.Ctx, status *grayl

// Run collector actions if provided
if len(respBody.CollectorActions) != 0 {
daemon.HandleCollectorActions(respBody.CollectorActions)
HandleCollectorActions(respBody.CollectorActions, ctx)
}
}

func UploadConfiguration(httpClient *http.Client, ctx *context.Ctx, payload *graylog.CollectorUpload) {
c := rest.NewClient(httpClient)
c.BaseURL = ctx.ServerUrl

r, err := c.NewRequest("PUT", "/plugins/org.graylog.plugins.collector/collectors/"+ctx.CollectorId+"/configuration", nil, payload)
if err != nil {
log.Error("[UploadConfiguration] Can not initialize REST request")
return
}

resp, err := c.Do(r, nil)
if resp != nil && resp.StatusCode == 404 {
log.Error("[UploadConfiguration] Can't upload rendered configuration, please update your Graylog server to the latest version.")
} else if resp != nil && resp.StatusCode != 202 {
log.Errorf("[UploadConfiguration] Bad response from Graylog server: %v", resp.Body)
} else if err != nil {
log.Error("[UploadConfiguration] Failed to upload collector configuration to server: ", err)
}
}

Expand Down
7 changes: 7 additions & 0 deletions api/graylog/requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,10 @@ type MetricsRequest struct {
CpuIdle float64 `json:"cpu_idle"`
Load1 float64 `json:"load_1"`
}

type CollectorUpload struct {
CollectorId string `json:"collector_id"`
NodeId string `json:"node_id"`
CollectorName string `json:"collector_name"`
RenderedConfiguration string `json:"rendered_configuration"`
}
16 changes: 13 additions & 3 deletions backends/beats/filebeat/render.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,12 @@ func (fbc *FileBeatConfig) Render() bytes.Buffer {
return result
}

func (fbc *FileBeatConfig) RenderToFile() error {
func (fbc *FileBeatConfig) RenderToString() string {
buffer := fbc.Render()
return buffer.String()
}

func (fbc *FileBeatConfig) RenderToFile() (error) {
stringConfig := fbc.Render()
err := common.CreatePathToFile(fbc.Beats.UserConfig.ConfigurationPath)
if err != nil {
Expand All @@ -70,7 +75,7 @@ func (fbc *FileBeatConfig) RenderToFile() error {
return err
}

func (fbc *FileBeatConfig) RenderOnChange(response graylog.ResponseCollectorConfiguration) bool {
func (fbc *FileBeatConfig) RenderOnChange(response graylog.ResponseCollectorConfiguration) (bool) {
newConfig := NewCollectorConfig(fbc.Beats.Context)

// holds file inputs
Expand Down Expand Up @@ -191,7 +196,12 @@ func (fbc *FileBeatConfig) RenderOnChange(response graylog.ResponseCollectorConf
if !fbc.Beats.Equals(newConfig.Beats) {
log.Infof("[%s] Configuration change detected, rewriting configuration file.", fbc.Name())
fbc.Beats.Update(newConfig.Beats)
fbc.RenderToFile()
err := fbc.RenderToFile()
if err != nil {
msg := fmt.Sprintf("[%s] Failed to write configuration file: %s", fbc.Name(), err)
fbc.SetStatus(backends.StatusError, msg)
log.Errorf("[%s] %s", fbc.Name(), msg)
}
return true
}

Expand Down
16 changes: 13 additions & 3 deletions backends/beats/winlogbeat/render.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,12 @@ func (wlbc *WinLogBeatConfig) Render() bytes.Buffer {
return result
}

func (wlbc *WinLogBeatConfig) RenderToFile() error {
func (wlbc *WinLogBeatConfig) RenderToString() string {
buffer := wlbc.Render()
return buffer.String()
}

func (wlbc *WinLogBeatConfig) RenderToFile() (error) {
stringConfig := wlbc.Render()
err := common.CreatePathToFile(wlbc.Beats.UserConfig.ConfigurationPath)
if err != nil {
Expand All @@ -70,7 +75,7 @@ func (wlbc *WinLogBeatConfig) RenderToFile() error {
return err
}

func (wlbc *WinLogBeatConfig) RenderOnChange(response graylog.ResponseCollectorConfiguration) bool {
func (wlbc *WinLogBeatConfig) RenderOnChange(response graylog.ResponseCollectorConfiguration) (bool) {
newConfig := NewCollectorConfig(wlbc.Beats.Context)

// holds event inputs
Expand Down Expand Up @@ -148,7 +153,12 @@ func (wlbc *WinLogBeatConfig) RenderOnChange(response graylog.ResponseCollectorC
if !wlbc.Beats.Equals(newConfig.Beats) {
log.Infof("[%s] Configuration change detected, rewriting configuration file.", wlbc.Name())
wlbc.Beats.Update(newConfig.Beats)
wlbc.RenderToFile()
err := wlbc.RenderToFile()
if err != nil {
msg := fmt.Sprintf("[%s] Failed to write configuration file: %s", wlbc.Name(), err)
wlbc.SetStatus(backends.StatusError, msg)
log.Errorf("[%s] %s", wlbc.Name(), msg)
}
return true
}

Expand Down
17 changes: 14 additions & 3 deletions backends/nxlog/render.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package nxlog

import (
"bytes"
"fmt"
"github.com/Graylog2/collector-sidecar/backends"
"io/ioutil"
"os/exec"
"strconv"
Expand Down Expand Up @@ -389,7 +391,11 @@ func (nxc *NxConfig) Render() []byte {
return common.ConvertLineBreak(result.Bytes())
}

func (nxc *NxConfig) RenderToFile() error {
func (nxc *NxConfig) RenderToString() string {
return string(nxc.Render())
}

func (nxc *NxConfig) RenderToFile() (error) {
stringConfig := nxc.Render()
err := common.CreatePathToFile(nxc.UserConfig.ConfigurationPath)
if err != nil {
Expand All @@ -399,7 +405,7 @@ func (nxc *NxConfig) RenderToFile() error {
return err
}

func (nxc *NxConfig) RenderOnChange(json graylog.ResponseCollectorConfiguration) bool {
func (nxc *NxConfig) RenderOnChange(json graylog.ResponseCollectorConfiguration) (bool) {
jsonConfig := NewCollectorConfig(nxc.Context)

for _, output := range json.Outputs {
Expand Down Expand Up @@ -448,7 +454,12 @@ func (nxc *NxConfig) RenderOnChange(json graylog.ResponseCollectorConfiguration)
if !nxc.Equals(jsonConfig) {
log.Infof("[%s] Configuration change detected, rewriting configuration file.", nxc.Name())
nxc.Update(jsonConfig)
nxc.RenderToFile()
err := nxc.RenderToFile()
if err != nil {
msg := fmt.Sprintf("[%s] Failed to write configuration file: %s", nxc.Name(), err)
nxc.SetStatus(backends.StatusError, msg)
log.Errorf("[%s] %s", nxc.Name(), msg)
}
return true
}

Expand Down
3 changes: 2 additions & 1 deletion backends/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import (

"github.com/Graylog2/collector-sidecar/api/graylog"
"github.com/Graylog2/collector-sidecar/context"
"github.com/Graylog2/collector-sidecar/system"
"github.com/Graylog2/collector-sidecar/logger"
"github.com/Graylog2/collector-sidecar/system"
)

var (
Expand All @@ -37,6 +37,7 @@ type Backend interface {
ExecPath() string
ConfigurationPath() string
ExecArgs() []string
RenderToString() string
RenderOnChange(graylog.ResponseCollectorConfiguration) bool
ValidateConfigurationFile() bool
ValidatePreconditions() bool
Expand Down
4 changes: 2 additions & 2 deletions services/periodicals.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ func checkForUpdateAndRestart(httpClient *http.Client, checksum string, context

for name, runner := range daemon.Daemon.Runner {
backend := backends.Store.GetBackend(name)
if backend.RenderOnChange(jsonConfig) {
changed := backend.RenderOnChange(jsonConfig)
if changed {
if !backend.ValidateConfigurationFile() {
backends.SetStatusLogErrorf(name, "Collector configuration file is not valid, waiting for the next update.")
continue
Expand All @@ -81,7 +82,6 @@ func checkForUpdateAndRestart(httpClient *http.Client, checksum string, context
backend.SetStatus(backends.StatusError, msg)
log.Errorf("[%s] %s: %v", name, msg, err)
}

}
}

Expand Down

0 comments on commit 602a7cb

Please sign in to comment.