From 6e9418d529f625993f4913f40e5bfd5f9a2e26d9 Mon Sep 17 00:00:00 2001 From: Ron cohen Date: Mon, 19 Mar 2018 21:31:38 +0100 Subject: [PATCH] WIP on intake api v2 --- beater/handlers.go | 11 +++ beater/streaming-api.go | 126 +++++++++++++++++++++++++++ decoder/decoder.go | 60 +++++++++++++ model/header.go | 7 ++ processor/transaction/event.go | 53 ----------- processor/transaction/payload.go | 50 ++++------- processor/transaction/processor.go | 2 +- processor/transaction/transaction.go | 89 +++++++++++++++++++ 8 files changed, 309 insertions(+), 89 deletions(-) create mode 100644 beater/streaming-api.go create mode 100644 model/header.go delete mode 100644 processor/transaction/event.go create mode 100644 processor/transaction/transaction.go diff --git a/beater/handlers.go b/beater/handlers.go index a3f31e98317..01a4966425e 100644 --- a/beater/handlers.go +++ b/beater/handlers.go @@ -28,6 +28,7 @@ import ( const ( BackendTransactionsURL = "/v1/transactions" + V2BackendURL = "/v2/transactions" FrontendTransactionsURL = "/v1/client-side/transactions" BackendErrorsURL = "/v1/errors" FrontendErrorsURL = "/v1/client-side/errors" @@ -63,6 +64,7 @@ var ( Routes = map[string]routeMapping{ BackendTransactionsURL: {backendHandler, transaction.NewProcessor}, + V2BackendURL: {backendHandler, transaction.NewProcessor}, FrontendTransactionsURL: {frontendHandler, transaction.NewProcessor}, BackendErrorsURL: {backendHandler, perr.NewProcessor}, FrontendErrorsURL: {frontendHandler, perr.NewProcessor}, @@ -87,6 +89,15 @@ func newMuxer(config *Config, report reporter) *http.ServeMux { return mux } +// func multiHandler(http.Handler) http.Handler { +// decoder, err := decoder.GetJSONDecoder(config.MaxUnzippedSize) +// if err != nil { +// return err +// } +// for val, err := decoder. + +// } + func backendHandler(pf ProcessorFactory, config *Config, report reporter) http.Handler { return logHandler( authHandler(config.SecretToken, diff --git a/beater/streaming-api.go b/beater/streaming-api.go new file mode 100644 index 00000000000..639cbcafd38 --- /dev/null +++ b/beater/streaming-api.go @@ -0,0 +1,126 @@ +package beater + +import ( + "errors" + "io/ioutil" + "net/http" + + "github.com/mitchellh/mapstructure" + "github.com/santhosh-tekuri/jsonschema" + + "github.com/elastic/apm-server/decoder" + "github.com/elastic/apm-server/model" + "github.com/elastic/apm-server/processor" + "github.com/elastic/apm-server/processor/transaction" + "github.com/elastic/apm-server/utility" +) + +// func SillyDecoder(fn func(r *http.Request) (decoder.DecodeReader, error)) decoder.Decoder { +// return func(r *http.Request) (map[string]interface{}, error) { +// return fn(r) +// } +// } + +// func backendV2Handler(pf ProcessorFactory, config *Config, report reporter) http.Handler { + +// decoder := decoder.GetJSONDecoder(config.MaxUnzippedSize) + +// process := processRequestHandler(pf, nil, report, +// decoder.DecodeSystemData( +// SillyDecoder(), config.AugmentEnabled + +// return logHandler( +// authHandler(config.SecretToken, process)) +// } + +func validateTransaction() { + +} + +func readSchema(file string) *jsonschema.Schema { + schemaData, err := ioutil.ReadFile(file) + if err != nil { + panic(err) + } + return processor.CreateSchema(string(schemaData), "Bleh") +} + +func decodeHeader(rawHeader map[string]interface{}) (*model.Header, error) { + header := model.Header{} + decoder, _ := mapstructure.NewDecoder( + &mapstructure.DecoderConfig{ + DecodeHook: utility.RFC3339DecoderHook, + Result: &header, + }, + ) + err := decoder.Decode(rawHeader) + if err != nil { + return nil, err + } + return &header, nil +} + +func processStreamingRequestHandler(pf ProcessorFactory, prConfig *processor.Config, report reporter, decode decoder.DecodeReader) http.Handler { + transactionSchema := readSchema("docs/specs/transactions/tranasction.json") + errorSchema := readSchema("docs/specs/transactions/tranasction.json") + spanSchema := readSchema("docs/specs/transactions/span.json") + + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + + code, err := func() (int, error) { + // read header + rawHeader, err := decode.Read() + if err != nil { + return http.StatusBadRequest, err + } + + header, err := decodeHeader(rawHeader) + if err != nil { + return http.StatusBadRequest, err + } + context := model.NewContext(header.Service, header.Process, header.System) + + decoder.DecodeUserData(func(*http.Request) { + + }) + + var schema *jsonschema.Schema + for { + item, err := decode.Read() + if err != nil { + return http.StatusBadRequest, err + } + + if item == nil { + return http.StatusAccepted, nil + } + + itemType, ok := item["type"] + if !ok { + return http.StatusBadRequest, errors.New("missing 'type' field") + } + + switch itemType { + case "transaction": + schema = transactionSchema + case "span": + schema = spanSchema + case "error": + schema = errorSchema + } + + err = schema.ValidateInterface(item) + if err != nil { + return http.StatusBadRequest, err + } + switch itemType { + case "transaction": + transaction.New + } + } + }() + + // code, err := processRequest(r, pf, prConfig, report, decode) + sendStatus(w, r, code, err) + }) +} diff --git a/decoder/decoder.go b/decoder/decoder.go index b8865030935..58f5767612e 100644 --- a/decoder/decoder.go +++ b/decoder/decoder.go @@ -1,10 +1,12 @@ package decoder import ( + "bufio" "compress/gzip" "compress/zlib" "encoding/json" "fmt" + "io" "net/http" "io/ioutil" @@ -53,6 +55,64 @@ func DecodeLimitJSONData(maxSize int64) Decoder { } } +type DecodeReader interface { + Read() (map[string]interface{}, error) +} + +func GetJSONDecoder(maxSize int64) func(req *http.Request) (DecodeReader, error) { + return func(req *http.Request) (DecodeReader, error) { + contentType := req.Header.Get("Content-Type") + + if !strings.Contains(contentType, "application/x-ndjson") { + return nil, fmt.Errorf("invalid content type: %s", req.Header.Get("Content-Type")) + } + + reader := req.Body + if reader == nil { + return nil, errors.New("no content") + } + + switch req.Header.Get("Content-Encoding") { + case "deflate": + var err error + reader, err = zlib.NewReader(reader) + if err != nil { + return nil, err + } + + case "gzip": + var err error + reader, err = gzip.NewReader(reader) + if err != nil { + return nil, err + } + } + reader = http.MaxBytesReader(nil, reader, maxSize) + return &NDJSONDecoder{bufio.NewReader(reader)}, nil + } +} + +type NDJSONDecoder struct { + reader *bufio.Reader +} + +func (n *NDJSONDecoder) Read() (map[string]interface{}, error) { + buf, err := n.reader.ReadBytes('\n') + if err == io.EOF { + return nil, nil + } + if err != nil { + return nil, err + } + var v map[string]interface{} + err = json.Unmarshal(buf, &v) + return v, err +} + +func (n *NDJSONDecoder) Decode(r *http.Request) (map[string]interface{}, error) { + return n.Read() +} + func DecodeSourcemapFormData(req *http.Request) (map[string]interface{}, error) { contentType := req.Header.Get("Content-Type") if !strings.Contains(contentType, "multipart/form-data") { diff --git a/model/header.go b/model/header.go new file mode 100644 index 00000000000..376884c7ae8 --- /dev/null +++ b/model/header.go @@ -0,0 +1,7 @@ +package model + +type Header struct { + Service Service + Process *Process + System *System +} diff --git a/processor/transaction/event.go b/processor/transaction/event.go deleted file mode 100644 index 33d8190e67b..00000000000 --- a/processor/transaction/event.go +++ /dev/null @@ -1,53 +0,0 @@ -package transaction - -import ( - "time" - - "github.com/elastic/apm-server/utility" - "github.com/elastic/beats/libbeat/common" -) - -type Event struct { - Id string - Name *string - Type string - Result *string - Duration float64 - Timestamp time.Time - Context common.MapStr - Spans []*Span - Marks common.MapStr - Sampled *bool - SpanCount SpanCount `mapstructure:"span_count"` -} -type SpanCount struct { - Dropped Dropped -} -type Dropped struct { - Total *int -} - -func (t *Event) Transform() common.MapStr { - tx := common.MapStr{"id": t.Id} - utility.Add(tx, "name", t.Name) - utility.Add(tx, "duration", utility.MillisAsMicros(t.Duration)) - utility.Add(tx, "type", t.Type) - utility.Add(tx, "result", t.Result) - utility.Add(tx, "marks", t.Marks) - - if t.Sampled == nil { - utility.Add(tx, "sampled", true) - } else { - utility.Add(tx, "sampled", t.Sampled) - } - - if t.SpanCount.Dropped.Total != nil { - s := common.MapStr{ - "dropped": common.MapStr{ - "total": *t.SpanCount.Dropped.Total, - }, - } - utility.Add(tx, "span_count", s) - } - return tx -} diff --git a/processor/transaction/payload.go b/processor/transaction/payload.go index 200a86fd236..a93fe45b45f 100644 --- a/processor/transaction/payload.go +++ b/processor/transaction/payload.go @@ -17,49 +17,29 @@ var ( ) type payload struct { - Service m.Service - System *m.System - Process *m.Process - Events []Event `mapstructure:"transactions"` - Context common.MapStr - User common.MapStr + Service m.Service + System *m.System + Process *m.Process + Transactions []Transaction `mapstructure:"transactions"` + Context common.MapStr + User common.MapStr } -func (pa *payload) transform(config *pr.Config) []beat.Event { +func (pa *payload) transform(config *pr.Config) ([]beat.Event, error) { var events []beat.Event context := m.NewContext(&pa.Service, pa.Process, pa.System, pa.User) spanContext := NewSpanContext(&pa.Service) - logp.NewLogger("transaction").Debugf("Transform transaction events: events=%d, service=%s, agent=%s:%s", len(pa.Events), pa.Service.Name, pa.Service.Agent.Name, pa.Service.Agent.Version) + logp.NewLogger("transaction").Debugf("Transform transaction events: events=%d, service=%s, agent=%s:%s", len(pa.Transactions), pa.Service.Name, pa.Service.Agent.Name, pa.Service.Agent.Version) - transactionCounter.Add(int64(len(pa.Events))) - for _, event := range pa.Events { - - ev := beat.Event{ - Fields: common.MapStr{ - "processor": processorTransEntry, - transactionDocType: event.Transform(), - "context": context.Transform(event.Context), - }, - Timestamp: event.Timestamp, - } - events = append(events, ev) - - trId := common.MapStr{"id": event.Id} - spanCounter.Add(int64(len(event.Spans))) - for _, sp := range event.Spans { - ev := beat.Event{ - Fields: common.MapStr{ - "processor": processorSpanEntry, - spanDocType: sp.Transform(config, pa.Service), - "transaction": trId, - "context": spanContext.Transform(sp.Context), - }, - Timestamp: event.Timestamp, - } - events = append(events, ev) + transactionCounter.Add(int64(len(pa.Transactions))) + for _, event := range pa.Transactions { + evs, err := event.Transform(config, pa.Service, context, spanContext) + if err != nil { + return nil, err } + events = append(events, evs...) } - return events + return events, nil } diff --git a/processor/transaction/processor.go b/processor/transaction/processor.go index 528a5c8da14..fbc9cf6aaa0 100644 --- a/processor/transaction/processor.go +++ b/processor/transaction/processor.go @@ -61,7 +61,7 @@ func (p *processor) Transform(raw interface{}) ([]beat.Event, error) { return nil, err } - return pa.transform(p.config), nil + return pa.transform(p.config) } func (p *processor) Name() string { diff --git a/processor/transaction/transaction.go b/processor/transaction/transaction.go new file mode 100644 index 00000000000..48923256cd8 --- /dev/null +++ b/processor/transaction/transaction.go @@ -0,0 +1,89 @@ +package transaction + +import ( + "time" + + m "github.com/elastic/apm-server/model" + pr "github.com/elastic/apm-server/processor" + + "github.com/elastic/beats/libbeat/beat" + + "github.com/elastic/apm-server/utility" + "github.com/elastic/beats/libbeat/common" +) + +type Transaction struct { + Id string + Name *string + Type string + Result *string + Duration float64 + Timestamp time.Time + Context common.MapStr + Spans []*Span + Marks common.MapStr + Sampled *bool + SpanCount SpanCount `mapstructure:"span_count"` +} +type SpanCount struct { + Dropped Dropped +} +type Dropped struct { + Total *int +} + +func (t *Transaction) transform() common.MapStr { + tx := common.MapStr{"id": t.Id} + utility.Add(tx, "name", t.Name) + utility.Add(tx, "duration", utility.MillisAsMicros(t.Duration)) + utility.Add(tx, "type", t.Type) + utility.Add(tx, "result", t.Result) + utility.Add(tx, "marks", t.Marks) + + if t.Sampled == nil { + utility.Add(tx, "sampled", true) + } else { + utility.Add(tx, "sampled", t.Sampled) + } + + if t.SpanCount.Dropped.Total != nil { + s := common.MapStr{ + "dropped": common.MapStr{ + "total": *t.SpanCount.Dropped.Total, + }, + } + utility.Add(tx, "span_count", s) + } + return tx +} + +func (t *Transaction) Transform(config *pr.Config, service m.Service, context *m.Context, spanContext *SpanContext) ([]beat.Event, error) { + var events []beat.Event + // context := m.NewContext(&pa.Service, pa.Process, pa.System, pa.User) + + ev := beat.Event{ + Fields: common.MapStr{ + "processor": processorTransEntry, + transactionDocType: t.transform(), + "context": context.Transform(t.Context), + }, + Timestamp: t.Timestamp, + } + events = append(events, ev) + + trId := common.MapStr{"id": t.Id} + spanCounter.Add(int64(len(t.Spans))) + for _, sp := range t.Spans { + ev := beat.Event{ + Fields: common.MapStr{ + "processor": processorSpanEntry, + spanDocType: sp.Transform(config, service), + "transaction": trId, + "context": spanContext.Transform(sp.Context), + }, + Timestamp: t.Timestamp, + } + events = append(events, ev) + } + return events, nil +}