Skip to content

Commit

Permalink
Ensure ECS compliant logging when enabled.
Browse files Browse the repository at this point in the history
If `logging.ecs` is set log data in ECS compliant way.

closes #3796
  • Loading branch information
simitt committed May 28, 2020
1 parent a93a1ef commit 30a0a7e
Show file tree
Hide file tree
Showing 6 changed files with 142 additions and 107 deletions.
4 changes: 2 additions & 2 deletions _meta/beat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1056,8 +1056,8 @@ output.elasticsearch:
# Set to true to log messages in json format.
#logging.json: false

# Set to true, to log messages with minimal required Elastic Common Schema (ECS)
# information. Recommended to use in combination with `logging.json=true`
# Set to true, to log messages in Elastic Common Schema (ECS) compliant format.
# Recommended to use in combination with `logging.json=true`
# Defaults to false.
#logging.ecs: false

Expand Down
4 changes: 2 additions & 2 deletions apm-server.docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1056,8 +1056,8 @@ output.elasticsearch:
# Set to true to log messages in json format.
#logging.json: false

# Set to true, to log messages with minimal required Elastic Common Schema (ECS)
# information. Recommended to use in combination with `logging.json=true`
# Set to true, to log messages in Elastic Common Schema (ECS) compliant format.
# Recommended to use in combination with `logging.json=true`
# Defaults to false.
#logging.ecs: false

Expand Down
4 changes: 2 additions & 2 deletions apm-server.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1056,8 +1056,8 @@ output.elasticsearch:
# Set to true to log messages in json format.
#logging.json: false

# Set to true, to log messages with minimal required Elastic Common Schema (ECS)
# information. Recommended to use in combination with `logging.json=true`
# Set to true, to log messages in Elastic Common Schema (ECS) compliant format.
# Recommended to use in combination with `logging.json=true`
# Defaults to false.
#logging.ecs: false

