Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure ECS compliant logging when enabled. #3829

Merged
merged 19 commits into from
Dec 15, 2020
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions _meta/beat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1068,8 +1068,8 @@ output.elasticsearch:

# Set to true, to log messages with minimal required Elastic Common Schema (ECS)
# information. Recommended to use in combination with `logging.json=true`
# Defaults to false.
#logging.ecs: false
# Defaults to true.
#logging.ecs: true

#=============================== HTTP Endpoint ===============================

Expand Down
4 changes: 2 additions & 2 deletions apm-server.docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1068,8 +1068,8 @@ output.elasticsearch:

# Set to true, to log messages with minimal required Elastic Common Schema (ECS)
# information. Recommended to use in combination with `logging.json=true`
# Defaults to false.
#logging.ecs: false
# Defaults to true.
#logging.ecs: true

#=============================== HTTP Endpoint ===============================

Expand Down
4 changes: 2 additions & 2 deletions apm-server.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1068,8 +1068,8 @@ output.elasticsearch:

# Set to true, to log messages with minimal required Elastic Common Schema (ECS)
# information. Recommended to use in combination with `logging.json=true`
# Defaults to false.
#logging.ecs: false
# Defaults to true.
#logging.ecs: true

#=============================== HTTP Endpoint ===============================

