Skip to content

Commit

Permalink
[Elastic Agent] Send Agent logs to elasticsearch (#19811)
Browse files Browse the repository at this point in the history
* Work on logging twice.

* Work on agent logging to fleet.

* Commit example index strategy.

* More work on logging to ES.

* Revert change to release/version.go

* Fix indexes for metricbeat sidecars.

* Add to changelog.

* Fix fmt.

* Don't expose zapLevel, add ConfigureWithOutputs.

* Update comment.

* Update comment.
  • Loading branch information
blakerouse authored Jul 14, 2020
1 parent 7514680 commit 2297636
Show file tree
Hide file tree
Showing 10 changed files with 230 additions and 75 deletions.
17 changes: 17 additions & 0 deletions libbeat/logp/configure/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"fmt"
"strings"

"go.uber.org/zap/zapcore"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"
)
Expand Down Expand Up @@ -58,6 +60,21 @@ func Logging(beatName string, cfg *common.Config) error {
return logp.Configure(config)
}

// Logging builds a logp.Config based on the given common.Config and the specified
// CLI flags along with the given outputs.
func LoggingWithOutputs(beatName string, cfg *common.Config, outputs ...zapcore.Core) error {
config := logp.DefaultConfig(environment)
config.Beat = beatName
if cfg != nil {
if err := cfg.Unpack(&config); err != nil {
return err
}
}

applyFlags(&config)
return logp.ConfigureWithOutputs(config, outputs...)
}

