Skip to content

Commit

Permalink
beater/middleware: add trace/log correlation (#3136)
Browse files Browse the repository at this point in the history
  • Loading branch information
axw authored Jan 13, 2020
1 parent 40d0beb commit a2f56dd
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 7 deletions.
32 changes: 26 additions & 6 deletions beater/middleware/log_middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/gofrs/uuid"

"github.com/elastic/beats/libbeat/logp"
"go.elastic.co/apm"

"github.com/elastic/apm-server/beater/headers"
"github.com/elastic/apm-server/beater/request"
Expand All @@ -34,12 +35,24 @@ func LogMiddleware() Middleware {
return func(h request.Handler) (request.Handler, error) {

return func(c *request.Context) {
reqID, err := uuid.NewV4()
if err != nil {
id := request.IDResponseErrorsInternal
logger.Errorw(request.MapResultIDToStatus[id].Keyword, "error", err)
c.Result.SetWithError(id, err)
c.Write()
var reqID, transactionID, traceID string
tx := apm.TransactionFromContext(c.Request.Context())
if tx != nil {
// This request is being traced, grab its IDs to add to logs.
traceContext := tx.TraceContext()
transactionID = traceContext.Span.String()
traceID = traceContext.Trace.String()
reqID = transactionID
} else {
uuid, err := uuid.NewV4()
if err != nil {
id := request.IDResponseErrorsInternal
logger.Errorw(request.MapResultIDToStatus[id].Keyword, "error", err)
c.Result.SetWithError(id, err)
c.Write()
return
}
reqID = uuid.String()
}

reqLogger := logger.With(
Expand All @@ -50,6 +63,13 @@ func LogMiddleware() Middleware {
"remote_address", utility.RemoteAddr(c.Request),
"user-agent", c.Request.Header.Get(headers.UserAgent))

if traceID != "" {
reqLogger = reqLogger.With(
"trace.id", traceID,
"transaction.id", transactionID,
)
}

c.Logger = reqLogger
h(c)

Expand Down
26 changes: 25 additions & 1 deletion beater/middleware/log_middleware_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@ import (
"testing"

"github.com/pkg/errors"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zapcore"

"github.com/elastic/beats/libbeat/logp"
"go.elastic.co/apm"
"go.elastic.co/apm/apmtest"

"github.com/elastic/apm-server/beater/beatertest"
"github.com/elastic/apm-server/beater/headers"
Expand All @@ -46,6 +47,7 @@ func TestLogMiddleware(t *testing.T) {
code int
error error
stacktrace bool
traced bool
}{
{
name: "Accepted",
Expand All @@ -54,6 +56,14 @@ func TestLogMiddleware(t *testing.T) {
handler: beatertest.Handler202,
code: http.StatusAccepted,
},
{
name: "Traced",
message: "request accepted",
level: zapcore.InfoLevel,
handler: beatertest.Handler202,
code: http.StatusAccepted,
traced: true,
},
{
name: "Error",
message: "forbidden request",
Expand Down Expand Up @@ -87,7 +97,13 @@ func TestLogMiddleware(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
c, rec := beatertest.DefaultContextWithResponseRecorder()
c.Request.Header.Set(headers.UserAgent, tc.name)
if tc.traced {
tx := apmtest.DiscardTracer.StartTransaction("name", "type")
c.Request = c.Request.WithContext(apm.ContextWithTransaction(c.Request.Context(), tx))
defer tx.End()
}
Apply(LogMiddleware(), tc.handler)(c)

assert.Equal(t, tc.code, rec.Code)
for i, entry := range logp.ObserverLogs().TakeAll() {
// expect only one log entry per request
Expand All @@ -110,6 +126,14 @@ func TestLogMiddleware(t *testing.T) {
if tc.stacktrace {
assert.NotZero(t, ec["stacktrace"])
}
if tc.traced {
assert.NotEmpty(t, ec, "trace.id")
assert.NotEmpty(t, ec, "transaction.id")
assert.Equal(t, ec["request_id"], ec["transaction.id"])
} else {
assert.NotContains(t, ec, "trace.id")
assert.NotContains(t, ec, "transaction.id")
}
}
})
}
Expand Down
1 change: 1 addition & 0 deletions changelogs/head.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,5 @@ https://github.com/elastic/apm-server/compare/7.5\...master[View commits]
- Upgrade Go to 1.13.5 {pull}3069[3069].
- Add experimental support for receiving Jaeger trace data {pull}3129[3129]
- Upgrade APM Go agent to 1.7.0, and add support for API Key auth for self-instrumentation {pull}3134[3134]
- Add correlation for server trace/log data {pull}3136[3136]

21 changes: 21 additions & 0 deletions tests/system/test_integration_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,24 @@ def test_log_event_size_exceeded(self):
}, req)
error = req.get("error")
assert error.startswith("event exceeded the permitted size."), json.dumps(req)


@integration_test
class LoggingIntegrationTraceCorrelationTest(ElasticTest):
config_overrides = {
"logging_json": "true",
"instrumentation_enabled": "true",
}

def test_trace_ids(self):
with open(self.get_transaction_payload_path()) as f:
r = requests.post(self.intake_url,
data=f,
headers={'content-type': 'application/x-ndjson'})
assert r.status_code == 202, r.status_code
intake_request_logs = list(self.logged_requests())
assert len(intake_request_logs) == 1, "multiple requests found"
req = intake_request_logs[0]
self.assertIn("trace.id", req)
self.assertIn("transaction.id", req)
self.assertEqual(req["transaction.id"], req["request_id"])

0 comments on commit a2f56dd

Please sign in to comment.