diff --git a/libbeat/publisher/pipeline/client_test.go b/libbeat/publisher/pipeline/client_test.go index e5fa370e292c..a3f0c822b9e8 100644 --- a/libbeat/publisher/pipeline/client_test.go +++ b/libbeat/publisher/pipeline/client_test.go @@ -19,6 +19,8 @@ package pipeline import ( "context" + "errors" + "io" "sync" "testing" "time" @@ -28,6 +30,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/outputs" + "github.com/elastic/beats/v7/libbeat/processors" "github.com/elastic/beats/v7/libbeat/publisher" "github.com/elastic/beats/v7/libbeat/publisher/processing" "github.com/elastic/beats/v7/libbeat/publisher/queue" @@ -35,26 +38,25 @@ import ( "github.com/elastic/beats/v7/libbeat/tests/resources" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/mapstr" "github.com/elastic/elastic-agent-libs/monitoring" ) -func TestClient(t *testing.T) { - makePipeline := func(settings Settings, qu queue.Queue) *Pipeline { - p, err := New(beat.Info{}, - Monitors{}, - conf.Namespace{}, - outputs.Group{}, - settings, - ) - if err != nil { - panic(err) - } - // Inject a test queue so the outputController doesn't create one - p.outputController.queue = qu +func makePipeline(t *testing.T, settings Settings, qu queue.Queue) *Pipeline { + p, err := New(beat.Info{}, + Monitors{}, + conf.Namespace{}, + outputs.Group{}, + settings, + ) + require.NoError(t, err) + // Inject a test queue so the outputController doesn't create one + p.outputController.queue = qu - return p - } + return p +} +func TestClient(t *testing.T) { t.Run("client close", func(t *testing.T) { // Note: no asserts. If closing fails we have a deadlock, because Publish // would block forever @@ -90,7 +92,7 @@ func TestClient(t *testing.T) { routinesChecker := resources.NewGoroutinesChecker() defer routinesChecker.Check(t) - pipeline := makePipeline(Settings{}, makeTestQueue()) + pipeline := makePipeline(t, Settings{}, makeTestQueue()) defer pipeline.Close() var ctx context.Context @@ -119,6 +121,105 @@ func TestClient(t *testing.T) { }) } }) + + t.Run("no infinite loop when processing fails", func(t *testing.T) { + logp.TestingSetup() + l := logp.L() + + // a small in-memory queue with a very short flush interval + q := memqueue.NewQueue(l, nil, memqueue.Settings{ + Events: 5, + FlushMinEvents: 1, + FlushTimeout: time.Millisecond, + }, 5) + + // model a processor that we're going to make produce errors after + p := &testProcessor{} + ps := testProcessorSupporter{Processor: p} + + // now we create a pipeline that makes sure that all + // events are acked while shutting down + pipeline := makePipeline(t, Settings{ + WaitClose: 100 * time.Millisecond, + WaitCloseMode: WaitOnPipelineClose, + Processors: ps, + }, q) + client, err := pipeline.Connect() + require.NoError(t, err) + defer client.Close() + + // consuming all the published events + var received []beat.Event + done := make(chan struct{}) + go func() { + for { + batch, err := q.Get(2) + if errors.Is(err, io.EOF) { + break + } + assert.NoError(t, err) + if batch == nil { + continue + } + for i := 0; i < batch.Count(); i++ { + e := batch.Entry(i).(publisher.Event) + received = append(received, e.Content) + } + batch.Done() + } + close(done) + }() + + sent := []beat.Event{ + { + Fields: mapstr.M{"number": 1}, + }, + { + Fields: mapstr.M{"number": 2}, + }, + { + Fields: mapstr.M{"number": 3}, + }, + { + Fields: mapstr.M{"number": 4}, + }, + } + + expected := []beat.Event{ + { + Fields: mapstr.M{"number": 1, "test": "value"}, + }, + { + Fields: mapstr.M{"number": 2, "test": "value"}, + }, + // { + // // this event must be excluded due to the processor error + // Fields: mapstr.M{"number": 3}, + // }, + { + Fields: mapstr.M{"number": 4, "test": "value"}, + }, + } + + client.PublishAll(sent[:2]) // first 2 + + // this causes our processor to malfunction and produce errors for all events + p.ErrorSwitch() + + client.PublishAll(sent[2:3]) // number 3 + + // back to normal + p.ErrorSwitch() + + client.PublishAll(sent[3:]) // number 4 + + client.Close() + pipeline.Close() + + // waiting for all events to be consumed from the queue + <-done + require.Equal(t, expected, received) + }) } func TestClientWaitClose(t *testing.T) { @@ -258,3 +359,39 @@ func TestMonitoring(t *testing.T) { assert.Equal(t, int64(batchSize), telemetrySnapshot.Ints["output.batch_size"]) assert.Equal(t, int64(numClients), telemetrySnapshot.Ints["output.clients"]) } + +type testProcessor struct{ error bool } + +func (p *testProcessor) String() string { + return "testProcessor" +} +func (p *testProcessor) Run(in *beat.Event) (event *beat.Event, err error) { + if p.error { + return nil, errors.New("test error") + } + _, err = in.Fields.Put("test", "value") + return in, err +} + +func (p *testProcessor) ErrorSwitch() { + p.error = !p.error +} + +type testProcessorSupporter struct { + beat.Processor +} + +// Create a running processor interface based on the given config +func (p testProcessorSupporter) Create(cfg beat.ProcessingConfig, drop bool) (beat.Processor, error) { + return p.Processor, nil +} + +// Processors returns a list of config strings for the given processor, for debug purposes +func (p testProcessorSupporter) Processors() []string { + return []string{p.Processor.String()} +} + +// Close the processor supporter +func (p testProcessorSupporter) Close() error { + return processors.Close(p.Processor) +}