Expand Down
109 changes: 56 additions & 53 deletions beater/middleware/log_middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,71 +32,74 @@ import (

// LogMiddleware returns a middleware taking care of logging processing a request in the middleware and the request handler
func LogMiddleware() Middleware {
logger := logp.NewLogger(logs.Request)
return func(h request.Handler) (request.Handler, error) {

return func(c *request.Context) {
var reqID, transactionID, traceID string
tx := apm.TransactionFromContext(c.Request.Context())
if tx != nil {
// This request is being traced, grab its IDs to add to logs.
traceContext := tx.TraceContext()
transactionID = traceContext.Span.String()
traceID = traceContext.Trace.String()
reqID = transactionID
} else {
uuid, err := uuid.NewV4()
if err != nil {
id := request.IDResponseErrorsInternal
logger.Errorw(request.MapResultIDToStatus[id].Keyword, "error", err)
c.Result.SetWithError(id, err)
c.Write()
return
}
reqID = uuid.String()
}

reqLogger := logger.With(
"request_id", reqID,
"method", c.Request.Method,
"URL", c.Request.URL,
"content_length", c.Request.ContentLength,
"remote_address", utility.RemoteAddr(c.Request),
"user-agent", c.Request.Header.Get(headers.UserAgent))

if traceID != "" {
reqLogger = reqLogger.With(
"trace.id", traceID,
"transaction.id", transactionID,
)
c.Logger = loggerWithContext(c)
var err error
if c.Logger, err = loggerWithTraceContext(c); err != nil {
id := request.IDResponseErrorsInternal
c.Logger.Error(request.MapResultIDToStatus[id].Keyword, logp.Error(err))
c.Result.SetWithError(id, err)
c.Write()
return
}

c.Logger = reqLogger
h(c)

if c.MultipleWriteAttempts() {
reqLogger.Warn("multiple write attempts")
c.Logger.Warn("multiple write attempts")
}

keyword := c.Result.Keyword
if keyword == "" {
keyword = "handled request"
}

keysAndValues := []interface{}{"response_code", c.Result.StatusCode}
if c.Result.Err != nil {
keysAndValues = append(keysAndValues, "error", c.Result.Err.Error())
}
if c.Result.Stacktrace != "" {
keysAndValues = append(keysAndValues, "stacktrace", c.Result.Stacktrace)
}

c.Logger = loggerWithResult(c)
if c.Result.Failure() {
reqLogger.Errorw(keyword, keysAndValues...)
} else {
reqLogger.Infow(keyword, keysAndValues...)
c.Logger.Error(keyword)
return
}

c.Logger.Info(keyword)
}, nil
}
}

func loggerWithContext(c *request.Context) *logp.Logger {
simitt marked this conversation as resolved.
Show resolved Hide resolved
return logp.NewLogger(logs.Request).With(
"http.request.method", c.Request.Method,
"http.request.body.bytes", c.Request.ContentLength,
simitt marked this conversation as resolved.
Show resolved Hide resolved
"source.address", utility.RemoteAddr(c.Request),
"user_agent.original", c.Request.Header.Get(headers.UserAgent),
axw marked this conversation as resolved.
Show resolved Hide resolved
"url.original", c.Request.URL.String())
}

func loggerWithTraceContext(c *request.Context) (*logp.Logger, error) {
tx := apm.TransactionFromContext(c.Request.Context())
if tx == nil {
uuid, err := uuid.NewV4()
if err != nil {
return c.Logger, err
}
return c.Logger.With("http.request.id", uuid.String()), nil
}
// This request is being traced, grab its IDs to add to logs.
traceContext := tx.TraceContext()
transactionID := traceContext.Span.String()
return c.Logger.With(
"trace.id", traceContext.Trace.String(),
"transaction.id", transactionID,
"http.request.id", transactionID,
), nil
}

func loggerWithResult(c *request.Context) *logp.Logger {
logger := c.Logger.With(
"http.response.status_code", c.Result.StatusCode)
if c.Result.Err == nil && c.Result.Stacktrace == "" {
return logger
}
simitt marked this conversation as resolved.
Show resolved Hide resolved
if c.Result.Err != nil {
logger = logger.With("error.message", c.Result.Err.Error())
}
if c.Result.Stacktrace != "" {
logger = logger.With("error.stacktrace", c.Result.Stacktrace)
simitt marked this conversation as resolved.
Show resolved Hide resolved
axw marked this conversation as resolved.
Show resolved Hide resolved
}
return logger
}
81 changes: 39 additions & 42 deletions beater/middleware/log_middleware_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,16 @@ import (
"net/http"
"testing"

"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zapcore"

"go.elastic.co/apm"
"go.elastic.co/apm/apmtest"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/logp/configure"

"github.com/elastic/apm-server/beater/beatertest"
"github.com/elastic/apm-server/beater/headers"
Expand All @@ -38,31 +39,30 @@ import (
)

func TestLogMiddleware(t *testing.T) {
err := logp.DevelopmentSetup(logp.ToObserverOutput())
require.NoError(t, err)

testCases := []struct {
name, message string
level zapcore.Level
handler request.Handler
code int
error error
stacktrace bool
traced bool
ecsKeys []string
}{
{
name: "Accepted",
message: "request accepted",
level: zapcore.InfoLevel,
handler: beatertest.Handler202,
code: http.StatusAccepted,
ecsKeys: []string{"url.original"},
},
{
name: "Traced",
message: "request accepted",
level: zapcore.InfoLevel,
handler: beatertest.Handler202,
code: http.StatusAccepted,
ecsKeys: []string{"url.original", "trace.id", "transaction.id"},
traced: true,
},
{
Expand All @@ -71,16 +71,15 @@ func TestLogMiddleware(t *testing.T) {
level: zapcore.ErrorLevel,
handler: beatertest.Handler403,
code: http.StatusForbidden,
error: errors.New("forbidden request"),
ecsKeys: []string{"url.original", "error.message"},
},
{
name: "Panic",
message: "internal error",
level: zapcore.ErrorLevel,
handler: Apply(RecoverPanicMiddleware(), beatertest.HandlerPanic),
code: http.StatusInternalServerError,
error: errors.New("panic on Handle"),
stacktrace: true,
name: "Panic",
message: "internal error",
level: zapcore.ErrorLevel,
handler: Apply(RecoverPanicMiddleware(), beatertest.HandlerPanic),
code: http.StatusInternalServerError,
ecsKeys: []string{"url.original", "error.message", "error.stacktrace"},
},
{
name: "Error without keyword",
Expand All @@ -90,12 +89,19 @@ func TestLogMiddleware(t *testing.T) {
c.Result.StatusCode = http.StatusForbidden
c.Write()
},
code: http.StatusForbidden,
code: http.StatusForbidden,
ecsKeys: []string{"url.original"},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// log setup
configure.Logging("APM Server test",
common.MustNewConfigFrom(`{"ecs":true}`))
require.NoError(t, logp.DevelopmentSetup(logp.ToObserverOutput()))

// prepare and record request
c, rec := beatertest.DefaultContextWithResponseRecorder()
c.Request.Header.Set(headers.UserAgent, tc.name)
if tc.traced {
Expand All @@ -105,36 +111,27 @@ func TestLogMiddleware(t *testing.T) {
}
Apply(LogMiddleware(), tc.handler)(c)

// check log lines
assert.Equal(t, tc.code, rec.Code)
for i, entry := range logp.ObserverLogs().TakeAll() {
// expect only one log entry per request
assert.Equal(t, i, 0)
assert.Equal(t, logs.Request, entry.LoggerName)
assert.Equal(t, tc.level, entry.Level)
assert.Equal(t, tc.message, entry.Message)
entries := logp.ObserverLogs().TakeAll()
require.Equal(t, 1, len(entries))
entry := entries[0]
assert.Equal(t, logs.Request, entry.LoggerName)
assert.Equal(t, tc.level, entry.Level)
assert.Equal(t, tc.message, entry.Message)

ec := entry.ContextMap()
assert.NotEmpty(t, ec["request_id"])
assert.NotEmpty(t, ec["method"])
assert.Equal(t, c.Request.URL.String(), ec["URL"])
assert.NotEmpty(t, ec["remote_address"])
assert.Equal(t, c.Request.Header.Get(headers.UserAgent), ec["user-agent"])
// zap encoded type
assert.Equal(t, tc.code, int(ec["response_code"].(int64)))
if tc.error != nil {
assert.Equal(t, tc.error.Error(), ec["error"])
}
if tc.stacktrace {
assert.NotZero(t, ec["stacktrace"])
}
if tc.traced {
assert.NotEmpty(t, ec, "trace.id")
assert.NotEmpty(t, ec, "transaction.id")
assert.Equal(t, ec["request_id"], ec["transaction.id"])
} else {
assert.NotContains(t, ec, "trace.id")
assert.NotContains(t, ec, "transaction.id")
}
encoder := zapcore.NewMapObjectEncoder()
ec := common.MapStr{}
for _, f := range entry.Context {
f.AddTo(encoder)
ec.DeepUpdate(encoder.Fields)
}
keys := []string{"http.request.id", "http.request.method", "http.request.body.bytes",
"source.address", "user_agent.original", "http.response.status_code"}
keys = append(keys, tc.ecsKeys...)
for _, key := range keys {
ok, _ := ec.HasKey(key)
assert.True(t, ok, key)
}
})
}
Expand Down
1 change: 1 addition & 0 deletions changelogs/head.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@ https://github.com/elastic/apm-server/compare/7.8\...master[View commits]
* Index Page URL and referer as ECS fields {pull}3857[3857]
* Map the Jaeger attribute message_bus.destination to the Elastic APM type messaging {pull}3884[3884]
* Added user_agent.name to grouping key for page-load transaction metrics {pull}3886[3886]
* Switch logging format to be ECS compliant where possible {pull}3829[3829]
1 change: 1 addition & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ var libbeatConfigOverrides = common.MustNewConfigFrom(map[string]interface{}{
"files": map[string]interface{}{
"rotateeverybytes": 10 * 1024 * 1024,
},
"ecs": true,
},
"setup": map[string]interface{}{
"template": map[string]interface{}{
Expand Down
3 changes: 1 addition & 2 deletions docs/copied-from-beats/docs/loggingconfig.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,7 @@ When true, logs messages in JSON format. The default is false.
[float]
==== `logging.ecs`

When true, logs messages with minimal required Elastic Common Schema (ECS)
information.
When true, logs messages in Elastic Common Schema (ECS) compliant format.

ifndef::serverless[]
[float]
Expand Down
4 changes: 2 additions & 2 deletions tests/system/apmserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,8 +344,8 @@ def check_for_no_smap(self, doc):
def logged_requests(self, url="/intake/v2/events"):
for line in self.get_log_lines():
jline = json.loads(line)
u = urlparse(jline.get("URL", ""))
if jline.get("logger") == "request" and u.path == url:
u = urlparse(jline.get("url.original", ""))
if jline.get("log.logger") == "request" and u.path == url:
yield jline

def approve_docs(self, base_path, received):
Expand Down
31 changes: 4 additions & 27 deletions tests/system/config/apm-server.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -278,22 +278,9 @@ output.elasticsearch:

############################# Logging #########################################

{% if logging_json or logging_level %}
{% if logging_json or logging_level or logging_ecs_disabled %}
logging:
{% else %}
#logging:
{% endif %}
# Send all logging output to syslog. On Windows default is false, otherwise
# default is true.
#to_syslog: true

# Write all logging output to files. Beats automatically rotate files if configurable
# limit is reached.
#to_files: false

# Enable debug output for selected components.
#selectors: []

{% if logging_json %}
# Set to true to log messages in json format.
json: {{ logging_json }}
Expand All @@ -304,19 +291,9 @@ logging:
level: {{ logging_level }}
{% endif %}

#files:
# The directory where the log files will written to.
#path: /var/log/apm-server

# The name of the files where the logs are written to.
#name: apm-server

# Configure log file size limit. If limit is reached, log file will be
# automatically rotated
#rotateeverybytes: 10485760 # = 10MB

# Number of rotated log files to keep. Oldest files will be deleted first.
#keepfiles: 7
{% if logging_ecs_disabled %}
ecs: false
{% endif %}

queue.mem.flush.min_events: {{ queue_flush }}

Expand Down
Loading