diff --git a/beater/api/asset/sourcemap/handler.go b/beater/api/asset/sourcemap/handler.go index 0d627a9372d..c684030c4c5 100644 --- a/beater/api/asset/sourcemap/handler.go +++ b/beater/api/asset/sourcemap/handler.go @@ -97,7 +97,6 @@ func Handler(report publish.Reporter, notifier AddedNotifier) request.Handler { req := publish.PendingReq{Transformable: &smap} span, ctx := apm.StartSpan(c.Request.Context(), "Send", "Reporter") defer span.End() - req.Trace = !span.Dropped() if err := report(ctx, req); err != nil { if err == publish.ErrChannelClosed { c.Result.SetWithError(request.IDResponseErrorsShuttingDown, err) diff --git a/beater/beater.go b/beater/beater.go index cbf008ef546..08f6dfe0c23 100644 --- a/beater/beater.go +++ b/beater/beater.go @@ -90,7 +90,7 @@ func NewCreator(args CreatorParams) beat.Creator { stopped: false, logger: logger, wrapRunServer: args.WrapRunServer, - waitPublished: newWaitPublishedAcker(), + waitPublished: publish.NewWaitPublishedAcker(), } var err error @@ -126,7 +126,7 @@ type beater struct { config *config.Config logger *logp.Logger wrapRunServer func(RunServerFunc) RunServerFunc - waitPublished *waitPublishedAcker + waitPublished *publish.WaitPublishedAcker mutex sync.Mutex // guards stopServer and stopped stopServer func() @@ -285,7 +285,7 @@ type serverRunner struct { done chan struct{} pipeline beat.PipelineConnector - acker *waitPublishedAcker + acker *publish.WaitPublishedAcker namespace string config *config.Config rawConfig *common.Config @@ -311,7 +311,7 @@ type sharedServerRunnerParams struct { Logger *logp.Logger Tracer *apm.Tracer TracerServer *tracerServer - Acker *waitPublishedAcker + Acker *publish.WaitPublishedAcker } func newServerRunner(ctx context.Context, args serverRunnerParams) (*serverRunner, error) { @@ -476,7 +476,7 @@ func (s *serverRunner) run(listener net.Listener) error { } batchProcessor = append(batchProcessor, modelprocessor.DroppedSpansStatsDiscarder{}, - s.newFinalBatchProcessor(reporter), + s.newFinalBatchProcessor(publisher), ) g.Go(func() error { @@ -573,8 +573,8 @@ func (s *serverRunner) waitReady(ctx context.Context, kibanaClient kibana.Client } // newFinalBatchProcessor returns the final model.BatchProcessor that publishes events. -func (s *serverRunner) newFinalBatchProcessor(libbeatReporter publish.Reporter) model.BatchProcessor { - return &reporterBatchProcessor{libbeatReporter} +func (s *serverRunner) newFinalBatchProcessor(p *publish.Publisher) model.BatchProcessor { + return p } func (s *serverRunner) wrapRunServerWithPreprocessors(runServer RunServerFunc) RunServerFunc { @@ -804,17 +804,6 @@ func WrapRunServerWithProcessors(runServer RunServerFunc, processors ...model.Ba } } -type disablePublisherTracingKey struct{} - -type reporterBatchProcessor struct { - reporter publish.Reporter -} - -func (p *reporterBatchProcessor) ProcessBatch(ctx context.Context, batch *model.Batch) error { - disableTracing, _ := ctx.Value(disablePublisherTracingKey{}).(bool) - return p.reporter(ctx, publish.PendingReq{Transformable: batch, Trace: !disableTracing}) -} - // augmentedReporter wraps publish.Reporter such that the events it reports have // `observer` and `ecs.version` fields injected. func augmentedReporter(reporter publish.Reporter, info beat.Info) publish.Reporter { diff --git a/beater/tracing.go b/beater/tracing.go index 4817aef0d4f..04cb19ec18e 100644 --- a/beater/tracing.go +++ b/beater/tracing.go @@ -117,9 +117,6 @@ func (s *tracerServer) serve(ctx context.Context, batchProcessor model.BatchProc case <-ctx.Done(): return ctx.Err() case req := <-s.requests: - // Disable tracing for requests that come through the - // tracer server, to avoid recursive tracing. - req.ctx = context.WithValue(req.ctx, disablePublisherTracingKey{}, true) req.res <- batchProcessor.ProcessBatch(req.ctx, req.batch) } } diff --git a/beater/tracing_test.go b/beater/tracing_test.go index 4b002f6d798..aff89f7ae75 100644 --- a/beater/tracing_test.go +++ b/beater/tracing_test.go @@ -18,6 +18,7 @@ package beater import ( + "fmt" "os" "testing" "time" @@ -27,128 +28,44 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" - - "github.com/elastic/apm-server/beater/api" ) -// transactions from testdata/intake-v2/transactions.ndjson used to trigger tracing -var testTransactionIds = map[string]bool{ - "945254c567a5417e": true, - "4340a8e0df1906ecbfa9": true, - "cdef4340a8e0df19": true, - "00xxxxFFaaaa1234": true, - "142e61450efb8574": true, -} - func TestServerTracingEnabled(t *testing.T) { - events, teardown := setupTestServerInstrumentation(t, true) - defer teardown() + os.Setenv("ELASTIC_APM_API_REQUEST_TIME", "10ms") + defer os.Unsetenv("ELASTIC_APM_API_REQUEST_TIME") - txEvents := transactionEvents(events) - var selfTransactions []string - for len(selfTransactions) < 2 { - select { - case e := <-txEvents: - if testTransactionIds[eventTransactionId(e)] { - continue + for _, enabled := range []bool{false, true} { + t.Run(fmt.Sprint(enabled), func(t *testing.T) { + cfg := common.MustNewConfigFrom(m{ + "host": "localhost:0", + "data_streams.enabled": true, + "data_streams.wait_for_integration": false, + "instrumentation.enabled": enabled, + }) + events := make(chan beat.Event, 10) + beater, err := setupServer(t, cfg, nil, events) + require.NoError(t, err) + + // Make an HTTP request to the server, which should be traced + // if instrumentation is enabled. + resp, err := beater.client.Get(beater.baseURL + "/foo") + assert.NoError(t, err) + resp.Body.Close() + + if enabled { + select { + case <-events: + case <-time.After(10 * time.Second): + t.Fatal("timed out waiting for event") + } } - // Check that self-instrumentation goes through the - // reporter wrapped by setupBeater. - wrapped, _ := e.GetValue("labels.wrapped_reporter") - assert.Equal(t, true, wrapped) - - selfTransactions = append(selfTransactions, eventTransactionName(e)) - case <-time.After(5 * time.Second): - assert.FailNow(t, "timed out waiting for transaction") - } - } - assert.Contains(t, selfTransactions, "POST "+api.IntakePath) - assert.Contains(t, selfTransactions, "ProcessPending") - - // We expect no more events, i.e. no recursive self-tracing. - for { - select { - case e := <-txEvents: - assert.FailNowf(t, "unexpected event", "%v", e) - case <-time.After(time.Second): - return - } - } -} - -func TestServerTracingDisabled(t *testing.T) { - events, teardown := setupTestServerInstrumentation(t, false) - defer teardown() - - txEvents := transactionEvents(events) - for { - select { - case e := <-txEvents: - assert.Contains(t, testTransactionIds, eventTransactionId(e)) - case <-time.After(time.Second): - return - } - } -} - -func eventTransactionId(event beat.Event) string { - transaction := event.Fields["transaction"].(common.MapStr) - return transaction["id"].(string) -} - -func eventTransactionName(event beat.Event) string { - transaction := event.Fields["transaction"].(common.MapStr) - return transaction["name"].(string) -} - -func transactionEvents(events <-chan beat.Event) <-chan beat.Event { - out := make(chan beat.Event, 1) - go func() { - defer close(out) - for event := range events { - processor := event.Fields["processor"].(common.MapStr) - if processor["event"] == "transaction" { - out <- event + // We expect no more events, i.e. no recursive self-tracing. + select { + case e := <-events: + t.Errorf("unexpected event: %v", e) + case <-time.After(100 * time.Millisecond): } - } - }() - return out -} - -// setupTestServerInstrumentation sets up a beater with or without instrumentation enabled, -// and returns a channel to which events are published, and a function to be -// called to teardown the beater. The initial onboarding event is consumed -// and a transactions request is made before returning. -func setupTestServerInstrumentation(t *testing.T, enabled bool) (chan beat.Event, func()) { - if testing.Short() { - t.Skip("skipping server test") + }) } - - os.Setenv("ELASTIC_APM_API_REQUEST_TIME", "100ms") - defer os.Unsetenv("ELASTIC_APM_API_REQUEST_TIME") - - events := make(chan beat.Event, 10) - - cfg := common.MustNewConfigFrom(m{ - "instrumentation": m{"enabled": enabled}, - "host": "localhost:0", - "secret_token": "foo", - }) - beater, err := setupServer(t, cfg, nil, events) - require.NoError(t, err) - - // onboarding event - e := <-events - assert.Equal(t, "onboarding", e.Fields["processor"].(common.MapStr)["name"]) - - // Send a transaction request so we have something to trace. - req := makeTransactionRequest(t, beater.baseURL) - req.Header.Add("Content-Type", "application/x-ndjson") - req.Header.Add("Authorization", "Bearer foo") - resp, err := beater.client.Do(req) - assert.NoError(t, err) - resp.Body.Close() - - return events, beater.Stop } diff --git a/changelogs/head.asciidoc b/changelogs/head.asciidoc index 1963e43fb47..666c03690b7 100644 --- a/changelogs/head.asciidoc +++ b/changelogs/head.asciidoc @@ -14,6 +14,7 @@ https://github.com/elastic/apm-server/compare/7.15\...master[View commits] - Removed `http.request.socket` fields {pull}6152[6152] - Removed unused `transaction.duration.{count,sum.us}` metric fields {pull}6174[6174] - Removed `apm-server.sampling.tail.storage_dir` config {pull}6236[6236] +- Removed `ProcessPending` self-instrumentation events {pull}6243[6243] [float] ==== Bug fixes diff --git a/beater/acker.go b/publish/acker.go similarity index 76% rename from beater/acker.go rename to publish/acker.go index b06035a851c..f88d63ff29a 100644 --- a/beater/acker.go +++ b/publish/acker.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package beater +package publish import ( "context" @@ -25,40 +25,40 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" ) -// waitPublishedAcker is a beat.ACKer which keeps track of the number of -// events published. waitPublishedAcker provides an interruptible Wait method +// WaitPublishedAcker is a beat.ACKer which keeps track of the number of +// events published. WaitPublishedAcker provides an interruptible Wait method // that blocks until all clients are closed, and all published events at the // time the clients are closed are acknowledged. -type waitPublishedAcker struct { +type WaitPublishedAcker struct { active int64 // atomic mu sync.Mutex empty *sync.Cond } -// newWaitPublishedAcker returns a new waitPublishedAcker. -func newWaitPublishedAcker() *waitPublishedAcker { - acker := &waitPublishedAcker{} +// NewWaitPublishedAcker returns a new WaitPublishedAcker. +func NewWaitPublishedAcker() *WaitPublishedAcker { + acker := &WaitPublishedAcker{} acker.empty = sync.NewCond(&acker.mu) return acker } // AddEvent is called when an event has been published or dropped by the client, // and increments a counter for published events. -func (w *waitPublishedAcker) AddEvent(event beat.Event, published bool) { +func (w *WaitPublishedAcker) AddEvent(event beat.Event, published bool) { if published { w.incref(1) } } // ACKEvents is called when published events have been acknowledged. -func (w *waitPublishedAcker) ACKEvents(n int) { +func (w *WaitPublishedAcker) ACKEvents(n int) { w.decref(int64(n)) } // Open must be called exactly once before any new pipeline client is opened, // incrementing the acker's reference count. -func (w *waitPublishedAcker) Open() { +func (w *WaitPublishedAcker) Open() { w.incref(1) } @@ -66,15 +66,15 @@ func (w *waitPublishedAcker) Open() { // acker's reference count. // // This must be called at most once for each call to Open. -func (w *waitPublishedAcker) Close() { +func (w *WaitPublishedAcker) Close() { w.decref(1) } -func (w *waitPublishedAcker) incref(n int64) { +func (w *WaitPublishedAcker) incref(n int64) { atomic.AddInt64(&w.active, 1) } -func (w *waitPublishedAcker) decref(n int64) { +func (w *WaitPublishedAcker) decref(n int64) { if atomic.AddInt64(&w.active, int64(-n)) == 0 { w.empty.Broadcast() } @@ -82,7 +82,7 @@ func (w *waitPublishedAcker) decref(n int64) { // Wait waits for w to be closed and all previously published events to be // acknowledged. -func (w *waitPublishedAcker) Wait(ctx context.Context) error { +func (w *WaitPublishedAcker) Wait(ctx context.Context) error { ctx, cancel := context.WithCancel(ctx) defer cancel() go func() { diff --git a/publish/pub.go b/publish/pub.go index 5d0c84de203..dda30e3d4db 100644 --- a/publish/pub.go +++ b/publish/pub.go @@ -27,6 +27,8 @@ import ( "go.elastic.co/apm" "github.com/elastic/beats/v7/libbeat/beat" + + "github.com/elastic/apm-server/model" ) type Reporter func(context.Context, PendingReq) error @@ -52,7 +54,6 @@ type Publisher struct { type PendingReq struct { Transformable Transformer - Trace bool } // Transformer is an interface implemented by types that can be transformed into beat.Events. @@ -154,6 +155,12 @@ func (p *Publisher) Stop(ctx context.Context) error { return p.client.Close() } +// ProcessBatch transforms batch to beat.Events, and sends them to the libbeat +// publishing pipeline. +func (p *Publisher) ProcessBatch(ctx context.Context, batch *model.Batch) error { + return p.Send(ctx, PendingReq{Transformable: batch}) +} + // Send tries to forward pendingReq to the publishers worker. If the queue is full, // an error is returned. // @@ -181,19 +188,7 @@ func (p *Publisher) Send(ctx context.Context, req PendingReq) error { func (p *Publisher) run() { ctx := context.Background() for req := range p.pendingRequests { - p.processPendingReq(ctx, req) - } -} - -func (p *Publisher) processPendingReq(ctx context.Context, req PendingReq) { - var tx *apm.Transaction - if req.Trace { - tx = p.tracer.StartTransaction("ProcessPending", "Publisher") - defer tx.End() - ctx = apm.ContextWithTransaction(ctx, tx) + events := req.Transformable.Transform(ctx) + p.client.PublishAll(events) } - events := req.Transformable.Transform(ctx) - span := tx.StartSpan("PublishAll", "Publisher", nil) - defer span.End() - p.client.PublishAll(events) } diff --git a/publish/pub_test.go b/publish/pub_test.go index e206eee2e59..035651987fa 100644 --- a/publish/pub_test.go +++ b/publish/pub_test.go @@ -18,7 +18,12 @@ package publish_test import ( + "bufio" "context" + "fmt" + "net/http" + "net/http/httptest" + "sync/atomic" "testing" "time" @@ -29,12 +34,17 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/idxmgmt" + "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/outputs" + _ "github.com/elastic/beats/v7/libbeat/outputs/elasticsearch" "github.com/elastic/beats/v7/libbeat/publisher" "github.com/elastic/beats/v7/libbeat/publisher/pipeline" + "github.com/elastic/beats/v7/libbeat/publisher/pipetool" "github.com/elastic/beats/v7/libbeat/publisher/queue" "github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue" + "github.com/elastic/apm-server/model" "github.com/elastic/apm-server/publish" ) @@ -95,6 +105,81 @@ func TestPublisherStopShutdownInactive(t *testing.T) { assert.NoError(t, publisher.Stop(context.Background())) } +func BenchmarkPublisher(b *testing.B) { + mux := http.NewServeMux() + mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("X-Elastic-Product", "Elasticsearch") + fmt.Fprintln(w, `{"version":{"number":"1.2.3"}}`) + }) + + var indexed int64 + mux.HandleFunc("/_bulk", func(w http.ResponseWriter, r *http.Request) { + scanner := bufio.NewScanner(r.Body) + var n int64 + for scanner.Scan() { + if scanner.Scan() { + n++ + } + } + atomic.AddInt64(&indexed, n) + }) + srv := httptest.NewServer(mux) + defer srv.Close() + + supporter, err := idxmgmt.DefaultSupport(logp.NewLogger("beater_test"), beat.Info{}, nil) + require.NoError(b, err) + outputGroup, err := outputs.Load(supporter, beat.Info{}, nil, "elasticsearch", common.MustNewConfigFrom(map[string]interface{}{ + "hosts": []interface{}{srv.URL}, + })) + require.NoError(b, err) + pipeline, err := pipeline.New( + beat.Info{}, + pipeline.Monitors{}, + func(lis queue.ACKListener) (queue.Queue, error) { + return memqueue.NewQueue(nil, memqueue.Settings{ + ACKListener: lis, + FlushMinEvents: 2048, + FlushTimeout: time.Second, + Events: 4096, + }), nil + }, + outputGroup, + pipeline.Settings{ + WaitCloseMode: pipeline.WaitOnClientClose, + InputQueueSize: 2048, + }, + ) + require.NoError(b, err) + + acker := publish.NewWaitPublishedAcker() + acker.Open() + publisher, err := publish.NewPublisher( + pipetool.WithACKer(pipeline, acker), + apmtest.DiscardTracer, + &publish.PublisherConfig{}, + ) + require.NoError(b, err) + + batch := model.Batch{ + model.APMEvent{ + Processor: model.TransactionProcessor, + Timestamp: time.Now(), + }, + } + ctx := context.Background() + for i := 0; i < b.N; i++ { + if err := publisher.ProcessBatch(ctx, &batch); err != nil { + b.Fatal(err) + } + } + + // Close the publisher and wait for enqueued events to be published. + assert.NoError(b, publisher.Stop(context.Background())) + assert.NoError(b, acker.Wait(context.Background())) + assert.NoError(b, pipeline.Close()) + assert.Equal(b, int64(b.N), indexed) +} + func newBlockingPipeline(t testing.TB) *pipeline.Pipeline { pipeline, err := pipeline.New( beat.Info{},