func applyFlags(cfg *logp.Config) {
if toStderr {
cfg.ToStderr = true
Expand Down
69 changes: 69 additions & 0 deletions libbeat/logp/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"sync/atomic"
"unsafe"

"github.com/hashicorp/go-multierror"

"github.com/pkg/errors"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
Expand Down Expand Up @@ -62,6 +64,13 @@ type coreLogger struct {

// Configure configures the logp package.
func Configure(cfg Config) error {
return ConfigureWithOutputs(cfg)
}

// XXX: ConfigureWithOutputs is used by elastic-agent only (See file: x-pack/elastic-agent/pkg/core/logger/logger.go).
// The agent requires that the output specified in the config object is configured and merged with the
// logging outputs given.
func ConfigureWithOutputs(cfg Config, outputs ...zapcore.Core) error {
var (
sink zapcore.Core
observedLogs *observer.ObservedLogs
Expand Down Expand Up @@ -105,6 +114,7 @@ func Configure(cfg Config) error {
sink = selectiveWrapper(sink, selectors)
}

sink = newMultiCore(append(outputs, sink)...)
root := zap.New(sink, makeOptions(cfg)...)
storeLogger(&coreLogger{
selectors: selectors,
Expand Down Expand Up @@ -262,3 +272,62 @@ func storeLogger(l *coreLogger) {
}
atomic.StorePointer(&_log, unsafe.Pointer(l))
}

// newMultiCore creates a sink that sends to multiple cores.
func newMultiCore(cores ...zapcore.Core) zapcore.Core {
return &multiCore{cores}
}

// multiCore allows multiple cores to be used for logging.
type multiCore struct {
cores []zapcore.Core
}

// Enabled returns true if the level is enabled in any one of the cores.
func (m multiCore) Enabled(level zapcore.Level) bool {
for _, core := range m.cores {
if core.Enabled(level) {
return true
}
}
return false
}

// With creates a new multiCore with each core set with the given fields.
func (m multiCore) With(fields []zapcore.Field) zapcore.Core {
cores := make([]zapcore.Core, len(m.cores))
for i, core := range m.cores {
cores[i] = core.With(fields)
}
return &multiCore{cores}
}

// Check will place each core that checks for that entry.
func (m multiCore) Check(entry zapcore.Entry, checked *zapcore.CheckedEntry) *zapcore.CheckedEntry {
for _, core := range m.cores {
checked = core.Check(entry, checked)
}
return checked
}

// Write writes the entry to each core.
func (m multiCore) Write(entry zapcore.Entry, fields []zapcore.Field) error {
var errs error
for _, core := range m.cores {
if err := core.Write(entry, fields); err != nil {
errs = multierror.Append(errs, err)
}
}
return errs
}

// Sync syncs each core.
func (m multiCore) Sync() error {
var errs error
for _, core := range m.cores {
if err := core.Sync(); err != nil {
errs = multierror.Append(errs, err)
}
}
return errs
}
14 changes: 7 additions & 7 deletions libbeat/logp/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,13 @@ func buildEncoder(cfg Config) zapcore.Encoder {
var encCfg zapcore.EncoderConfig
var encCreator encoderCreator
if cfg.JSON {
encCfg = jsonEncoderConfig()
encCfg = JSONEncoderConfig()
encCreator = zapcore.NewJSONEncoder
} else if cfg.ToSyslog {
encCfg = syslogEncoderConfig()
encCfg = SyslogEncoderConfig()
encCreator = zapcore.NewConsoleEncoder
} else {
encCfg = consoleEncoderConfig()
encCfg = ConsoleEncoderConfig()
encCreator = zapcore.NewConsoleEncoder
}

Expand All @@ -60,19 +60,19 @@ func buildEncoder(cfg Config) zapcore.Encoder {
return encCreator(encCfg)
}

func jsonEncoderConfig() zapcore.EncoderConfig {
func JSONEncoderConfig() zapcore.EncoderConfig {
return baseEncodingConfig
}

func consoleEncoderConfig() zapcore.EncoderConfig {
func ConsoleEncoderConfig() zapcore.EncoderConfig {
c := baseEncodingConfig
c.EncodeLevel = zapcore.CapitalLevelEncoder
c.EncodeName = bracketedNameEncoder
return c
}

func syslogEncoderConfig() zapcore.EncoderConfig {
c := consoleEncoderConfig()
func SyslogEncoderConfig() zapcore.EncoderConfig {
c := ConsoleEncoderConfig()
// Time is generally added by syslog.
// But when logging with ECS the empty TimeKey will be
// ignored and @timestamp is still added to log line
Expand Down
1 change: 1 addition & 0 deletions x-pack/elastic-agent/CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,4 @@
- Refuse invalid stream values in configuration {pull}19587[19587]
- Agent now load balances across multiple Kibana instances {pull}19628[19628]
- Configuration cleanup {pull}19848[19848]
- Agent now sends its own logs to elasticsearch {pull}19811[19811]
10 changes: 8 additions & 2 deletions x-pack/elastic-agent/pkg/agent/application/paths/paths.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
var (
homePath string
dataPath string
logsPath string
)

func init() {
Expand All @@ -21,6 +22,7 @@ func init() {
fs := flag.CommandLine
fs.StringVar(&homePath, "path.home", exePath, "Agent root path")
fs.StringVar(&dataPath, "path.data", filepath.Join(exePath, "data"), "Data path contains Agent managed binaries")
fs.StringVar(&logsPath, "path.logs", exePath, "Logs path contains Agent log output")
}

// Home returns a directory where binary lives
Expand All @@ -29,13 +31,17 @@ func Home() string {
return homePath
}

// Data returns a home directory of current user
// Data returns the data directory for Agent
func Data() string {
return dataPath
}

func retrieveExecutablePath() string {
// Logs returns a the log directory for Agent
func Logs() string {
return logsPath
}

func retrieveExecutablePath() string {
execPath, err := os.Executable()
if err != nil {
panic(err)
Expand Down
1 change: 1 addition & 0 deletions x-pack/elastic-agent/pkg/agent/cmd/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func NewCommandWithArgs(args []string, streams *cli.IOStreams) *cobra.Command {

cmd.PersistentFlags().AddGoFlag(flag.CommandLine.Lookup("path.home"))
cmd.PersistentFlags().AddGoFlag(flag.CommandLine.Lookup("path.data"))
cmd.PersistentFlags().AddGoFlag(flag.CommandLine.Lookup("path.logs"))

cmd.PersistentFlags().StringVarP(&flags.PathConfigFile, "", "c", defaultConfig, fmt.Sprintf(`Configuration file, relative to path.config (default "%s")`, defaultConfig))
cmd.PersistentFlags().StringVarP(&flags.PathConfig, "path.config", "", "${path.home}", "Configuration path")
Expand Down
Loading

0 comments on commit 2297636

Please sign in to comment.