From a00c8187a1962074648f1aa18478f2991073650c Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Sun, 12 Sep 2021 10:20:39 +0800 Subject: [PATCH] Make publish.Reporter implement BatchProcessor --- beater/api/asset/sourcemap/handler.go | 1 - beater/beater.go | 23 ++------ beater/tracing.go | 3 - beater/tracing_test.go | 2 + {beater => publish}/acker.go | 28 ++++----- publish/pub.go | 23 +++----- publish/pub_test.go | 84 +++++++++++++++++++++++++++ 7 files changed, 114 insertions(+), 50 deletions(-) rename {beater => publish}/acker.go (76%) diff --git a/beater/api/asset/sourcemap/handler.go b/beater/api/asset/sourcemap/handler.go index 0d627a9372d..c684030c4c5 100644 --- a/beater/api/asset/sourcemap/handler.go +++ b/beater/api/asset/sourcemap/handler.go @@ -97,7 +97,6 @@ func Handler(report publish.Reporter, notifier AddedNotifier) request.Handler { req := publish.PendingReq{Transformable: &smap} span, ctx := apm.StartSpan(c.Request.Context(), "Send", "Reporter") defer span.End() - req.Trace = !span.Dropped() if err := report(ctx, req); err != nil { if err == publish.ErrChannelClosed { c.Result.SetWithError(request.IDResponseErrorsShuttingDown, err) diff --git a/beater/beater.go b/beater/beater.go index cb075a8f0f7..6bb20ff2684 100644 --- a/beater/beater.go +++ b/beater/beater.go @@ -91,7 +91,7 @@ func NewCreator(args CreatorParams) beat.Creator { stopped: false, logger: logger, wrapRunServer: args.WrapRunServer, - waitPublished: newWaitPublishedAcker(), + waitPublished: publish.NewWaitPublishedAcker(), } var err error @@ -127,7 +127,7 @@ type beater struct { config *config.Config logger *logp.Logger wrapRunServer func(RunServerFunc) RunServerFunc - waitPublished *waitPublishedAcker + waitPublished *publish.WaitPublishedAcker mutex sync.Mutex // guards stopServer and stopped stopServer func() @@ -286,7 +286,7 @@ type serverRunner struct { done chan struct{} pipeline beat.PipelineConnector - acker *waitPublishedAcker + acker *publish.WaitPublishedAcker namespace string config *config.Config rawConfig *common.Config @@ -312,7 +312,7 @@ type sharedServerRunnerParams struct { Logger *logp.Logger Tracer *apm.Tracer TracerServer *tracerServer - Acker *waitPublishedAcker + Acker *publish.WaitPublishedAcker } func newServerRunner(ctx context.Context, args serverRunnerParams) (*serverRunner, error) { @@ -576,7 +576,7 @@ func (s *serverRunner) waitReady(ctx context.Context, kibanaClient kibana.Client func (s *serverRunner) newFinalBatchProcessor(libbeatReporter publish.Reporter) (model.BatchProcessor, error) { esOutputConfig := elasticsearchOutputConfig(s.beat) if esOutputConfig == nil || !s.config.DataStreams.Enabled { - return &reporterBatchProcessor{libbeatReporter}, nil + return libbeatReporter, nil } // Add `output.elasticsearch.experimental` config. If this is true and @@ -599,7 +599,7 @@ func (s *serverRunner) newFinalBatchProcessor(libbeatReporter publish.Reporter) } } if !esConfig.Experimental { - return &reporterBatchProcessor{libbeatReporter}, nil + return libbeatReporter, nil } s.logger.Info("using experimental model indexer") @@ -860,17 +860,6 @@ func WrapRunServerWithProcessors(runServer RunServerFunc, processors ...model.Ba } } -type disablePublisherTracingKey struct{} - -type reporterBatchProcessor struct { - reporter publish.Reporter -} - -func (p *reporterBatchProcessor) ProcessBatch(ctx context.Context, batch *model.Batch) error { - disableTracing, _ := ctx.Value(disablePublisherTracingKey{}).(bool) - return p.reporter(ctx, publish.PendingReq{Transformable: batch, Trace: !disableTracing}) -} - // augmentedReporter wraps publish.Reporter such that the events it reports have // `observer` and `ecs.version` fields injected. func augmentedReporter(reporter publish.Reporter, info beat.Info) publish.Reporter { diff --git a/beater/tracing.go b/beater/tracing.go index 4817aef0d4f..04cb19ec18e 100644 --- a/beater/tracing.go +++ b/beater/tracing.go @@ -117,9 +117,6 @@ func (s *tracerServer) serve(ctx context.Context, batchProcessor model.BatchProc case <-ctx.Done(): return ctx.Err() case req := <-s.requests: - // Disable tracing for requests that come through the - // tracer server, to avoid recursive tracing. - req.ctx = context.WithValue(req.ctx, disablePublisherTracingKey{}, true) req.res <- batchProcessor.ProcessBatch(req.ctx, req.batch) } } diff --git a/beater/tracing_test.go b/beater/tracing_test.go index 0476663000e..bac33492b66 100644 --- a/beater/tracing_test.go +++ b/beater/tracing_test.go @@ -40,6 +40,8 @@ var testTransactionIds = map[string]bool{ } func TestServerTracingEnabled(t *testing.T) { + // FIXME + events, teardown := setupTestServerInstrumentation(t, true) defer teardown() diff --git a/beater/acker.go b/publish/acker.go similarity index 76% rename from beater/acker.go rename to publish/acker.go index b06035a851c..f88d63ff29a 100644 --- a/beater/acker.go +++ b/publish/acker.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package beater +package publish import ( "context" @@ -25,40 +25,40 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" ) -// waitPublishedAcker is a beat.ACKer which keeps track of the number of -// events published. waitPublishedAcker provides an interruptible Wait method +// WaitPublishedAcker is a beat.ACKer which keeps track of the number of +// events published. WaitPublishedAcker provides an interruptible Wait method // that blocks until all clients are closed, and all published events at the // time the clients are closed are acknowledged. -type waitPublishedAcker struct { +type WaitPublishedAcker struct { active int64 // atomic mu sync.Mutex empty *sync.Cond } -// newWaitPublishedAcker returns a new waitPublishedAcker. -func newWaitPublishedAcker() *waitPublishedAcker { - acker := &waitPublishedAcker{} +// NewWaitPublishedAcker returns a new WaitPublishedAcker. +func NewWaitPublishedAcker() *WaitPublishedAcker { + acker := &WaitPublishedAcker{} acker.empty = sync.NewCond(&acker.mu) return acker } // AddEvent is called when an event has been published or dropped by the client, // and increments a counter for published events. -func (w *waitPublishedAcker) AddEvent(event beat.Event, published bool) { +func (w *WaitPublishedAcker) AddEvent(event beat.Event, published bool) { if published { w.incref(1) } } // ACKEvents is called when published events have been acknowledged. -func (w *waitPublishedAcker) ACKEvents(n int) { +func (w *WaitPublishedAcker) ACKEvents(n int) { w.decref(int64(n)) } // Open must be called exactly once before any new pipeline client is opened, // incrementing the acker's reference count. -func (w *waitPublishedAcker) Open() { +func (w *WaitPublishedAcker) Open() { w.incref(1) } @@ -66,15 +66,15 @@ func (w *waitPublishedAcker) Open() { // acker's reference count. // // This must be called at most once for each call to Open. -func (w *waitPublishedAcker) Close() { +func (w *WaitPublishedAcker) Close() { w.decref(1) } -func (w *waitPublishedAcker) incref(n int64) { +func (w *WaitPublishedAcker) incref(n int64) { atomic.AddInt64(&w.active, 1) } -func (w *waitPublishedAcker) decref(n int64) { +func (w *WaitPublishedAcker) decref(n int64) { if atomic.AddInt64(&w.active, int64(-n)) == 0 { w.empty.Broadcast() } @@ -82,7 +82,7 @@ func (w *waitPublishedAcker) decref(n int64) { // Wait waits for w to be closed and all previously published events to be // acknowledged. -func (w *waitPublishedAcker) Wait(ctx context.Context) error { +func (w *WaitPublishedAcker) Wait(ctx context.Context) error { ctx, cancel := context.WithCancel(ctx) defer cancel() go func() { diff --git a/publish/pub.go b/publish/pub.go index 5d0c84de203..74df4eb89f4 100644 --- a/publish/pub.go +++ b/publish/pub.go @@ -27,10 +27,16 @@ import ( "go.elastic.co/apm" "github.com/elastic/beats/v7/libbeat/beat" + + "github.com/elastic/apm-server/model" ) type Reporter func(context.Context, PendingReq) error +func (r Reporter) ProcessBatch(ctx context.Context, batch *model.Batch) error { + return r(ctx, PendingReq{Transformable: batch}) +} + // Publisher forwards batches of events to libbeat. // // If the publisher's input channel is full, an error is returned immediately. @@ -52,7 +58,6 @@ type Publisher struct { type PendingReq struct { Transformable Transformer - Trace bool } // Transformer is an interface implemented by types that can be transformed into beat.Events. @@ -181,19 +186,7 @@ func (p *Publisher) Send(ctx context.Context, req PendingReq) error { func (p *Publisher) run() { ctx := context.Background() for req := range p.pendingRequests { - p.processPendingReq(ctx, req) - } -} - -func (p *Publisher) processPendingReq(ctx context.Context, req PendingReq) { - var tx *apm.Transaction - if req.Trace { - tx = p.tracer.StartTransaction("ProcessPending", "Publisher") - defer tx.End() - ctx = apm.ContextWithTransaction(ctx, tx) + events := req.Transformable.Transform(ctx) + p.client.PublishAll(events) } - events := req.Transformable.Transform(ctx) - span := tx.StartSpan("PublishAll", "Publisher", nil) - defer span.End() - p.client.PublishAll(events) } diff --git a/publish/pub_test.go b/publish/pub_test.go index e206eee2e59..60ef099ac97 100644 --- a/publish/pub_test.go +++ b/publish/pub_test.go @@ -18,7 +18,12 @@ package publish_test import ( + "bufio" "context" + "fmt" + "net/http" + "net/http/httptest" + "sync/atomic" "testing" "time" @@ -29,12 +34,17 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/idxmgmt" + "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/outputs" + _ "github.com/elastic/beats/v7/libbeat/outputs/elasticsearch" "github.com/elastic/beats/v7/libbeat/publisher" "github.com/elastic/beats/v7/libbeat/publisher/pipeline" + "github.com/elastic/beats/v7/libbeat/publisher/pipetool" "github.com/elastic/beats/v7/libbeat/publisher/queue" "github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue" + "github.com/elastic/apm-server/model" "github.com/elastic/apm-server/publish" ) @@ -95,6 +105,80 @@ func TestPublisherStopShutdownInactive(t *testing.T) { assert.NoError(t, publisher.Stop(context.Background())) } +func BenchmarkPublisher(b *testing.B) { + mux := http.NewServeMux() + mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("X-Elastic-Product", "Elasticsearch") + fmt.Fprintln(w, `{"version":{"number":"1.2.3"}}`) + }) + + var indexed int64 + mux.HandleFunc("/_bulk", func(w http.ResponseWriter, r *http.Request) { + scanner := bufio.NewScanner(r.Body) + var n int64 + for scanner.Scan() && scanner.Scan() { + n++ + } + atomic.AddInt64(&indexed, n) + // TODO(axw) respond properly + }) + srv := httptest.NewServer(mux) + defer srv.Close() + + supporter, err := idxmgmt.DefaultSupport(logp.NewLogger("beater_test"), beat.Info{}, nil) + require.NoError(b, err) + outputGroup, err := outputs.Load(supporter, beat.Info{}, nil, "elasticsearch", common.MustNewConfigFrom(map[string]interface{}{ + "hosts": []interface{}{srv.URL}, + })) + require.NoError(b, err) + pipeline, err := pipeline.New( + beat.Info{}, + pipeline.Monitors{}, + func(lis queue.ACKListener) (queue.Queue, error) { + return memqueue.NewQueue(nil, memqueue.Settings{ + ACKListener: lis, + FlushMinEvents: 2048, + FlushTimeout: time.Second, + Events: 4096, + }), nil + }, + outputGroup, + pipeline.Settings{ + WaitCloseMode: pipeline.WaitOnClientClose, + InputQueueSize: 2048, + }, + ) + require.NoError(b, err) + + acker := publish.NewWaitPublishedAcker() + acker.Open() + publisher, err := publish.NewPublisher( + pipetool.WithACKer(pipeline, acker), + apmtest.DiscardTracer, + &publish.PublisherConfig{}, + ) + require.NoError(b, err) + + batch := model.Batch{ + model.APMEvent{ + Processor: model.TransactionProcessor, + Timestamp: time.Now(), + }, + } + ctx := context.Background() + for i := 0; i < b.N; i++ { + if err := publisher.Send(ctx, publish.PendingReq{&batch}); err != nil { + b.Fatal(err) + } + } + + // Close the publisher and wait for enqueued events to be published. + assert.NoError(b, publisher.Stop(context.Background())) + assert.NoError(b, acker.Wait(context.Background())) + assert.NoError(b, pipeline.Close()) + assert.Equal(b, int64(b.N), indexed) +} + func newBlockingPipeline(t testing.TB) *pipeline.Pipeline { pipeline, err := pipeline.New( beat.Info{},