Skip to content

Commit

Permalink
[Elastic Agent] Send Agent logs to elasticsearch (elastic#19811)
Browse files Browse the repository at this point in the history
* Work on logging twice.

* Work on agent logging to fleet.

* Commit example index strategy.

* More work on logging to ES.

* Revert change to release/version.go

* Fix indexes for metricbeat sidecars.

* Add to changelog.

* Fix fmt.

* Don't expose zapLevel, add ConfigureWithOutputs.

* Update comment.

* Update comment.
blakerouse authored and melchiormoulin committed Oct 14, 2020

Verified

This commit was signed with the committer’s verified signature. The key has expired.
phi-gamma Philipp Gesang
1 parent 98464c9 commit b77c9b1
Showing 10 changed files with 230 additions and 75 deletions.
17 changes: 17 additions & 0 deletions libbeat/logp/configure/logging.go
Original file line number Diff line number Diff line change
@@ -22,6 +22,8 @@ import (
"fmt"
"strings"

"go.uber.org/zap/zapcore"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"
)
@@ -58,6 +60,21 @@ func Logging(beatName string, cfg *common.Config) error {
return logp.Configure(config)
}

// Logging builds a logp.Config based on the given common.Config and the specified
// CLI flags along with the given outputs.
func LoggingWithOutputs(beatName string, cfg *common.Config, outputs ...zapcore.Core) error {
config := logp.DefaultConfig(environment)
config.Beat = beatName
if cfg != nil {
if err := cfg.Unpack(&config); err != nil {
return err
}
}

applyFlags(&config)
return logp.ConfigureWithOutputs(config, outputs...)
}

func applyFlags(cfg *logp.Config) {
if toStderr {
cfg.ToStderr = true
69 changes: 69 additions & 0 deletions libbeat/logp/core.go
Original file line number Diff line number Diff line change
@@ -27,6 +27,8 @@ import (
"sync/atomic"
"unsafe"

"github.com/hashicorp/go-multierror"

"github.com/pkg/errors"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
@@ -62,6 +64,13 @@ type coreLogger struct {

// Configure configures the logp package.
func Configure(cfg Config) error {
return ConfigureWithOutputs(cfg)
}

// XXX: ConfigureWithOutputs is used by elastic-agent only (See file: x-pack/elastic-agent/pkg/core/logger/logger.go).
// The agent requires that the output specified in the config object is configured and merged with the
// logging outputs given.
func ConfigureWithOutputs(cfg Config, outputs ...zapcore.Core) error {
var (
sink zapcore.Core
observedLogs *observer.ObservedLogs
@@ -105,6 +114,7 @@ func Configure(cfg Config) error {
sink = selectiveWrapper(sink, selectors)
}

sink = newMultiCore(append(outputs, sink)...)
root := zap.New(sink, makeOptions(cfg)...)
storeLogger(&coreLogger{
selectors: selectors,
@@ -262,3 +272,62 @@ func storeLogger(l *coreLogger) {
}
atomic.StorePointer(&_log, unsafe.Pointer(l))
}

// newMultiCore creates a sink that sends to multiple cores.
func newMultiCore(cores ...zapcore.Core) zapcore.Core {
return &multiCore{cores}
}

// multiCore allows multiple cores to be used for logging.
type multiCore struct {
cores []zapcore.Core
}

// Enabled returns true if the level is enabled in any one of the cores.
func (m multiCore) Enabled(level zapcore.Level) bool {
for _, core := range m.cores {
if core.Enabled(level) {
return true
}
}
return false
}

// With creates a new multiCore with each core set with the given fields.
func (m multiCore) With(fields []zapcore.Field) zapcore.Core {
cores := make([]zapcore.Core, len(m.cores))
for i, core := range m.cores {
cores[i] = core.With(fields)
}
return &multiCore{cores}
}

// Check will place each core that checks for that entry.
func (m multiCore) Check(entry zapcore.Entry, checked *zapcore.CheckedEntry) *zapcore.CheckedEntry {
for _, core := range m.cores {
checked = core.Check(entry, checked)
}
return checked
}

// Write writes the entry to each core.
func (m multiCore) Write(entry zapcore.Entry, fields []zapcore.Field) error {
var errs error
for _, core := range m.cores {
if err := core.Write(entry, fields); err != nil {
errs = multierror.Append(errs, err)
}
}
return errs
}

// Sync syncs each core.
func (m multiCore) Sync() error {
var errs error
for _, core := range m.cores {
if err := core.Sync(); err != nil {
errs = multierror.Append(errs, err)
}
}
return errs
}
14 changes: 7 additions & 7 deletions libbeat/logp/encoding.go
Original file line number Diff line number Diff line change
@@ -44,13 +44,13 @@ func buildEncoder(cfg Config) zapcore.Encoder {
var encCfg zapcore.EncoderConfig
var encCreator encoderCreator
if cfg.JSON {
encCfg = jsonEncoderConfig()
encCfg = JSONEncoderConfig()
encCreator = zapcore.NewJSONEncoder
} else if cfg.ToSyslog {
encCfg = syslogEncoderConfig()
encCfg = SyslogEncoderConfig()
encCreator = zapcore.NewConsoleEncoder
} else {
encCfg = consoleEncoderConfig()
encCfg = ConsoleEncoderConfig()
encCreator = zapcore.NewConsoleEncoder
}

@@ -60,19 +60,19 @@ func buildEncoder(cfg Config) zapcore.Encoder {
return encCreator(encCfg)
}

func jsonEncoderConfig() zapcore.EncoderConfig {
func JSONEncoderConfig() zapcore.EncoderConfig {
return baseEncodingConfig
}

func consoleEncoderConfig() zapcore.EncoderConfig {
func ConsoleEncoderConfig() zapcore.EncoderConfig {
c := baseEncodingConfig
c.EncodeLevel = zapcore.CapitalLevelEncoder
c.EncodeName = bracketedNameEncoder
return c
}

func syslogEncoderConfig() zapcore.EncoderConfig {
c := consoleEncoderConfig()
func SyslogEncoderConfig() zapcore.EncoderConfig {
c := ConsoleEncoderConfig()
// Time is generally added by syslog.
// But when logging with ECS the empty TimeKey will be
// ignored and @timestamp is still added to log line
1 change: 1 addition & 0 deletions x-pack/elastic-agent/CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
@@ -82,3 +82,4 @@
- Refuse invalid stream values in configuration {pull}19587[19587]
- Agent now load balances across multiple Kibana instances {pull}19628[19628]
- Configuration cleanup {pull}19848[19848]
- Agent now sends its own logs to elasticsearch {pull}19811[19811]
10 changes: 8 additions & 2 deletions x-pack/elastic-agent/pkg/agent/application/paths/paths.go
Original file line number Diff line number Diff line change
@@ -13,6 +13,7 @@ import (
var (
homePath string
dataPath string
logsPath string
)

func init() {
@@ -21,6 +22,7 @@ func init() {
fs := flag.CommandLine
fs.StringVar(&homePath, "path.home", exePath, "Agent root path")
fs.StringVar(&dataPath, "path.data", filepath.Join(exePath, "data"), "Data path contains Agent managed binaries")
fs.StringVar(&logsPath, "path.logs", exePath, "Logs path contains Agent log output")
}

// Home returns a directory where binary lives
@@ -29,13 +31,17 @@ func Home() string {
return homePath
}

// Data returns a home directory of current user
// Data returns the data directory for Agent
func Data() string {
return dataPath
}

func retrieveExecutablePath() string {
// Logs returns a the log directory for Agent
func Logs() string {
return logsPath
}

func retrieveExecutablePath() string {
execPath, err := os.Executable()
if err != nil {
panic(err)
1 change: 1 addition & 0 deletions x-pack/elastic-agent/pkg/agent/cmd/common.go
Original file line number Diff line number Diff line change
@@ -52,6 +52,7 @@ func NewCommandWithArgs(args []string, streams *cli.IOStreams) *cobra.Command {

cmd.PersistentFlags().AddGoFlag(flag.CommandLine.Lookup("path.home"))
cmd.PersistentFlags().AddGoFlag(flag.CommandLine.Lookup("path.data"))
cmd.PersistentFlags().AddGoFlag(flag.CommandLine.Lookup("path.logs"))

cmd.PersistentFlags().StringVarP(&flags.PathConfigFile, "", "c", defaultConfig, fmt.Sprintf(`Configuration file, relative to path.config (default "%s")`, defaultConfig))
cmd.PersistentFlags().StringVarP(&flags.PathConfig, "path.config", "", "${path.home}", "Configuration path")
143 changes: 85 additions & 58 deletions x-pack/elastic-agent/pkg/agent/operation/monitoring.go
Original file line number Diff line number Diff line change
@@ -6,22 +6,23 @@ package operation

import (
"fmt"
"path/filepath"

"github.com/hashicorp/go-multierror"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/paths"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configrequest"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/program"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/app"
)

const (
monitoringName = "FLEET_MONITORING"
outputKey = "output"
monitoringEnabledSubkey = "enabled"
logsProcessName = "filebeat"
metricsProcessName = "metricbeat"
artifactPrefix = "beats"
monitoringName = "FLEET_MONITORING"
outputKey = "output"
logsProcessName = "filebeat"
metricsProcessName = "metricbeat"
artifactPrefix = "beats"
)

func (o *Operator) handleStartSidecar(s configrequest.Step) (result error) {
@@ -174,37 +175,62 @@ func (o *Operator) generateMonitoringSteps(version string, output interface{}) [
}

func (o *Operator) getMonitoringFilebeatConfig(output interface{}) (map[string]interface{}, bool) {
paths := o.getLogFilePaths()
if len(paths) == 0 {
return nil, false
}

result := map[string]interface{}{
"filebeat": map[string]interface{}{
"inputs": []interface{}{
map[string]interface{}{
"type": "log",
"multiline": map[string]interface{}{
"pattern": "^[0-9]{4}",
"negate": true,
"match": "after",
inputs := []interface{}{
map[string]interface{}{
"type": "log",
"json": map[string]interface{}{
"keys_under_root": true,
"overwrite_keys": true,
"message_key": "message",
},
"paths": []string{
filepath.Join(paths.Data(), "logs", "elastic-agent-json.log"),
},
"index": "logs-elastic.agent-default",
"processors": []map[string]interface{}{
{
"add_fields": map[string]interface{}{
"target": "dataset",
"fields": map[string]interface{}{
"type": "logs",
"name": "elastic.agent",
"namespace": "default",
},
},
"paths": paths,
"index": "logs-agent-default",
"processors": []map[string]interface{}{
{
"add_fields": map[string]interface{}{
"target": "dataset",
"fields": map[string]interface{}{
"type": "logs",
"name": "agent",
"namespace": "default",
},
},
},
},
}
logPaths := o.getLogFilePaths()
if len(logPaths) > 0 {
for name, paths := range logPaths {
inputs = append(inputs, map[string]interface{}{
"type": "log",
"json": map[string]interface{}{
"keys_under_root": true,
"overwrite_keys": true,
"message_key": "message",
},
"paths": paths,
"index": fmt.Sprintf("logs-elastic.agent.%s-default", name),
"processors": []map[string]interface{}{
{
"add_fields": map[string]interface{}{
"target": "dataset",
"fields": map[string]interface{}{
"type": "logs",
"name": fmt.Sprintf("elastic.agent.%s", name),
"namespace": "default",
},
},
},
},
},
})
}
}
result := map[string]interface{}{
"filebeat": map[string]interface{}{
"inputs": inputs,
},
"output": map[string]interface{}{
"elasticsearch": output,
@@ -221,30 +247,31 @@ func (o *Operator) getMonitoringMetricbeatConfig(output interface{}) (map[string
if len(hosts) == 0 {
return nil, false
}

result := map[string]interface{}{
"metricbeat": map[string]interface{}{
"modules": []interface{}{
map[string]interface{}{
"module": "beat",
"metricsets": []string{"stats", "state"},
"period": "10s",
"hosts": hosts,
"index": "metrics-agent-default",
"processors": []map[string]interface{}{
{
"add_fields": map[string]interface{}{
"target": "dataset",
"fields": map[string]interface{}{
"type": "metrics",
"name": "agent",
"namespace": "default",
},
},
var modules []interface{}
for name, endpoints := range hosts {
modules = append(modules, map[string]interface{}{
"module": "beat",
"metricsets": []string{"stats", "state"},
"period": "10s",
"hosts": endpoints,
"index": fmt.Sprintf("metrics-elastic.agent.%s-default", name),
"processors": []map[string]interface{}{
{
"add_fields": map[string]interface{}{
"target": "dataset",
"fields": map[string]interface{}{
"type": "metrics",
"name": fmt.Sprintf("elastic.agent.%s", name),
"namespace": "default",
},
},
},
},
})
}
result := map[string]interface{}{
"metricbeat": map[string]interface{}{
"modules": modules,
},
"output": map[string]interface{}{
"elasticsearch": output,
@@ -256,32 +283,32 @@ func (o *Operator) getMonitoringMetricbeatConfig(output interface{}) (map[string
return result, true
}

func (o *Operator) getLogFilePaths() []string {
var paths []string
func (o *Operator) getLogFilePaths() map[string][]string {
paths := map[string][]string{}

o.appsLock.Lock()
defer o.appsLock.Unlock()

for _, a := range o.apps {
logPath := a.Monitor().LogPath(a.Name(), o.pipelineID)
if logPath != "" {
paths = append(paths, logPath)
paths[a.Name()] = append(paths[a.Name()], logPath)
}
}

return paths
}

func (o *Operator) getMetricbeatEndpoints() []string {
var endpoints []string
func (o *Operator) getMetricbeatEndpoints() map[string][]string {
endpoints := map[string][]string{}

o.appsLock.Lock()
defer o.appsLock.Unlock()

for _, a := range o.apps {
metricEndpoint := a.Monitor().MetricsPathPrefixed(a.Name(), o.pipelineID)
if metricEndpoint != "" {
endpoints = append(endpoints, metricEndpoint)
endpoints[a.Name()] = append(endpoints[a.Name()], metricEndpoint)
}
}

42 changes: 36 additions & 6 deletions x-pack/elastic-agent/pkg/core/logger/logger.go
Original file line number Diff line number Diff line change
@@ -6,11 +6,15 @@ package logger

import (
"fmt"
"os"
"path/filepath"

"go.elastic.co/ecszap"
"go.uber.org/zap/zapcore"
"gopkg.in/yaml.v2"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/file"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/logp/configure"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/paths"
@@ -50,11 +54,13 @@ func new(name string, cfg *Config) (*Logger, error) {
if err != nil {
return nil, err
}

if err := configure.Logging("", commonCfg); err != nil {
internal, err := makeInternalFileOutput()
if err != nil {
return nil, err
}
if err := configure.LoggingWithOutputs("", commonCfg, internal); err != nil {
return nil, fmt.Errorf("error initializing logging: %v", err)
}

return logp.NewLogger(name), nil
}

@@ -80,10 +86,34 @@ func toCommonConfig(cfg *Config) (*common.Config, error) {
func DefaultLoggingConfig() *Config {
cfg := logp.DefaultConfig(logp.DefaultEnvironment)
cfg.Beat = agentName
cfg.ECSEnabled = true
cfg.Level = logp.DebugLevel
cfg.Files.Path = filepath.Join(paths.Home(), "data", "logs")
cfg.Files.Name = agentName
cfg.Files.Path = paths.Logs()
cfg.Files.Name = fmt.Sprintf("%s.log", agentName)

return &cfg
}

// makeInternalFileOutput creates a zapcore.Core logger that cannot be changed with configuration.
//
// This is the logger that the spawned filebeat expects to read the log file from and ship to ES.
func makeInternalFileOutput() (zapcore.Core, error) {
// defaultCfg is used to set the defaults for the file rotation of the internal logging
// these settings cannot be changed by a user configuration
defaultCfg := logp.DefaultConfig(logp.DefaultEnvironment)
filename := filepath.Join(paths.Data(), "logs", fmt.Sprintf("%s-json.log", agentName))

rotator, err := file.NewFileRotator(filename,
file.MaxSizeBytes(defaultCfg.Files.MaxSize),
file.MaxBackups(defaultCfg.Files.MaxBackups),
file.Permissions(os.FileMode(defaultCfg.Files.Permissions)),
file.Interval(defaultCfg.Files.Interval),
file.RotateOnStartup(defaultCfg.Files.RotateOnStartup),
file.RedirectStderr(defaultCfg.Files.RedirectStderr),
)
if err != nil {
return nil, errors.New("failed to create internal file rotator")
}

encoder := zapcore.NewJSONEncoder(ecszap.ECSCompatibleEncoderConfig(logp.JSONEncoderConfig()))
return ecszap.WrapCore(zapcore.NewCore(encoder, rotator, zapcore.DebugLevel)), nil
}
Original file line number Diff line number Diff line change
@@ -5,6 +5,7 @@
package beats

import (
"fmt"
"net/url"
"os"
"path/filepath"
@@ -109,7 +110,10 @@ func (b *Monitor) EnrichArgs(process, pipelineID string, args []string, isSideca
if isSidecar {
logFile += "_monitor"
}
logFile = fmt.Sprintf("%s-json.log", logFile)
appendix = append(appendix,
"-E", "logging.json=true",
"-E", "logging.ecs=true",
"-E", "logging.files.path="+loggingPath,
"-E", "logging.files.name="+logFile,
"-E", "logging.files.keepfiles=7",
4 changes: 2 additions & 2 deletions x-pack/elastic-agent/pkg/core/monitoring/beats/monitoring.go
Original file line number Diff line number Diff line change
@@ -12,9 +12,9 @@ import (

const (
// args: data path, pipeline name, application name
logFileFormat = "%s/logs/%s/%s"
logFileFormat = "%s/logs/%s/%s-json.log"
// args: data path, install path, pipeline name, application name
logFileFormatWin = "%s\\logs\\%s\\%s"
logFileFormatWin = "%s\\logs\\%s\\%s-json.log"

// args: pipeline name, application name
mbEndpointFileFormat = "unix:///tmp/elastic-agent/%s/%s/%s.sock"

0 comments on commit b77c9b1

Please sign in to comment.