Skip to content

Commit

Permalink
Make publish.Reporter implement BatchProcessor
Browse files Browse the repository at this point in the history
  • Loading branch information
axw committed Sep 13, 2021
1 parent 0faf630 commit a00c818
Show file tree
Hide file tree
Showing 7 changed files with 114 additions and 50 deletions.
1 change: 0 additions & 1 deletion beater/api/asset/sourcemap/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ func Handler(report publish.Reporter, notifier AddedNotifier) request.Handler {
req := publish.PendingReq{Transformable: &smap}
span, ctx := apm.StartSpan(c.Request.Context(), "Send", "Reporter")
defer span.End()
req.Trace = !span.Dropped()
if err := report(ctx, req); err != nil {
if err == publish.ErrChannelClosed {
c.Result.SetWithError(request.IDResponseErrorsShuttingDown, err)
Expand Down
23 changes: 6 additions & 17 deletions beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func NewCreator(args CreatorParams) beat.Creator {
stopped: false,
logger: logger,
wrapRunServer: args.WrapRunServer,
waitPublished: newWaitPublishedAcker(),
waitPublished: publish.NewWaitPublishedAcker(),
}

var err error
Expand Down Expand Up @@ -127,7 +127,7 @@ type beater struct {
config *config.Config
logger *logp.Logger
wrapRunServer func(RunServerFunc) RunServerFunc
waitPublished *waitPublishedAcker
waitPublished *publish.WaitPublishedAcker

mutex sync.Mutex // guards stopServer and stopped
stopServer func()
Expand Down Expand Up @@ -286,7 +286,7 @@ type serverRunner struct {
done chan struct{}

pipeline beat.PipelineConnector
acker *waitPublishedAcker
acker *publish.WaitPublishedAcker
namespace string
config *config.Config
rawConfig *common.Config
Expand All @@ -312,7 +312,7 @@ type sharedServerRunnerParams struct {
Logger *logp.Logger
Tracer *apm.Tracer
TracerServer *tracerServer
Acker *waitPublishedAcker
Acker *publish.WaitPublishedAcker
}

func newServerRunner(ctx context.Context, args serverRunnerParams) (*serverRunner, error) {
Expand Down Expand Up @@ -576,7 +576,7 @@ func (s *serverRunner) waitReady(ctx context.Context, kibanaClient kibana.Client
func (s *serverRunner) newFinalBatchProcessor(libbeatReporter publish.Reporter) (model.BatchProcessor, error) {
esOutputConfig := elasticsearchOutputConfig(s.beat)
if esOutputConfig == nil || !s.config.DataStreams.Enabled {
return &reporterBatchProcessor{libbeatReporter}, nil
return libbeatReporter, nil
}

// Add `output.elasticsearch.experimental` config. If this is true and
Expand All @@ -599,7 +599,7 @@ func (s *serverRunner) newFinalBatchProcessor(libbeatReporter publish.Reporter)
}
}
if !esConfig.Experimental {
return &reporterBatchProcessor{libbeatReporter}, nil
return libbeatReporter, nil
}

s.logger.Info("using experimental model indexer")
Expand Down Expand Up @@ -860,17 +860,6 @@ func WrapRunServerWithProcessors(runServer RunServerFunc, processors ...model.Ba
}
}

type disablePublisherTracingKey struct{}

type reporterBatchProcessor struct {
reporter publish.Reporter
}

func (p *reporterBatchProcessor) ProcessBatch(ctx context.Context, batch *model.Batch) error {
disableTracing, _ := ctx.Value(disablePublisherTracingKey{}).(bool)
return p.reporter(ctx, publish.PendingReq{Transformable: batch, Trace: !disableTracing})
}

// augmentedReporter wraps publish.Reporter such that the events it reports have
// `observer` and `ecs.version` fields injected.
func augmentedReporter(reporter publish.Reporter, info beat.Info) publish.Reporter {
Expand Down
3 changes: 0 additions & 3 deletions beater/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,6 @@ func (s *tracerServer) serve(ctx context.Context, batchProcessor model.BatchProc
case <-ctx.Done():
return ctx.Err()
case req := <-s.requests:
// Disable tracing for requests that come through the
// tracer server, to avoid recursive tracing.
req.ctx = context.WithValue(req.ctx, disablePublisherTracingKey{}, true)
req.res <- batchProcessor.ProcessBatch(req.ctx, req.batch)
}
}
Expand Down
2 changes: 2 additions & 0 deletions beater/tracing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ var testTransactionIds = map[string]bool{
}

func TestServerTracingEnabled(t *testing.T) {
// FIXME

events, teardown := setupTestServerInstrumentation(t, true)
defer teardown()

Expand Down
28 changes: 14 additions & 14 deletions beater/acker.go → publish/acker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

package beater
package publish

import (
"context"
Expand All @@ -25,64 +25,64 @@ import (
"github.com/elastic/beats/v7/libbeat/beat"
)

// waitPublishedAcker is a beat.ACKer which keeps track of the number of
// events published. waitPublishedAcker provides an interruptible Wait method
// WaitPublishedAcker is a beat.ACKer which keeps track of the number of
// events published. WaitPublishedAcker provides an interruptible Wait method
// that blocks until all clients are closed, and all published events at the
// time the clients are closed are acknowledged.
type waitPublishedAcker struct {
type WaitPublishedAcker struct {
active int64 // atomic

mu sync.Mutex
empty *sync.Cond
}

// newWaitPublishedAcker returns a new waitPublishedAcker.
func newWaitPublishedAcker() *waitPublishedAcker {
acker := &waitPublishedAcker{}
// NewWaitPublishedAcker returns a new WaitPublishedAcker.
func NewWaitPublishedAcker() *WaitPublishedAcker {
acker := &WaitPublishedAcker{}
acker.empty = sync.NewCond(&acker.mu)
return acker
}

// AddEvent is called when an event has been published or dropped by the client,
// and increments a counter for published events.
func (w *waitPublishedAcker) AddEvent(event beat.Event, published bool) {
func (w *WaitPublishedAcker) AddEvent(event beat.Event, published bool) {
if published {
w.incref(1)
}
}

// ACKEvents is called when published events have been acknowledged.
func (w *waitPublishedAcker) ACKEvents(n int) {
func (w *WaitPublishedAcker) ACKEvents(n int) {
w.decref(int64(n))
}

// Open must be called exactly once before any new pipeline client is opened,
// incrementing the acker's reference count.
func (w *waitPublishedAcker) Open() {
func (w *WaitPublishedAcker) Open() {
w.incref(1)
}

// Close is called when a pipeline client is closed, and decrements the
// acker's reference count.
//
// This must be called at most once for each call to Open.
func (w *waitPublishedAcker) Close() {
func (w *WaitPublishedAcker) Close() {
w.decref(1)
}

func (w *waitPublishedAcker) incref(n int64) {
func (w *WaitPublishedAcker) incref(n int64) {
atomic.AddInt64(&w.active, 1)
}

func (w *waitPublishedAcker) decref(n int64) {
func (w *WaitPublishedAcker) decref(n int64) {
if atomic.AddInt64(&w.active, int64(-n)) == 0 {
w.empty.Broadcast()
}
}

// Wait waits for w to be closed and all previously published events to be
// acknowledged.
func (w *waitPublishedAcker) Wait(ctx context.Context) error {
func (w *WaitPublishedAcker) Wait(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
go func() {
Expand Down
23 changes: 8 additions & 15 deletions publish/pub.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,16 @@ import (
"go.elastic.co/apm"

"github.com/elastic/beats/v7/libbeat/beat"

"github.com/elastic/apm-server/model"
)

type Reporter func(context.Context, PendingReq) error

func (r Reporter) ProcessBatch(ctx context.Context, batch *model.Batch) error {
return r(ctx, PendingReq{Transformable: batch})
}

// Publisher forwards batches of events to libbeat.
//
// If the publisher's input channel is full, an error is returned immediately.
Expand All @@ -52,7 +58,6 @@ type Publisher struct {

type PendingReq struct {
Transformable Transformer
Trace bool
}

// Transformer is an interface implemented by types that can be transformed into beat.Events.
Expand Down Expand Up @@ -181,19 +186,7 @@ func (p *Publisher) Send(ctx context.Context, req PendingReq) error {
func (p *Publisher) run() {
ctx := context.Background()
for req := range p.pendingRequests {
p.processPendingReq(ctx, req)
}
}

func (p *Publisher) processPendingReq(ctx context.Context, req PendingReq) {
var tx *apm.Transaction
if req.Trace {
tx = p.tracer.StartTransaction("ProcessPending", "Publisher")
defer tx.End()
ctx = apm.ContextWithTransaction(ctx, tx)
events := req.Transformable.Transform(ctx)
p.client.PublishAll(events)
}
events := req.Transformable.Transform(ctx)
span := tx.StartSpan("PublishAll", "Publisher", nil)
defer span.End()
p.client.PublishAll(events)
}
84 changes: 84 additions & 0 deletions publish/pub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@
package publish_test

import (
"bufio"
"context"
"fmt"
"net/http"
"net/http/httptest"
"sync/atomic"
"testing"
"time"

Expand All @@ -29,12 +34,17 @@ import (

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/idxmgmt"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/outputs"
_ "github.com/elastic/beats/v7/libbeat/outputs/elasticsearch"
"github.com/elastic/beats/v7/libbeat/publisher"
"github.com/elastic/beats/v7/libbeat/publisher/pipeline"
"github.com/elastic/beats/v7/libbeat/publisher/pipetool"
"github.com/elastic/beats/v7/libbeat/publisher/queue"
"github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue"

"github.com/elastic/apm-server/model"
"github.com/elastic/apm-server/publish"
)

Expand Down Expand Up @@ -95,6 +105,80 @@ func TestPublisherStopShutdownInactive(t *testing.T) {
assert.NoError(t, publisher.Stop(context.Background()))
}

func BenchmarkPublisher(b *testing.B) {
mux := http.NewServeMux()
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("X-Elastic-Product", "Elasticsearch")
fmt.Fprintln(w, `{"version":{"number":"1.2.3"}}`)
})

var indexed int64
mux.HandleFunc("/_bulk", func(w http.ResponseWriter, r *http.Request) {
scanner := bufio.NewScanner(r.Body)
var n int64
for scanner.Scan() && scanner.Scan() {
n++
}
atomic.AddInt64(&indexed, n)
// TODO(axw) respond properly
})
srv := httptest.NewServer(mux)
defer srv.Close()

supporter, err := idxmgmt.DefaultSupport(logp.NewLogger("beater_test"), beat.Info{}, nil)
require.NoError(b, err)
outputGroup, err := outputs.Load(supporter, beat.Info{}, nil, "elasticsearch", common.MustNewConfigFrom(map[string]interface{}{
"hosts": []interface{}{srv.URL},
}))
require.NoError(b, err)
pipeline, err := pipeline.New(
beat.Info{},
pipeline.Monitors{},
func(lis queue.ACKListener) (queue.Queue, error) {
return memqueue.NewQueue(nil, memqueue.Settings{
ACKListener: lis,
FlushMinEvents: 2048,
FlushTimeout: time.Second,
Events: 4096,
}), nil
},
outputGroup,
pipeline.Settings{
WaitCloseMode: pipeline.WaitOnClientClose,
InputQueueSize: 2048,
},
)
require.NoError(b, err)

acker := publish.NewWaitPublishedAcker()
acker.Open()
publisher, err := publish.NewPublisher(
pipetool.WithACKer(pipeline, acker),
apmtest.DiscardTracer,
&publish.PublisherConfig{},
)
require.NoError(b, err)

batch := model.Batch{
model.APMEvent{
Processor: model.TransactionProcessor,
Timestamp: time.Now(),
},
}
ctx := context.Background()
for i := 0; i < b.N; i++ {
if err := publisher.Send(ctx, publish.PendingReq{&batch}); err != nil {
b.Fatal(err)
}
}

// Close the publisher and wait for enqueued events to be published.
assert.NoError(b, publisher.Stop(context.Background()))
assert.NoError(b, acker.Wait(context.Background()))
assert.NoError(b, pipeline.Close())
assert.Equal(b, int64(b.N), indexed)
}

func newBlockingPipeline(t testing.TB) *pipeline.Pipeline {
pipeline, err := pipeline.New(
beat.Info{},
Expand Down

0 comments on commit a00c818

Please sign in to comment.