Skip to content

Commit

Permalink
publish: implement BatchProcessor; remove tracing (#6243)
Browse files Browse the repository at this point in the history
Modify publish.Publisher so that it implements
model.BatchProcessor directly. Remove the tracing
instrumentation from publisher. We no longer use
and don't support libbeat processors, so this is
not useful.

(cherry picked from commit 79b43c1)

# Conflicts:
#	changelogs/head.asciidoc
  • Loading branch information
axw authored and mergify-bot committed Oct 4, 2021
1 parent f846deb commit 2bc7f61
Show file tree
Hide file tree
Showing 8 changed files with 193 additions and 167 deletions.
1 change: 0 additions & 1 deletion beater/api/asset/sourcemap/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ func Handler(report publish.Reporter, notifier AddedNotifier) request.Handler {
req := publish.PendingReq{Transformable: &smap}
span, ctx := apm.StartSpan(c.Request.Context(), "Send", "Reporter")
defer span.End()
req.Trace = !span.Dropped()
if err := report(ctx, req); err != nil {
if err == publish.ErrChannelClosed {
c.Result.SetWithError(request.IDResponseErrorsShuttingDown, err)
Expand Down
25 changes: 7 additions & 18 deletions beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func NewCreator(args CreatorParams) beat.Creator {
stopped: false,
logger: logger,
wrapRunServer: args.WrapRunServer,
waitPublished: newWaitPublishedAcker(),
waitPublished: publish.NewWaitPublishedAcker(),
}

var err error
Expand Down Expand Up @@ -126,7 +126,7 @@ type beater struct {
config *config.Config
logger *logp.Logger
wrapRunServer func(RunServerFunc) RunServerFunc
waitPublished *waitPublishedAcker
waitPublished *publish.WaitPublishedAcker

mutex sync.Mutex // guards stopServer and stopped
stopServer func()
Expand Down Expand Up @@ -285,7 +285,7 @@ type serverRunner struct {
done chan struct{}

pipeline beat.PipelineConnector
acker *waitPublishedAcker
acker *publish.WaitPublishedAcker
namespace string
config *config.Config
rawConfig *common.Config
Expand All @@ -311,7 +311,7 @@ type sharedServerRunnerParams struct {
Logger *logp.Logger
Tracer *apm.Tracer
TracerServer *tracerServer
Acker *waitPublishedAcker
Acker *publish.WaitPublishedAcker
}

func newServerRunner(ctx context.Context, args serverRunnerParams) (*serverRunner, error) {
Expand Down Expand Up @@ -476,7 +476,7 @@ func (s *serverRunner) run(listener net.Listener) error {
}
batchProcessor = append(batchProcessor,
modelprocessor.DroppedSpansStatsDiscarder{},
s.newFinalBatchProcessor(reporter),
s.newFinalBatchProcessor(publisher),
)

g.Go(func() error {
Expand Down Expand Up @@ -573,8 +573,8 @@ func (s *serverRunner) waitReady(ctx context.Context, kibanaClient kibana.Client
}

// newFinalBatchProcessor returns the final model.BatchProcessor that publishes events.
func (s *serverRunner) newFinalBatchProcessor(libbeatReporter publish.Reporter) model.BatchProcessor {
return &reporterBatchProcessor{libbeatReporter}
func (s *serverRunner) newFinalBatchProcessor(p *publish.Publisher) model.BatchProcessor {
return p
}

func (s *serverRunner) wrapRunServerWithPreprocessors(runServer RunServerFunc) RunServerFunc {
Expand Down Expand Up @@ -803,17 +803,6 @@ func WrapRunServerWithProcessors(runServer RunServerFunc, processors ...model.Ba
}
}

type disablePublisherTracingKey struct{}

type reporterBatchProcessor struct {
reporter publish.Reporter
}

func (p *reporterBatchProcessor) ProcessBatch(ctx context.Context, batch *model.Batch) error {
disableTracing, _ := ctx.Value(disablePublisherTracingKey{}).(bool)
return p.reporter(ctx, publish.PendingReq{Transformable: batch, Trace: !disableTracing})
}

// augmentedReporter wraps publish.Reporter such that the events it reports have
// `observer` and `ecs.version` fields injected.
func augmentedReporter(reporter publish.Reporter, info beat.Info) publish.Reporter {
Expand Down
3 changes: 0 additions & 3 deletions beater/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,6 @@ func (s *tracerServer) serve(ctx context.Context, batchProcessor model.BatchProc
case <-ctx.Done():
return ctx.Err()
case req := <-s.requests:
// Disable tracing for requests that come through the
// tracer server, to avoid recursive tracing.
req.ctx = context.WithValue(req.ctx, disablePublisherTracingKey{}, true)
req.res <- batchProcessor.ProcessBatch(req.ctx, req.batch)
}
}
Expand Down
149 changes: 33 additions & 116 deletions beater/tracing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package beater

import (
"fmt"
"os"
"testing"
"time"
Expand All @@ -27,128 +28,44 @@ import (

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"

"github.com/elastic/apm-server/beater/api"
)

// transactions from testdata/intake-v2/transactions.ndjson used to trigger tracing
var testTransactionIds = map[string]bool{
"945254c567a5417e": true,
"4340a8e0df1906ecbfa9": true,
"cdef4340a8e0df19": true,
"00xxxxFFaaaa1234": true,
"142e61450efb8574": true,
}

func TestServerTracingEnabled(t *testing.T) {
events, teardown := setupTestServerInstrumentation(t, true)
defer teardown()
os.Setenv("ELASTIC_APM_API_REQUEST_TIME", "10ms")
defer os.Unsetenv("ELASTIC_APM_API_REQUEST_TIME")

txEvents := transactionEvents(events)
var selfTransactions []string
for len(selfTransactions) < 2 {
select {
case e := <-txEvents:
if testTransactionIds[eventTransactionId(e)] {
continue
for _, enabled := range []bool{false, true} {
t.Run(fmt.Sprint(enabled), func(t *testing.T) {
cfg := common.MustNewConfigFrom(m{
"host": "localhost:0",
"data_streams.enabled": true,
"data_streams.wait_for_integration": false,
"instrumentation.enabled": enabled,
})
events := make(chan beat.Event, 10)
beater, err := setupServer(t, cfg, nil, events)
require.NoError(t, err)

// Make an HTTP request to the server, which should be traced
// if instrumentation is enabled.
resp, err := beater.client.Get(beater.baseURL + "/foo")
assert.NoError(t, err)
resp.Body.Close()

if enabled {
select {
case <-events:
case <-time.After(10 * time.Second):
t.Fatal("timed out waiting for event")
}
}

// Check that self-instrumentation goes through the
// reporter wrapped by setupBeater.
wrapped, _ := e.GetValue("labels.wrapped_reporter")
assert.Equal(t, true, wrapped)

selfTransactions = append(selfTransactions, eventTransactionName(e))
case <-time.After(5 * time.Second):
assert.FailNow(t, "timed out waiting for transaction")
}
}
assert.Contains(t, selfTransactions, "POST "+api.IntakePath)
assert.Contains(t, selfTransactions, "ProcessPending")

// We expect no more events, i.e. no recursive self-tracing.
for {
select {
case e := <-txEvents:
assert.FailNowf(t, "unexpected event", "%v", e)
case <-time.After(time.Second):
return
}
}
}

func TestServerTracingDisabled(t *testing.T) {
events, teardown := setupTestServerInstrumentation(t, false)
defer teardown()

txEvents := transactionEvents(events)
for {
select {
case e := <-txEvents:
assert.Contains(t, testTransactionIds, eventTransactionId(e))
case <-time.After(time.Second):
return
}
}
}

func eventTransactionId(event beat.Event) string {
transaction := event.Fields["transaction"].(common.MapStr)
return transaction["id"].(string)
}

func eventTransactionName(event beat.Event) string {
transaction := event.Fields["transaction"].(common.MapStr)
return transaction["name"].(string)
}

func transactionEvents(events <-chan beat.Event) <-chan beat.Event {
out := make(chan beat.Event, 1)
go func() {
defer close(out)
for event := range events {
processor := event.Fields["processor"].(common.MapStr)
if processor["event"] == "transaction" {
out <- event
// We expect no more events, i.e. no recursive self-tracing.
select {
case e := <-events:
t.Errorf("unexpected event: %v", e)
case <-time.After(100 * time.Millisecond):
}
}
}()
return out
}

// setupTestServerInstrumentation sets up a beater with or without instrumentation enabled,
// and returns a channel to which events are published, and a function to be
// called to teardown the beater. The initial onboarding event is consumed
// and a transactions request is made before returning.
func setupTestServerInstrumentation(t *testing.T, enabled bool) (chan beat.Event, func()) {
if testing.Short() {
t.Skip("skipping server test")
})
}

os.Setenv("ELASTIC_APM_API_REQUEST_TIME", "100ms")
defer os.Unsetenv("ELASTIC_APM_API_REQUEST_TIME")

events := make(chan beat.Event, 10)

cfg := common.MustNewConfigFrom(m{
"instrumentation": m{"enabled": enabled},
"host": "localhost:0",
"secret_token": "foo",
})
beater, err := setupServer(t, cfg, nil, events)
require.NoError(t, err)

// onboarding event
e := <-events
assert.Equal(t, "onboarding", e.Fields["processor"].(common.MapStr)["name"])

// Send a transaction request so we have something to trace.
req := makeTransactionRequest(t, beater.baseURL)
req.Header.Add("Content-Type", "application/x-ndjson")
req.Header.Add("Authorization", "Bearer foo")
resp, err := beater.client.Do(req)
assert.NoError(t, err)
resp.Body.Close()

return events, beater.Stop
}
44 changes: 44 additions & 0 deletions changelogs/head.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
[[release-notes-head]]
== APM Server version HEAD

https://github.com/elastic/apm-server/compare/7.15\...master[View commits]

[float]
==== Breaking Changes
- Removed unused stacktrace/frame monitoring counters {pull}5984[5984]
- Removed unused support for top-level metricsets and metricset tags for RUMv3 {pull}6065[6065]
- Removed `apm-server.mode` configuration, and "experimental" fields {pull}6086[6086]
- `transaction.sampled` is now only set for sampled transactions {pull}6066[6066]
- Unknown metrics are dropped when `transaction.*` or `span.*` are present in a metricset {pull}6111[6111]
- Removed `metricset.period` from service_destination metrics {pull}6111[6111]
- Removed `http.request.socket` fields {pull}6152[6152]
- Removed unused `transaction.duration.{count,sum.us}` metric fields {pull}6174[6174]
- Removed `apm-server.sampling.tail.storage_dir` config {pull}6236[6236]
- Removed `ProcessPending` self-instrumentation events {pull}6243[6243]

[float]
==== Bug fixes

[float]
==== Intake API Changes
- `faas`, `service.origin.*`, and `cloud.origin.*` added for supporting function as a service fields {pull}6161[6161]
- `context.message.routing_key` was added to the intake API {pull}6177[6177]
- `transaction.dropped_spans_stats` was added to the intake API {pull}6200[6200]

[float]
==== Added
- The `error.log.message` or `error.exception.message` field of errors will be copied to the ECS field `message` {pull}5974[5974]
- Define index sorting for internal metrics data stream {pull}6116[6116]
- Add histogram dynamic_template to app metrics data stream {pull}6043[6043]
- Index OpenTelemetry span events and Jaeger logs into a log data stream {pull}6122[6122]
- With `apm-server.data_streams.enabled` in standalone mode, the server now accepts and enqueues events while waiting for the integration to be installed {pull}6130[6130]
- HTTP server errors (e.g. TLS handshake errors) are now logged {pull}6141[6141]
- Span documents now duplicate extended HTTP fields, which were previously only under `span.http.*`, under `http.*` {pull}6147[6147]
- We now record the direct network peer for incoming requests as `source.ip` and `source.port`; origin IP is recorded in `client.ip` {pull}6152[6152]
- We now collect span destination metrics for transactions with too many spans (for example due to transaction_max_spans or exit_span_min_duration) when collected and sent by APM agents {pull}6200[6200]

[float]
==== Deprecated
- Setting `service.version` as a span tag (Jaeger) or attribute (OTel) is deprecated; use tracer tags (Jaeger) and resource attributes (OTel) {pull}6131[6131]
- Setting up Elasticsearch templates, ILM policies, and pipelines directly with apm-server is now deprecated. Users should use the integration package {pull}6145[6145]
- `span.http.*` fields are deprecated, replaced by `http.*`, and will be removed in 8.0 {pull}6147[6147]
28 changes: 14 additions & 14 deletions beater/acker.go → publish/acker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

package beater
package publish

import (
"context"
Expand All @@ -25,64 +25,64 @@ import (
"github.com/elastic/beats/v7/libbeat/beat"
)

// waitPublishedAcker is a beat.ACKer which keeps track of the number of
// events published. waitPublishedAcker provides an interruptible Wait method
// WaitPublishedAcker is a beat.ACKer which keeps track of the number of
// events published. WaitPublishedAcker provides an interruptible Wait method
// that blocks until all clients are closed, and all published events at the
// time the clients are closed are acknowledged.
type waitPublishedAcker struct {
type WaitPublishedAcker struct {
active int64 // atomic

mu sync.Mutex
empty *sync.Cond
}

// newWaitPublishedAcker returns a new waitPublishedAcker.
func newWaitPublishedAcker() *waitPublishedAcker {
acker := &waitPublishedAcker{}
// NewWaitPublishedAcker returns a new WaitPublishedAcker.
func NewWaitPublishedAcker() *WaitPublishedAcker {
acker := &WaitPublishedAcker{}
acker.empty = sync.NewCond(&acker.mu)
return acker
}

// AddEvent is called when an event has been published or dropped by the client,
// and increments a counter for published events.
func (w *waitPublishedAcker) AddEvent(event beat.Event, published bool) {
func (w *WaitPublishedAcker) AddEvent(event beat.Event, published bool) {
if published {
w.incref(1)
}
}

// ACKEvents is called when published events have been acknowledged.
func (w *waitPublishedAcker) ACKEvents(n int) {
func (w *WaitPublishedAcker) ACKEvents(n int) {
w.decref(int64(n))
}

// Open must be called exactly once before any new pipeline client is opened,
// incrementing the acker's reference count.
func (w *waitPublishedAcker) Open() {
func (w *WaitPublishedAcker) Open() {
w.incref(1)
}

// Close is called when a pipeline client is closed, and decrements the
// acker's reference count.
//
// This must be called at most once for each call to Open.
func (w *waitPublishedAcker) Close() {
func (w *WaitPublishedAcker) Close() {
w.decref(1)
}

func (w *waitPublishedAcker) incref(n int64) {
func (w *WaitPublishedAcker) incref(n int64) {
atomic.AddInt64(&w.active, 1)
}

func (w *waitPublishedAcker) decref(n int64) {
func (w *WaitPublishedAcker) decref(n int64) {
if atomic.AddInt64(&w.active, int64(-n)) == 0 {
w.empty.Broadcast()
}
}

// Wait waits for w to be closed and all previously published events to be
// acknowledged.
func (w *waitPublishedAcker) Wait(ctx context.Context) error {
func (w *WaitPublishedAcker) Wait(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
go func() {
Expand Down
Loading

0 comments on commit 2bc7f61

Please sign in to comment.