Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Collect process (beats) stdout and stderr #455

Merged
merged 10 commits into from
May 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@
- Add fleet-server to output of elastic-agent inspect output command (and diagnostic bundle). {pull}243[243]
- Update API calls that the agent makes to Kibana when running the container command. {pull}253[253]
- diagnostics collect log names are fixed on Windows machines, command will ignore failures. AgentID is included in diagnostics(and diagnostics collect) output. {issue}81[81] {issue}92[92] {issue}190[190] {pull}262[262]
- Collects stdout and stderr of applications run as a process and logs them. {issue}[88]

==== New features

Expand Down
4 changes: 2 additions & 2 deletions internal/pkg/agent/control/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"context"
"encoding/json"
"fmt"

"sync"
"time"

Expand Down Expand Up @@ -44,7 +43,8 @@ type Version struct {
Snapshot bool
}

// ApplicationStatus is a status of an application inside of Elastic Agent.
// ApplicationStatus is a status of an application managed by the Elastic Agent.
// TODO(Anderson): Implement sort.Interface and sort it.
type ApplicationStatus struct {
ID string
Name string
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/core/plugin/process/configure.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (a *Application) Configure(ctx context.Context, config map[string]interface
a.appLock.Unlock()
a.Stop()
err = a.Start(ctx, a.desc, config)
// lock back so it wont panic on deferred unlock
// lock back so it won't panic on deferred unlock
a.appLock.Lock()
}

Expand Down
6 changes: 5 additions & 1 deletion internal/pkg/core/plugin/process/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"context"
"fmt"
"io"
"os/exec"
"path/filepath"

"gopkg.in/yaml.v2"
Expand Down Expand Up @@ -132,7 +133,10 @@ func (a *Application) start(ctx context.Context, t app.Taggable, cfg map[string]
a.processConfig,
a.uid,
a.gid,
spec.Args)
spec.Args, func(c *exec.Cmd) {
c.Stdout = newLoggerWriter(a.Name(), logStdOut, a.logger)
c.Stderr = newLoggerWriter(a.Name(), logStdErr, a.logger)
})
if err != nil {
return fmt.Errorf("%q failed to start %q: %w",
a.Name(), spec.BinaryPath, err)
Expand Down
52 changes: 52 additions & 0 deletions internal/pkg/core/plugin/process/stdlogger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package process

import (
"github.com/elastic/elastic-agent/pkg/core/logger"
)

type logStd int

const (
agentConsoleName = "agent.console.name"
agentConsoleType = "agent.console.type"

logStdOut logStd = iota
logStdErr
)

func (l logStd) String() string {
switch l {
case logStdOut:
return "stdout"
case logStdErr:
return "stderr"
}

return "unknown"
}

type loggerWriter struct {
format string
logf func(format string, args ...interface{})
}

func newLoggerWriter(appName string, std logStd, log *logger.Logger) loggerWriter {
log = log.With(
agentConsoleName, appName,
agentConsoleType, std.String())

logf := log.Infof
if std == logStdErr {
logf = log.Errorf
}

return loggerWriter{
format: appName + " " + std.String() + ": %q",
logf: logf,
}
}

func (l loggerWriter) Write(p []byte) (n int, err error) {
l.logf(l.format, string(p))
return len(p), nil
}
66 changes: 66 additions & 0 deletions internal/pkg/core/plugin/process/stdlogger_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package process

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zapcore"

"github.com/elastic/elastic-agent/pkg/core/logger"
)

func Test_loggerWriter(t *testing.T) {
tc := []struct {
name string
args struct {
appName string
logTo logStd
}
logMsg string
logLevel zapcore.Level
}{
{
name: "capture stdout",
args: struct {
appName string
logTo logStd
}{
appName: "somebeats",
logTo: logStdOut,
},
logMsg: "stdout log",
logLevel: zapcore.InfoLevel,
},
{
name: "capture stderr",
args: struct {
appName string
logTo logStd
}{
appName: "somebeats",
logTo: logStdErr,
},
logMsg: "stderr log",
logLevel: zapcore.ErrorLevel,
},
}

for _, tt := range tc {
logg, obs := logger.NewTesting("test-loggerWriter")
logg = logg.With("previous-field", "previous-value")

l := newLoggerWriter(tt.args.appName, tt.args.logTo, logg)
_, _ = l.Write([]byte(tt.logMsg))

logs := obs.All()
require.Equal(t, 1, len(logs))

log := logs[0]
assert.Equal(t, log.Level, tt.logLevel)
assert.Contains(t, log.Message, tt.logMsg)
assert.Equal(t, log.ContextMap()[agentConsoleName], tt.args.appName)
assert.Equal(t, log.ContextMap()[agentConsoleType], tt.args.logTo.String())
assert.Equal(t, log.ContextMap()["previous-field"], "previous-value")
}
}
11 changes: 0 additions & 11 deletions pkg/core/logger/logger_test.go

This file was deleted.

21 changes: 21 additions & 0 deletions pkg/core/logger/testing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package logger

import (
"github.com/elastic/elastic-agent-libs/logp"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"go.uber.org/zap/zaptest/observer"
)

// NewTesting creates a testing logger that buffers the logs in memory and
// logs in debug level. Check observer.ObservedLogs for more details.
func NewTesting(name string) (*Logger, *observer.ObservedLogs) {
core, obs := observer.New(zapcore.DebugLevel)

logger := logp.NewLogger(
name,
zap.WrapCore(func(in zapcore.Core) zapcore.Core {
return zapcore.NewTee(in, core)
}))
return logger, obs
}