From 51757b1c51b247eb9f04cbb2e730006776e9d937 Mon Sep 17 00:00:00 2001 From: Ron Cohen Date: Fri, 31 Aug 2018 11:12:48 +0200 Subject: [PATCH] Move non-web related processing into processor, add "publish" package (#1324) also simplify stream error handling. --- .../testStreamResponseAdvanced.approved.json | 25 -- .../testStreamResponseSimple.approved.json | 11 - beater/beater.go | 5 +- beater/beater_test.go | 3 +- beater/common_handlers.go | 41 +- beater/route_config.go | 11 +- beater/server.go | 5 +- beater/server_test.go | 3 +- beater/stream_response.go | 168 --------- beater/stream_response_test.go | 78 ---- beater/v2_handler.go | 249 +++--------- beater/v2_handler_test.go | 313 ++-------------- beater/v2_integration_test.go | 88 ----- decoder/req_decoder.go | 9 +- decoder/stream_decoder.go | 12 +- model/error/generated/schema/error.go | 20 +- model/span/generated/schema/transaction.go | 26 ++ .../generated/schema/transaction.go | 20 +- ...estV2IntakeIntegrationErrors.approved.json | 0 ...ntegrationMetadataNullValues.approved.json | 0 ...stV2IntakeIntegrationMetrics.approved.json | 0 ...akeIntegrationMinimalService.approved.json | 0 ...tegrationMixedMinimalProcess.approved.json | 0 ...testV2IntakeIntegrationSpans.approved.json | 0 ...ntakeIntegrationTransactions.approved.json | 0 .../testIntegrationResultErrors.approved.json | 3 + ...ntegrationResultInvalidEvent.approved.json | 9 + ...tionResultMetadataNullValues.approved.json | 3 + ...testIntegrationResultMetrics.approved.json | 3 + ...egrationResultMinimalService.approved.json | 3 + ...ionResultMixedMinimalProcess.approved.json | 3 + .../testIntegrationResultSpans.approved.json | 3 + ...ntegrationResultTransactions.approved.json | 3 + processor/stream/result.go | 77 ++++ processor/stream/result_test.go | 47 +++ processor/stream/stream_processor.go | 239 ++++++++++++ processor/stream/stream_processor_test.go | 353 ++++++++++++++++++ {beater => publish}/pub.go | 40 +- tests/approvals.go | 1 - utility/request_time.go | 42 +++ 40 files changed, 982 insertions(+), 934 deletions(-) delete mode 100644 beater/approved-stream-response/testStreamResponseAdvanced.approved.json delete mode 100644 beater/approved-stream-response/testStreamResponseSimple.approved.json delete mode 100644 beater/stream_response.go delete mode 100644 beater/stream_response_test.go delete mode 100644 beater/v2_integration_test.go rename {beater => processor/stream}/approved-es-documents/testV2IntakeIntegrationErrors.approved.json (100%) rename {beater => processor/stream}/approved-es-documents/testV2IntakeIntegrationMetadataNullValues.approved.json (100%) rename {beater => processor/stream}/approved-es-documents/testV2IntakeIntegrationMetrics.approved.json (100%) rename {beater => processor/stream}/approved-es-documents/testV2IntakeIntegrationMinimalService.approved.json (100%) rename {beater => processor/stream}/approved-es-documents/testV2IntakeIntegrationMixedMinimalProcess.approved.json (100%) rename {beater => processor/stream}/approved-es-documents/testV2IntakeIntegrationSpans.approved.json (100%) rename {beater => processor/stream}/approved-es-documents/testV2IntakeIntegrationTransactions.approved.json (100%) create mode 100644 processor/stream/approved-stream-result/testIntegrationResultErrors.approved.json create mode 100644 processor/stream/approved-stream-result/testIntegrationResultInvalidEvent.approved.json create mode 100644 processor/stream/approved-stream-result/testIntegrationResultMetadataNullValues.approved.json create mode 100644 processor/stream/approved-stream-result/testIntegrationResultMetrics.approved.json create mode 100644 processor/stream/approved-stream-result/testIntegrationResultMinimalService.approved.json create mode 100644 processor/stream/approved-stream-result/testIntegrationResultMixedMinimalProcess.approved.json create mode 100644 processor/stream/approved-stream-result/testIntegrationResultSpans.approved.json create mode 100644 processor/stream/approved-stream-result/testIntegrationResultTransactions.approved.json create mode 100644 processor/stream/result.go create mode 100644 processor/stream/result_test.go create mode 100644 processor/stream/stream_processor.go create mode 100644 processor/stream/stream_processor_test.go rename {beater => publish}/pub.go (81%) create mode 100644 utility/request_time.go diff --git a/beater/approved-stream-response/testStreamResponseAdvanced.approved.json b/beater/approved-stream-response/testStreamResponseAdvanced.approved.json deleted file mode 100644 index 61b37baa19..0000000000 --- a/beater/approved-stream-response/testStreamResponseAdvanced.approved.json +++ /dev/null @@ -1,25 +0,0 @@ -{ - "accepted": 0, - "dropped": 0, - "errors": { - "ERR_QUEUE_FULL": { - "count": 23, - "message": "queue is full" - }, - "ERR_SCHEMA_VALIDATION": { - "count": 5, - "documents": [ - { - "error": "transmogrifier error", - "object": "{\"wrong\": \"field\"}" - }, - { - "error": "thing error", - "object": "{\"wrong\": \"value\"}" - } - ], - "message": "validation error" - } - }, - "invalid": 0 -} diff --git a/beater/approved-stream-response/testStreamResponseSimple.approved.json b/beater/approved-stream-response/testStreamResponseSimple.approved.json deleted file mode 100644 index 3602a6cb56..0000000000 --- a/beater/approved-stream-response/testStreamResponseSimple.approved.json +++ /dev/null @@ -1,11 +0,0 @@ -{ - "accepted": 0, - "dropped": 0, - "errors": { - "ERR_QUEUE_FULL": { - "count": 23, - "message": "queue is full" - } - }, - "invalid": 0 -} diff --git a/beater/beater.go b/beater/beater.go index b6909222ff..370420bb8a 100644 --- a/beater/beater.go +++ b/beater/beater.go @@ -34,6 +34,7 @@ import ( "github.com/elastic/apm-agent-go/transport" "github.com/elastic/apm-server/ingest/pipeline" "github.com/elastic/apm-server/pipelistener" + "github.com/elastic/apm-server/publish" "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" @@ -138,7 +139,7 @@ func (bt *beater) Run(b *beat.Beat) error { defer traceListener.Close() defer tracer.Close() - pub, err := newPublisher(b.Publisher, bt.config.ConcurrentRequests, bt.config.ShutdownTimeout, tracer) + pub, err := publish.NewPublisher(b.Publisher, bt.config.ConcurrentRequests, bt.config.ShutdownTimeout, tracer) if err != nil { return err } @@ -150,7 +151,7 @@ func (bt *beater) Run(b *beat.Beat) error { return nil } - go notifyListening(bt.config, pub.client.Publish) + go notifyListening(bt.config, pub.Client().Publish) bt.mutex.Lock() if bt.stopped { diff --git a/beater/beater_test.go b/beater/beater_test.go index 8234d267d4..8b186ec44a 100644 --- a/beater/beater_test.go +++ b/beater/beater_test.go @@ -36,6 +36,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/elastic/apm-agent-go" + "github.com/elastic/apm-server/publish" "github.com/elastic/apm-server/tests/loader" "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" @@ -436,7 +437,7 @@ func setupBeater(t *testing.T, publisher beat.Pipeline, ucfg *common.Config, bea func SetupServer(b *testing.B) *http.ServeMux { pip := DummyPipeline() - pub, err := newPublisher(pip, 1, time.Duration(0), elasticapm.DefaultTracer) + pub, err := publish.NewPublisher(pip, 1, time.Duration(0), elasticapm.DefaultTracer) if err != nil { b.Fatal("error initializing publisher", err) } diff --git a/beater/common_handlers.go b/beater/common_handlers.go index dd6294f062..90aaa3a504 100644 --- a/beater/common_handlers.go +++ b/beater/common_handlers.go @@ -35,6 +35,7 @@ import ( "github.com/elastic/apm-server/decoder" "github.com/elastic/apm-server/processor" + "github.com/elastic/apm-server/publish" "github.com/elastic/apm-server/transform" "github.com/elastic/apm-server/utility" @@ -52,7 +53,7 @@ const ( supportedMethods = "POST, OPTIONS" ) -type ProcessorHandler func(processor.Processor, *Config, reporter) http.Handler +type ProcessorHandler func(processor.Processor, *Config, publish.Reporter) http.Handler type serverResponse struct { err error @@ -148,7 +149,7 @@ var ( } ) -func newMuxer(beaterConfig *Config, report reporter) *http.ServeMux { +func newMuxer(beaterConfig *Config, report publish.Reporter) *http.ServeMux { mux := http.NewServeMux() logger := logp.NewLogger("handler") for path, route := range V1Routes { @@ -234,25 +235,6 @@ func rootHandler(secretToken string) http.Handler { type contextKey string -const requestTimeContextKey = contextKey("requestTime") - -func requestTimeHandler(h http.Handler) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if requestTime(r).IsZero() { - r = r.WithContext(context.WithValue(r.Context(), requestTimeContextKey, time.Now())) - } - h.ServeHTTP(w, r) - }) -} - -func requestTime(r *http.Request) time.Time { - t, ok := r.Context().Value(requestTimeContextKey).(time.Time) - if !ok { - return time.Time{} - } - return t -} - const reqLoggerContextKey = contextKey("requestLogger") func logHandler(h http.Handler) http.Handler { @@ -285,6 +267,13 @@ func logHandler(h http.Handler) http.Handler { }) } +func requestTimeHandler(h http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + r = r.WithContext(utility.ContextWithRequestTime(r.Context(), time.Now())) + h.ServeHTTP(w, r) + }) +} + // requestLogger is a convenience function to retrieve the logger that was // added to the request context by handler `logHandler`` func requestLogger(r *http.Request) *logp.Logger { @@ -400,14 +389,14 @@ func corsHandler(allowedOrigins []string, h http.Handler) http.Handler { }) } -func processRequestHandler(p processor.Processor, config transform.Config, report reporter, decode decoder.ReqDecoder) http.Handler { +func processRequestHandler(p processor.Processor, config transform.Config, report publish.Reporter, decode decoder.ReqDecoder) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { res := processRequest(r, p, config, report, decode) sendStatus(w, r, res) }) } -func processRequest(r *http.Request, p processor.Processor, config transform.Config, report reporter, decode decoder.ReqDecoder) serverResponse { +func processRequest(r *http.Request, p processor.Processor, config transform.Config, report publish.Reporter, decode decoder.ReqDecoder) serverResponse { if r.Method != "POST" { return methodNotAllowedResponse } @@ -430,13 +419,13 @@ func processRequest(r *http.Request, p processor.Processor, config transform.Con } tctx := &transform.Context{ - RequestTime: requestTime(r), + RequestTime: utility.RequestTime(r.Context()), Config: config, Metadata: *metadata, } - if err = report(r.Context(), pendingReq{transformables: transformables, tcontext: tctx}); err != nil { - if err == errChannelClosed { + if err = report(r.Context(), publish.PendingReq{Transformables: transformables, Tcontext: tctx}); err != nil { + if err == publish.ErrChannelClosed { return serverShuttingDownResponse(err) } return fullQueueResponse(err) diff --git a/beater/route_config.go b/beater/route_config.go index cf86b605a5..ffbab716a1 100644 --- a/beater/route_config.go +++ b/beater/route_config.go @@ -21,11 +21,14 @@ import ( "net/http" "regexp" + "github.com/elastic/apm-server/processor/stream" + "github.com/elastic/apm-server/processor" perr "github.com/elastic/apm-server/processor/error" "github.com/elastic/apm-server/processor/metric" "github.com/elastic/apm-server/processor/sourcemap" "github.com/elastic/apm-server/processor/transaction" + "github.com/elastic/apm-server/publish" "github.com/elastic/apm-server/decoder" "github.com/elastic/apm-server/transform" @@ -172,7 +175,7 @@ type v1Route struct { topLevelRequestDecoder func(*Config) decoder.ReqDecoder } -func (v *v1Route) Handler(p processor.Processor, beaterConfig *Config, report reporter) http.Handler { +func (v *v1Route) Handler(p processor.Processor, beaterConfig *Config, report publish.Reporter) http.Handler { decoder := v.configurableDecoder(beaterConfig, v.topLevelRequestDecoder(beaterConfig)) tconfig := v.transformConfig(beaterConfig) @@ -188,15 +191,15 @@ type v2Route struct { routeType } -func (v v2Route) Handler(beaterConfig *Config, report reporter) http.Handler { +func (v v2Route) Handler(beaterConfig *Config, report publish.Reporter) http.Handler { reqDecoder := v.configurableDecoder( beaterConfig, func(*http.Request) (map[string]interface{}, error) { return map[string]interface{}{}, nil }, ) v2Handler := v2Handler{ - requestDecoder: reqDecoder, - tconfig: v.transformConfig(beaterConfig), + requestDecoder: reqDecoder, + streamProcessor: &stream.StreamProcessor{Tconfig: v.transformConfig(beaterConfig)}, } return v.wrappingHandler(beaterConfig, v2Handler.Handle(beaterConfig, report)) diff --git a/beater/server.go b/beater/server.go index 7da235a362..0e622136dc 100644 --- a/beater/server.go +++ b/beater/server.go @@ -28,15 +28,14 @@ import ( "github.com/elastic/apm-agent-go" "github.com/elastic/apm-agent-go/module/apmhttp" + "github.com/elastic/apm-server/publish" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/outputs" "github.com/elastic/beats/libbeat/version" ) -type reporter func(context.Context, pendingReq) error - -func newServer(config *Config, tracer *elasticapm.Tracer, report reporter) *http.Server { +func newServer(config *Config, tracer *elasticapm.Tracer, report publish.Reporter) *http.Server { mux := newMuxer(config, report) return &http.Server{ diff --git a/beater/server_test.go b/beater/server_test.go index 93535a16e3..29a754d35d 100644 --- a/beater/server_test.go +++ b/beater/server_test.go @@ -36,6 +36,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/elastic/apm-server/publish" "github.com/elastic/apm-server/tests/loader" "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" @@ -658,4 +659,4 @@ func body(t *testing.T, response *http.Response) string { return string(body) } -func nopReporter(context.Context, pendingReq) error { return nil } +func nopReporter(context.Context, publish.PendingReq) error { return nil } diff --git a/beater/stream_response.go b/beater/stream_response.go deleted file mode 100644 index 78b9cae334..0000000000 --- a/beater/stream_response.go +++ /dev/null @@ -1,168 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package beater - -import ( - "encoding/json" - "fmt" - "net/http" - "sort" - "strings" -) - -type StreamErrorType string - -const ( - QueueFullErr StreamErrorType = "ERR_QUEUE_FULL" - ProcessingTimeoutErr StreamErrorType = "ERR_PROCESSING_TIMEOUT" - SchemaValidationErr StreamErrorType = "ERR_SCHEMA_VALIDATION" - InvalidJSONErr StreamErrorType = "ERR_INVALID_JSON" - ShuttingDownErr StreamErrorType = "ERR_SHUTTING_DOWN" - InvalidContentTypeErr StreamErrorType = "ERR_CONTENT_TYPE" - ServerError StreamErrorType = "ERR_SERVER_ERROR" - - validationErrorsLimit = 5 -) - -var standardMessages = map[StreamErrorType]struct { - err string - code int -}{ - QueueFullErr: {"queue is full", http.StatusTooManyRequests}, - ProcessingTimeoutErr: {"timeout while waiting to process request", http.StatusRequestTimeout}, - SchemaValidationErr: {"validation error", http.StatusBadRequest}, - InvalidJSONErr: {"invalid JSON", http.StatusBadRequest}, - ShuttingDownErr: {"server is shutting down", http.StatusServiceUnavailable}, - InvalidContentTypeErr: {"invalid content-type. Expected 'application/x-ndjson'", http.StatusBadRequest}, - ServerError: {"internal server error", http.StatusInternalServerError}, -} - -var errorTypesDecreasingHTTPStatus = func() []StreamErrorType { - keys := []StreamErrorType{} - for k := range standardMessages { - keys = append(keys, k) - } - - // sort in reverse order - sort.Slice(keys, func(i, j int) bool { return standardMessages[keys[i]].code > standardMessages[keys[j]].code }) - - return keys -}() - -type streamResponse struct { - Accepted int `json:"accepted"` - Invalid int `json:"invalid"` - Dropped int `json:"dropped"` - - Errors map[StreamErrorType]errorDetails `json:"errors"` -} - -type errorDetails struct { - Count int `json:"count"` - Message string `json:"message"` - - Documents []*ValidationError `json:"documents,omitempty"` - - // we only use this for deduplication - errorsMap map[string]struct{} -} - -type ValidationError struct { - Error string `json:"error"` - OffendingEvent string `json:"object"` -} - -func (s *streamResponse) add(err StreamErrorType, count int) { - s.addWithMessage(err, count, standardMessages[err].err) -} - -func (s *streamResponse) addWithMessage(err StreamErrorType, count int, message string) { - if s.Errors == nil { - s.Errors = make(map[StreamErrorType]errorDetails) - } - - var details errorDetails - var ok bool - if details, ok = s.Errors[err]; !ok { - s.Errors[err] = errorDetails{ - Count: count, - Message: message, - } - return - } - - details.Count += count - s.Errors[err] = details -} - -func (s *streamResponse) String() string { - errorList := []string{} - for _, t := range errorTypesDecreasingHTTPStatus { - if s.Errors[t].Count > 0 { - errorStr := fmt.Sprintf("%s (%d)", s.Errors[t].Message, s.Errors[t].Count) - - if len(s.Errors[t].Documents) > 0 { - errorStr += ": " - var docsErrorList []string - for _, d := range s.Errors[t].Documents { - docsErrorList = append(docsErrorList, fmt.Sprintf("%s (%s)", d.Error, d.OffendingEvent)) - } - errorStr += strings.Join(docsErrorList, ", ") - } - - errorList = append(errorList, errorStr) - } - } - return strings.Join(errorList, ", ") -} - -func (s *streamResponse) statusCode() int { - statusCode := http.StatusAccepted - for k := range s.Errors { - if standardMessages[k].code > statusCode { - statusCode = standardMessages[k].code - } - } - return statusCode -} - -func (s *streamResponse) marshal() ([]byte, error) { - return json.Marshal(s) -} - -func (s *streamResponse) addWithOffendingDocument(errType StreamErrorType, errMsg string, offendingDocument []byte) { - s.add(errType, 1) - errorDetails := s.Errors[errType] - if errorDetails.Documents == nil { - errorDetails.Documents = []*ValidationError{} - errorDetails.errorsMap = make(map[string]struct{}) - } - - if len(errorDetails.Documents) < validationErrorsLimit { - // we only want one specimen of each error - if _, ok := errorDetails.errorsMap[errMsg]; !ok { - errorDetails.errorsMap[errMsg] = struct{}{} - - errorDetails.Documents = append(errorDetails.Documents, &ValidationError{ - Error: errMsg, - OffendingEvent: string(offendingDocument), - }) - s.Errors[errType] = errorDetails - } - } -} diff --git a/beater/stream_response_test.go b/beater/stream_response_test.go deleted file mode 100644 index 445586b836..0000000000 --- a/beater/stream_response_test.go +++ /dev/null @@ -1,78 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package beater - -import ( - "encoding/json" - "fmt" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "github.com/elastic/apm-server/tests" -) - -func TestStreamResponseSimple(t *testing.T) { - sr := streamResponse{} - sr.add(QueueFullErr, 23) - - jsonByte, err := sr.marshal() - require.NoError(t, err) - - var jsonOut map[string]interface{} - err = json.Unmarshal(jsonByte, &jsonOut) - require.NoError(t, err) - - verifyErr := tests.ApproveJson(jsonOut, "approved-stream-response/testStreamResponseSimple", nil) - if verifyErr != nil { - assert.Fail(t, fmt.Sprintf("Test %s failed with error: %s", "testStreamResponseSimple", verifyErr.Error())) - } - - expectedStr := `queue is full (23)` - assert.Equal(t, expectedStr, sr.String()) - assert.Equal(t, 429, sr.statusCode()) -} - -func TestStreamResponseAdvanced(t *testing.T) { - sr := streamResponse{} - - sr.add(SchemaValidationErr, 2) - sr.addWithOffendingDocument(SchemaValidationErr, "transmogrifier error", []byte(`{"wrong": "field"}`)) - sr.addWithOffendingDocument(SchemaValidationErr, "transmogrifier error", []byte(`{"wrong": "field"}`)) - sr.addWithOffendingDocument(SchemaValidationErr, "thing error", []byte(`{"wrong": "value"}`)) - - sr.add(QueueFullErr, 23) - - jsonByte, err := sr.marshal() - require.NoError(t, err) - - var jsonOut map[string]interface{} - err = json.Unmarshal(jsonByte, &jsonOut) - require.NoError(t, err) - - verifyErr := tests.ApproveJson(jsonOut, "approved-stream-response/testStreamResponseAdvanced", nil) - if verifyErr != nil { - assert.Fail(t, fmt.Sprintf("Test %s failed with error: %s", "testStreamResponseAdvanced", verifyErr.Error())) - } - - expectedStr := `queue is full (23), validation error (5): transmogrifier error ({"wrong": "field"}), thing error ({"wrong": "value"})` - assert.Equal(t, expectedStr, sr.String()) - - assert.Equal(t, 429, sr.statusCode()) -} diff --git a/beater/v2_handler.go b/beater/v2_handler.go index 057d52dda0..ac8a7aa49a 100644 --- a/beater/v2_handler.go +++ b/beater/v2_handler.go @@ -18,214 +18,55 @@ package beater import ( - "io" + "encoding/json" "net/http" + "github.com/elastic/apm-server/processor/stream" + "github.com/elastic/apm-server/publish" + "github.com/elastic/beats/libbeat/logp" "github.com/pkg/errors" - "github.com/elastic/apm-server/transform" - "github.com/elastic/apm-server/utility" - - "github.com/santhosh-tekuri/jsonschema" - - "github.com/elastic/apm-server/validation" - "github.com/elastic/apm-server/decoder" - er "github.com/elastic/apm-server/model/error" - "github.com/elastic/apm-server/model/metadata" - "github.com/elastic/apm-server/model/metric" - "github.com/elastic/apm-server/model/span" - "github.com/elastic/apm-server/model/transaction" ) var ( - errUnrecognizedObject = errors.New("did not recognize object type") errInvalidMetadataFormat = errors.New("invalid metadata format") ) -const batchSize = 10 - -var models = []struct { - key string - schema *jsonschema.Schema - modelDecoder func(interface{}, error) (transform.Transformable, error) -}{ - { - "transaction", - transaction.ModelSchema(), - transaction.V2DecodeEvent, - }, - { - "span", - span.ModelSchema(), - span.V2DecodeEvent, - }, - { - "metric", - metric.ModelSchema(), - metric.DecodeMetric, - }, - { - "error", - er.ModelSchema(), - er.V2DecodeEvent, - }, -} - type v2Handler struct { - requestDecoder decoder.ReqDecoder - tconfig transform.Config + requestDecoder decoder.ReqDecoder + streamProcessor *stream.StreamProcessor } -// handleRawModel validates and decodes a single json object into its struct form -func (v *v2Handler) handleRawModel(rawModel map[string]interface{}) (transform.Transformable, error) { - for _, model := range models { - if entry, ok := rawModel[model.key]; ok { - err := validation.Validate(entry, model.schema) - if err != nil { - return nil, err - } - - tr, err := model.modelDecoder(entry, err) - if err != nil { - return nil, err - } - return tr, nil - } - } - return nil, errUnrecognizedObject -} - -// readBatch will read up to `batchSize` objects from the ndjson stream -// it returns a slice of eventables, a serverResponse and a bool that indicates if we're at EOF. -func (v *v2Handler) readBatch(batchSize int, reader *decoder.NDJSONStreamReader, response *streamResponse) ([]transform.Transformable, bool) { - var err error - var rawModel map[string]interface{} - - var eventables []transform.Transformable - for i := 0; i < batchSize && err == nil; i++ { - rawModel, err = reader.Read() - if err != nil && err != io.EOF { - switch e := err.(type) { - case decoder.ReadError: - response.addWithMessage(ServerError, 1, e.Error()) - // return early, we can't recover from a read error - return eventables, true - case decoder.JSONDecodeError: - response.addWithOffendingDocument(InvalidJSONErr, e.Error(), reader.LastLine()) - response.Invalid++ - } - } - - if rawModel != nil { - tr, err := v.handleRawModel(rawModel) - if err != nil { - response.addWithOffendingDocument(SchemaValidationErr, err.Error(), reader.LastLine()) - response.Invalid++ - continue - } - eventables = append(eventables, tr) - } - } - - return eventables, reader.IsEOF() -} -func (v *v2Handler) readMetadata(r *http.Request, ndReader *decoder.NDJSONStreamReader) (*metadata.Metadata, error) { - // first item is the metadata object - rawModel, err := ndReader.Read() - if err != nil { - return nil, err - } - - rawMetadata, ok := rawModel["metadata"].(map[string]interface{}) - if !ok { - return nil, errUnrecognizedObject - } - // augment the metadata object with information from the request, like user-agent or remote address - reqMeta, err := v.requestDecoder(r) - if err != nil { - return nil, err - } - - for k, v := range reqMeta { - utility.InsertInMap(rawMetadata, k, v.(map[string]interface{})) - } - - // validate the metadata object against our jsonschema - err = validation.Validate(rawMetadata, metadata.ModelSchema()) - if err != nil { - return nil, err - } - - // create a metadata struct - metadata, err := metadata.DecodeMetadata(rawMetadata) - if err != nil { - return nil, err - } - - return metadata, nil -} - -func (v *v2Handler) handleRequestBody(r *http.Request, ndReader *decoder.NDJSONStreamReader, report reporter) *streamResponse { - resp := &streamResponse{} - - metadata, err := v.readMetadata(r, ndReader) - - // no point in continuing if we couldn't read the metadata - if err != nil { - switch e := err.(type) { - case decoder.ReadError: - resp.addWithMessage(ServerError, 1, e.Error()) - case decoder.JSONDecodeError: - resp.addWithOffendingDocument(InvalidJSONErr, err.Error(), ndReader.LastLine()) +func (v *v2Handler) statusCode(sr *stream.Result) int { + var code int + higestCode := http.StatusAccepted + for _, err := range sr.Errors { + switch err.Type { + case stream.InvalidInputErrType: + code = http.StatusBadRequest + case stream.ProcessingTimeoutErrType: + code = http.StatusRequestTimeout + case stream.QueueFullErrType: + code = http.StatusTooManyRequests + case stream.ShuttingDownErrType: + code = http.StatusServiceUnavailable + case stream.ServerErrType: + code = http.StatusInternalServerError default: - resp.addWithOffendingDocument(SchemaValidationErr, err.Error(), ndReader.LastLine()) - } - - return resp - } - - tctx := &transform.Context{ - RequestTime: requestTime(r), - Config: v.tconfig, - Metadata: *metadata, - } - - for { - transformables, done := v.readBatch(batchSize, ndReader, resp) - if transformables != nil { - err := report(r.Context(), pendingReq{ - transformables: transformables, - tcontext: tctx, - }) - - if err != nil { - if err == errChannelClosed { - resp.add(ShuttingDownErr, 1) - return resp - } - - if err == errFull { - resp.add(QueueFullErr, len(transformables)) - resp.Dropped += len(transformables) - continue - } - - resp.addWithMessage(ServerError, len(transformables), err.Error()) - } + code = http.StatusInternalServerError } - - if done { - break + if code > higestCode { + higestCode = code } } - return resp + return higestCode } -func (v *v2Handler) sendResponse(logger *logp.Logger, w http.ResponseWriter, sr *streamResponse) { - statusCode := sr.statusCode() +func (v *v2Handler) sendResponse(logger *logp.Logger, w http.ResponseWriter, sr *stream.Result) { + statusCode := v.statusCode(sr) w.WriteHeader(statusCode) if statusCode != http.StatusAccepted { @@ -234,7 +75,7 @@ func (v *v2Handler) sendResponse(logger *logp.Logger, w http.ResponseWriter, sr // https://golang.org/src/net/http/server.go#L1254 w.Header().Add("Connection", "Close") - buf, err := sr.marshal() + buf, err := json.Marshal(sr) if err != nil { logger.Errorw("error sending response", "error", err) } @@ -246,32 +87,32 @@ func (v *v2Handler) sendResponse(logger *logp.Logger, w http.ResponseWriter, sr } } -// handleInvalidHeaders reads out the rest of the body and discards it -// then returns an error response -func (v *v2Handler) handleInvalidHeaders(w http.ResponseWriter, r *http.Request) { - sr := streamResponse{ - Dropped: -1, - Accepted: -1, - Invalid: 1, - } - sr.add(InvalidContentTypeErr, 1) - - v.sendResponse(requestLogger(r), w, &sr) -} - -func (v *v2Handler) Handle(beaterConfig *Config, report reporter) http.Handler { +func (v *v2Handler) Handle(beaterConfig *Config, report publish.Reporter) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { logger := requestLogger(r) ndReader, err := decoder.NDJSONStreamDecodeCompressedWithLimit(r, beaterConfig.MaxUnzippedSize) if err != nil { // if we can't set up the ndjsonreader, // we won't be able to make sense of the body - v.handleInvalidHeaders(w, r) + sr := stream.Result{} + sr.LimitedAdd(&stream.Error{ + Type: stream.InvalidInputErrType, + Message: err.Error(), + }) + v.sendResponse(logger, w, &sr) + return + } + // extract metadata information from the request, like user-agent or remote address + reqMeta, err := v.requestDecoder(r) + if err != nil { + sr := stream.Result{} + sr.LimitedAdd(err) + v.sendResponse(logger, w, &sr) return } - streamResponse := v.handleRequestBody(r, ndReader, report) + res := v.streamProcessor.HandleStream(r.Context(), reqMeta, ndReader, report) - v.sendResponse(logger, w, streamResponse) + v.sendResponse(logger, w, res) }) } diff --git a/beater/v2_handler_test.go b/beater/v2_handler_test.go index 887fcd3983..f9d79dde82 100644 --- a/beater/v2_handler_test.go +++ b/beater/v2_handler_test.go @@ -19,312 +19,65 @@ package beater import ( "bytes" - "context" - "encoding/json" "net/http" "net/http/httptest" - "strings" "testing" - "testing/iotest" - "time" + "github.com/pkg/errors" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "github.com/elastic/apm-server/model" - errorm "github.com/elastic/apm-server/model/error" - "github.com/elastic/apm-server/model/metric" - "github.com/elastic/apm-server/model/span" - "github.com/elastic/apm-server/model/transaction" + "github.com/elastic/apm-server/decoder" "github.com/elastic/apm-server/transform" ) -func validMetadata() string { - return `{"metadata": {"service": {"name": "myservice", "agent": {"name": "test", "version": "1.0"}}}}` -} - -func TestV2Handler(t *testing.T) { - var transformables []transform.Transformable - var reportedTCtx *transform.Context - report := func(ctx context.Context, p pendingReq) error { - transformables = append(transformables, p.transformables...) - reportedTCtx = p.tcontext - return nil - } +func TestInvalidContentType(t *testing.T) { + req := httptest.NewRequest("POST", "/v2/intake", nil) + w := httptest.NewRecorder() c := defaultConfig("7.0.0") + handler := (&v2BackendRoute).Handler(c, nil) - handler := (&v2BackendRoute).Handler(c, report) - - tx1 := "tx1" - spanHexId, traceId := "0147258369abcdef", "abcdefabcdef01234567890123456789" - timestamp, err := time.Parse(time.RFC3339, "2018-01-01T10:00:00Z") - assert.NoError(t, err) - reqTimestamp, err := time.Parse(time.RFC3339, "2018-01-02T10:00:00Z") - assert.NoError(t, err) + handler.ServeHTTP(w, req) - transactionId := "fedcba0123456789" - - for idx, test := range []struct { - body string - contentType string - err *streamResponse - expectedCode int - reported []transform.Transformable - }{ - { - body: "", - contentType: "", - err: &streamResponse{ - Errors: map[StreamErrorType]errorDetails{ - "ERR_CONTENT_TYPE": errorDetails{ - Count: 1, - Message: "invalid content-type. Expected 'application/x-ndjson'", - }, - }, - Accepted: -1, - Dropped: -1, - Invalid: 1, - }, - expectedCode: 400, - reported: []transform.Transformable{}, - }, - { - body: strings.Join([]string{ - validMetadata(), - `{"invalid json"}`, - }, "\n"), - contentType: "application/x-ndjson", - err: &streamResponse{ - Errors: map[StreamErrorType]errorDetails{ - "ERR_INVALID_JSON": errorDetails{ - Count: 1, - Message: "invalid JSON", - Documents: []*ValidationError{ - { - Error: "data read error: invalid character '}' after object key", - OffendingEvent: `{"invalid json"}`, - }, - }, - }, - }, - Accepted: 0, - Dropped: 0, - Invalid: 1, - }, - expectedCode: 400, - reported: []transform.Transformable{}, - }, - { - body: strings.Join([]string{ - `{"transaction": {"invalid": "metadata"}}`, // invalid metadata - `{"transaction": {"invalid": "metadata"}}`, - }, "\n"), - contentType: "application/x-ndjson", - expectedCode: 400, - err: &streamResponse{ - Errors: map[StreamErrorType]errorDetails{ - "ERR_SCHEMA_VALIDATION": errorDetails{ - Count: 1, - Message: "validation error", - Documents: []*ValidationError{ - { - Error: "did not recognize object type", - OffendingEvent: "{\"transaction\": {\"invalid\": \"metadata\"}}\n", - }, - }, - }, - }, - }, - reported: []transform.Transformable{}, - }, - { - body: strings.Join([]string{ - `{"metadata": {}}`, - `{"span": {}}`, - }, "\n"), - contentType: "application/x-ndjson", - expectedCode: 400, - err: &streamResponse{ - Errors: map[StreamErrorType]errorDetails{ - "ERR_SCHEMA_VALIDATION": errorDetails{ - Count: 1, - Message: "validation error", - Documents: []*ValidationError{ - { - Error: "Problem validating JSON document against schema: I[#] S[#] doesn't validate with \"metadata#\"\n I[#] S[#/required] missing properties: \"service\"", - OffendingEvent: "{\"metadata\": {}}\n", - }, - }, - }, - }, - }, - reported: []transform.Transformable{}, - }, - { - body: strings.Join([]string{ - validMetadata(), - `{"transaction": {"name": "tx1", "id": "9876543210abcdef", "duration": 12, "type": "request", "timestamp": "2018-01-01T10:00:00Z", "trace_id": "abcdefabcdef01234567890123456789"}}`, - `{"span": {"name": "sp1", "duration": 20, "start": 10, "type": "db", "timestamp": "2018-01-01T10:00:00Z", "id": "0147258369abcdef","trace_id": "abcdefabcdef01234567890123456789", "transaction_id": "fedcba0123456789", "stacktrace": [{"filename": "file.js", "lineno": 10}, {"filename": "file2.js", "lineno": 11}]}}`, - `{"metric": {"samples": {"my-metric": {"value": 99}}, "timestamp": "2018-01-01T10:00:00Z"}}`, - `{"error": {"exception": {"message": "hello world!"}}}`, - }, "\n"), - contentType: "application/x-ndjson", - expectedCode: http.StatusAccepted, - reported: []transform.Transformable{ - &transaction.Event{Name: &tx1, Id: "9876543210abcdef", Duration: 12, Type: "request", Timestamp: timestamp, TraceId: &traceId}, - &span.Event{Name: "sp1", Duration: 20.0, Start: 10, Type: "db", Timestamp: timestamp, HexId: &spanHexId, TransactionId: &transactionId, TraceId: &traceId, Stacktrace: model.Stacktrace{&model.StacktraceFrame{Filename: "file.js", Lineno: 10}, &model.StacktraceFrame{Filename: "file2.js", Lineno: 11}}}, - &metric.Metric{Samples: []*metric.Sample{&metric.Sample{Name: "my-metric", Value: 99}}, Timestamp: timestamp}, - &errorm.Event{Exception: &errorm.Exception{Message: "hello world!", Stacktrace: model.Stacktrace{}}}, - }, - }, - { - // optional timestamps - body: strings.Join([]string{ - validMetadata(), - `{"transaction": {"name": "tx1", "id": "1111222233334444", "trace_id": "abcdefabcdef01234567890123456789", "duration": 12, "type": "request"}}`, - `{"span": {"name": "sp1","trace_id": "abcdefabcdef01234567890123456789", "duration": 20, "start": 10, "type": "db", "id": "0147258369abcdef", "transaction_id": "fedcba0123456789"}}`, - `{"metric": {"samples": {"my-metric": {"value": 99}}, "timestamp": "2018-01-01T10:00:00Z"}}`, - }, "\n"), - contentType: "application/x-ndjson", - expectedCode: http.StatusAccepted, - reported: []transform.Transformable{ - &transaction.Event{Name: &tx1, Id: "1111222233334444", Duration: 12, Type: "request", TraceId: &traceId}, - &span.Event{Name: "sp1", Duration: 20.0, Start: 10, Type: "db", HexId: &spanHexId, TransactionId: &transactionId, TraceId: &traceId}, - &metric.Metric{Timestamp: timestamp, Samples: []*metric.Sample{&metric.Sample{Name: "my-metric", Value: 99}}}, - }, - }, - } { - transformables = []transform.Transformable{} - bodyReader := bytes.NewBufferString(test.body) - - r := httptest.NewRequest("POST", "/v2/intake", bodyReader) - r.Header.Add("Content-Type", test.contentType) - - w := httptest.NewRecorder() - - // set request time - r = r.WithContext(context.WithValue(r.Context(), requestTimeContextKey, reqTimestamp)) - - handler.ServeHTTP(w, r) - - assert.Equal(t, test.expectedCode, w.Code, "Failed at index %d: %s", idx, w.Body.String()) - if test.err != nil { - var actualResponse streamResponse - assert.NoError(t, json.Unmarshal(w.Body.Bytes(), &actualResponse)) - assert.Equal(t, *test.err, actualResponse, "Failed at index %d", idx) - } else { - assert.Equal(t, reqTimestamp, reportedTCtx.RequestTime) - } - - assert.Equal(t, test.reported, transformables) - } + assert.Equal(t, http.StatusBadRequest, w.Code, w.Body.String()) } -func TestV2HandlerReadError(t *testing.T) { - var transformables []transform.Transformable - report := func(ctx context.Context, p pendingReq) error { - transformables = append(transformables, p.transformables...) - return nil - } +func TestEmptyRequest(t *testing.T) { + req := httptest.NewRequest("POST", "/v2/intake", nil) + req.Header.Add("Content-Type", "application/x-ndjson") + + w := httptest.NewRecorder() c := defaultConfig("7.0.0") + handler := (&v2BackendRoute).Handler(c, nil) - body := strings.Join([]string{ - validMetadata(), - `{"transaction": {"name": "tx1", "id": "8ace3f94cd01462c", "trace_id": "0123456789", "duration": 12, "type": "request", "timestamp": "2018-01-01T10:00:00Z"}}`, - `{"span": {"name": "sp1", "duration": 20, "start": 10, "type": "db", "trace_id": "0123456789", "id": "0000111122223333", "timestamp": "2018-01-01T10:00:01Z", "transaction_id": "fedcba0123456789"}}`, - `{"metric": {"samples": {"my-metric": {"value": 99}}, "timestamp": "2018-01-01T10:00:00Z"}}`, - }, "\n") + handler.ServeHTTP(w, req) - bodyReader := bytes.NewBufferString(body) - timeoutReader := iotest.TimeoutReader(bodyReader) + assert.Equal(t, http.StatusBadRequest, w.Code, w.Body.String()) +} - r := httptest.NewRequest("POST", "/v2/intake", timeoutReader) - r.Header.Add("Content-Type", "application/x-ndjson") +func TestRequestDecoderError(t *testing.T) { + req := httptest.NewRequest("POST", "/v2/intake", bytes.NewBufferString(`asdasd`)) + req.Header.Add("Content-Type", "application/x-ndjson") w := httptest.NewRecorder() - handler := (&v2BackendRoute).Handler(c, report) - handler.ServeHTTP(w, r) - - expected := &streamResponse{ - Errors: map[StreamErrorType]errorDetails{ - "ERR_SERVER_ERROR": errorDetails{ - Count: 1, - Message: "timeout", - }, - }, + c := defaultConfig("7.0.0") + expectedErr := errors.New("Faulty decoder") + faultyDecoder := func(r *http.Request) (map[string]interface{}, error) { + return nil, expectedErr } - expectedBuf, err := expected.marshal() - require.NoError(t, err) - - assert.Equal(t, string(expectedBuf), string(w.Body.Bytes())) - assert.Equal(t, http.StatusInternalServerError, w.Code, w.Body.String()) -} - -func TestV2HandlerReportingError(t *testing.T) { - for _, test := range []struct { - err *streamResponse - expectedCode int - report func(ctx context.Context, p pendingReq) error - }{ - { - err: &streamResponse{ - Errors: map[StreamErrorType]errorDetails{ - "ERR_SHUTTING_DOWN": errorDetails{ - Count: 1, - Message: "server is shutting down", - }, - }, - Accepted: 0, - Dropped: 0, - Invalid: 0, - }, - expectedCode: 503, - report: func(ctx context.Context, p pendingReq) error { - return errChannelClosed - }, - }, { - err: &streamResponse{ - Errors: map[StreamErrorType]errorDetails{ - "ERR_QUEUE_FULL": errorDetails{ - Count: 1, - Message: "queue is full", - }, - }, - Accepted: 0, - Dropped: 1, - Invalid: 0, - }, - expectedCode: 429, - report: func(ctx context.Context, p pendingReq) error { - return errFull - }, + testRouteWithFaultyDecoder := v2Route{ + routeType{ + v2backendHandler, + func(*Config, decoder.ReqDecoder) decoder.ReqDecoder { return faultyDecoder }, + func(*Config) transform.Config { return transform.Config{} }, }, - } { - - c := defaultConfig("7.0.0") - - body := strings.Join([]string{ - validMetadata(), - `{"transaction": {"name": "tx1","trace_id": "01234567890123456789abcdefabcdef", "id": "8ace3f94462ab069", "duration": 12, "type": "request", "timestamp": "2018-01-01T10:00:00Z"}}`, - }, "\n") - - bodyReader := bytes.NewBufferString(body) - - r := httptest.NewRequest("POST", "/v2/intake", bodyReader) - r.Header.Add("Content-Type", "application/x-ndjson") - - w := httptest.NewRecorder() + } - handler := (&v2BackendRoute).Handler(c, test.report) - handler.ServeHTTP(w, r) + handler := testRouteWithFaultyDecoder.Handler(c, nil) - assert.Equal(t, test.expectedCode, w.Code, w.Body.String()) + handler.ServeHTTP(w, req) - expectedBuf, err := test.err.marshal() - require.NoError(t, err) - assert.Equal(t, string(expectedBuf), string(w.Body.Bytes())) - } + assert.Equal(t, http.StatusInternalServerError, w.Code, w.Body.String()) } diff --git a/beater/v2_integration_test.go b/beater/v2_integration_test.go deleted file mode 100644 index 563551432d..0000000000 --- a/beater/v2_integration_test.go +++ /dev/null @@ -1,88 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package beater - -import ( - "bytes" - "context" - "fmt" - "net/http/httptest" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "github.com/elastic/apm-server/tests" - "github.com/elastic/apm-server/tests/loader" - "github.com/elastic/beats/libbeat/beat" -) - -func TestV2IntakeIntegration(t *testing.T) { - report := func(ctx context.Context, p pendingReq) error { - var events []beat.Event - for _, transformable := range p.transformables { - events = append(events, transformable.Transform(p.tcontext)...) - } - name := ctx.Value("name").(string) - verifyErr := tests.ApproveEvents(events, name, nil) - if verifyErr != nil { - assert.Fail(t, fmt.Sprintf("Test %s failed with error: %s", name, verifyErr.Error())) - } - return nil - } - - c := defaultConfig("7.0.0") - - handler := (&v2BackendRoute).Handler(c, report) - - for _, test := range []struct { - path string - name string - status int - }{ - {status: 202, path: "../testdata/intake-v2/errors.ndjson", name: "Errors"}, - {status: 202, path: "../testdata/intake-v2/transactions.ndjson", name: "Transactions"}, - {status: 202, path: "../testdata/intake-v2/spans.ndjson", name: "Spans"}, - {status: 202, path: "../testdata/intake-v2/metrics.ndjson", name: "Metrics"}, - {status: 202, path: "../testdata/intake-v2/minimal_process.ndjson", name: "MixedMinimalProcess"}, - {status: 202, path: "../testdata/intake-v2/minimal_service.ndjson", name: "MinimalService"}, - {status: 202, path: "../testdata/intake-v2/metadata_null_values.ndjson", name: "MetadataNullValues"}, - {status: 400, path: "../testdata/intake-v2/invalid-event.ndjson", name: "InvalidEvent"}, - } { - - b, err := loader.LoadDataAsBytes(test.path) - require.NoError(t, err) - bodyReader := bytes.NewBufferString(string(b)) - - r := httptest.NewRequest("POST", "/v2/intake", bodyReader) - r.Header.Add("Content-Type", "application/x-ndjson") - r.Header.Add("X-Real-Ip", "192.0.0.1") - - w := httptest.NewRecorder() - - name := fmt.Sprintf("approved-es-documents/testV2IntakeIntegration%s", test.name) - r = r.WithContext(context.WithValue(r.Context(), "name", name)) - reqTimestamp, err := time.Parse(time.RFC3339, "2018-08-01T10:00:00Z") - r = r.WithContext(context.WithValue(r.Context(), requestTimeContextKey, reqTimestamp)) - handler.ServeHTTP(w, r) - - assert.Equal(t, test.status, w.Code) - - } -} diff --git a/decoder/req_decoder.go b/decoder/req_decoder.go index a506785784..5fbb25a37f 100644 --- a/decoder/req_decoder.go +++ b/decoder/req_decoder.go @@ -74,12 +74,15 @@ func DecodeLimitJSONData(maxSize int64) ReqDecoder { if err != nil { return nil, err } + + reader = http.MaxBytesReader(nil, reader, maxSize) + return DecodeJSONData(monitoringReader{reader}) } } -// CompressedRequestReader makes a function that uses information from an http request to construct a Limited ReadCloser -// from the body of the request, handling any decompression necessary +// CompressedRequestReader returns a `ReqReader` that will decompress +// the body according to the supplied Content-Encoding header in the request func CompressedRequestReader(maxSize int64) ReqReader { return func(req *http.Request) (io.ReadCloser, error) { reader := req.Body @@ -121,7 +124,7 @@ func CompressedRequestReader(maxSize int64) ReqReader { } } readerCounter.Inc() - return http.MaxBytesReader(nil, reader, maxSize), nil + return reader, nil } } diff --git a/decoder/stream_decoder.go b/decoder/stream_decoder.go index ee485747c9..408c8d7f4f 100644 --- a/decoder/stream_decoder.go +++ b/decoder/stream_decoder.go @@ -38,7 +38,11 @@ func NDJSONStreamDecodeCompressedWithLimit(req *http.Request, maxSize int64) (*N return nil, err } - return &NDJSONStreamReader{bufio.NewReader(reader), false, nil}, nil + return NewNDJSONStreamReader(reader), nil +} + +func NewNDJSONStreamReader(reader io.Reader) *NDJSONStreamReader { + return &NDJSONStreamReader{bufio.NewReader(reader), false, nil} } type NDJSONStreamReader struct { @@ -54,7 +58,7 @@ func (s JSONDecodeError) Error() string { return string(s) } func (s ReadError) Error() string { return string(s) } func (sr *NDJSONStreamReader) Read() (map[string]interface{}, error) { - // ReadBytes can return valid data in `buf` _and_ also an io.EOF + // readLine can return valid data in `buf` _and_ also an io.EOF buf, readErr := sr.stream.ReadBytes('\n') if readErr != nil && readErr != io.EOF { return nil, ReadError(readErr.Error()) @@ -77,5 +81,5 @@ func (sr *NDJSONStreamReader) Read() (map[string]interface{}, error) { return decoded, readErr // this might be io.EOF } -func (sr *NDJSONStreamReader) IsEOF() bool { return sr.isEOF } -func (sr *NDJSONStreamReader) LastLine() []byte { return sr.latestLine } +func (sr *NDJSONStreamReader) IsEOF() bool { return sr.isEOF } +func (sr *NDJSONStreamReader) LatestLine() []byte { return sr.latestLine } diff --git a/model/error/generated/schema/error.go b/model/error/generated/schema/error.go index 9c3ce002f9..c0796e03f3 100644 --- a/model/error/generated/schema/error.go +++ b/model/error/generated/schema/error.go @@ -173,15 +173,17 @@ const ModelSchema = `{ "required": ["url", "method"] }, "tags": { - "type": ["object", "null"], - "description": "A flat mapping of user-defined tags with string values.", - "patternProperties": { - "^[^.*\"]*$": { - "type": ["string", "null"], - "maxLength": 1024 - } - }, - "additionalProperties": false + "$id": "doc/spec/tags.json", + "title": "Tags", + "type": ["object", "null"], + "description": "A flat mapping of user-defined tags with string values.", + "patternProperties": { + "^[^.*\"]*$": { + "type": ["string", "null"], + "maxLength": 1024 + } + }, + "additionalProperties": false }, "user": { "$id": "docs/spec/user.json", diff --git a/model/span/generated/schema/transaction.go b/model/span/generated/schema/transaction.go index be921fcab1..c9e9340b8b 100644 --- a/model/span/generated/schema/transaction.go +++ b/model/span/generated/schema/transaction.go @@ -58,6 +58,19 @@ const ModelSchema = `{ "description": "The raw url of the correlating http request." } } + }, + "tags": { + "$id": "doc/spec/tags.json", + "title": "Tags", + "type": ["object", "null"], + "description": "A flat mapping of user-defined tags with string values.", + "patternProperties": { + "^[^.*\"]*$": { + "type": ["string", "null"], + "maxLength": 1024 + } + }, + "additionalProperties": false } } }, @@ -187,6 +200,19 @@ const ModelSchema = `{ "description": "The raw url of the correlating http request." } } + }, + "tags": { + "$id": "doc/spec/tags.json", + "title": "Tags", + "type": ["object", "null"], + "description": "A flat mapping of user-defined tags with string values.", + "patternProperties": { + "^[^.*\"]*$": { + "type": ["string", "null"], + "maxLength": 1024 + } + }, + "additionalProperties": false } } }, diff --git a/model/transaction/generated/schema/transaction.go b/model/transaction/generated/schema/transaction.go index 189e783bbd..de4bf1f99b 100644 --- a/model/transaction/generated/schema/transaction.go +++ b/model/transaction/generated/schema/transaction.go @@ -172,15 +172,17 @@ const ModelSchema = `{ "required": ["url", "method"] }, "tags": { - "type": ["object", "null"], - "description": "A flat mapping of user-defined tags with string values.", - "patternProperties": { - "^[^.*\"]*$": { - "type": ["string", "null"], - "maxLength": 1024 - } - }, - "additionalProperties": false + "$id": "doc/spec/tags.json", + "title": "Tags", + "type": ["object", "null"], + "description": "A flat mapping of user-defined tags with string values.", + "patternProperties": { + "^[^.*\"]*$": { + "type": ["string", "null"], + "maxLength": 1024 + } + }, + "additionalProperties": false }, "user": { "$id": "docs/spec/user.json", diff --git a/beater/approved-es-documents/testV2IntakeIntegrationErrors.approved.json b/processor/stream/approved-es-documents/testV2IntakeIntegrationErrors.approved.json similarity index 100% rename from beater/approved-es-documents/testV2IntakeIntegrationErrors.approved.json rename to processor/stream/approved-es-documents/testV2IntakeIntegrationErrors.approved.json diff --git a/beater/approved-es-documents/testV2IntakeIntegrationMetadataNullValues.approved.json b/processor/stream/approved-es-documents/testV2IntakeIntegrationMetadataNullValues.approved.json similarity index 100% rename from beater/approved-es-documents/testV2IntakeIntegrationMetadataNullValues.approved.json rename to processor/stream/approved-es-documents/testV2IntakeIntegrationMetadataNullValues.approved.json diff --git a/beater/approved-es-documents/testV2IntakeIntegrationMetrics.approved.json b/processor/stream/approved-es-documents/testV2IntakeIntegrationMetrics.approved.json similarity index 100% rename from beater/approved-es-documents/testV2IntakeIntegrationMetrics.approved.json rename to processor/stream/approved-es-documents/testV2IntakeIntegrationMetrics.approved.json diff --git a/beater/approved-es-documents/testV2IntakeIntegrationMinimalService.approved.json b/processor/stream/approved-es-documents/testV2IntakeIntegrationMinimalService.approved.json similarity index 100% rename from beater/approved-es-documents/testV2IntakeIntegrationMinimalService.approved.json rename to processor/stream/approved-es-documents/testV2IntakeIntegrationMinimalService.approved.json diff --git a/beater/approved-es-documents/testV2IntakeIntegrationMixedMinimalProcess.approved.json b/processor/stream/approved-es-documents/testV2IntakeIntegrationMixedMinimalProcess.approved.json similarity index 100% rename from beater/approved-es-documents/testV2IntakeIntegrationMixedMinimalProcess.approved.json rename to processor/stream/approved-es-documents/testV2IntakeIntegrationMixedMinimalProcess.approved.json diff --git a/beater/approved-es-documents/testV2IntakeIntegrationSpans.approved.json b/processor/stream/approved-es-documents/testV2IntakeIntegrationSpans.approved.json similarity index 100% rename from beater/approved-es-documents/testV2IntakeIntegrationSpans.approved.json rename to processor/stream/approved-es-documents/testV2IntakeIntegrationSpans.approved.json diff --git a/beater/approved-es-documents/testV2IntakeIntegrationTransactions.approved.json b/processor/stream/approved-es-documents/testV2IntakeIntegrationTransactions.approved.json similarity index 100% rename from beater/approved-es-documents/testV2IntakeIntegrationTransactions.approved.json rename to processor/stream/approved-es-documents/testV2IntakeIntegrationTransactions.approved.json diff --git a/processor/stream/approved-stream-result/testIntegrationResultErrors.approved.json b/processor/stream/approved-stream-result/testIntegrationResultErrors.approved.json new file mode 100644 index 0000000000..e36e2c0c42 --- /dev/null +++ b/processor/stream/approved-stream-result/testIntegrationResultErrors.approved.json @@ -0,0 +1,3 @@ +{ + "accepted": 3 +} diff --git a/processor/stream/approved-stream-result/testIntegrationResultInvalidEvent.approved.json b/processor/stream/approved-stream-result/testIntegrationResultInvalidEvent.approved.json new file mode 100644 index 0000000000..b27dceca0e --- /dev/null +++ b/processor/stream/approved-stream-result/testIntegrationResultInvalidEvent.approved.json @@ -0,0 +1,9 @@ +{ + "accepted": 0, + "errors": [ + { + "document": "{ \"transaction\": { \"id\": 12345, \"trace_id\": \"0123456789abcdef0123456789abcdef\", \"parent_id\": \"abcdefabcdef01234567\", \"type\": \"request\", \"duration\": 32.592981 } } \n", + "message": "Problem validating JSON document against schema: I[#] S[#] doesn't validate with \"transaction#\"\n I[#] S[#/allOf/1] allOf failed\n I[#/id] S[#/allOf/1/properties/id/type] expected string, but got number" + } + ] +} diff --git a/processor/stream/approved-stream-result/testIntegrationResultMetadataNullValues.approved.json b/processor/stream/approved-stream-result/testIntegrationResultMetadataNullValues.approved.json new file mode 100644 index 0000000000..172488d4a5 --- /dev/null +++ b/processor/stream/approved-stream-result/testIntegrationResultMetadataNullValues.approved.json @@ -0,0 +1,3 @@ +{ + "accepted": 1 +} diff --git a/processor/stream/approved-stream-result/testIntegrationResultMetrics.approved.json b/processor/stream/approved-stream-result/testIntegrationResultMetrics.approved.json new file mode 100644 index 0000000000..9421fa0d0b --- /dev/null +++ b/processor/stream/approved-stream-result/testIntegrationResultMetrics.approved.json @@ -0,0 +1,3 @@ +{ + "accepted": 2 +} diff --git a/processor/stream/approved-stream-result/testIntegrationResultMinimalService.approved.json b/processor/stream/approved-stream-result/testIntegrationResultMinimalService.approved.json new file mode 100644 index 0000000000..9421fa0d0b --- /dev/null +++ b/processor/stream/approved-stream-result/testIntegrationResultMinimalService.approved.json @@ -0,0 +1,3 @@ +{ + "accepted": 2 +} diff --git a/processor/stream/approved-stream-result/testIntegrationResultMixedMinimalProcess.approved.json b/processor/stream/approved-stream-result/testIntegrationResultMixedMinimalProcess.approved.json new file mode 100644 index 0000000000..6d6a93fd6b --- /dev/null +++ b/processor/stream/approved-stream-result/testIntegrationResultMixedMinimalProcess.approved.json @@ -0,0 +1,3 @@ +{ + "accepted": 4 +} diff --git a/processor/stream/approved-stream-result/testIntegrationResultSpans.approved.json b/processor/stream/approved-stream-result/testIntegrationResultSpans.approved.json new file mode 100644 index 0000000000..93ed508ba6 --- /dev/null +++ b/processor/stream/approved-stream-result/testIntegrationResultSpans.approved.json @@ -0,0 +1,3 @@ +{ + "accepted": 5 +} diff --git a/processor/stream/approved-stream-result/testIntegrationResultTransactions.approved.json b/processor/stream/approved-stream-result/testIntegrationResultTransactions.approved.json new file mode 100644 index 0000000000..1ea3d31648 --- /dev/null +++ b/processor/stream/approved-stream-result/testIntegrationResultTransactions.approved.json @@ -0,0 +1,3 @@ +{ + "accepted": 8 +} diff --git a/processor/stream/result.go b/processor/stream/result.go new file mode 100644 index 0000000000..8c3136be6a --- /dev/null +++ b/processor/stream/result.go @@ -0,0 +1,77 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package stream + +import ( + "fmt" + "strings" +) + +type Error struct { + Type StreamError `json:"-"` + Message string `json:"message"` + Document string `json:"document"` +} + +func (s *Error) Error() string { + if s.Document != "" { + return fmt.Sprintf("%s [%s]", s.Message, string(s.Document)) + } + return s.Message +} + +type StreamError int + +const ( + QueueFullErrType StreamError = iota + ProcessingTimeoutErrType + InvalidInputErrType + ShuttingDownErrType + ServerErrType +) + +const ( + errorsLimit = 5 +) + +type Result struct { + Accepted int `json:"accepted"` + Errors []*Error `json:"errors,omitempty"` +} + +func (r *Result) LimitedAdd(err error) { + if len(r.Errors) < errorsLimit { + r.Add(err) + } +} + +func (r *Result) Add(err error) { + if e, ok := err.(*Error); ok { + r.Errors = append(r.Errors, e) + } else { + r.Errors = append(r.Errors, &Error{Message: err.Error(), Type: ServerErrType}) + } +} + +func (r *Result) String() string { + errorList := []string{} + for _, e := range r.Errors { + errorList = append(errorList, e.Error()) + } + return strings.Join(errorList, ", ") +} diff --git a/processor/stream/result_test.go b/processor/stream/result_test.go new file mode 100644 index 0000000000..1d64c61781 --- /dev/null +++ b/processor/stream/result_test.go @@ -0,0 +1,47 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package stream + +import ( + "errors" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestStreamResponseSimple(t *testing.T) { + sr := Result{} + sr.LimitedAdd(&Error{Type: QueueFullErrType, Message: "err1", Document: "buf1"}) + sr.LimitedAdd(errors.New("transmogrifier error")) + sr.LimitedAdd(&Error{Type: InvalidInputErrType, Message: "err2", Document: "buf2"}) + sr.LimitedAdd(&Error{Type: ProcessingTimeoutErrType, Message: "err3", Document: "buf3"}) + + sr.LimitedAdd(&Error{Message: "err4"}) + sr.LimitedAdd(&Error{Message: "err5"}) + + // not added + sr.LimitedAdd(&Error{Message: "err6"}) + + // added + sr.Add(&Error{Message: "err6"}) + + assert.Len(t, sr.Errors, 6) + + expectedStr := `err1 [buf1], transmogrifier error, err2 [buf2], err3 [buf3], err4, err6` + assert.Equal(t, expectedStr, sr.String()) +} diff --git a/processor/stream/stream_processor.go b/processor/stream/stream_processor.go new file mode 100644 index 0000000000..5ec0f705ac --- /dev/null +++ b/processor/stream/stream_processor.go @@ -0,0 +1,239 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package stream + +import ( + "context" + "errors" + "io" + + "github.com/santhosh-tekuri/jsonschema" + + "github.com/elastic/apm-server/decoder" + er "github.com/elastic/apm-server/model/error" + "github.com/elastic/apm-server/model/metadata" + "github.com/elastic/apm-server/model/metric" + "github.com/elastic/apm-server/model/span" + "github.com/elastic/apm-server/model/transaction" + "github.com/elastic/apm-server/publish" + "github.com/elastic/apm-server/transform" + "github.com/elastic/apm-server/utility" + "github.com/elastic/apm-server/validation" +) + +var ( + ErrUnrecognizedObject = errors.New("did not recognize object type") +) + +type StreamReader interface { + Read() (map[string]interface{}, error) + IsEOF() bool + LatestLine() []byte +} + +type StreamProcessor struct { + Tconfig transform.Config +} + +const batchSize = 10 + +var models = []struct { + key string + schema *jsonschema.Schema + modelDecoder func(interface{}, error) (transform.Transformable, error) +}{ + { + "transaction", + transaction.ModelSchema(), + transaction.V2DecodeEvent, + }, + { + "span", + span.ModelSchema(), + span.V2DecodeEvent, + }, + { + "metric", + metric.ModelSchema(), + metric.DecodeMetric, + }, + { + "error", + er.ModelSchema(), + er.V2DecodeEvent, + }, +} + +func (v *StreamProcessor) readMetadata(reqMeta map[string]interface{}, reader StreamReader) (*metadata.Metadata, error) { + // first item is the metadata object + rawModel, err := reader.Read() + if err != nil { + if e, ok := err.(decoder.JSONDecodeError); ok || err == io.EOF { + return nil, &Error{ + Type: InvalidInputErrType, + Message: e.Error(), + Document: string(reader.LatestLine()), + } + } + return nil, err + } + + rawMetadata, ok := rawModel["metadata"].(map[string]interface{}) + if !ok { + return nil, &Error{ + Type: InvalidInputErrType, + Message: ErrUnrecognizedObject.Error(), + Document: string(reader.LatestLine()), + } + } + + for k, v := range reqMeta { + utility.InsertInMap(rawMetadata, k, v.(map[string]interface{})) + } + + // validate the metadata object against our jsonschema + err = validation.Validate(rawMetadata, metadata.ModelSchema()) + if err != nil { + return nil, &Error{ + Type: InvalidInputErrType, + Message: err.Error(), + Document: string(reader.LatestLine()), + } + } + + // create a metadata struct + metadata, err := metadata.DecodeMetadata(rawMetadata) + if err != nil { + return nil, err + } + + return metadata, nil +} + +// handleRawModel validates and decodes a single json object into its struct form +func (v *StreamProcessor) handleRawModel(rawModel map[string]interface{}) (transform.Transformable, error) { + for _, model := range models { + if entry, ok := rawModel[model.key]; ok { + err := validation.Validate(entry, model.schema) + if err != nil { + return nil, err + } + + tr, err := model.modelDecoder(entry, err) + if err != nil { + return nil, err + } + return tr, nil + } + } + return nil, ErrUnrecognizedObject +} + +// readBatch will read up to `batchSize` objects from the ndjson stream +// it returns a slice of eventables and a bool that indicates if there might be more to read. +func (v *StreamProcessor) readBatch(batchSize int, reader StreamReader, response *Result) ([]transform.Transformable, bool) { + var err error + var rawModel map[string]interface{} + + var eventables []transform.Transformable + for i := 0; i < batchSize && err == nil; i++ { + rawModel, err = reader.Read() + if err != nil && err != io.EOF { + + if e, ok := err.(decoder.JSONDecodeError); ok { + response.LimitedAdd(&Error{ + Type: InvalidInputErrType, + Message: e.Error(), + Document: string(reader.LatestLine()), + }) + continue + } + + // return early, we assume we can only recover from a JSON decode error + response.Add(err) + return eventables, true + } + + if rawModel != nil { + tr, err := v.handleRawModel(rawModel) + if err != nil { + response.LimitedAdd(&Error{ + Type: InvalidInputErrType, + Message: err.Error(), + Document: string(reader.LatestLine()), + }) + continue + } + eventables = append(eventables, tr) + } + } + + return eventables, reader.IsEOF() +} + +func (s *StreamProcessor) HandleStream(ctx context.Context, meta map[string]interface{}, jsonReader StreamReader, report publish.Reporter) *Result { + res := &Result{} + metadata, err := s.readMetadata(meta, jsonReader) + // no point in continuing if we couldn't read the metadata + if err != nil { + res.Add(err) + return res + } + + tctx := &transform.Context{ + RequestTime: utility.RequestTime(ctx), + Config: s.Tconfig, + Metadata: *metadata, + } + + for { + transformables, done := s.readBatch(batchSize, jsonReader, res) + if transformables != nil { + err := report(ctx, publish.PendingReq{ + Transformables: transformables, + Tcontext: tctx, + }) + + if err != nil { + switch err { + case publish.ErrChannelClosed: + res.Add(&Error{ + Type: ShuttingDownErrType, + Message: "server is shutting down", + }) + case publish.ErrFull: + res.Add(&Error{ + Type: QueueFullErrType, + Message: err.Error(), + }) + default: + res.Add(err) + } + + return res + } + + res.Accepted += len(transformables) + } + + if done { + break + } + } + return res +} diff --git a/processor/stream/stream_processor_test.go b/processor/stream/stream_processor_test.go new file mode 100644 index 0000000000..4ca3d348a3 --- /dev/null +++ b/processor/stream/stream_processor_test.go @@ -0,0 +1,353 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package stream + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "strings" + "testing" + "testing/iotest" + "time" + + "github.com/elastic/apm-server/decoder" + "github.com/elastic/apm-server/tests" + "github.com/elastic/apm-server/tests/loader" + "github.com/elastic/apm-server/utility" + "github.com/elastic/beats/libbeat/beat" + + "github.com/elastic/apm-server/model" + errorm "github.com/elastic/apm-server/model/error" + "github.com/elastic/apm-server/model/metric" + "github.com/elastic/apm-server/model/span" + "github.com/elastic/apm-server/model/transaction" + "github.com/elastic/apm-server/publish" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/elastic/apm-server/transform" +) + +func validMetadata() string { + return `{"metadata": {"service": {"name": "myservice", "agent": {"name": "test", "version": "1.0"}}}}` +} + +func TestV2Handler(t *testing.T) { + var transformables []transform.Transformable + var reportedTCtx *transform.Context + report := func(ctx context.Context, p publish.PendingReq) error { + transformables = append(transformables, p.Transformables...) + reportedTCtx = p.Tcontext + return nil + } + + tx1 := "tx1" + spanHexId, traceId := "0147258369abcdef", "abcdefabcdef01234567890123456789" + + timestamp, err := time.Parse(time.RFC3339, "2018-01-01T10:00:00Z") + assert.NoError(t, err) + reqTimestamp, err := time.Parse(time.RFC3339, "2018-01-02T10:00:00Z") + assert.NoError(t, err) + + transactionId := "fedcba0123456789" + + for idx, test := range []struct { + body string + err *Result + reported []transform.Transformable + }{ + { + body: strings.Join([]string{ + validMetadata(), + `{"invalid json"}`, + }, "\n"), + err: &Result{ + Errors: []*Error{ + { + Type: InvalidInputErrType, + Message: "data read error: invalid character '}' after object key", + Document: `{"invalid json"}`, + }, + }, + Accepted: 0, + }, + reported: []transform.Transformable{}, + }, + { + body: strings.Join([]string{ + `{"transaction": {"invalid": "metadata"}}`, // invalid metadata + `{"transaction": {"invalid": "metadata"}}`, + }, "\n"), + err: &Result{ + Errors: []*Error{ + { + Type: InvalidInputErrType, + Message: "did not recognize object type", + Document: "{\"transaction\": {\"invalid\": \"metadata\"}}\n", + }, + }, + }, + reported: []transform.Transformable{}, + }, + { + body: strings.Join([]string{ + `{"metadata": {}}`, + `{"span": {}}`, + }, "\n"), + err: &Result{ + Errors: []*Error{ + { + Type: InvalidInputErrType, + Message: "Problem validating JSON document against schema: I[#] S[#] doesn't validate with \"metadata#\"\n I[#] S[#/required] missing properties: \"service\"", + Document: "{\"metadata\": {}}\n", + }, + }, + }, + reported: []transform.Transformable{}, + }, + { + body: strings.Join([]string{ + validMetadata(), + `{"transaction": {"name": "tx1", "id": "9876543210abcdef", "duration": 12, "type": "request", "timestamp": "2018-01-01T10:00:00Z", "trace_id": "abcdefabcdef01234567890123456789"}}`, + `{"span": {"name": "sp1", "duration": 20, "start": 10, "type": "db", "timestamp": "2018-01-01T10:00:00Z", "id": "0147258369abcdef","trace_id": "abcdefabcdef01234567890123456789", "transaction_id": "fedcba0123456789", "stacktrace": [{"filename": "file.js", "lineno": 10}, {"filename": "file2.js", "lineno": 11}]}}`, + `{"metric": {"samples": {"my-metric": {"value": 99}}, "timestamp": "2018-01-01T10:00:00Z"}}`, + `{"error": {}`, // invalid json + `{"error": {"exception": {"message": "hello world!"}}}`, + }, "\n"), + err: &Result{ + Errors: []*Error{ + { + Type: InvalidInputErrType, + Message: "data read error: unexpected EOF", + Document: "{\"error\": {}\n", + }, + }, + Accepted: 4, + }, + reported: []transform.Transformable{ + &transaction.Event{Name: &tx1, Id: "9876543210abcdef", Duration: 12, Type: "request", Timestamp: timestamp, TraceId: &traceId}, + &span.Event{Name: "sp1", Duration: 20.0, Start: 10, Type: "db", Timestamp: timestamp, HexId: &spanHexId, TransactionId: &transactionId, TraceId: &traceId, Stacktrace: model.Stacktrace{&model.StacktraceFrame{Filename: "file.js", Lineno: 10}, &model.StacktraceFrame{Filename: "file2.js", Lineno: 11}}}, + &metric.Metric{Samples: []*metric.Sample{&metric.Sample{Name: "my-metric", Value: 99}}, Timestamp: timestamp}, + &errorm.Event{Exception: &errorm.Exception{Message: "hello world!", Stacktrace: model.Stacktrace{}}}, + }, + }, + { + body: strings.Join([]string{ + validMetadata(), + `{"transaction": {"name": "tx1", "id": "9876543210abcdef", "duration": 12, "type": "request", "timestamp": "2018-01-01T10:00:00Z", "trace_id": "abcdefabcdef01234567890123456789"}}`, + `{"error": {"log": {}}}`, // schema validation error + `{"metric": {"samples": {"my-metric": {"value": 99}}, "timestamp": "2018-01-01T10:00:00Z"}}`, + }, "\n"), + err: &Result{ + Errors: []*Error{ + { + Type: InvalidInputErrType, + Message: "Problem validating JSON document against schema: I[#] S[#] doesn't validate with \"error#\"\n I[#] S[#/allOf/0] allOf failed\n I[#/log] S[#/allOf/0/properties/log/required] missing properties: \"message\"", + Document: "{\"error\": {\"log\": {}}}\n", + }, + }, + Accepted: 2, + }, + reported: []transform.Transformable{ + &transaction.Event{Name: &tx1, Id: "9876543210abcdef", Duration: 12, Type: "request", Timestamp: timestamp, TraceId: &traceId}, + &metric.Metric{Samples: []*metric.Sample{&metric.Sample{Name: "my-metric", Value: 99}}, Timestamp: timestamp}, + }, + }, + { + // optional timestamps + body: strings.Join([]string{ + validMetadata(), + `{"transaction": {"name": "tx1", "id": "1111222233334444", "trace_id": "abcdefabcdef01234567890123456789", "duration": 12, "type": "request"}}`, + `{"span": {"name": "sp1","trace_id": "abcdefabcdef01234567890123456789", "duration": 20, "start": 10, "type": "db", "id": "0147258369abcdef", "transaction_id": "fedcba0123456789"}}`, + `{"metric": {"samples": {"my-metric": {"value": 99}}, "timestamp": "2018-01-01T10:00:00Z"}}`, + }, "\n"), + reported: []transform.Transformable{ + &transaction.Event{Name: &tx1, Id: "1111222233334444", Duration: 12, Type: "request", TraceId: &traceId}, + &span.Event{Name: "sp1", Duration: 20.0, Start: 10, Type: "db", HexId: &spanHexId, TransactionId: &transactionId, TraceId: &traceId}, + &metric.Metric{Timestamp: timestamp, Samples: []*metric.Sample{&metric.Sample{Name: "my-metric", Value: 99}}}, + }, + }, + } { + transformables = []transform.Transformable{} + bodyReader := bytes.NewBufferString(test.body) + + // set request time + ctx := utility.ContextWithRequestTime(context.Background(), reqTimestamp) + reader := decoder.NewNDJSONStreamReader(bodyReader) + sp := StreamProcessor{} + + actualResponse := sp.HandleStream(ctx, map[string]interface{}{}, reader, report) + + if test.err != nil { + assert.Equal(t, test.err, actualResponse, "Failed at index %d (%#v - %#v)", idx, test.err, actualResponse) + } else { + assert.Equal(t, reqTimestamp, reportedTCtx.RequestTime) + } + + assert.Equal(t, test.reported, transformables) + } +} + +func TestV2HandlerReadStreamError(t *testing.T) { + var transformables []transform.Transformable + report := func(ctx context.Context, p publish.PendingReq) error { + transformables = append(transformables, p.Transformables...) + return nil + } + + body := strings.Join([]string{ + validMetadata(), + `{"transaction": {"name": "tx1", "id": "8ace3f94cd01462c", "trace_id": "0123456789", "duration": 12, "type": "request", "timestamp": "2018-01-01T10:00:00Z"}}`, + `{"span": {"name": "sp1", "duration": 20, "start": 10, "type": "db", "trace_id": "0123456789", "id": "0000111122223333", "timestamp": "2018-01-01T10:00:01Z", "transaction_id": "fedcba0123456789"}}`, + `{"metric": {"samples": {"my-metric": {"value": 99}}, "timestamp": "2018-01-01T10:00:00Z"}}`, + }, "\n") + + bodyReader := bytes.NewBufferString(body) + timeoutReader := iotest.TimeoutReader(bodyReader) + + reader := decoder.NewNDJSONStreamReader(timeoutReader) + sp := StreamProcessor{} + + actualResponse := sp.HandleStream(context.Background(), map[string]interface{}{}, reader, report) + + expected := &Result{ + Errors: []*Error{ + { + Message: "timeout", + Type: ServerErrType, + }, + }, + Accepted: 2, + } + + assert.Equal(t, expected, actualResponse, "%#v - %#v", expected, actualResponse) +} + +func TestV2HandlerReportingStreamError(t *testing.T) { + for idx, test := range []struct { + err *Result + report func(ctx context.Context, p publish.PendingReq) error + }{ + { + err: &Result{ + Errors: []*Error{ + { + Type: ShuttingDownErrType, + Message: "server is shutting down", + }, + }, + }, + report: func(ctx context.Context, p publish.PendingReq) error { + return publish.ErrChannelClosed + }, + }, { + err: &Result{ + Errors: []*Error{ + { + Type: QueueFullErrType, + Message: "queue is full", + }, + }, + }, + report: func(ctx context.Context, p publish.PendingReq) error { + return publish.ErrFull + }, + }, + } { + body := strings.Join([]string{ + validMetadata(), + `{"transaction": {"name": "tx1","trace_id": "01234567890123456789abcdefabcdef", "id": "8ace3f94462ab069", "duration": 12, "type": "request", "timestamp": "2018-01-01T10:00:00Z"}}`, + `{"transaction": {"name": "tx1","trace_id": "01234567890123456789abcdefabcdef", "id": "8ace3f94462ab069", "duration": 12, "type": "request", "timestamp": "2018-01-01T10:00:00Z"}}`, + }, "\n") + + bodyReader := bytes.NewBufferString(body) + + reader := decoder.NewNDJSONStreamReader(bodyReader) + + sp := StreamProcessor{} + actualResponse := sp.HandleStream(context.Background(), map[string]interface{}{}, reader, test.report) + + assert.Equal(t, test.err, actualResponse, "Failed at idx %d", idx) + } +} + +func TestIntegration(t *testing.T) { + report := func(ctx context.Context, p publish.PendingReq) error { + var events []beat.Event + for _, transformable := range p.Transformables { + events = append(events, transformable.Transform(p.Tcontext)...) + } + name := ctx.Value("name").(string) + verifyErr := tests.ApproveEvents(events, name, nil) + if verifyErr != nil { + assert.Fail(t, fmt.Sprintf("Test %s failed with error: %s", name, verifyErr.Error())) + } + return nil + } + + for _, test := range []struct { + path string + name string + }{ + {path: "../testdata/intake-v2/errors.ndjson", name: "Errors"}, + {path: "../testdata/intake-v2/transactions.ndjson", name: "Transactions"}, + {path: "../testdata/intake-v2/spans.ndjson", name: "Spans"}, + {path: "../testdata/intake-v2/metrics.ndjson", name: "Metrics"}, + {path: "../testdata/intake-v2/minimal_process.ndjson", name: "MixedMinimalProcess"}, + {path: "../testdata/intake-v2/minimal_service.ndjson", name: "MinimalService"}, + {path: "../testdata/intake-v2/metadata_null_values.ndjson", name: "MetadataNullValues"}, + {path: "../testdata/intake-v2/invalid-event.ndjson", name: "InvalidEvent"}, + } { + t.Run(test.name, func(t *testing.T) { + b, err := loader.LoadDataAsBytes(test.path) + require.NoError(t, err) + bodyReader := bytes.NewBuffer(b) + + name := fmt.Sprintf("approved-es-documents/testV2IntakeIntegration%s", test.name) + ctx := context.WithValue(context.Background(), "name", name) + reqTimestamp, err := time.Parse(time.RFC3339, "2018-08-01T10:00:00Z") + ctx = utility.ContextWithRequestTime(ctx, reqTimestamp) + + reader := decoder.NewNDJSONStreamReader(bodyReader) + + reqDecoderMeta := map[string]interface{}{ + "system": map[string]interface{}{ + "ip": "192.0.0.1", + }, + } + + result := (&StreamProcessor{}).HandleStream(ctx, reqDecoderMeta, reader, report) + + resultName := fmt.Sprintf("approved-stream-result/testIntegrationResult%s", test.name) + resultJSON, err := json.Marshal(result) + require.NoError(t, err) + + var resultmap map[string]interface{} + err = json.Unmarshal(resultJSON, &resultmap) + require.NoError(t, err) + + verifyErr := tests.ApproveJson(resultmap, resultName, map[string]string{}) + if verifyErr != nil { + assert.Fail(t, fmt.Sprintf("Test %s failed with error: %s", name, verifyErr.Error())) + } + }) + } +} diff --git a/beater/pub.go b/publish/pub.go similarity index 81% rename from beater/pub.go rename to publish/pub.go index f71b793d43..122ce279c8 100644 --- a/beater/pub.go +++ b/publish/pub.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package beater +package publish import ( "context" @@ -30,6 +30,8 @@ import ( "github.com/elastic/beats/libbeat/beat" ) +type Reporter func(context.Context, PendingReq) error + // Publisher forwards batches of events to libbeat. It uses GuaranteedSend // to enable infinite retry of events being processed. // If the publisher's input channel is full, an error is returned immediately. @@ -38,31 +40,31 @@ import ( // number requests(events) active in the system can exceed the queue size. Only // the number of concurrent HTTP requests trying to publish at the same time is limited. type publisher struct { - pendingRequests chan pendingReq + pendingRequests chan PendingReq tracer *elasticapm.Tracer client beat.Client m sync.RWMutex stopped bool } -type pendingReq struct { - transformables []transform.Transformable - tcontext *transform.Context +type PendingReq struct { + Transformables []transform.Transformable + Tcontext *transform.Context trace bool } var ( - errFull = errors.New("Queue is full") - errInvalidBufferSize = errors.New("Request buffer must be > 0") - errChannelClosed = errors.New("Can't send batch, publisher is being stopped") + ErrFull = errors.New("queue is full") + ErrInvalidBufferSize = errors.New("request buffer must be > 0") + ErrChannelClosed = errors.New("can't send batch, publisher is being stopped") ) // newPublisher creates a new publisher instance. //MaxCPU new go-routines are started for forwarding events to libbeat. //Stop must be called to close the beat.Client and free resources. -func newPublisher(pipeline beat.Pipeline, N int, shutdownTimeout time.Duration, tracer *elasticapm.Tracer) (*publisher, error) { +func NewPublisher(pipeline beat.Pipeline, N int, shutdownTimeout time.Duration, tracer *elasticapm.Tracer) (*publisher, error) { if N <= 0 { - return nil, errInvalidBufferSize + return nil, ErrInvalidBufferSize } client, err := pipeline.ConnectWith(beat.ClientConfig{ @@ -82,7 +84,7 @@ func newPublisher(pipeline beat.Pipeline, N int, shutdownTimeout time.Duration, // Set channel size to N - 1. One request will be actively processed by the // worker, while the other concurrent requests will be buffered in the queue. - pendingRequests: make(chan pendingReq, N-1), + pendingRequests: make(chan PendingReq, N-1), } for i := 0; i < runtime.GOMAXPROCS(0); i++ { @@ -92,6 +94,10 @@ func newPublisher(pipeline beat.Pipeline, N int, shutdownTimeout time.Duration, return p, nil } +func (p *publisher) Client() beat.Client { + return p.client +} + // Stop closes all channels and waits for the the worker to stop. // The worker will drain the queue on shutdown, but no more pending requests // will be published. @@ -106,11 +112,11 @@ func (p *publisher) Stop() { // Send tries to forward pendingReq to the publishers worker. If the queue is full, // an error is returned. // Calling send after Stop will return an error. -func (p *publisher) Send(ctx context.Context, req pendingReq) error { +func (p *publisher) Send(ctx context.Context, req PendingReq) error { p.m.RLock() defer p.m.RUnlock() if p.stopped { - return errChannelClosed + return ErrChannelClosed } span, ctx := elasticapm.StartSpan(ctx, "Send", "Publisher") @@ -125,7 +131,7 @@ func (p *publisher) Send(ctx context.Context, req pendingReq) error { case p.pendingRequests <- req: return nil case <-time.After(time.Second * 1): // this forces the go scheduler to try something else for a while - return errFull + return ErrFull } } @@ -135,16 +141,16 @@ func (p *publisher) run() { } } -func (p *publisher) processPendingReq(req pendingReq) { +func (p *publisher) processPendingReq(req PendingReq) { var tx *elasticapm.Transaction if req.trace { tx = p.tracer.StartTransaction("ProcessPending", "Publisher") defer tx.End() } - for _, transformable := range req.transformables { + for _, transformable := range req.Transformables { span := tx.StartSpan("Transform", "Publisher", nil) - events := transformable.Transform(req.tcontext) + events := transformable.Transform(req.Tcontext) span.End() span = tx.StartSpan("PublishAll", "Publisher", nil) diff --git a/tests/approvals.go b/tests/approvals.go index cc54399bb1..7f4d89f413 100644 --- a/tests/approvals.go +++ b/tests/approvals.go @@ -60,7 +60,6 @@ func ApproveJson(received map[string]interface{}, name string, ignored map[strin r, _ := json.MarshalIndent(received, "", " ") ioutil.WriteFile(receivedPath, r, 0644) - received, _, diff, err := Compare(path, ignored) if err != nil { return err diff --git a/utility/request_time.go b/utility/request_time.go new file mode 100644 index 0000000000..d3e65bbc82 --- /dev/null +++ b/utility/request_time.go @@ -0,0 +1,42 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package utility + +import ( + "context" + "time" +) + +type contextKey string + +const requestTimeContextKey = contextKey("requestTime") + +func ContextWithRequestTime(ctx context.Context, t time.Time) context.Context { + if RequestTime(ctx).IsZero() { + return context.WithValue(ctx, requestTimeContextKey, t) + } + return ctx +} + +func RequestTime(ctx context.Context) time.Time { + t, ok := ctx.Value(requestTimeContextKey).(time.Time) + if !ok { + return time.Time{} + } + return t +}