Skip to content

Commit

Permalink
Move non-web related processing into processor, add "publish" package (
Browse files Browse the repository at this point in the history
…elastic#1324)

also simplify stream error handling.
  • Loading branch information
roncohen authored and simitt committed Sep 7, 2018
1 parent 0dc40a9 commit 51757b1
Show file tree
Hide file tree
Showing 40 changed files with 982 additions and 934 deletions.

This file was deleted.

This file was deleted.

5 changes: 3 additions & 2 deletions beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/elastic/apm-agent-go/transport"
"github.com/elastic/apm-server/ingest/pipeline"
"github.com/elastic/apm-server/pipelistener"
"github.com/elastic/apm-server/publish"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
Expand Down Expand Up @@ -138,7 +139,7 @@ func (bt *beater) Run(b *beat.Beat) error {
defer traceListener.Close()
defer tracer.Close()

pub, err := newPublisher(b.Publisher, bt.config.ConcurrentRequests, bt.config.ShutdownTimeout, tracer)
pub, err := publish.NewPublisher(b.Publisher, bt.config.ConcurrentRequests, bt.config.ShutdownTimeout, tracer)
if err != nil {
return err
}
Expand All @@ -150,7 +151,7 @@ func (bt *beater) Run(b *beat.Beat) error {
return nil
}

go notifyListening(bt.config, pub.client.Publish)
go notifyListening(bt.config, pub.Client().Publish)

bt.mutex.Lock()
if bt.stopped {
Expand Down
3 changes: 2 additions & 1 deletion beater/beater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/stretchr/testify/assert"

"github.com/elastic/apm-agent-go"
"github.com/elastic/apm-server/publish"
"github.com/elastic/apm-server/tests/loader"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
Expand Down Expand Up @@ -436,7 +437,7 @@ func setupBeater(t *testing.T, publisher beat.Pipeline, ucfg *common.Config, bea

func SetupServer(b *testing.B) *http.ServeMux {
pip := DummyPipeline()
pub, err := newPublisher(pip, 1, time.Duration(0), elasticapm.DefaultTracer)
pub, err := publish.NewPublisher(pip, 1, time.Duration(0), elasticapm.DefaultTracer)
if err != nil {
b.Fatal("error initializing publisher", err)
}
Expand Down
41 changes: 15 additions & 26 deletions beater/common_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (

"github.com/elastic/apm-server/decoder"
"github.com/elastic/apm-server/processor"
"github.com/elastic/apm-server/publish"
"github.com/elastic/apm-server/transform"

"github.com/elastic/apm-server/utility"
Expand All @@ -52,7 +53,7 @@ const (
supportedMethods = "POST, OPTIONS"
)

type ProcessorHandler func(processor.Processor, *Config, reporter) http.Handler
type ProcessorHandler func(processor.Processor, *Config, publish.Reporter) http.Handler

type serverResponse struct {
err error
Expand Down Expand Up @@ -148,7 +149,7 @@ var (
}
)

func newMuxer(beaterConfig *Config, report reporter) *http.ServeMux {
func newMuxer(beaterConfig *Config, report publish.Reporter) *http.ServeMux {
mux := http.NewServeMux()
logger := logp.NewLogger("handler")
for path, route := range V1Routes {
Expand Down Expand Up @@ -234,25 +235,6 @@ func rootHandler(secretToken string) http.Handler {

type contextKey string

const requestTimeContextKey = contextKey("requestTime")

func requestTimeHandler(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if requestTime(r).IsZero() {
r = r.WithContext(context.WithValue(r.Context(), requestTimeContextKey, time.Now()))
}
h.ServeHTTP(w, r)
})
}

func requestTime(r *http.Request) time.Time {
t, ok := r.Context().Value(requestTimeContextKey).(time.Time)
if !ok {
return time.Time{}
}
return t
}

const reqLoggerContextKey = contextKey("requestLogger")

func logHandler(h http.Handler) http.Handler {
Expand Down Expand Up @@ -285,6 +267,13 @@ func logHandler(h http.Handler) http.Handler {
})
}

func requestTimeHandler(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
r = r.WithContext(utility.ContextWithRequestTime(r.Context(), time.Now()))
h.ServeHTTP(w, r)
})
}

// requestLogger is a convenience function to retrieve the logger that was
// added to the request context by handler `logHandler``
func requestLogger(r *http.Request) *logp.Logger {
Expand Down Expand Up @@ -400,14 +389,14 @@ func corsHandler(allowedOrigins []string, h http.Handler) http.Handler {
})
}

func processRequestHandler(p processor.Processor, config transform.Config, report reporter, decode decoder.ReqDecoder) http.Handler {
func processRequestHandler(p processor.Processor, config transform.Config, report publish.Reporter, decode decoder.ReqDecoder) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
res := processRequest(r, p, config, report, decode)
sendStatus(w, r, res)
})
}

func processRequest(r *http.Request, p processor.Processor, config transform.Config, report reporter, decode decoder.ReqDecoder) serverResponse {
func processRequest(r *http.Request, p processor.Processor, config transform.Config, report publish.Reporter, decode decoder.ReqDecoder) serverResponse {
if r.Method != "POST" {
return methodNotAllowedResponse
}
Expand All @@ -430,13 +419,13 @@ func processRequest(r *http.Request, p processor.Processor, config transform.Con
}

tctx := &transform.Context{
RequestTime: requestTime(r),
RequestTime: utility.RequestTime(r.Context()),
Config: config,
Metadata: *metadata,
}

if err = report(r.Context(), pendingReq{transformables: transformables, tcontext: tctx}); err != nil {
if err == errChannelClosed {
if err = report(r.Context(), publish.PendingReq{Transformables: transformables, Tcontext: tctx}); err != nil {
if err == publish.ErrChannelClosed {
return serverShuttingDownResponse(err)
}
return fullQueueResponse(err)
Expand Down
11 changes: 7 additions & 4 deletions beater/route_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@ import (
"net/http"
"regexp"

"github.com/elastic/apm-server/processor/stream"

"github.com/elastic/apm-server/processor"
perr "github.com/elastic/apm-server/processor/error"
"github.com/elastic/apm-server/processor/metric"
"github.com/elastic/apm-server/processor/sourcemap"
"github.com/elastic/apm-server/processor/transaction"
"github.com/elastic/apm-server/publish"

"github.com/elastic/apm-server/decoder"
"github.com/elastic/apm-server/transform"
Expand Down Expand Up @@ -172,7 +175,7 @@ type v1Route struct {
topLevelRequestDecoder func(*Config) decoder.ReqDecoder
}

func (v *v1Route) Handler(p processor.Processor, beaterConfig *Config, report reporter) http.Handler {
func (v *v1Route) Handler(p processor.Processor, beaterConfig *Config, report publish.Reporter) http.Handler {
decoder := v.configurableDecoder(beaterConfig, v.topLevelRequestDecoder(beaterConfig))
tconfig := v.transformConfig(beaterConfig)

Expand All @@ -188,15 +191,15 @@ type v2Route struct {
routeType
}

func (v v2Route) Handler(beaterConfig *Config, report reporter) http.Handler {
func (v v2Route) Handler(beaterConfig *Config, report publish.Reporter) http.Handler {
reqDecoder := v.configurableDecoder(
beaterConfig,
func(*http.Request) (map[string]interface{}, error) { return map[string]interface{}{}, nil },
)

v2Handler := v2Handler{
requestDecoder: reqDecoder,
tconfig: v.transformConfig(beaterConfig),
requestDecoder: reqDecoder,
streamProcessor: &stream.StreamProcessor{Tconfig: v.transformConfig(beaterConfig)},
}

return v.wrappingHandler(beaterConfig, v2Handler.Handle(beaterConfig, report))
Expand Down
5 changes: 2 additions & 3 deletions beater/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,14 @@ import (

"github.com/elastic/apm-agent-go"
"github.com/elastic/apm-agent-go/module/apmhttp"
"github.com/elastic/apm-server/publish"

"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/outputs"
"github.com/elastic/beats/libbeat/version"
)

type reporter func(context.Context, pendingReq) error

func newServer(config *Config, tracer *elasticapm.Tracer, report reporter) *http.Server {
func newServer(config *Config, tracer *elasticapm.Tracer, report publish.Reporter) *http.Server {
mux := newMuxer(config, report)

return &http.Server{
Expand Down
3 changes: 2 additions & 1 deletion beater/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/apm-server/publish"
"github.com/elastic/apm-server/tests/loader"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
Expand Down Expand Up @@ -658,4 +659,4 @@ func body(t *testing.T, response *http.Response) string {
return string(body)
}

func nopReporter(context.Context, pendingReq) error { return nil }
func nopReporter(context.Context, publish.PendingReq) error { return nil }
Loading

0 comments on commit 51757b1

Please sign in to comment.