Skip to content

Commit

Permalink
WIP on intake api v2
Browse files Browse the repository at this point in the history
  • Loading branch information
Ron cohen committed Mar 19, 2018
1 parent f33412d commit 6e9418d
Show file tree
Hide file tree
Showing 8 changed files with 309 additions and 89 deletions.
11 changes: 11 additions & 0 deletions beater/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

const (
BackendTransactionsURL = "/v1/transactions"
V2BackendURL = "/v2/transactions"
FrontendTransactionsURL = "/v1/client-side/transactions"
BackendErrorsURL = "/v1/errors"
FrontendErrorsURL = "/v1/client-side/errors"
Expand Down Expand Up @@ -63,6 +64,7 @@ var (

Routes = map[string]routeMapping{
BackendTransactionsURL: {backendHandler, transaction.NewProcessor},
V2BackendURL: {backendHandler, transaction.NewProcessor},
FrontendTransactionsURL: {frontendHandler, transaction.NewProcessor},
BackendErrorsURL: {backendHandler, perr.NewProcessor},
FrontendErrorsURL: {frontendHandler, perr.NewProcessor},
Expand All @@ -87,6 +89,15 @@ func newMuxer(config *Config, report reporter) *http.ServeMux {
return mux
}

// func multiHandler(http.Handler) http.Handler {
// decoder, err := decoder.GetJSONDecoder(config.MaxUnzippedSize)
// if err != nil {
// return err
// }
// for val, err := decoder.

// }

func backendHandler(pf ProcessorFactory, config *Config, report reporter) http.Handler {
return logHandler(
authHandler(config.SecretToken,
Expand Down
126 changes: 126 additions & 0 deletions beater/streaming-api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package beater

import (
"errors"
"io/ioutil"
"net/http"

"github.com/mitchellh/mapstructure"
"github.com/santhosh-tekuri/jsonschema"

"github.com/elastic/apm-server/decoder"
"github.com/elastic/apm-server/model"
"github.com/elastic/apm-server/processor"
"github.com/elastic/apm-server/processor/transaction"
"github.com/elastic/apm-server/utility"
)

// func SillyDecoder(fn func(r *http.Request) (decoder.DecodeReader, error)) decoder.Decoder {
// return func(r *http.Request) (map[string]interface{}, error) {
// return fn(r)
// }
// }

// func backendV2Handler(pf ProcessorFactory, config *Config, report reporter) http.Handler {

// decoder := decoder.GetJSONDecoder(config.MaxUnzippedSize)

// process := processRequestHandler(pf, nil, report,
// decoder.DecodeSystemData(
// SillyDecoder(), config.AugmentEnabled

// return logHandler(
// authHandler(config.SecretToken, process))
// }

func validateTransaction() {

}

func readSchema(file string) *jsonschema.Schema {
schemaData, err := ioutil.ReadFile(file)
if err != nil {
panic(err)
}
return processor.CreateSchema(string(schemaData), "Bleh")
}

func decodeHeader(rawHeader map[string]interface{}) (*model.Header, error) {
header := model.Header{}
decoder, _ := mapstructure.NewDecoder(
&mapstructure.DecoderConfig{
DecodeHook: utility.RFC3339DecoderHook,
Result: &header,
},
)
err := decoder.Decode(rawHeader)
if err != nil {
return nil, err
}
return &header, nil
}

func processStreamingRequestHandler(pf ProcessorFactory, prConfig *processor.Config, report reporter, decode decoder.DecodeReader) http.Handler {
transactionSchema := readSchema("docs/specs/transactions/tranasction.json")
errorSchema := readSchema("docs/specs/transactions/tranasction.json")
spanSchema := readSchema("docs/specs/transactions/span.json")

return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {

code, err := func() (int, error) {
// read header
rawHeader, err := decode.Read()
if err != nil {
return http.StatusBadRequest, err
}

header, err := decodeHeader(rawHeader)
if err != nil {
return http.StatusBadRequest, err
}
context := model.NewContext(header.Service, header.Process, header.System)

decoder.DecodeUserData(func(*http.Request) {

})

var schema *jsonschema.Schema
for {
item, err := decode.Read()
if err != nil {
return http.StatusBadRequest, err
}

if item == nil {
return http.StatusAccepted, nil
}

itemType, ok := item["type"]
if !ok {
return http.StatusBadRequest, errors.New("missing 'type' field")
}

switch itemType {
case "transaction":
schema = transactionSchema
case "span":
schema = spanSchema
case "error":
schema = errorSchema
}

err = schema.ValidateInterface(item)
if err != nil {
return http.StatusBadRequest, err
}
switch itemType {
case "transaction":
transaction.New
}
}
}()

// code, err := processRequest(r, pf, prConfig, report, decode)
sendStatus(w, r, code, err)
})
}
60 changes: 60 additions & 0 deletions decoder/decoder.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package decoder

import (
"bufio"
"compress/gzip"
"compress/zlib"
"encoding/json"
"fmt"
"io"
"net/http"

"io/ioutil"
Expand Down Expand Up @@ -53,6 +55,64 @@ func DecodeLimitJSONData(maxSize int64) Decoder {
}
}

type DecodeReader interface {
Read() (map[string]interface{}, error)
}

func GetJSONDecoder(maxSize int64) func(req *http.Request) (DecodeReader, error) {
return func(req *http.Request) (DecodeReader, error) {
contentType := req.Header.Get("Content-Type")

if !strings.Contains(contentType, "application/x-ndjson") {
return nil, fmt.Errorf("invalid content type: %s", req.Header.Get("Content-Type"))
}

reader := req.Body
if reader == nil {
return nil, errors.New("no content")
}

switch req.Header.Get("Content-Encoding") {
case "deflate":
var err error
reader, err = zlib.NewReader(reader)
if err != nil {
return nil, err
}

case "gzip":
var err error
reader, err = gzip.NewReader(reader)
if err != nil {
return nil, err
}
}
reader = http.MaxBytesReader(nil, reader, maxSize)
return &NDJSONDecoder{bufio.NewReader(reader)}, nil
}
}

type NDJSONDecoder struct {
reader *bufio.Reader
}

func (n *NDJSONDecoder) Read() (map[string]interface{}, error) {
buf, err := n.reader.ReadBytes('\n')
if err == io.EOF {
return nil, nil
}
if err != nil {
return nil, err
}
var v map[string]interface{}
err = json.Unmarshal(buf, &v)
return v, err
}

func (n *NDJSONDecoder) Decode(r *http.Request) (map[string]interface{}, error) {
return n.Read()
}

func DecodeSourcemapFormData(req *http.Request) (map[string]interface{}, error) {
contentType := req.Header.Get("Content-Type")
if !strings.Contains(contentType, "multipart/form-data") {
Expand Down
7 changes: 7 additions & 0 deletions model/header.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package model

type Header struct {
Service Service
Process *Process
System *System
}
53 changes: 0 additions & 53 deletions processor/transaction/event.go

This file was deleted.

50 changes: 15 additions & 35 deletions processor/transaction/payload.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,49 +17,29 @@ var (
)

type payload struct {
Service m.Service
System *m.System
Process *m.Process
Events []Event `mapstructure:"transactions"`
Context common.MapStr
User common.MapStr
Service m.Service
System *m.System
Process *m.Process
Transactions []Transaction `mapstructure:"transactions"`
Context common.MapStr
User common.MapStr
}

func (pa *payload) transform(config *pr.Config) []beat.Event {
func (pa *payload) transform(config *pr.Config) ([]beat.Event, error) {
var events []beat.Event
context := m.NewContext(&pa.Service, pa.Process, pa.System, pa.User)
spanContext := NewSpanContext(&pa.Service)

logp.NewLogger("transaction").Debugf("Transform transaction events: events=%d, service=%s, agent=%s:%s", len(pa.Events), pa.Service.Name, pa.Service.Agent.Name, pa.Service.Agent.Version)
logp.NewLogger("transaction").Debugf("Transform transaction events: events=%d, service=%s, agent=%s:%s", len(pa.Transactions), pa.Service.Name, pa.Service.Agent.Name, pa.Service.Agent.Version)

transactionCounter.Add(int64(len(pa.Events)))
for _, event := range pa.Events {

ev := beat.Event{
Fields: common.MapStr{
"processor": processorTransEntry,
transactionDocType: event.Transform(),
"context": context.Transform(event.Context),
},
Timestamp: event.Timestamp,
}
events = append(events, ev)

trId := common.MapStr{"id": event.Id}
spanCounter.Add(int64(len(event.Spans)))
for _, sp := range event.Spans {
ev := beat.Event{
Fields: common.MapStr{
"processor": processorSpanEntry,
spanDocType: sp.Transform(config, pa.Service),
"transaction": trId,
"context": spanContext.Transform(sp.Context),
},
Timestamp: event.Timestamp,
}
events = append(events, ev)
transactionCounter.Add(int64(len(pa.Transactions)))
for _, event := range pa.Transactions {
evs, err := event.Transform(config, pa.Service, context, spanContext)
if err != nil {
return nil, err
}
events = append(events, evs...)
}

return events
return events, nil
}
2 changes: 1 addition & 1 deletion processor/transaction/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (p *processor) Transform(raw interface{}) ([]beat.Event, error) {
return nil, err
}

return pa.transform(p.config), nil
return pa.transform(p.config)
}

func (p *processor) Name() string {
Expand Down
Loading

0 comments on commit 6e9418d

Please sign in to comment.