Skip to content

Commit

Permalink
publish: implement BatchProcessor; remove tracing (elastic#6243)
Browse files Browse the repository at this point in the history
Modify publish.Publisher so that it implements
model.BatchProcessor directly. Remove the tracing
instrumentation from publisher. We no longer use
and don't support libbeat processors, so this is
not useful.
  • Loading branch information
axw authored Oct 4, 2021
1 parent 1176879 commit 79b43c1
Show file tree
Hide file tree
Showing 8 changed files with 150 additions and 167 deletions.
1 change: 0 additions & 1 deletion beater/api/asset/sourcemap/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ func Handler(report publish.Reporter, notifier AddedNotifier) request.Handler {
req := publish.PendingReq{Transformable: &smap}
span, ctx := apm.StartSpan(c.Request.Context(), "Send", "Reporter")
defer span.End()
req.Trace = !span.Dropped()
if err := report(ctx, req); err != nil {
if err == publish.ErrChannelClosed {
c.Result.SetWithError(request.IDResponseErrorsShuttingDown, err)
Expand Down
25 changes: 7 additions & 18 deletions beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func NewCreator(args CreatorParams) beat.Creator {
stopped: false,
logger: logger,
wrapRunServer: args.WrapRunServer,
waitPublished: newWaitPublishedAcker(),
waitPublished: publish.NewWaitPublishedAcker(),
}

var err error
Expand Down Expand Up @@ -126,7 +126,7 @@ type beater struct {
config *config.Config
logger *logp.Logger
wrapRunServer func(RunServerFunc) RunServerFunc
waitPublished *waitPublishedAcker
waitPublished *publish.WaitPublishedAcker

mutex sync.Mutex // guards stopServer and stopped
stopServer func()
Expand Down Expand Up @@ -285,7 +285,7 @@ type serverRunner struct {
done chan struct{}

pipeline beat.PipelineConnector
acker *waitPublishedAcker
acker *publish.WaitPublishedAcker
namespace string
config *config.Config
rawConfig *common.Config
Expand All @@ -311,7 +311,7 @@ type sharedServerRunnerParams struct {
Logger *logp.Logger
Tracer *apm.Tracer
TracerServer *tracerServer
Acker *waitPublishedAcker
Acker *publish.WaitPublishedAcker
}

func newServerRunner(ctx context.Context, args serverRunnerParams) (*serverRunner, error) {
Expand Down Expand Up @@ -476,7 +476,7 @@ func (s *serverRunner) run(listener net.Listener) error {
}
batchProcessor = append(batchProcessor,
modelprocessor.DroppedSpansStatsDiscarder{},
s.newFinalBatchProcessor(reporter),
s.newFinalBatchProcessor(publisher),
)

g.Go(func() error {
Expand Down Expand Up @@ -573,8 +573,8 @@ func (s *serverRunner) waitReady(ctx context.Context, kibanaClient kibana.Client
}

// newFinalBatchProcessor returns the final model.BatchProcessor that publishes events.
func (s *serverRunner) newFinalBatchProcessor(libbeatReporter publish.Reporter) model.BatchProcessor {
return &reporterBatchProcessor{libbeatReporter}
func (s *serverRunner) newFinalBatchProcessor(p *publish.Publisher) model.BatchProcessor {
return p
}

func (s *serverRunner) wrapRunServerWithPreprocessors(runServer RunServerFunc) RunServerFunc {
Expand Down Expand Up @@ -804,17 +804,6 @@ func WrapRunServerWithProcessors(runServer RunServerFunc, processors ...model.Ba
}
}

type disablePublisherTracingKey struct{}

type reporterBatchProcessor struct {
reporter publish.Reporter
}

func (p *reporterBatchProcessor) ProcessBatch(ctx context.Context, batch *model.Batch) error {
disableTracing, _ := ctx.Value(disablePublisherTracingKey{}).(bool)
return p.reporter(ctx, publish.PendingReq{Transformable: batch, Trace: !disableTracing})
}

// augmentedReporter wraps publish.Reporter such that the events it reports have
// `observer` and `ecs.version` fields injected.
func augmentedReporter(reporter publish.Reporter, info beat.Info) publish.Reporter {
Expand Down
3 changes: 0 additions & 3 deletions beater/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,6 @@ func (s *tracerServer) serve(ctx context.Context, batchProcessor model.BatchProc
case <-ctx.Done():
return ctx.Err()
case req := <-s.requests:
// Disable tracing for requests that come through the
// tracer server, to avoid recursive tracing.
req.ctx = context.WithValue(req.ctx, disablePublisherTracingKey{}, true)
req.res <- batchProcessor.ProcessBatch(req.ctx, req.batch)
}
}
Expand Down
149 changes: 33 additions & 116 deletions beater/tracing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package beater

import (
"fmt"
"os"
"testing"
"time"
Expand All @@ -27,128 +28,44 @@ import (

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"

"github.com/elastic/apm-server/beater/api"
)

// transactions from testdata/intake-v2/transactions.ndjson used to trigger tracing
var testTransactionIds = map[string]bool{
"945254c567a5417e": true,
"4340a8e0df1906ecbfa9": true,
"cdef4340a8e0df19": true,
"00xxxxFFaaaa1234": true,
"142e61450efb8574": true,
}

func TestServerTracingEnabled(t *testing.T) {
events, teardown := setupTestServerInstrumentation(t, true)
defer teardown()
os.Setenv("ELASTIC_APM_API_REQUEST_TIME", "10ms")
defer os.Unsetenv("ELASTIC_APM_API_REQUEST_TIME")

txEvents := transactionEvents(events)
var selfTransactions []string
for len(selfTransactions) < 2 {
select {
case e := <-txEvents:
if testTransactionIds[eventTransactionId(e)] {
continue
for _, enabled := range []bool{false, true} {
t.Run(fmt.Sprint(enabled), func(t *testing.T) {
cfg := common.MustNewConfigFrom(m{
"host": "localhost:0",
"data_streams.enabled": true,
"data_streams.wait_for_integration": false,
"instrumentation.enabled": enabled,
})
events := make(chan beat.Event, 10)
beater, err := setupServer(t, cfg, nil, events)
require.NoError(t, err)

// Make an HTTP request to the server, which should be traced
// if instrumentation is enabled.
resp, err := beater.client.Get(beater.baseURL + "/foo")
assert.NoError(t, err)
resp.Body.Close()

if enabled {
select {
case <-events:
case <-time.After(10 * time.Second):
t.Fatal("timed out waiting for event")
}
}

// Check that self-instrumentation goes through the
// reporter wrapped by setupBeater.
wrapped, _ := e.GetValue("labels.wrapped_reporter")
assert.Equal(t, true, wrapped)

selfTransactions = append(selfTransactions, eventTransactionName(e))
case <-time.After(5 * time.Second):
assert.FailNow(t, "timed out waiting for transaction")
}
}
assert.Contains(t, selfTransactions, "POST "+api.IntakePath)
assert.Contains(t, selfTransactions, "ProcessPending")

// We expect no more events, i.e. no recursive self-tracing.
for {
select {
case e := <-txEvents:
assert.FailNowf(t, "unexpected event", "%v", e)
case <-time.After(time.Second):
return
}
}
}

func TestServerTracingDisabled(t *testing.T) {
events, teardown := setupTestServerInstrumentation(t, false)
defer teardown()

txEvents := transactionEvents(events)
for {
select {
case e := <-txEvents:
assert.Contains(t, testTransactionIds, eventTransactionId(e))
case <-time.After(time.Second):
return
}
}
}

func eventTransactionId(event beat.Event) string {
transaction := event.Fields["transaction"].(common.MapStr)
return transaction["id"].(string)
}

func eventTransactionName(event beat.Event) string {
transaction := event.Fields["transaction"].(common.MapStr)
return transaction["name"].(string)
}

func transactionEvents(events <-chan beat.Event) <-chan beat.Event {
out := make(chan beat.Event, 1)
go func() {
defer close(out)
for event := range events {
processor := event.Fields["processor"].(common.MapStr)
if processor["event"] == "transaction" {
out <- event
// We expect no more events, i.e. no recursive self-tracing.
select {
case e := <-events:
t.Errorf("unexpected event: %v", e)
case <-time.After(100 * time.Millisecond):
}
}
}()
return out
}

// setupTestServerInstrumentation sets up a beater with or without instrumentation enabled,
// and returns a channel to which events are published, and a function to be
// called to teardown the beater. The initial onboarding event is consumed
// and a transactions request is made before returning.
func setupTestServerInstrumentation(t *testing.T, enabled bool) (chan beat.Event, func()) {
if testing.Short() {
t.Skip("skipping server test")
})
}

os.Setenv("ELASTIC_APM_API_REQUEST_TIME", "100ms")
defer os.Unsetenv("ELASTIC_APM_API_REQUEST_TIME")

events := make(chan beat.Event, 10)

cfg := common.MustNewConfigFrom(m{
"instrumentation": m{"enabled": enabled},
"host": "localhost:0",
"secret_token": "foo",
})
beater, err := setupServer(t, cfg, nil, events)
require.NoError(t, err)

// onboarding event
e := <-events
assert.Equal(t, "onboarding", e.Fields["processor"].(common.MapStr)["name"])

// Send a transaction request so we have something to trace.
req := makeTransactionRequest(t, beater.baseURL)
req.Header.Add("Content-Type", "application/x-ndjson")
req.Header.Add("Authorization", "Bearer foo")
resp, err := beater.client.Do(req)
assert.NoError(t, err)
resp.Body.Close()

return events, beater.Stop
}
1 change: 1 addition & 0 deletions changelogs/head.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ https://github.com/elastic/apm-server/compare/7.15\...master[View commits]
- Removed `http.request.socket` fields {pull}6152[6152]
- Removed unused `transaction.duration.{count,sum.us}` metric fields {pull}6174[6174]
- Removed `apm-server.sampling.tail.storage_dir` config {pull}6236[6236]
- Removed `ProcessPending` self-instrumentation events {pull}6243[6243]

[float]
==== Bug fixes
Expand Down
28 changes: 14 additions & 14 deletions beater/acker.go → publish/acker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

package beater
package publish

import (
"context"
Expand All @@ -25,64 +25,64 @@ import (
"github.com/elastic/beats/v7/libbeat/beat"
)

// waitPublishedAcker is a beat.ACKer which keeps track of the number of
// events published. waitPublishedAcker provides an interruptible Wait method
// WaitPublishedAcker is a beat.ACKer which keeps track of the number of
// events published. WaitPublishedAcker provides an interruptible Wait method
// that blocks until all clients are closed, and all published events at the
// time the clients are closed are acknowledged.
type waitPublishedAcker struct {
type WaitPublishedAcker struct {
active int64 // atomic

mu sync.Mutex
empty *sync.Cond
}

// newWaitPublishedAcker returns a new waitPublishedAcker.
func newWaitPublishedAcker() *waitPublishedAcker {
acker := &waitPublishedAcker{}
// NewWaitPublishedAcker returns a new WaitPublishedAcker.
func NewWaitPublishedAcker() *WaitPublishedAcker {
acker := &WaitPublishedAcker{}
acker.empty = sync.NewCond(&acker.mu)
return acker
}

// AddEvent is called when an event has been published or dropped by the client,
// and increments a counter for published events.
func (w *waitPublishedAcker) AddEvent(event beat.Event, published bool) {
func (w *WaitPublishedAcker) AddEvent(event beat.Event, published bool) {
if published {
w.incref(1)
}
}

// ACKEvents is called when published events have been acknowledged.
func (w *waitPublishedAcker) ACKEvents(n int) {
func (w *WaitPublishedAcker) ACKEvents(n int) {
w.decref(int64(n))
}

// Open must be called exactly once before any new pipeline client is opened,
// incrementing the acker's reference count.
func (w *waitPublishedAcker) Open() {
func (w *WaitPublishedAcker) Open() {
w.incref(1)
}

// Close is called when a pipeline client is closed, and decrements the
// acker's reference count.
//
// This must be called at most once for each call to Open.
func (w *waitPublishedAcker) Close() {
func (w *WaitPublishedAcker) Close() {
w.decref(1)
}

func (w *waitPublishedAcker) incref(n int64) {
func (w *WaitPublishedAcker) incref(n int64) {
atomic.AddInt64(&w.active, 1)
}

func (w *waitPublishedAcker) decref(n int64) {
func (w *WaitPublishedAcker) decref(n int64) {
if atomic.AddInt64(&w.active, int64(-n)) == 0 {
w.empty.Broadcast()
}
}

// Wait waits for w to be closed and all previously published events to be
// acknowledged.
func (w *waitPublishedAcker) Wait(ctx context.Context) error {
func (w *WaitPublishedAcker) Wait(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
go func() {
Expand Down
Loading

0 comments on commit 79b43c1

Please sign in to comment.