Skip to content

Commit

Permalink
[v2] Add metrics counter handling for v2. (elastic#1403)
Browse files Browse the repository at this point in the history
implements elastic#1355
  • Loading branch information
simitt authored and Ron cohen committed Oct 15, 2018
1 parent 6285ada commit 0a1a437
Show file tree
Hide file tree
Showing 7 changed files with 178 additions and 59 deletions.
22 changes: 13 additions & 9 deletions beater/common_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,20 +67,22 @@ var (
counter = func(s string) *monitoring.Int {
return monitoring.NewInt(serverMetrics, s)
}
requestCounter = counter("request.count")
concurrentWait = counter("concurrent.wait.ms")
responseCounter = counter("response.count")
responseErrors = counter("response.errors.count")
responseSuccesses = counter("response.valid.count")
responseOk = counter("response.valid.ok")
requestCounter = counter("request.count")
concurrentWait = counter("concurrent.wait.ms")
responseCounter = counter("response.count")
responseErrors = counter("response.errors.count")
responseSuccesses = counter("response.valid.count")
responseOk = counter("response.valid.ok")
responseAccepted = counter("response.valid.accepted")
responseErrorsOthers = counter("response.errors.other")

okResponse = serverResponse{
code: http.StatusOK,
counter: responseOk,
}
acceptedResponse = serverResponse{
code: http.StatusAccepted,
counter: counter("response.valid.accepted"),
counter: responseAccepted,
}
forbiddenCounter = counter("response.errors.forbidden")
forbiddenResponse = func(err error) serverResponse {
Expand All @@ -95,10 +97,11 @@ var (
code: http.StatusUnauthorized,
counter: counter("response.errors.unauthorized"),
}
requestTooLargeCounter = counter("response.errors.toolarge")
requestTooLargeResponse = serverResponse{
err: errors.New("request body too large"),
code: http.StatusRequestEntityTooLarge,
counter: counter("response.errors.toolarge"),
counter: requestTooLargeCounter,
}
decodeCounter = counter("response.errors.decode")
cannotDecodeResponse = func(err error) serverResponse {
Expand All @@ -121,10 +124,11 @@ var (
code: http.StatusTooManyRequests,
counter: counter("response.errors.ratelimit"),
}
methodNotAllowedCounter = counter("response.errors.method")
methodNotAllowedResponse = serverResponse{
err: errors.New("only POST requests are supported"),
code: http.StatusMethodNotAllowed,
counter: counter("response.errors.method"),
counter: methodNotAllowedCounter,
}
tooManyConcurrentRequestsResponse = serverResponse{
err: errors.New("timeout waiting to be processed"),
Expand Down
47 changes: 35 additions & 12 deletions beater/v2_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,54 +21,64 @@ import (
"encoding/json"
"net/http"

"github.com/elastic/beats/libbeat/monitoring"

"github.com/elastic/apm-server/processor/stream"
"github.com/elastic/apm-server/publish"

"github.com/elastic/beats/libbeat/logp"

"github.com/pkg/errors"

"github.com/elastic/apm-server/decoder"
)

var (
errInvalidMetadataFormat = errors.New("invalid metadata format")
)

type v2Handler struct {
requestDecoder decoder.ReqDecoder
streamProcessor *stream.StreamProcessor
}

func (v *v2Handler) statusCode(sr *stream.Result) int {
func (v *v2Handler) statusCode(sr *stream.Result) (int, *monitoring.Int) {
var code int
var ct *monitoring.Int
highestCode := http.StatusAccepted
monitoringCt := responseAccepted
for _, err := range sr.Errors {
switch err.Type {
case stream.MethodForbiddenErrType:
code = http.StatusBadRequest
ct = methodNotAllowedCounter
case stream.InputTooLargeErrType:
code = http.StatusBadRequest
ct = requestTooLargeCounter
case stream.InvalidInputErrType:
code = http.StatusBadRequest
ct = validateCounter
case stream.QueueFullErrType:
code = http.StatusTooManyRequests
ct = fullQueueCounter
case stream.ShuttingDownErrType:
code = http.StatusServiceUnavailable
case stream.ServerErrType:
code = http.StatusInternalServerError
ct = serverShuttingDownCounter
default:
code = http.StatusInternalServerError
ct = responseErrorsOthers
}
if code > highestCode {
highestCode = code
monitoringCt = ct
}
}
return highestCode
return highestCode, monitoringCt
}

func (v *v2Handler) sendResponse(logger *logp.Logger, w http.ResponseWriter, sr *stream.Result) {
statusCode := v.statusCode(sr)
statusCode, counter := v.statusCode(sr)
responseCounter.Inc()
counter.Inc()

w.WriteHeader(statusCode)
if statusCode != http.StatusAccepted {
// this singals to the client that we're closing the connection
responseErrors.Inc()
// this signals to the client that we're closing the connection
// but also signals to http.Server that it should close it:
// https://golang.org/src/net/http/server.go#L1254
w.Header().Add("Connection", "Close")
Expand All @@ -82,12 +92,25 @@ func (v *v2Handler) sendResponse(logger *logp.Logger, w http.ResponseWriter, sr
logger.Errorw("error sending response", "error", err)
}
logger.Infow("error handling request", "error", sr.String())
} else {
responseSuccesses.Inc()
}
}

func (v *v2Handler) Handle(beaterConfig *Config, report publish.Reporter) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
logger := requestLogger(r)

if r.Method != "POST" {
sr := stream.Result{}
sr.Add(&stream.Error{
Type: stream.MethodForbiddenErrType,
Message: "only POST requests are supported",
})
v.sendResponse(logger, w, &sr)
return
}

ndReader, err := decoder.NDJSONStreamDecodeCompressedWithLimit(r, beaterConfig.MaxEventSize)
if err != nil {
// if we can't set up the ndjsonreader,
Expand Down
49 changes: 37 additions & 12 deletions beater/v2_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@ import (
"context"
"net/http"
"net/http/httptest"
"path/filepath"
"testing"

"github.com/elastic/beats/libbeat/monitoring"

"github.com/elastic/apm-server/publish"
"github.com/elastic/apm-server/tests"

Expand Down Expand Up @@ -94,19 +97,20 @@ func TestRequestIntegration(t *testing.T) {
code int
path string
reportingErr error
counter *monitoring.Int
}{
{name: "Success", code: http.StatusAccepted, path: "../testdata/intake-v2/errors.ndjson"},
{name: "InvalidEvent", code: http.StatusBadRequest, path: "../testdata/intake-v2/invalid-event.ndjson"},
{name: "InvalidJSONEvent", code: http.StatusBadRequest, path: "../testdata/intake-v2/invalid-json-event.ndjson"},
{name: "InvalidJSONMetadata", code: http.StatusBadRequest, path: "../testdata/intake-v2/invalid-json-metadata.ndjson"},
{name: "InvalidMetadata", code: http.StatusBadRequest, path: "../testdata/intake-v2/invalid-metadata.ndjson"},
{name: "InvalidMetadata2", code: http.StatusBadRequest, path: "../testdata/intake-v2/invalid-metadata-2.ndjson"},
{name: "UnrecognizedEvent", code: http.StatusBadRequest, path: "../testdata/intake-v2/unrecognized-event.ndjson"},
{name: "Closing", code: http.StatusServiceUnavailable, path: "../testdata/intake-v2/errors.ndjson", reportingErr: publish.ErrChannelClosed},
{name: "FullQueue", code: http.StatusTooManyRequests, path: "../testdata/intake-v2/errors.ndjson", reportingErr: publish.ErrFull},
{name: "Success", code: http.StatusAccepted, path: "errors.ndjson", counter: responseAccepted},
{name: "InvalidEvent", code: http.StatusBadRequest, path: "invalid-event.ndjson", counter: validateCounter},
{name: "InvalidJSONEvent", code: http.StatusBadRequest, path: "invalid-json-event.ndjson", counter: validateCounter},
{name: "InvalidJSONMetadata", code: http.StatusBadRequest, path: "invalid-json-metadata.ndjson", counter: validateCounter},
{name: "InvalidMetadata", code: http.StatusBadRequest, path: "invalid-metadata.ndjson", counter: validateCounter},
{name: "InvalidMetadata2", code: http.StatusBadRequest, path: "invalid-metadata-2.ndjson", counter: validateCounter},
{name: "UnrecognizedEvent", code: http.StatusBadRequest, path: "unrecognized-event.ndjson", counter: validateCounter},
{name: "Closing", code: http.StatusServiceUnavailable, path: "errors.ndjson", reportingErr: publish.ErrChannelClosed, counter: serverShuttingDownCounter},
{name: "FullQueue", code: http.StatusTooManyRequests, path: "errors.ndjson", reportingErr: publish.ErrFull, counter: fullQueueCounter},
} {
t.Run(test.name, func(t *testing.T) {
b, err := loader.LoadDataAsBytes(test.path)
b, err := loader.LoadDataAsBytes(filepath.Join("../testdata/intake-v2/", test.path))
require.NoError(t, err)
bodyReader := bytes.NewBuffer(b)

Expand All @@ -121,14 +125,22 @@ func TestRequestIntegration(t *testing.T) {
}
handler := (&v2BackendRoute).Handler(c, report)

ctSuccess := responseSuccesses.Get()
ctFailure := responseErrors.Get()
ct := test.counter.Get()
handler.ServeHTTP(w, req)

assert.Equal(t, test.code, w.Code, w.Body.String())
assert.Equal(t, ct+1, test.counter.Get())
if test.code == http.StatusAccepted {
assert.Equal(t, 0, w.Body.Len())
assert.Equal(t, w.HeaderMap.Get("Content-Type"), "")
assert.Equal(t, ctSuccess+1, responseSuccesses.Get())
assert.Equal(t, ctFailure, responseErrors.Get())
} else {
assert.Equal(t, w.HeaderMap.Get("Content-Type"), "application/json")
assert.Equal(t, ctSuccess, responseSuccesses.Get())
assert.Equal(t, ctFailure+1, responseErrors.Get())

body := w.Body.Bytes()
tests.AssertApproveResult(t, "approved-stream-result/TestRequestIntegration"+test.name, body)
Expand All @@ -137,6 +149,19 @@ func TestRequestIntegration(t *testing.T) {
}
}

func TestV2WrongMethod(t *testing.T) {
req := httptest.NewRequest("GET", "/intake/v2/events", nil)
req.Header.Add("Content-Type", "application/x-ndjson")
w := httptest.NewRecorder()
handler := (&v2BackendRoute).Handler(defaultConfig("7.0.0"), nil)

ct := methodNotAllowedCounter.Get()
handler.ServeHTTP(w, req)

assert.Equal(t, http.StatusBadRequest, w.Code)
assert.Equal(t, ct+1, methodNotAllowedCounter.Get())
}

func TestV2LineExceeded(t *testing.T) {
b, err := loader.LoadDataAsBytes("../testdata/intake-v2/transactions.ndjson")
require.NoError(t, err)
Expand All @@ -156,11 +181,9 @@ func TestV2LineExceeded(t *testing.T) {
req.Header.Add("Content-Type", "application/x-ndjson")

w := httptest.NewRecorder()

report := func(ctx context.Context, p publish.PendingReq) error {
return nil
}

c := defaultConfig("7.0.0")
assert.False(t, lineLimitExceededInTestData(c.MaxEventSize))
handler := (&v2BackendRoute).Handler(c, report)
Expand All @@ -177,7 +200,9 @@ func TestV2LineExceeded(t *testing.T) {
req.Header.Add("Content-Type", "application/x-ndjson")
w = httptest.NewRecorder()

ct := requestTooLargeCounter.Get()
handler.ServeHTTP(w, req)
assert.Equal(t, http.StatusBadRequest, w.Code, w.Body.String())
assert.Equal(t, ct+1, requestTooLargeCounter.Get())
tests.AssertApproveResult(t, "approved-stream-result/TestV2LineExceeded", w.Body.Bytes())
}
50 changes: 42 additions & 8 deletions processor/stream/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package stream
import (
"fmt"
"strings"

"github.com/elastic/beats/libbeat/monitoring"
)

type Error struct {
Expand All @@ -40,31 +42,44 @@ type StreamError int
const (
QueueFullErrType StreamError = iota
InvalidInputErrType
InputTooLargeErrType
ShuttingDownErrType
ServerErrType
MethodForbiddenErrType
)

const (
errorsLimit = 5
)

var (
m = monitoring.Default.NewRegistry("apm-server.processor.stream")
mAccepted = monitoring.NewInt(m, "accepted")
monitoringMap = map[StreamError]*monitoring.Int{
QueueFullErrType: monitoring.NewInt(m, "errors.queue"),
InvalidInputErrType: monitoring.NewInt(m, "errors.invalid"),
InputTooLargeErrType: monitoring.NewInt(m, "errors.toolarge"),
ShuttingDownErrType: monitoring.NewInt(m, "errors.server"),
ServerErrType: monitoring.NewInt(m, "errors.closed"),
}
)

type Result struct {
Accepted int `json:"accepted"`
Errors []*Error `json:"errors,omitempty"`
}

func (r *Result) LimitedAdd(err error) {
if len(r.Errors) < errorsLimit {
r.Add(err)
}
r.add(err, len(r.Errors) < errorsLimit)
}

func (r *Result) Add(err error) {
if e, ok := err.(*Error); ok {
r.Errors = append(r.Errors, e)
} else {
r.Errors = append(r.Errors, &Error{Message: err.Error(), Type: ServerErrType})
}
r.add(err, true)
}

func (r *Result) AddAccepted(ct int) {
r.Accepted += ct
mAccepted.Add(int64(ct))
}

func (r *Result) String() string {
Expand All @@ -74,3 +89,22 @@ func (r *Result) String() string {
}
return strings.Join(errorList, ", ")
}

func (r *Result) add(err error, add bool) {
e, ok := err.(*Error)
if !ok {
e = &Error{Message: err.Error(), Type: ServerErrType}
}
if add {
r.Errors = append(r.Errors, e)
}
countErr(e.Type)
}

func countErr(e StreamError) {
if i, ok := monitoringMap[e]; ok {
i.Inc()
} else {
monitoringMap[ServerErrType].Inc()
}
}
32 changes: 32 additions & 0 deletions processor/stream/result_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/libbeat/monitoring"
)

func TestStreamResponseSimple(t *testing.T) {
Expand All @@ -45,3 +47,33 @@ func TestStreamResponseSimple(t *testing.T) {
expectedStr := `err1 [buf1], transmogrifier error, err2 [buf2], err3 [buf3], err4, err6`
assert.Equal(t, expectedStr, sr.String())
}

func TestMonitoring(t *testing.T) {
for _, test := range []struct {
counter *monitoring.Int
expected int64
}{
{monitoringMap[QueueFullErrType], 1},
{monitoringMap[InvalidInputErrType], 2},
{monitoringMap[InputTooLargeErrType], 1},
{monitoringMap[ShuttingDownErrType], 1},
{monitoringMap[ServerErrType], 2},
{mAccepted, 12},
} {
// get current value for counter
ct := test.counter.Get()

sr := Result{}
sr.AddAccepted(9)
sr.AddAccepted(3)
sr.LimitedAdd(&Error{Type: QueueFullErrType})
sr.LimitedAdd(errors.New("error"))
sr.LimitedAdd(&Error{Type: InvalidInputErrType})
sr.LimitedAdd(&Error{Type: ShuttingDownErrType})
sr.LimitedAdd(&Error{Type: ServerErrType})
sr.LimitedAdd(&Error{Type: InputTooLargeErrType, Message: "err3", Document: "buf3"})
sr.Add(&Error{Type: InvalidInputErrType})

assert.Equal(t, ct+test.expected, test.counter.Get())
}
}
Loading

0 comments on commit 0a1a437

Please sign in to comment.