Expand Down
130 changes: 80 additions & 50 deletions beater/middleware/log_middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,67 +36,97 @@ func LogMiddleware() Middleware {
return func(h request.Handler) (request.Handler, error) {

return func(c *request.Context) {
var reqID, transactionID, traceID string
tx := apm.TransactionFromContext(c.Request.Context())
if tx != nil {
// This request is being traced, grab its IDs to add to logs.
traceContext := tx.TraceContext()
transactionID = traceContext.Span.String()
traceID = traceContext.Trace.String()
reqID = transactionID
} else {
uuid, err := uuid.NewV4()
if err != nil {
id := request.IDResponseErrorsInternal
logger.Errorw(request.MapResultIDToStatus[id].Keyword, "error", err)
c.Result.SetWithError(id, err)
c.Write()
return
}
reqID = uuid.String()
args, err := requestArgs(c, logger.ECSEnabled())
if err != nil {
id := request.IDResponseErrorsInternal
logger.Errorw(request.MapResultIDToStatus[id].Keyword, "error", err)
c.Result.SetWithError(id, err)
c.Write()
return
}

reqLogger := logger.With(
"request_id", reqID,
"method", c.Request.Method,
"URL", c.Request.URL,
"content_length", c.Request.ContentLength,
"remote_address", utility.RemoteAddr(c.Request),
"user-agent", c.Request.Header.Get(headers.UserAgent))

if traceID != "" {
reqLogger = reqLogger.With(
"trace.id", traceID,
"transaction.id", transactionID,
)
}

c.Logger = reqLogger
c.Logger = logger.With(args...)
h(c)

if c.MultipleWriteAttempts() {
reqLogger.Warn("multiple write attempts")
c.Logger.Warn("multiple write attempts")
}

keyword := c.Result.Keyword
if keyword == "" {
keyword = "handled request"
}

keysAndValues := []interface{}{"response_code", c.Result.StatusCode}
if c.Result.Err != nil {
keysAndValues = append(keysAndValues, "error", c.Result.Err.Error())
}
if c.Result.Stacktrace != "" {
keysAndValues = append(keysAndValues, "stacktrace", c.Result.Stacktrace)
}

args = resultArgs(c, logger.ECSEnabled())
if c.Result.Failure() {
reqLogger.Errorw(keyword, keysAndValues...)
} else {
reqLogger.Infow(keyword, keysAndValues...)
c.Logger.Errorw(keyword, args...)
return
}

c.Logger.Infow(keyword, args...)
}, nil
}
}

func requestArgs(c *request.Context, ecsEnabled bool) ([]interface{}, error) {
var reqID, transactionID, traceID string
tx := apm.TransactionFromContext(c.Request.Context())
if tx != nil {
// This request is being traced, grab its IDs to add to logs.
traceContext := tx.TraceContext()
transactionID = traceContext.Span.String()
traceID = traceContext.Trace.String()
reqID = transactionID
} else {
uuid, err := uuid.NewV4()
if err != nil {
return nil, err
}
reqID = uuid.String()
}

args := []interface{}{
"http", map[string]interface{}{
"request": map[string]interface{}{
"id": reqID, //not defined in ECS but fits here best
"method": c.Request.Method,
"body": map[string]interface{}{"bytes": c.Request.ContentLength}}},
"source", map[string]interface{}{"address": utility.RemoteAddr(c.Request)},
"user_agent", map[string]interface{}{"original": c.Request.Header.Get(headers.UserAgent)},
}
if traceID != "" {
args = append(args,
"trace", map[string]interface{}{"id": traceID},
"transaction", map[string]interface{}{"id": transactionID})
}
// avoid conflicts on existing log keys
if ecsEnabled {
return append(args, "url", map[string]string{"original": c.Request.URL.String()}), nil
}
return append(args, "URL", c.Request.URL), nil
}

func resultArgs(c *request.Context, ecsEnabled bool) []interface{} {
args := []interface{}{
// http key will be duplicated at this point
"http", map[string]interface{}{
"response": map[string]interface{}{
"status_code": c.Result.StatusCode}}}
if c.Result.Err == nil && c.Result.Stacktrace == "" {
return args
}

if ecsEnabled {
err := map[string]interface{}{}
if c.Result.Err != nil {
err["message"] = c.Result.Err.Error()
}
if c.Result.Stacktrace != "" {
err["stacktrace"] = c.Result.Stacktrace
}
return append(args, "error", err)
}
if c.Result.Err != nil {
args = append(args, "error", c.Result.Err)
}
if c.Result.Stacktrace != "" {
args = append(args, "stacktrace", c.Result.Stacktrace)
}
return args
}
104 changes: 55 additions & 49 deletions beater/middleware/log_middleware_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,20 @@
package middleware

import (
"fmt"
"net/http"
"testing"

"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zapcore"

"go.elastic.co/apm"
"go.elastic.co/apm/apmtest"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/logp/configure"

"github.com/elastic/apm-server/beater/beatertest"
"github.com/elastic/apm-server/beater/headers"
Expand All @@ -38,31 +40,32 @@ import (
)

func TestLogMiddleware(t *testing.T) {
err := logp.DevelopmentSetup(logp.ToObserverOutput())
require.NoError(t, err)

testCases := []struct {
name, message string
level zapcore.Level
handler request.Handler
code int
error error
stacktrace bool
traced bool
keys, ecsKeys []string
}{
{
name: "Accepted",
message: "request accepted",
level: zapcore.InfoLevel,
handler: beatertest.Handler202,
code: http.StatusAccepted,
keys: []string{"URL"},
ecsKeys: []string{"url.original"},
},
{
name: "Traced",
message: "request accepted",
level: zapcore.InfoLevel,
handler: beatertest.Handler202,
code: http.StatusAccepted,
keys: []string{"URL", "trace.id", "transaction.id"},
ecsKeys: []string{"url.original", "trace.id", "transaction.id"},
traced: true,
},
{
Expand All @@ -71,16 +74,17 @@ func TestLogMiddleware(t *testing.T) {
level: zapcore.ErrorLevel,
handler: beatertest.Handler403,
code: http.StatusForbidden,
error: errors.New("forbidden request"),
keys: []string{"URL", "error"},
ecsKeys: []string{"url.original", "error.message"},
},
{
name: "Panic",
message: "internal error",
level: zapcore.ErrorLevel,
handler: Apply(RecoverPanicMiddleware(), beatertest.HandlerPanic),
code: http.StatusInternalServerError,
error: errors.New("panic on Handle"),
stacktrace: true,
name: "Panic",
message: "internal error",
level: zapcore.ErrorLevel,
handler: Apply(RecoverPanicMiddleware(), beatertest.HandlerPanic),
code: http.StatusInternalServerError,
keys: []string{"URL", "error", "stacktrace"},
ecsKeys: []string{"url.original", "error.message", "error.stacktrace"},
},
{
name: "Error without keyword",
Expand All @@ -90,52 +94,54 @@ func TestLogMiddleware(t *testing.T) {
c.Result.StatusCode = http.StatusForbidden
c.Write()
},
code: http.StatusForbidden,
code: http.StatusForbidden,
keys: []string{"URL"},
ecsKeys: []string{"url.original"},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
c, rec := beatertest.DefaultContextWithResponseRecorder()
c.Request.Header.Set(headers.UserAgent, tc.name)
if tc.traced {
tx := apmtest.DiscardTracer.StartTransaction("name", "type")
c.Request = c.Request.WithContext(apm.ContextWithTransaction(c.Request.Context(), tx))
defer tx.End()
}
Apply(LogMiddleware(), tc.handler)(c)
for _, withECS := range []bool{true, false} {
name := fmt.Sprintf("%sWithECS%v", tc.name, withECS)
t.Run(name, func(t *testing.T) {
if withECS {
configure.Logging("APM Server test",
common.MustNewConfigFrom(`{"ecs":true}`))
}
err := logp.DevelopmentSetup(logp.ToObserverOutput())
require.NoError(t, err)
c, rec := beatertest.DefaultContextWithResponseRecorder()
c.Request.Header.Set(headers.UserAgent, tc.name)
if tc.traced {
tx := apmtest.DiscardTracer.StartTransaction("name", "type")
c.Request = c.Request.WithContext(apm.ContextWithTransaction(c.Request.Context(), tx))
defer tx.End()
}
Apply(LogMiddleware(), tc.handler)(c)

assert.Equal(t, tc.code, rec.Code)
for i, entry := range logp.ObserverLogs().TakeAll() {
// expect only one log entry per request
assert.Equal(t, i, 0)
assert.Equal(t, tc.code, rec.Code)
entries := logp.ObserverLogs().TakeAll()
require.Equal(t, 1, len(entries))
entry := entries[0]
assert.Equal(t, logs.Request, entry.LoggerName)
assert.Equal(t, tc.level, entry.Level)
assert.Equal(t, tc.message, entry.Message)

ec := entry.ContextMap()
assert.NotEmpty(t, ec["request_id"])
assert.NotEmpty(t, ec["method"])
assert.Equal(t, c.Request.URL.String(), ec["URL"])
assert.NotEmpty(t, ec["remote_address"])
assert.Equal(t, c.Request.Header.Get(headers.UserAgent), ec["user-agent"])
// zap encoded type
assert.Equal(t, tc.code, int(ec["response_code"].(int64)))
if tc.error != nil {
assert.Equal(t, tc.error.Error(), ec["error"])
}
if tc.stacktrace {
assert.NotZero(t, ec["stacktrace"])
encoder := zapcore.NewMapObjectEncoder()
ec := common.MapStr{}
for _, f := range entry.Context {
f.AddTo(encoder)
ec.DeepUpdate(encoder.Fields)
}
if tc.traced {
assert.NotEmpty(t, ec, "trace.id")
assert.NotEmpty(t, ec, "transaction.id")
assert.Equal(t, ec["request_id"], ec["transaction.id"])
} else {
assert.NotContains(t, ec, "trace.id")
assert.NotContains(t, ec, "transaction.id")
keys := []string{"http", "http.request.id", "http.request.method", "http.request.body",
"source.address", "user_agent.original", "http.response.status_code"}
keys = append(keys, tc.keys...)
for _, key := range keys {
ok, _ := ec.HasKey(key)
assert.True(t, ok, key)
}
}
})
})

}
}
}
3 changes: 1 addition & 2 deletions docs/copied-from-beats/docs/loggingconfig.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,7 @@ When true, logs messages in JSON format. The default is false.
[float]
==== `logging.ecs`

When true, logs messages with minimal required Elastic Common Schema (ECS)
information.
When true, logs messages in Elastic Common Schema (ECS) compliant format.

ifndef::serverless[]
[float]
Expand Down

0 comments on commit 30a0a7e

Please sign in to comment.