diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index b042045e0ef..59d4014b863 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -195,6 +195,7 @@ field. You can revert this change by configuring tags for the module and omittin - Explicitly detect missing variables in autodiscover configuration, log them at the debug level. {issue}20568[20568] {pull}20898[20898] - Fix `libbeat.output.write.bytes` and `libbeat.output.read.bytes` metrics of the Elasticsearch output. {issue}20752[20752] {pull}21197[21197] - The `o365input` and `o365` module now recover from an authentication problem or other fatal errors, instead of terminating. {pull}21258[21258] +- Orderly close processors when processing pipelines are not needed anymore to release their resources. {pull}16349[16349] *Auditbeat* diff --git a/libbeat/beat/pipeline.go b/libbeat/beat/pipeline.go index 699c96d8b62..f13b3c39ff2 100644 --- a/libbeat/beat/pipeline.go +++ b/libbeat/beat/pipeline.go @@ -138,6 +138,7 @@ type ClientEventer interface { type ProcessorList interface { Processor + Close() error All() []Processor } diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index 3a0375120ff..9b3b19b294c 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -372,6 +372,11 @@ func (b *Beat) launch(settings Settings, bt beat.Creator) error { if err != nil { return err } + defer func() { + if err := b.processing.Close(); err != nil { + logp.Warn("Failed to close global processing: %v", err) + } + }() // Windows: Mark service as stopped. // After this is run, a Beat service is considered by the OS to be stopped diff --git a/libbeat/common/docker/watcher.go b/libbeat/common/docker/watcher.go index 0543d37e9c3..2421c232eee 100644 --- a/libbeat/common/docker/watcher.go +++ b/libbeat/common/docker/watcher.go @@ -138,6 +138,7 @@ func NewWatcher(log *logp.Logger, host string, tls *TLSConfig, storeShortID bool // Extra check to confirm that Docker is available _, err = client.Info(context.Background()) if err != nil { + client.Close() return nil, err } @@ -395,14 +396,12 @@ func (w *watcher) cleanupWorker() { log := w.log for { - // Wait a full period - time.Sleep(w.cleanupTimeout) - select { case <-w.ctx.Done(): w.stopped.Done() return - default: + // Wait a full period + case <-time.After(w.cleanupTimeout): // Check entries for timeout var toDelete []string timeout := time.Now().Add(-w.cleanupTimeout) diff --git a/libbeat/processors/add_docker_metadata/add_docker_metadata.go b/libbeat/processors/add_docker_metadata/add_docker_metadata.go index cf0fda79d8d..beaca3bb46b 100644 --- a/libbeat/processors/add_docker_metadata/add_docker_metadata.go +++ b/libbeat/processors/add_docker_metadata/add_docker_metadata.go @@ -209,6 +209,18 @@ func (d *addDockerMetadata) Run(event *beat.Event) (*beat.Event, error) { return event, nil } +func (d *addDockerMetadata) Close() error { + if d.cgroups != nil { + d.cgroups.StopJanitor() + } + d.watcher.Stop() + err := processors.Close(d.sourceProcessor) + if err != nil { + return errors.Wrap(err, "closing source processor of add_docker_metadata") + } + return nil +} + func (d *addDockerMetadata) String() string { return fmt.Sprintf("%v=[match_fields=[%v] match_pids=[%v]]", processorName, strings.Join(d.fields, ", "), strings.Join(d.pidFields, ", ")) diff --git a/libbeat/processors/add_docker_metadata/add_docker_metadata_integration_test.go b/libbeat/processors/add_docker_metadata/add_docker_metadata_integration_test.go new file mode 100644 index 00000000000..91d3315a401 --- /dev/null +++ b/libbeat/processors/add_docker_metadata/add_docker_metadata_integration_test.go @@ -0,0 +1,120 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// +build linux darwin windows +// +build integration + +package add_docker_metadata + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/common/docker" + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/libbeat/processors" + dockertest "github.com/elastic/beats/v7/libbeat/tests/docker" + "github.com/elastic/beats/v7/libbeat/tests/resources" +) + +func TestAddDockerMetadata(t *testing.T) { + goroutines := resources.NewGoroutinesChecker() + defer goroutines.Check(t) + + client, err := docker.NewClient(defaultConfig().Host, nil, nil) + require.NoError(t, err) + + // Docker clients can affect the goroutines checker because they keep + // idle keep-alive connections, so we explicitly close them. + // These idle connections in principle wouldn't represent leaks even if + // the client is not explicitly closed because they are eventually closed. + defer client.Close() + + // Start a container to have some data to enrich events + testClient, err := dockertest.NewClient() + require.NoError(t, err) + // Explicitly close client to don't affect goroutines checker + defer testClient.Close() + + image := "busybox" + cmd := []string{"sleep", "60"} + labels := map[string]string{"label": "foo"} + id, err := testClient.ContainerStart(image, cmd, labels) + require.NoError(t, err) + defer testClient.ContainerRemove(id) + + info, err := testClient.ContainerInspect(id) + require.NoError(t, err) + pid := info.State.Pid + + config, err := common.NewConfigFrom(map[string]interface{}{ + "match_fields": []string{"cid"}, + }) + watcherConstructor := newWatcherWith(client) + processor, err := buildDockerMetadataProcessor(logp.L(), config, watcherConstructor) + require.NoError(t, err) + + t.Run("match container by container id", func(t *testing.T) { + input := &beat.Event{Fields: common.MapStr{ + "cid": id, + }} + result, err := processor.Run(input) + require.NoError(t, err) + + resultLabels, _ := result.Fields.GetValue("container.labels") + expectedLabels := common.MapStr{"label": "foo"} + assert.Equal(t, expectedLabels, resultLabels) + assert.Equal(t, id, result.Fields["cid"]) + }) + + t.Run("match container by process id", func(t *testing.T) { + input := &beat.Event{Fields: common.MapStr{ + "cid": id, + "process.pid": pid, + }} + result, err := processor.Run(input) + require.NoError(t, err) + + resultLabels, _ := result.Fields.GetValue("container.labels") + expectedLabels := common.MapStr{"label": "foo"} + assert.Equal(t, expectedLabels, resultLabels) + assert.Equal(t, id, result.Fields["cid"]) + }) + + t.Run("don't enrich non existing container", func(t *testing.T) { + input := &beat.Event{Fields: common.MapStr{ + "cid": "notexists", + }} + result, err := processor.Run(input) + require.NoError(t, err) + assert.Equal(t, input.Fields, result.Fields) + }) + + err = processors.Close(processor) + require.NoError(t, err) +} + +func newWatcherWith(client docker.Client) docker.WatcherConstructor { + return func(log *logp.Logger, host string, tls *docker.TLSConfig, storeShortID bool) (docker.Watcher, error) { + return docker.NewWatcherWithClient(log, client, 60*time.Second, storeShortID) + } +} diff --git a/libbeat/processors/add_docker_metadata/add_docker_metadata_test.go b/libbeat/processors/add_docker_metadata/add_docker_metadata_test.go index 2d8a5a9e970..f246d597e13 100644 --- a/libbeat/processors/add_docker_metadata/add_docker_metadata_test.go +++ b/libbeat/processors/add_docker_metadata/add_docker_metadata_test.go @@ -16,6 +16,7 @@ // under the License. // +build linux darwin windows +// +build !integration package add_docker_metadata diff --git a/libbeat/processors/add_kubernetes_metadata/cache.go b/libbeat/processors/add_kubernetes_metadata/cache.go index da7492b43a9..5de52efdc1b 100644 --- a/libbeat/processors/add_kubernetes_metadata/cache.go +++ b/libbeat/processors/add_kubernetes_metadata/cache.go @@ -31,6 +31,7 @@ type cache struct { timeout time.Duration deleted map[string]time.Time // key -> when should this obj be deleted metadata map[string]common.MapStr + done chan struct{} } func newCache(cleanupTimeout time.Duration) *cache { @@ -38,6 +39,7 @@ func newCache(cleanupTimeout time.Duration) *cache { timeout: cleanupTimeout, deleted: make(map[string]time.Time), metadata: make(map[string]common.MapStr), + done: make(chan struct{}), } go c.cleanup() return c @@ -67,15 +69,29 @@ func (c *cache) set(key string, data common.MapStr) { } func (c *cache) cleanup() { - ticker := time.Tick(timeout) - for now := range ticker { - c.Lock() - for k, t := range c.deleted { - if now.After(t) { - delete(c.deleted, k) - delete(c.metadata, k) + if timeout <= 0 { + return + } + + ticker := time.NewTicker(timeout) + defer ticker.Stop() + for { + select { + case <-c.done: + return + case now := <-ticker.C: + c.Lock() + for k, t := range c.deleted { + if now.After(t) { + delete(c.deleted, k) + delete(c.metadata, k) + } } + c.Unlock() } - c.Unlock() } } + +func (c *cache) stop() { + close(c.done) +} diff --git a/libbeat/processors/add_kubernetes_metadata/kubernetes.go b/libbeat/processors/add_kubernetes_metadata/kubernetes.go index 2a5f4d2faed..535eb1187ea 100644 --- a/libbeat/processors/add_kubernetes_metadata/kubernetes.go +++ b/libbeat/processors/add_kubernetes_metadata/kubernetes.go @@ -242,6 +242,16 @@ func (k *kubernetesAnnotator) Run(event *beat.Event) (*beat.Event, error) { return event, nil } +func (k *kubernetesAnnotator) Close() error { + if k.watcher != nil { + k.watcher.Stop() + } + if k.cache != nil { + k.cache.stop() + } + return nil +} + func (k *kubernetesAnnotator) addPod(pod *kubernetes.Pod) { metadata := k.indexers.GetMetadata(pod) for _, m := range metadata { diff --git a/libbeat/processors/add_process_metadata/add_process_metadata.go b/libbeat/processors/add_process_metadata/add_process_metadata.go index c41ca9a73d6..01e2cf1e9fe 100644 --- a/libbeat/processors/add_process_metadata/add_process_metadata.go +++ b/libbeat/processors/add_process_metadata/add_process_metadata.go @@ -55,11 +55,12 @@ var ( ) type addProcessMetadata struct { - config config - provider processMetadataProvider - cidProvider cidProvider - log *logp.Logger - mappings common.MapStr + config config + provider processMetadataProvider + cgroupsCache *common.Cache + cidProvider cidProvider + log *logp.Logger + mappings common.MapStr } type processMetadata struct { @@ -81,16 +82,22 @@ type cidProvider interface { } func init() { - processors.RegisterPlugin(processorName, New) + processors.RegisterPlugin(processorName, NewWithCache) jsprocessor.RegisterPlugin("AddProcessMetadata", New) } // New constructs a new add_process_metadata processor. func New(cfg *common.Config) (processors.Processor, error) { - return newProcessMetadataProcessorWithProvider(cfg, &procCache) + return newProcessMetadataProcessorWithProvider(cfg, &procCache, false) } -func newProcessMetadataProcessorWithProvider(cfg *common.Config, provider processMetadataProvider) (proc processors.Processor, err error) { +// NewWithCache construct a new add_process_metadata processor with cache for container IDs. +// Resulting processor implements `Close()` to release the cache resources. +func NewWithCache(cfg *common.Config) (processors.Processor, error) { + return newProcessMetadataProcessorWithProvider(cfg, &procCache, true) +} + +func newProcessMetadataProcessorWithProvider(cfg *common.Config, provider processMetadataProvider, withCache bool) (proc processors.Processor, err error) { // Logging (each processor instance has a unique ID). var ( id = int(instanceID.Inc()) @@ -118,21 +125,25 @@ func newProcessMetadataProcessorWithProvider(cfg *common.Config, provider proces } // don't use cgroup.ProcessCgroupPaths to save it from doing the work when container id disabled if ok := containsValue(mappings, "container.id"); ok { - if config.CgroupCacheExpireTime != 0 { + if withCache && config.CgroupCacheExpireTime != 0 { p.log.Debug("Initializing cgroup cache") evictionListener := func(k common.Key, v common.Value) { p.log.Debugf("Evicted cached cgroups for PID=%v", k) } - cgroupsCache := common.NewCacheWithRemovalListener(config.CgroupCacheExpireTime, 100, evictionListener) - cgroupsCache.StartJanitor(config.CgroupCacheExpireTime) - p.cidProvider = newCidProvider(config.HostPath, config.CgroupPrefixes, config.CgroupRegex, processCgroupPaths, cgroupsCache) + p.cgroupsCache = common.NewCacheWithRemovalListener(config.CgroupCacheExpireTime, 100, evictionListener) + p.cgroupsCache.StartJanitor(config.CgroupCacheExpireTime) + p.cidProvider = newCidProvider(config.HostPath, config.CgroupPrefixes, config.CgroupRegex, processCgroupPaths, p.cgroupsCache) } else { p.cidProvider = newCidProvider(config.HostPath, config.CgroupPrefixes, config.CgroupRegex, processCgroupPaths, nil) } } + if withCache { + return &addProcessMetadataCloser{p}, nil + } + return &p, nil } @@ -253,6 +264,17 @@ func (p *addProcessMetadata) getContainerID(pid int) (string, error) { return cid, nil } +type addProcessMetadataCloser struct { + addProcessMetadata +} + +func (p *addProcessMetadataCloser) Close() error { + if p.addProcessMetadata.cgroupsCache != nil { + p.addProcessMetadata.cgroupsCache.StopJanitor() + } + return nil +} + // String returns the processor representation formatted as a string func (p *addProcessMetadata) String() string { return fmt.Sprintf("%v=[match_pids=%v, mappings=%v, ignore_missing=%v, overwrite_fields=%v, restricted_fields=%v, host_path=%v, cgroup_prefixes=%v]", diff --git a/libbeat/processors/add_process_metadata/add_process_metadata_test.go b/libbeat/processors/add_process_metadata/add_process_metadata_test.go index f9b4aaa681c..493034098cf 100644 --- a/libbeat/processors/add_process_metadata/add_process_metadata_test.go +++ b/libbeat/processors/add_process_metadata/add_process_metadata_test.go @@ -651,7 +651,7 @@ func TestAddProcessMetadata(t *testing.T) { t.Fatal(err) } - proc, err := newProcessMetadataProcessorWithProvider(config, testProcs) + proc, err := newProcessMetadataProcessorWithProvider(config, testProcs, true) if test.initErr == nil { if err != nil { t.Fatal(err) diff --git a/libbeat/processors/processor.go b/libbeat/processors/processor.go index c5002b7cebd..0e772a524da 100644 --- a/libbeat/processors/processor.go +++ b/libbeat/processors/processor.go @@ -20,6 +20,7 @@ package processors import ( "strings" + "github.com/joeshaw/multierror" "github.com/pkg/errors" "github.com/elastic/beats/v7/libbeat/beat" @@ -35,11 +36,29 @@ type Processors struct { log *logp.Logger } +// Processor is the interface that all processors must implement type Processor interface { Run(event *beat.Event) (*beat.Event, error) String() string } +// Closer defines the interface for processors that should be closed after using +// them. +// Close() is not part of the Processor interface because implementing this method +// is also a way to indicate that the processor keeps some resource that needs to +// be released or orderly closed. +type Closer interface { + Close() error +} + +// Close closes a processor if it implements the Closer interface +func Close(p Processor) error { + if closer, ok := p.(Closer); ok { + return closer.Close() + } + return nil +} + // NewList creates a new empty processor list. // Additional processors can be added to the List field. func NewList(log *logp.Logger) *Processors { @@ -153,6 +172,17 @@ func (procs *Processors) All() []beat.Processor { return ret } +func (procs *Processors) Close() error { + var errs multierror.Errors + for _, p := range procs.List { + err := Close(p) + if err != nil { + errs = append(errs, err) + } + } + return errs.Err() +} + // Run executes the all processors serially and returns the event and possibly // an error. If the event has been dropped (canceled) by a processor in the // list then a nil event is returned. diff --git a/libbeat/processors/script/javascript/module/processor/chain.go b/libbeat/processors/script/javascript/module/processor/chain.go index e58aac29372..9ef3da7859e 100644 --- a/libbeat/processors/script/javascript/module/processor/chain.go +++ b/libbeat/processors/script/javascript/module/processor/chain.go @@ -151,6 +151,17 @@ func newNativeProcessor(constructor processors.Constructor, call gojaCall) (proc if err != nil { return nil, err } + + if closer, ok := p.(processors.Closer); ok { + closer.Close() + // Script processor doesn't support releasing resources of stateful processors, + // what can lead to leaks, so prevent use of these processors. They shouldn't + // be registered. If this error happens, a processor that needs to be closed is + // being registered, this should be avoided. + // See https://github.com/elastic/beats/pull/16349 + return nil, errors.Errorf("stateful processor cannot be used in script processor, this is probably a bug: %s", p) + } + return &nativeProcessor{p}, nil } diff --git a/libbeat/processors/script/javascript/module/processor/processor_test.go b/libbeat/processors/script/javascript/module/processor/processor_test.go index 6ea66f409ff..9e958ee7035 100644 --- a/libbeat/processors/script/javascript/module/processor/processor_test.go +++ b/libbeat/processors/script/javascript/module/processor/processor_test.go @@ -23,6 +23,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" @@ -35,6 +36,7 @@ import ( func init() { RegisterPlugin("Mock", newMock) + RegisterPlugin("MockWithCloser", newMockWithCloser) } func testEvent() *beat.Event { @@ -67,14 +69,10 @@ function process(evt) { logp.TestingSetup() p, err := javascript.NewFromConfig(javascript.Config{Source: script}, nil) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) evt, err := p.Run(testEvent()) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) checkEvent(t, evt, "added", "new_value") } @@ -107,14 +105,10 @@ function process(evt) { logp.TestingSetup() p, err := javascript.NewFromConfig(javascript.Config{Source: script}, nil) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) evt, err := p.Run(testEvent()) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) // checking if hello world is added to the event in different languages checkEvent(t, evt, "helló", "világ") @@ -123,6 +117,22 @@ function process(evt) { checkEvent(t, evt, "hallo", "Welt") } +func TestProcessorWithCloser(t *testing.T) { + const script = ` +var processor = require('processor'); + +var processorWithCloser = new processor.MockWithCloser().Build() + +function process(evt) { + processorWithCloser.Run(evt); +} +` + + logp.TestingSetup() + _, err := javascript.NewFromConfig(javascript.Config{Source: script}, nil) + require.Error(t, err, "processor that implements Closer() shouldn't be allowed") +} + func checkEvent(t *testing.T, evt *beat.Event, key, value string) { s, err := evt.GetValue(key) assert.NoError(t, err) @@ -162,3 +172,23 @@ func (m *mockProcessor) String() string { s, _ := json.Marshal(m.fields) return fmt.Sprintf("mock=%s", s) } + +type mockProcessorWithCloser struct{} + +func newMockWithCloser(c *common.Config) (processors.Processor, error) { + return &mockProcessorWithCloser{}, nil +} + +func (m *mockProcessorWithCloser) Run(event *beat.Event) (*beat.Event, error) { + // Nothing to do, we only want this struct to implement processors.Closer + return event, nil +} + +func (m *mockProcessorWithCloser) Close() error { + // Nothing to do, we only want this struct to implement processors.Closer + return nil +} + +func (m *mockProcessorWithCloser) String() string { + return "mockWithCloser" +} diff --git a/libbeat/publisher/pipeline/client.go b/libbeat/publisher/pipeline/client.go index 2ce792ed887..edb5a3f1eb3 100644 --- a/libbeat/publisher/pipeline/client.go +++ b/libbeat/publisher/pipeline/client.go @@ -24,6 +24,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common/atomic" "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/libbeat/processors" "github.com/elastic/beats/v7/libbeat/publisher" "github.com/elastic/beats/v7/libbeat/publisher/queue" ) @@ -164,6 +165,15 @@ func (c *client) Close() error { log.Debug("client: unlink from queue") c.unlink() log.Debug("client: done unlink") + + if c.processors != nil { + log.Debug("client: closing processors") + err := processors.Close(c.processors) + if err != nil { + log.Errorf("client: error closing processors: %v", err) + } + log.Debug("client: done closing processors") + } }) return nil } diff --git a/libbeat/publisher/processing/default.go b/libbeat/publisher/processing/default.go index cf99e03d4d3..6a9083acd1a 100644 --- a/libbeat/publisher/processing/default.go +++ b/libbeat/publisher/processing/default.go @@ -341,7 +341,10 @@ func (b *builder) Create(cfg beat.ProcessingConfig, drop bool) (beat.Processor, } // setup 8: pipeline processors list - processors.add(b.processors) + if b.processors != nil { + // Add the global pipeline as a function processor, so clients cannot close it + processors.add(newProcessor(b.processors.title, b.processors.Run)) + } // setup 9: time series metadata if b.timeSeries { @@ -361,6 +364,13 @@ func (b *builder) Create(cfg beat.ProcessingConfig, drop bool) (beat.Processor, return processors, nil } +func (b *builder) Close() error { + if b.processors != nil { + return b.processors.Close() + } + return nil +} + func makeClientProcessors( log *logp.Logger, cfg beat.ProcessingConfig, diff --git a/libbeat/publisher/processing/default_test.go b/libbeat/publisher/processing/default_test.go index 967119fcd39..bdd0dc9b736 100644 --- a/libbeat/publisher/processing/default_test.go +++ b/libbeat/publisher/processing/default_test.go @@ -29,6 +29,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/libbeat/processors" "github.com/elastic/beats/v7/libbeat/processors/actions" "github.com/elastic/ecs/code/go/ecs" ) @@ -319,6 +320,9 @@ func TestNormalization(t *testing.T) { fields.DeepUpdate(test.mod) assert.Equal(t, test.want, actual.Fields) + + err = s.Close() + require.NoError(t, err) }) } } @@ -333,6 +337,9 @@ func TestAlwaysDrop(t *testing.T) { actual, err := prog.Run(&beat.Event{}) require.NoError(t, err) assert.Nil(t, actual) + + err = s.Close() + require.NoError(t, err) } func TestDynamicFields(t *testing.T) { @@ -353,6 +360,52 @@ func TestDynamicFields(t *testing.T) { actual, err = prog.Run(&beat.Event{Fields: common.MapStr{"hello": "world"}}) require.NoError(t, err) assert.Equal(t, common.MapStr{"hello": "world", "dyn": "field"}, actual.Fields) + + err = factory.Close() + require.NoError(t, err) +} + +func TestProcessingClose(t *testing.T) { + factory, err := MakeDefaultSupport(true)(beat.Info{}, logp.L(), common.NewConfig()) + require.NoError(t, err) + + // Inject a processor in the builder that we can check if has been closed. + factoryProcessor := &processorWithClose{} + b := factory.(*builder) + if b.processors == nil { + b.processors = newGroup("global", logp.L()) + } + b.processors.add(factoryProcessor) + + clientProcessor := &processorWithClose{} + g := newGroup("test", logp.L()) + g.add(clientProcessor) + + prog, err := factory.Create(beat.ProcessingConfig{ + Processor: g, + }, false) + require.NoError(t, err) + + // Check that both processors are called + assert.False(t, factoryProcessor.called) + assert.False(t, clientProcessor.called) + _, err = prog.Run(&beat.Event{Fields: common.MapStr{"hello": "world"}}) + require.NoError(t, err) + assert.True(t, factoryProcessor.called) + assert.True(t, clientProcessor.called) + + // Check that closing the client processing pipeline doesn't close the global pipeline + assert.False(t, factoryProcessor.closed) + assert.False(t, clientProcessor.closed) + err = processors.Close(prog) + require.NoError(t, err) + assert.False(t, factoryProcessor.closed) + assert.True(t, clientProcessor.closed) + + // Check that closing the factory closes the processor in the global pipeline + err = factory.Close() + require.NoError(t, err) + assert.True(t, factoryProcessor.closed) } func fromJSON(in string) common.MapStr { @@ -363,3 +416,22 @@ func fromJSON(in string) common.MapStr { } return tmp } + +type processorWithClose struct { + closed bool + called bool +} + +func (p *processorWithClose) Run(e *beat.Event) (*beat.Event, error) { + p.called = true + return e, nil +} + +func (p *processorWithClose) Close() error { + p.closed = true + return nil +} + +func (p *processorWithClose) String() string { + return "processorWithClose" +} diff --git a/libbeat/publisher/processing/processing.go b/libbeat/publisher/processing/processing.go index 4f615fb1422..88feb62c7b2 100644 --- a/libbeat/publisher/processing/processing.go +++ b/libbeat/publisher/processing/processing.go @@ -33,6 +33,8 @@ type SupportFactory func(info beat.Info, log *logp.Logger, cfg *common.Config) ( // will merge the global and local configurations into a common event // processor. // If `drop` is set, then the processor generated must always drop all events. +// A Supporter needs to be closed with `Close()` to release its global resources. type Supporter interface { Create(cfg beat.ProcessingConfig, drop bool) (beat.Processor, error) + Close() error } diff --git a/libbeat/publisher/processing/processors.go b/libbeat/publisher/processing/processors.go index e994eef48cc..3a400d36dad 100644 --- a/libbeat/publisher/processing/processors.go +++ b/libbeat/publisher/processing/processors.go @@ -22,6 +22,8 @@ import ( "strings" "sync" + "github.com/joeshaw/multierror" + "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" @@ -77,6 +79,20 @@ func (p *group) add(processor processors.Processor) { } } +func (p *group) Close() error { + if p == nil { + return nil + } + var errs multierror.Errors + for _, processor := range p.list { + err := processors.Close(processor) + if err != nil { + errs = append(errs, err) + } + } + return errs.Err() +} + func (p *group) String() string { var s []string for _, p := range p.list { diff --git a/libbeat/tests/docker/docker.go b/libbeat/tests/docker/docker.go index b81ffa285ea..888347c5cc7 100644 --- a/libbeat/tests/docker/docker.go +++ b/libbeat/tests/docker/docker.go @@ -77,8 +77,28 @@ func (c Client) ContainerWait(ID string) error { return nil } +// ContainerInspect recovers information of the container +func (c Client) ContainerInspect(ID string) (types.ContainerJSON, error) { + ctx := context.Background() + return c.cli.ContainerInspect(ctx, ID) +} + // ContainerKill kills the given container func (c Client) ContainerKill(ID string) error { ctx := context.Background() return c.cli.ContainerKill(ctx, ID, "KILL") } + +// ContainerRemove kills and removed the given container +func (c Client) ContainerRemove(ID string) error { + ctx := context.Background() + return c.cli.ContainerRemove(ctx, ID, types.ContainerRemoveOptions{ + RemoveVolumes: true, + Force: true, + }) +} + +// Close closes the underlying client +func (c *Client) Close() error { + return c.cli.Close() +}