diff --git a/beater/common_handlers.go b/beater/common_handlers.go index 029395ba8f..6aa161b9a2 100644 --- a/beater/common_handlers.go +++ b/beater/common_handlers.go @@ -67,12 +67,14 @@ var ( counter = func(s string) *monitoring.Int { return monitoring.NewInt(serverMetrics, s) } - requestCounter = counter("request.count") - concurrentWait = counter("concurrent.wait.ms") - responseCounter = counter("response.count") - responseErrors = counter("response.errors.count") - responseSuccesses = counter("response.valid.count") - responseOk = counter("response.valid.ok") + requestCounter = counter("request.count") + concurrentWait = counter("concurrent.wait.ms") + responseCounter = counter("response.count") + responseErrors = counter("response.errors.count") + responseSuccesses = counter("response.valid.count") + responseOk = counter("response.valid.ok") + responseAccepted = counter("response.valid.accepted") + responseErrorsOthers = counter("response.errors.other") okResponse = serverResponse{ code: http.StatusOK, @@ -80,7 +82,7 @@ var ( } acceptedResponse = serverResponse{ code: http.StatusAccepted, - counter: counter("response.valid.accepted"), + counter: responseAccepted, } internalErrorCounter = counter("response.errors.internal") internalErrorResponse = func(err error) serverResponse { @@ -103,10 +105,11 @@ var ( code: http.StatusUnauthorized, counter: counter("response.errors.unauthorized"), } + requestTooLargeCounter = counter("response.errors.toolarge") requestTooLargeResponse = serverResponse{ err: errors.New("request body too large"), code: http.StatusRequestEntityTooLarge, - counter: counter("response.errors.toolarge"), + counter: requestTooLargeCounter, } decodeCounter = counter("response.errors.decode") cannotDecodeResponse = func(err error) serverResponse { @@ -129,10 +132,11 @@ var ( code: http.StatusTooManyRequests, counter: counter("response.errors.ratelimit"), } + methodNotAllowedCounter = counter("response.errors.method") methodNotAllowedResponse = serverResponse{ err: errors.New("only POST requests are supported"), code: http.StatusMethodNotAllowed, - counter: counter("response.errors.method"), + counter: methodNotAllowedCounter, } tooManyConcurrentRequestsResponse = serverResponse{ err: errors.New("timeout waiting to be processed"), diff --git a/beater/v2_handler.go b/beater/v2_handler.go index c78ec2e828..c7a158d664 100644 --- a/beater/v2_handler.go +++ b/beater/v2_handler.go @@ -21,54 +21,64 @@ import ( "encoding/json" "net/http" + "github.com/elastic/beats/libbeat/monitoring" + "github.com/elastic/apm-server/processor/stream" "github.com/elastic/apm-server/publish" "github.com/elastic/beats/libbeat/logp" - "github.com/pkg/errors" - "github.com/elastic/apm-server/decoder" ) -var ( - errInvalidMetadataFormat = errors.New("invalid metadata format") -) - type v2Handler struct { requestDecoder decoder.ReqDecoder streamProcessor *stream.StreamProcessor } -func (v *v2Handler) statusCode(sr *stream.Result) int { +func (v *v2Handler) statusCode(sr *stream.Result) (int, *monitoring.Int) { var code int + var ct *monitoring.Int highestCode := http.StatusAccepted + monitoringCt := responseAccepted for _, err := range sr.Errors { switch err.Type { + case stream.MethodForbiddenErrType: + code = http.StatusBadRequest + ct = methodNotAllowedCounter + case stream.InputTooLargeErrType: + code = http.StatusBadRequest + ct = requestTooLargeCounter case stream.InvalidInputErrType: code = http.StatusBadRequest + ct = validateCounter case stream.QueueFullErrType: code = http.StatusTooManyRequests + ct = fullQueueCounter case stream.ShuttingDownErrType: code = http.StatusServiceUnavailable - case stream.ServerErrType: - code = http.StatusInternalServerError + ct = serverShuttingDownCounter default: code = http.StatusInternalServerError + ct = responseErrorsOthers } if code > highestCode { highestCode = code + monitoringCt = ct } } - return highestCode + return highestCode, monitoringCt } func (v *v2Handler) sendResponse(logger *logp.Logger, w http.ResponseWriter, sr *stream.Result) { - statusCode := v.statusCode(sr) + statusCode, counter := v.statusCode(sr) + responseCounter.Inc() + counter.Inc() w.WriteHeader(statusCode) if statusCode != http.StatusAccepted { - // this singals to the client that we're closing the connection + responseErrors.Inc() + // this signals to the client that we're closing the connection // but also signals to http.Server that it should close it: // https://golang.org/src/net/http/server.go#L1254 w.Header().Add("Connection", "Close") @@ -82,12 +92,25 @@ func (v *v2Handler) sendResponse(logger *logp.Logger, w http.ResponseWriter, sr logger.Errorw("error sending response", "error", err) } logger.Infow("error handling request", "error", sr.String()) + } else { + responseSuccesses.Inc() } } func (v *v2Handler) Handle(beaterConfig *Config, report publish.Reporter) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { logger := requestLogger(r) + + if r.Method != "POST" { + sr := stream.Result{} + sr.Add(&stream.Error{ + Type: stream.MethodForbiddenErrType, + Message: "only POST requests are supported", + }) + v.sendResponse(logger, w, &sr) + return + } + ndReader, err := decoder.NDJSONStreamDecodeCompressedWithLimit(r, beaterConfig.MaxEventSize) if err != nil { // if we can't set up the ndjsonreader, diff --git a/beater/v2_handler_test.go b/beater/v2_handler_test.go index 9b87dcad78..36e277e233 100644 --- a/beater/v2_handler_test.go +++ b/beater/v2_handler_test.go @@ -22,8 +22,11 @@ import ( "context" "net/http" "net/http/httptest" + "path/filepath" "testing" + "github.com/elastic/beats/libbeat/monitoring" + "github.com/elastic/apm-server/publish" "github.com/elastic/apm-server/tests" @@ -94,19 +97,20 @@ func TestRequestIntegration(t *testing.T) { code int path string reportingErr error + counter *monitoring.Int }{ - {name: "Success", code: http.StatusAccepted, path: "../testdata/intake-v2/errors.ndjson"}, - {name: "InvalidEvent", code: http.StatusBadRequest, path: "../testdata/intake-v2/invalid-event.ndjson"}, - {name: "InvalidJSONEvent", code: http.StatusBadRequest, path: "../testdata/intake-v2/invalid-json-event.ndjson"}, - {name: "InvalidJSONMetadata", code: http.StatusBadRequest, path: "../testdata/intake-v2/invalid-json-metadata.ndjson"}, - {name: "InvalidMetadata", code: http.StatusBadRequest, path: "../testdata/intake-v2/invalid-metadata.ndjson"}, - {name: "InvalidMetadata2", code: http.StatusBadRequest, path: "../testdata/intake-v2/invalid-metadata-2.ndjson"}, - {name: "UnrecognizedEvent", code: http.StatusBadRequest, path: "../testdata/intake-v2/unrecognized-event.ndjson"}, - {name: "Closing", code: http.StatusServiceUnavailable, path: "../testdata/intake-v2/errors.ndjson", reportingErr: publish.ErrChannelClosed}, - {name: "FullQueue", code: http.StatusTooManyRequests, path: "../testdata/intake-v2/errors.ndjson", reportingErr: publish.ErrFull}, + {name: "Success", code: http.StatusAccepted, path: "errors.ndjson", counter: responseAccepted}, + {name: "InvalidEvent", code: http.StatusBadRequest, path: "invalid-event.ndjson", counter: validateCounter}, + {name: "InvalidJSONEvent", code: http.StatusBadRequest, path: "invalid-json-event.ndjson", counter: validateCounter}, + {name: "InvalidJSONMetadata", code: http.StatusBadRequest, path: "invalid-json-metadata.ndjson", counter: validateCounter}, + {name: "InvalidMetadata", code: http.StatusBadRequest, path: "invalid-metadata.ndjson", counter: validateCounter}, + {name: "InvalidMetadata2", code: http.StatusBadRequest, path: "invalid-metadata-2.ndjson", counter: validateCounter}, + {name: "UnrecognizedEvent", code: http.StatusBadRequest, path: "unrecognized-event.ndjson", counter: validateCounter}, + {name: "Closing", code: http.StatusServiceUnavailable, path: "errors.ndjson", reportingErr: publish.ErrChannelClosed, counter: serverShuttingDownCounter}, + {name: "FullQueue", code: http.StatusTooManyRequests, path: "errors.ndjson", reportingErr: publish.ErrFull, counter: fullQueueCounter}, } { t.Run(test.name, func(t *testing.T) { - b, err := loader.LoadDataAsBytes(test.path) + b, err := loader.LoadDataAsBytes(filepath.Join("../testdata/intake-v2/", test.path)) require.NoError(t, err) bodyReader := bytes.NewBuffer(b) @@ -121,14 +125,22 @@ func TestRequestIntegration(t *testing.T) { } handler := (&v2BackendRoute).Handler(c, report) + ctSuccess := responseSuccesses.Get() + ctFailure := responseErrors.Get() + ct := test.counter.Get() handler.ServeHTTP(w, req) assert.Equal(t, test.code, w.Code, w.Body.String()) + assert.Equal(t, ct+1, test.counter.Get()) if test.code == http.StatusAccepted { assert.Equal(t, 0, w.Body.Len()) assert.Equal(t, w.HeaderMap.Get("Content-Type"), "") + assert.Equal(t, ctSuccess+1, responseSuccesses.Get()) + assert.Equal(t, ctFailure, responseErrors.Get()) } else { assert.Equal(t, w.HeaderMap.Get("Content-Type"), "application/json") + assert.Equal(t, ctSuccess, responseSuccesses.Get()) + assert.Equal(t, ctFailure+1, responseErrors.Get()) body := w.Body.Bytes() tests.AssertApproveResult(t, "approved-stream-result/TestRequestIntegration"+test.name, body) @@ -137,6 +149,19 @@ func TestRequestIntegration(t *testing.T) { } } +func TestV2WrongMethod(t *testing.T) { + req := httptest.NewRequest("GET", "/intake/v2/events", nil) + req.Header.Add("Content-Type", "application/x-ndjson") + w := httptest.NewRecorder() + handler := (&v2BackendRoute).Handler(defaultConfig("7.0.0"), nil) + + ct := methodNotAllowedCounter.Get() + handler.ServeHTTP(w, req) + + assert.Equal(t, http.StatusBadRequest, w.Code) + assert.Equal(t, ct+1, methodNotAllowedCounter.Get()) +} + func TestV2LineExceeded(t *testing.T) { b, err := loader.LoadDataAsBytes("../testdata/intake-v2/transactions.ndjson") require.NoError(t, err) @@ -156,11 +181,9 @@ func TestV2LineExceeded(t *testing.T) { req.Header.Add("Content-Type", "application/x-ndjson") w := httptest.NewRecorder() - report := func(ctx context.Context, p publish.PendingReq) error { return nil } - c := defaultConfig("7.0.0") assert.False(t, lineLimitExceededInTestData(c.MaxEventSize)) handler := (&v2BackendRoute).Handler(c, report) @@ -177,7 +200,9 @@ func TestV2LineExceeded(t *testing.T) { req.Header.Add("Content-Type", "application/x-ndjson") w = httptest.NewRecorder() + ct := requestTooLargeCounter.Get() handler.ServeHTTP(w, req) assert.Equal(t, http.StatusBadRequest, w.Code, w.Body.String()) + assert.Equal(t, ct+1, requestTooLargeCounter.Get()) tests.AssertApproveResult(t, "approved-stream-result/TestV2LineExceeded", w.Body.Bytes()) } diff --git a/processor/stream/result.go b/processor/stream/result.go index 1e28b72858..e6abc34c3b 100644 --- a/processor/stream/result.go +++ b/processor/stream/result.go @@ -20,6 +20,8 @@ package stream import ( "fmt" "strings" + + "github.com/elastic/beats/libbeat/monitoring" ) type Error struct { @@ -40,31 +42,44 @@ type StreamError int const ( QueueFullErrType StreamError = iota InvalidInputErrType + InputTooLargeErrType ShuttingDownErrType ServerErrType + MethodForbiddenErrType ) const ( errorsLimit = 5 ) +var ( + m = monitoring.Default.NewRegistry("apm-server.processor.stream") + mAccepted = monitoring.NewInt(m, "accepted") + monitoringMap = map[StreamError]*monitoring.Int{ + QueueFullErrType: monitoring.NewInt(m, "errors.queue"), + InvalidInputErrType: monitoring.NewInt(m, "errors.invalid"), + InputTooLargeErrType: monitoring.NewInt(m, "errors.toolarge"), + ShuttingDownErrType: monitoring.NewInt(m, "errors.server"), + ServerErrType: monitoring.NewInt(m, "errors.closed"), + } +) + type Result struct { Accepted int `json:"accepted"` Errors []*Error `json:"errors,omitempty"` } func (r *Result) LimitedAdd(err error) { - if len(r.Errors) < errorsLimit { - r.Add(err) - } + r.add(err, len(r.Errors) < errorsLimit) } func (r *Result) Add(err error) { - if e, ok := err.(*Error); ok { - r.Errors = append(r.Errors, e) - } else { - r.Errors = append(r.Errors, &Error{Message: err.Error(), Type: ServerErrType}) - } + r.add(err, true) +} + +func (r *Result) AddAccepted(ct int) { + r.Accepted += ct + mAccepted.Add(int64(ct)) } func (r *Result) String() string { @@ -74,3 +89,22 @@ func (r *Result) String() string { } return strings.Join(errorList, ", ") } + +func (r *Result) add(err error, add bool) { + e, ok := err.(*Error) + if !ok { + e = &Error{Message: err.Error(), Type: ServerErrType} + } + if add { + r.Errors = append(r.Errors, e) + } + countErr(e.Type) +} + +func countErr(e StreamError) { + if i, ok := monitoringMap[e]; ok { + i.Inc() + } else { + monitoringMap[ServerErrType].Inc() + } +} diff --git a/processor/stream/result_test.go b/processor/stream/result_test.go index f5127f18e2..6207d38daf 100644 --- a/processor/stream/result_test.go +++ b/processor/stream/result_test.go @@ -22,6 +22,8 @@ import ( "testing" "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/libbeat/monitoring" ) func TestStreamResponseSimple(t *testing.T) { @@ -45,3 +47,33 @@ func TestStreamResponseSimple(t *testing.T) { expectedStr := `err1 [buf1], transmogrifier error, err2 [buf2], err3 [buf3], err4, err6` assert.Equal(t, expectedStr, sr.String()) } + +func TestMonitoring(t *testing.T) { + for _, test := range []struct { + counter *monitoring.Int + expected int64 + }{ + {monitoringMap[QueueFullErrType], 1}, + {monitoringMap[InvalidInputErrType], 2}, + {monitoringMap[InputTooLargeErrType], 1}, + {monitoringMap[ShuttingDownErrType], 1}, + {monitoringMap[ServerErrType], 2}, + {mAccepted, 12}, + } { + // get current value for counter + ct := test.counter.Get() + + sr := Result{} + sr.AddAccepted(9) + sr.AddAccepted(3) + sr.LimitedAdd(&Error{Type: QueueFullErrType}) + sr.LimitedAdd(errors.New("error")) + sr.LimitedAdd(&Error{Type: InvalidInputErrType}) + sr.LimitedAdd(&Error{Type: ShuttingDownErrType}) + sr.LimitedAdd(&Error{Type: ServerErrType}) + sr.LimitedAdd(&Error{Type: InputTooLargeErrType, Message: "err3", Document: "buf3"}) + sr.Add(&Error{Type: InvalidInputErrType}) + + assert.Equal(t, ct+test.expected, test.counter.Get()) + } +} diff --git a/processor/stream/stream_processor.go b/processor/stream/stream_processor.go index 6182d0ccd0..ec795f2c13 100644 --- a/processor/stream/stream_processor.go +++ b/processor/stream/stream_processor.go @@ -65,7 +65,7 @@ func (s *srErrorWrapper) Read() (map[string]interface{}, error) { if err == decoder.ErrLineTooLong { return nil, &Error{ - Type: InvalidInputErrType, + Type: InputTooLargeErrType, Message: "event exceeded the permitted size.", Document: string(s.StreamReader.LatestLine()), } @@ -183,7 +183,7 @@ func (s *StreamProcessor) readBatch(batchSize int, reader StreamReader, response rawModel, err = reader.Read() if err != nil && err != io.EOF { - if e, ok := err.(*Error); ok && e.Type == InvalidInputErrType { + if e, ok := err.(*Error); ok && (e.Type == InvalidInputErrType || e.Type == InputTooLargeErrType) { response.LimitedAdd(e) continue } @@ -255,7 +255,7 @@ func (s *StreamProcessor) HandleStream(ctx context.Context, meta map[string]inte return res } - res.Accepted += len(transformables) + res.AddAccepted(len(transformables)) } if done { diff --git a/processor/stream/stream_processor_test.go b/processor/stream/stream_processor_test.go index 45ee2ab605..bf4802acfb 100644 --- a/processor/stream/stream_processor_test.go +++ b/processor/stream/stream_processor_test.go @@ -22,6 +22,7 @@ import ( "context" "encoding/json" "fmt" + "path/filepath" "testing" "testing/iotest" "time" @@ -117,23 +118,23 @@ func TestIntegration(t *testing.T) { path string name string }{ - {path: "../testdata/intake-v2/errors.ndjson", name: "Errors"}, - {path: "../testdata/intake-v2/transactions.ndjson", name: "Transactions"}, - {path: "../testdata/intake-v2/spans.ndjson", name: "Spans"}, - {path: "../testdata/intake-v2/metrics.ndjson", name: "Metrics"}, - {path: "../testdata/intake-v2/minimal_process.ndjson", name: "MixedMinimalProcess"}, - {path: "../testdata/intake-v2/minimal_service.ndjson", name: "MinimalService"}, - {path: "../testdata/intake-v2/metadata_null_values.ndjson", name: "MetadataNullValues"}, - {path: "../testdata/intake-v2/invalid-event.ndjson", name: "InvalidEvent"}, - {path: "../testdata/intake-v2/invalid-json-event.ndjson", name: "InvalidJSONEvent"}, - {path: "../testdata/intake-v2/invalid-json-metadata.ndjson", name: "InvalidJSONMetadata"}, - {path: "../testdata/intake-v2/invalid-metadata.ndjson", name: "InvalidMetadata"}, - {path: "../testdata/intake-v2/invalid-metadata-2.ndjson", name: "InvalidMetadata2"}, - {path: "../testdata/intake-v2/unrecognized-event.ndjson", name: "UnrecognizedEvent"}, - {path: "../testdata/intake-v2/optional-timestamps.ndjson", name: "OptionalTimestamps"}, + {path: "errors.ndjson", name: "Errors"}, + {path: "transactions.ndjson", name: "Transactions"}, + {path: "spans.ndjson", name: "Spans"}, + {path: "metrics.ndjson", name: "Metrics"}, + {path: "minimal_process.ndjson", name: "MixedMinimalProcess"}, + {path: "minimal_service.ndjson", name: "MinimalService"}, + {path: "metadata_null_values.ndjson", name: "MetadataNullValues"}, + {path: "invalid-event.ndjson", name: "InvalidEvent"}, + {path: "invalid-json-event.ndjson", name: "InvalidJSONEvent"}, + {path: "invalid-json-metadata.ndjson", name: "InvalidJSONMetadata"}, + {path: "invalid-metadata.ndjson", name: "InvalidMetadata"}, + {path: "invalid-metadata-2.ndjson", name: "InvalidMetadata2"}, + {path: "unrecognized-event.ndjson", name: "UnrecognizedEvent"}, + {path: "optional-timestamps.ndjson", name: "OptionalTimestamps"}, } { t.Run(test.name, func(t *testing.T) { - b, err := loader.LoadDataAsBytes(test.path) + b, err := loader.LoadDataAsBytes(filepath.Join("../testdata/intake-v2/", test.path)) require.NoError(t, err) bodyReader := bytes.NewBuffer(b)