From e6cae466ab81bfcd3f6df94332991ee0669a4c8e Mon Sep 17 00:00:00 2001 From: Craig MacKenzie Date: Tue, 23 Apr 2024 08:54:05 -0400 Subject: [PATCH 1/6] Agentbeat: remove unused files from package and silence new error message (#39064) * Remove Beat config files from agentbeat. Beats no longer reads config files at startup when run as part of agent. * Silence new error message with agentbeat. * Add comment to log line. --- dev-tools/packaging/package_test.go | 4 +-- filebeat/fileset/modules.go | 6 +++- .../dev-tools/packaging/packages.yml | 31 ------------------- 3 files changed, 7 insertions(+), 34 deletions(-) diff --git a/dev-tools/packaging/package_test.go b/dev-tools/packaging/package_test.go index fff920b429c..308610b4760 100644 --- a/dev-tools/packaging/package_test.go +++ b/dev-tools/packaging/package_test.go @@ -263,7 +263,7 @@ func checkConfigPermissionsWithMode(t *testing.T, p *packageFile, expectedMode o return } } - t.Errorf("no config file found matching %v", configFilePattern) + t.Logf("no config file found matching %v", configFilePattern) }) } @@ -288,7 +288,7 @@ func checkConfigOwner(t *testing.T, p *packageFile, expectRoot bool) { return } } - t.Errorf("no config file found matching %v", configFilePattern) + t.Logf("no config file found matching %v", configFilePattern) }) } diff --git a/filebeat/fileset/modules.go b/filebeat/fileset/modules.go index b1482033c91..a3d2c82f6ec 100644 --- a/filebeat/fileset/modules.go +++ b/filebeat/fileset/modules.go @@ -29,6 +29,7 @@ import ( "gopkg.in/yaml.v2" "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/common/fleetmode" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/paths" @@ -149,7 +150,10 @@ func NewModuleRegistry(moduleConfigs []*conf.C, beatInfo beat.Info, init bool, f stat, err := os.Stat(modulesPath) if err != nil || !stat.IsDir() { log := logp.NewLogger(logName) - log.Errorf("Not loading modules. Module directory not found: %s", modulesPath) + if !fleetmode.Enabled() { + // When run under agent via agentbeat there is no modules directory and this is expected. + log.Errorf("Not loading modules. Module directory not found: %s", modulesPath) + } return &ModuleRegistry{log: log}, nil //nolint:nilerr // empty registry, no error } diff --git a/x-pack/agentbeat/dev-tools/packaging/packages.yml b/x-pack/agentbeat/dev-tools/packaging/packages.yml index b753d88bf2a..98c5c0f8289 100644 --- a/x-pack/agentbeat/dev-tools/packaging/packages.yml +++ b/x-pack/agentbeat/dev-tools/packaging/packages.yml @@ -24,9 +24,6 @@ shared: NOTICE.txt: source: '{{ repo.RootDir }}/NOTICE.txt' mode: 0644 - README.md: - template: '{{ elastic_beats_dir }}/dev-tools/packaging/templates/common/README.md.tmpl' - mode: 0644 .build_hash.txt: content: > {{ commit }} @@ -35,32 +32,6 @@ shared: source: '{{.BeatName}}.spec.yml' mode: 0644 - - &config_files - 'auditbeat.yml': - source: '{{ repo.RootDir }}/x-pack/auditbeat/auditbeat.yml' - mode: 0600 - config: true - 'filebeat.yml': - source: '{{ repo.RootDir }}/x-pack/filebeat/filebeat.yml' - mode: 0600 - config: true - 'heartbeat.yml': - source: '{{ repo.RootDir }}/x-pack/heartbeat/heartbeat.yml' - mode: 0600 - config: true - 'metricbeat.yml': - source: '{{ repo.RootDir }}/x-pack/metricbeat/metricbeat.yml' - mode: 0600 - config: true - 'osquerybeat.yml': - source: '{{ repo.RootDir }}/x-pack/osquerybeat/osquerybeat.yml' - mode: 0600 - config: true - 'packetbeat.yml': - source: '{{ repo.RootDir }}/x-pack/packetbeat/packetbeat.yml' - mode: 0600 - config: true - - &unix_osquery_files 'osquery-extension.ext': source: '{{ repo.RootDir }}/x-pack/osquerybeat/ext/osquery-extension/build/golang-crossbuild/osquery-extension-{{.GOOS}}-{{.Platform.Arch}}{{.BinaryExt}}' @@ -76,14 +47,12 @@ shared: <<: *common files: <<: *binary_files - <<: *config_files <<: *unix_osquery_files - &windows_binary_spec <<: *common files: <<: *binary_files - <<: *config_files <<: *windows_osquery_files # License modifiers for the Elastic License From e53eb0c4aca1a966e5b79937c7aea7085e151215 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Constan=C3=A7a=20Manteigas?= <113898685+constanca-m@users.noreply.github.com> Date: Tue, 23 Apr 2024 17:42:38 +0200 Subject: [PATCH 2/6] [Metricbeat][Kubernetes] Remove mandatory permissions for namespace and node (#38762) * remove mandatory permissions --------- Signed-off-by: constanca Co-authored-by: Michael Katsoulis --- CHANGELOG-developer.next.asciidoc | 1 + .../autodiscover/providers/kubernetes/pod.go | 35 +++--- .../providers/kubernetes/pod_test.go | 107 ++++++++++++++++++ .../providers/kubernetes/service.go | 20 ++-- .../providers/kubernetes/service_test.go | 98 ++++++++++++++++ .../add_kubernetes_metadata/kubernetes.go | 28 +++-- .../module/kubernetes/util/kubernetes.go | 57 ++++++++-- .../module/kubernetes/util/kubernetes_test.go | 12 ++ 8 files changed, 314 insertions(+), 44 deletions(-) diff --git a/CHANGELOG-developer.next.asciidoc b/CHANGELOG-developer.next.asciidoc index 8875b834e66..f74e4a72782 100644 --- a/CHANGELOG-developer.next.asciidoc +++ b/CHANGELOG-developer.next.asciidoc @@ -70,6 +70,7 @@ The list below covers the major changes between 7.0.0-rc2 and main only. ==== Bugfixes +- Handle the starting of namespace and node watchers for metadata enrichment according to `add_resource_metadata` configuration.{pull}38762[38762] - Fix multiple metricbeat instances reporting same metrics when using autodiscover with provider kubernetes, and ensure leader elector is always running in autodiscover mode.{pull}38471[38471] - Fix how Prometheus histograms are calculated when percentiles are provide.{pull}36537[36537] - Stop using `mage:import` in community beats. This was ignoring the vendorized beats directory for some mage targets, using the code available in GOPATH, this causes inconsistencies and compilation problems if the version of the code in the GOPATH is different to the vendored one. Use of `mage:import` will continue to be unsupported in custom beats till beats is migrated to go modules, or mage supports vendored dependencies. {issue}13998[13998] {pull}14162[14162] diff --git a/libbeat/autodiscover/providers/kubernetes/pod.go b/libbeat/autodiscover/providers/kubernetes/pod.go index d849039a66e..569b2d21cd4 100644 --- a/libbeat/autodiscover/providers/kubernetes/pod.go +++ b/libbeat/autodiscover/providers/kubernetes/pod.go @@ -59,7 +59,7 @@ type pod struct { func NewPodEventer(uuid uuid.UUID, cfg *conf.C, client k8s.Interface, publish func(event []bus.Event)) (Eventer, error) { logger := logp.NewLogger("autodiscover.pod") - var replicaSetWatcher, jobWatcher kubernetes.Watcher + var replicaSetWatcher, jobWatcher, namespaceWatcher, nodeWatcher kubernetes.Watcher config := defaultConfig() err := cfg.Unpack(&config) @@ -96,22 +96,27 @@ func NewPodEventer(uuid uuid.UUID, cfg *conf.C, client k8s.Interface, publish fu return nil, fmt.Errorf("couldn't create watcher for %T due to error %w", &kubernetes.Pod{}, err) } - options := kubernetes.WatchOptions{ - SyncTimeout: config.SyncPeriod, - Node: config.Node, - Namespace: config.Namespace, - } - metaConf := config.AddResourceMetadata - nodeWatcher, err := kubernetes.NewNamedWatcher("node", client, &kubernetes.Node{}, options, nil) - if err != nil { - logger.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Node{}, err) + + if metaConf.Node.Enabled() || config.Hints.Enabled() { + options := kubernetes.WatchOptions{ + SyncTimeout: config.SyncPeriod, + Node: config.Node, + Namespace: config.Namespace, + } + nodeWatcher, err = kubernetes.NewNamedWatcher("node", client, &kubernetes.Node{}, options, nil) + if err != nil { + logger.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Node{}, err) + } } - namespaceWatcher, err := kubernetes.NewNamedWatcher("namespace", client, &kubernetes.Namespace{}, kubernetes.WatchOptions{ - SyncTimeout: config.SyncPeriod, - }, nil) - if err != nil { - logger.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Namespace{}, err) + + if metaConf.Namespace.Enabled() || config.Hints.Enabled() { + namespaceWatcher, err = kubernetes.NewNamedWatcher("namespace", client, &kubernetes.Namespace{}, kubernetes.WatchOptions{ + SyncTimeout: config.SyncPeriod, + }, nil) + if err != nil { + logger.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Namespace{}, err) + } } // Resource is Pod so we need to create watchers for Replicasets and Jobs that it might belongs to diff --git a/libbeat/autodiscover/providers/kubernetes/pod_test.go b/libbeat/autodiscover/providers/kubernetes/pod_test.go index 4cc2d8bb393..3a60342444a 100644 --- a/libbeat/autodiscover/providers/kubernetes/pod_test.go +++ b/libbeat/autodiscover/providers/kubernetes/pod_test.go @@ -2108,6 +2108,113 @@ func TestNodePodUpdater(t *testing.T) { } } +func TestPodEventer_Namespace_Node_Watcher(t *testing.T) { + client := k8sfake.NewSimpleClientset() + uuid, err := uuid.NewV4() + if err != nil { + t.Fatal(err) + } + + tests := []struct { + cfg mapstr.M + expectedNil bool + name string + msg string + }{ + { + cfg: mapstr.M{ + "resource": "pod", + "node": "node-1", + "add_resource_metadata": mapstr.M{ + "namespace.enabled": false, + "node.enabled": false, + }, + "hints.enabled": false, + "builders": []mapstr.M{ + { + "mock": mapstr.M{}, + }, + }, + }, + expectedNil: true, + name: "add_resource_metadata.namespace and add_resource_metadata.node disabled and hints disabled.", + msg: "Watcher should be nil.", + }, + { + cfg: mapstr.M{ + "resource": "pod", + "node": "node-1", + "add_resource_metadata": mapstr.M{ + "namespace.enabled": false, + "node.enabled": false, + }, + "hints.enabled": true, + }, + expectedNil: false, + name: "add_resource_metadata.namespace and add_resource_metadata.node disabled and hints enabled.", + msg: "Watcher should not be nil.", + }, + { + cfg: mapstr.M{ + "resource": "pod", + "node": "node-1", + "add_resource_metadata": mapstr.M{ + "namespace.enabled": true, + "node.enabled": true, + }, + "hints.enabled": false, + "builders": []mapstr.M{ + { + "mock": mapstr.M{}, + }, + }, + }, + expectedNil: false, + name: "add_resource_metadata.namespace and add_resource_metadata.node enabled and hints disabled.", + msg: "Watcher should not be nil.", + }, + { + cfg: mapstr.M{ + "resource": "pod", + "node": "node-1", + "builders": []mapstr.M{ + { + "mock": mapstr.M{}, + }, + }, + }, + expectedNil: false, + name: "add_resource_metadata default and hints default.", + msg: "Watcher should not be nil.", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + config := conf.MustNewConfigFrom(&test.cfg) + c := defaultConfig() + err = config.Unpack(&c) + assert.NoError(t, err) + + eventer, err := NewPodEventer(uuid, config, client, nil) + if err != nil { + t.Fatal(err) + } + + namespaceWatcher := eventer.(*pod).namespaceWatcher + nodeWatcher := eventer.(*pod).nodeWatcher + + if test.expectedNil { + assert.Equalf(t, nil, namespaceWatcher, "Namespace "+test.msg) + assert.Equalf(t, nil, nodeWatcher, "Node "+test.msg) + } else { + assert.NotEqualf(t, nil, namespaceWatcher, "Namespace "+test.msg) + assert.NotEqualf(t, nil, nodeWatcher, "Node "+test.msg) + } + }) + } +} + type mockUpdaterHandler struct { objects []interface{} } diff --git a/libbeat/autodiscover/providers/kubernetes/service.go b/libbeat/autodiscover/providers/kubernetes/service.go index 5a0c6b3cc3f..de6287f7466 100644 --- a/libbeat/autodiscover/providers/kubernetes/service.go +++ b/libbeat/autodiscover/providers/kubernetes/service.go @@ -70,17 +70,19 @@ func NewServiceEventer(uuid uuid.UUID, cfg *conf.C, client k8s.Interface, publis var namespaceMeta metadata.MetaGen var namespaceWatcher kubernetes.Watcher - metaConf := metadata.GetDefaultResourceMetadataConfig() - namespaceWatcher, err = kubernetes.NewNamedWatcher("namespace", client, &kubernetes.Namespace{}, kubernetes.WatchOptions{ - SyncTimeout: config.SyncPeriod, - Namespace: config.Namespace, - }, nil) - if err != nil { - return nil, fmt.Errorf("couldn't create watcher for %T due to error %w", &kubernetes.Namespace{}, err) + metaConf := config.AddResourceMetadata + + if metaConf.Namespace.Enabled() || config.Hints.Enabled() { + namespaceWatcher, err = kubernetes.NewNamedWatcher("namespace", client, &kubernetes.Namespace{}, kubernetes.WatchOptions{ + SyncTimeout: config.SyncPeriod, + Namespace: config.Namespace, + }, nil) + if err != nil { + return nil, fmt.Errorf("couldn't create watcher for %T due to error %w", &kubernetes.Namespace{}, err) + } + namespaceMeta = metadata.NewNamespaceMetadataGenerator(metaConf.Namespace, namespaceWatcher.Store(), client) } - namespaceMeta = metadata.NewNamespaceMetadataGenerator(metaConf.Namespace, namespaceWatcher.Store(), client) - p := &service{ config: config, uuid: uuid, diff --git a/libbeat/autodiscover/providers/kubernetes/service_test.go b/libbeat/autodiscover/providers/kubernetes/service_test.go index 510ac6ebd0d..90ff678e11c 100644 --- a/libbeat/autodiscover/providers/kubernetes/service_test.go +++ b/libbeat/autodiscover/providers/kubernetes/service_test.go @@ -432,6 +432,104 @@ func TestEmitEvent_Service(t *testing.T) { } } +func TestServiceEventer_NamespaceWatcher(t *testing.T) { + client := k8sfake.NewSimpleClientset() + uuid, err := uuid.NewV4() + if err != nil { + t.Fatal(err) + } + + tests := []struct { + cfg mapstr.M + expectedNil bool + name string + msg string + }{ + { + cfg: mapstr.M{ + "resource": "service", + "node": "node-1", + "add_resource_metadata": mapstr.M{ + "namespace.enabled": false, + }, + "hints.enabled": false, + "builders": []mapstr.M{ + { + "mock": mapstr.M{}, + }, + }, + }, + expectedNil: true, + name: "add_resource_metadata.namespace disabled and hints disabled.", + msg: "Namespace watcher should be nil.", + }, + { + cfg: mapstr.M{ + "resource": "service", + "node": "node-1", + "add_resource_metadata": mapstr.M{ + "namespace.enabled": false, + }, + "hints.enabled": true, + }, + expectedNil: false, + name: "add_resource_metadata.namespace disabled and hints enabled.", + msg: "Namespace watcher should not be nil.", + }, + { + cfg: mapstr.M{ + "resource": "service", + "node": "node-1", + "add_resource_metadata": mapstr.M{ + "namespace.enabled": true, + }, + "hints.enabled": false, + "builders": []mapstr.M{ + { + "mock": mapstr.M{}, + }, + }, + }, + expectedNil: false, + name: "add_resource_metadata.namespace enabled and hints disabled.", + msg: "Namespace watcher should not be nil.", + }, + { + cfg: mapstr.M{ + "resource": "pod", + "node": "node-1", + "builders": []mapstr.M{ + { + "mock": mapstr.M{}, + }, + }, + }, + expectedNil: false, + name: "add_resource_metadata default and hints default.", + msg: "Watcher should not be nil.", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + config := conf.MustNewConfigFrom(&test.cfg) + + eventer, err := NewServiceEventer(uuid, config, client, nil) + if err != nil { + t.Fatal(err) + } + + namespaceWatcher := eventer.(*service).namespaceWatcher + + if test.expectedNil { + assert.Equalf(t, nil, namespaceWatcher, test.msg) + } else { + assert.NotEqualf(t, nil, namespaceWatcher, test.msg) + } + }) + } +} + func NewMockServiceEventerManager(svc *service) EventManager { em := &eventerManager{} em.eventer = svc diff --git a/libbeat/processors/add_kubernetes_metadata/kubernetes.go b/libbeat/processors/add_kubernetes_metadata/kubernetes.go index 954a59ab3f1..f9143cdf289 100644 --- a/libbeat/processors/add_kubernetes_metadata/kubernetes.go +++ b/libbeat/processors/add_kubernetes_metadata/kubernetes.go @@ -27,13 +27,14 @@ import ( k8sclient "k8s.io/client-go/kubernetes" - "github.com/elastic/beats/v7/libbeat/beat" - "github.com/elastic/beats/v7/libbeat/processors" "github.com/elastic/elastic-agent-autodiscover/kubernetes" "github.com/elastic/elastic-agent-autodiscover/kubernetes/metadata" "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/processors" ) const ( @@ -144,7 +145,7 @@ func newProcessorConfig(cfg *config.C, register *Register) (kubeAnnotatorConfig, func (k *kubernetesAnnotator) init(config kubeAnnotatorConfig, cfg *config.C) { k.initOnce.Do(func() { - var replicaSetWatcher, jobWatcher kubernetes.Watcher + var replicaSetWatcher, jobWatcher, namespaceWatcher, nodeWatcher kubernetes.Watcher client, err := kubernetes.GetKubernetesClient(config.KubeConfig, config.KubeClientOptions) if err != nil { @@ -203,15 +204,20 @@ func (k *kubernetesAnnotator) init(config kubeAnnotatorConfig, cfg *config.C) { Namespace: config.Namespace, } - nodeWatcher, err := kubernetes.NewNamedWatcher("add_kubernetes_metadata_node", client, &kubernetes.Node{}, options, nil) - if err != nil { - k.log.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Node{}, err) + if metaConf.Node.Enabled() { + nodeWatcher, err = kubernetes.NewNamedWatcher("add_kubernetes_metadata_node", client, &kubernetes.Node{}, options, nil) + if err != nil { + k.log.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Node{}, err) + } } - namespaceWatcher, err := kubernetes.NewNamedWatcher("add_kubernetes_metadata_namespace", client, &kubernetes.Namespace{}, kubernetes.WatchOptions{ - SyncTimeout: config.SyncPeriod, - }, nil) - if err != nil { - k.log.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Namespace{}, err) + + if metaConf.Namespace.Enabled() { + namespaceWatcher, err = kubernetes.NewNamedWatcher("add_kubernetes_metadata_namespace", client, &kubernetes.Namespace{}, kubernetes.WatchOptions{ + SyncTimeout: config.SyncPeriod, + }, nil) + if err != nil { + k.log.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Namespace{}, err) + } } // Resource is Pod so we need to create watchers for Replicasets and Jobs that it might belongs to diff --git a/metricbeat/module/kubernetes/util/kubernetes.go b/metricbeat/module/kubernetes/util/kubernetes.go index d89ca006f0e..a0c409ca14e 100644 --- a/metricbeat/module/kubernetes/util/kubernetes.go +++ b/metricbeat/module/kubernetes/util/kubernetes.go @@ -166,7 +166,14 @@ func getResource(resourceName string) kubernetes.Resource { func getExtraWatchers(resourceName string, addResourceMetadata *metadata.AddResourceMetadataConfig) []string { switch resourceName { case PodResource: - extra := []string{NamespaceResource, NodeResource} + extra := []string{} + if addResourceMetadata.Node.Enabled() { + extra = append(extra, NodeResource) + } + if addResourceMetadata.Namespace.Enabled() { + extra = append(extra, NamespaceResource) + } + // We need to create watchers for ReplicaSets and Jobs that it might belong to, // in order to be able to retrieve 2nd layer Owner metadata like in case of: // Deployment -> Replicaset -> Pod @@ -179,23 +186,55 @@ func getExtraWatchers(resourceName string, addResourceMetadata *metadata.AddReso } return extra case ServiceResource: - return []string{NamespaceResource} + extra := []string{} + if addResourceMetadata.Namespace.Enabled() { + extra = append(extra, NamespaceResource) + } + return extra case DeploymentResource: - return []string{NamespaceResource} + extra := []string{} + if addResourceMetadata.Namespace.Enabled() { + extra = append(extra, NamespaceResource) + } + return extra case ReplicaSetResource: - return []string{NamespaceResource} + extra := []string{} + if addResourceMetadata.Namespace.Enabled() { + extra = append(extra, NamespaceResource) + } + return extra case StatefulSetResource: - return []string{NamespaceResource} + extra := []string{} + if addResourceMetadata.Namespace.Enabled() { + extra = append(extra, NamespaceResource) + } + return extra case DaemonSetResource: - return []string{NamespaceResource} + extra := []string{} + if addResourceMetadata.Namespace.Enabled() { + extra = append(extra, NamespaceResource) + } + return extra case JobResource: - return []string{NamespaceResource} + extra := []string{} + if addResourceMetadata.Namespace.Enabled() { + extra = append(extra, NamespaceResource) + } + return extra case CronJobResource: - return []string{NamespaceResource} + extra := []string{} + if addResourceMetadata.Namespace.Enabled() { + extra = append(extra, NamespaceResource) + } + return extra case PersistentVolumeResource: return []string{} case PersistentVolumeClaimResource: - return []string{NamespaceResource} + extra := []string{} + if addResourceMetadata.Namespace.Enabled() { + extra = append(extra, NamespaceResource) + } + return extra case StorageClassResource: return []string{} case NodeResource: diff --git a/metricbeat/module/kubernetes/util/kubernetes_test.go b/metricbeat/module/kubernetes/util/kubernetes_test.go index b4e528100a6..8235e73a277 100644 --- a/metricbeat/module/kubernetes/util/kubernetes_test.go +++ b/metricbeat/module/kubernetes/util/kubernetes_test.go @@ -265,6 +265,12 @@ func TestCreateMetaGenSpecific(t *testing.T) { require.NoError(t, err) log := logp.NewLogger("test") + + namespaceConfig, err := conf.NewConfigFrom(map[string]interface{}{ + "enabled": true, + }) + require.NoError(t, err) + config := &kubernetesConfig{ Namespace: "test-ns", SyncPeriod: time.Minute, @@ -272,6 +278,7 @@ func TestCreateMetaGenSpecific(t *testing.T) { AddResourceMetadata: &metadata.AddResourceMetadataConfig{ CronJob: false, Deployment: true, + Namespace: namespaceConfig, }, } client := k8sfake.NewSimpleClientset() @@ -326,6 +333,10 @@ func TestBuildMetadataEnricher_Start_Stop(t *testing.T) { resourceWatchers.lock.Unlock() funcs := mockFuncs{} + namespaceConfig, err := conf.NewConfigFrom(map[string]interface{}{ + "enabled": true, + }) + require.NoError(t, err) config := &kubernetesConfig{ Namespace: "test-ns", SyncPeriod: time.Minute, @@ -333,6 +344,7 @@ func TestBuildMetadataEnricher_Start_Stop(t *testing.T) { AddResourceMetadata: &metadata.AddResourceMetadataConfig{ CronJob: false, Deployment: false, + Namespace: namespaceConfig, }, } From deece39d2a5897014bea6228bfbd141c98fe63ed Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Tue, 23 Apr 2024 12:27:48 -0400 Subject: [PATCH 3/6] Fix awscloudwatch worker allocation (#38953) Fix a bug in cloudwatch worker allocation that could cause data loss (https://github.com/elastic/beats/issues/38918). The previous behavior wasn't really tested, since worker tasks were computed in cloudwatchPoller's polling loop which required live AWS connections. So in addition to the basic logical fix, I did some refactoring to cloudwatchPoller that makes the task iteration visible to unit tests. --- CHANGELOG.next.asciidoc | 1 + .../input/awscloudwatch/cloudwatch.go | 134 +++++++++--- .../input/awscloudwatch/cloudwatch_test.go | 207 ++++++++++++++++++ x-pack/filebeat/input/awscloudwatch/input.go | 101 +-------- .../input/awscloudwatch/input_test.go | 103 --------- 5 files changed, 316 insertions(+), 230 deletions(-) create mode 100644 x-pack/filebeat/input/awscloudwatch/cloudwatch_test.go diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index c28b56a184e..3ee1edbffa3 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -132,6 +132,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Fix panic when more than 32767 pipeline clients are active. {issue}38197[38197] {pull}38556[38556] - Fix filestream's registry GC: registry entries are now removed from the in-memory and disk store when they're older than the set TTL {issue}36761[36761] {pull}38488[38488] - [threatintel] MISP splitting fix for empty responses {issue}38739[38739] {pull}38917[38917] +- Fix a bug in cloudwatch task allocation that could skip some logs {issue}38918[38918] {pull}38953[38953] - Prevent GCP Pub/Sub input blockage by increasing default value of `max_outstanding_messages` {issue}35029[35029] {pull}38985[38985] - entity-analytics input: Improve structured logging. {pull}38990[38990] - Fix config validation for CEL and HTTPJSON inputs when using password grant authentication and `client.id` or `client.secret` are not present. {pull}38962[38962] diff --git a/x-pack/filebeat/input/awscloudwatch/cloudwatch.go b/x-pack/filebeat/input/awscloudwatch/cloudwatch.go index ca54721bd27..d85480891a0 100644 --- a/x-pack/filebeat/input/awscloudwatch/cloudwatch.go +++ b/x-pack/filebeat/input/awscloudwatch/cloudwatch.go @@ -14,61 +14,69 @@ import ( awssdk "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs" - awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws" "github.com/elastic/elastic-agent-libs/logp" ) type cloudwatchPoller struct { - numberOfWorkers int - apiSleep time.Duration + config config region string - logStreams []*string - logStreamPrefix string - startTime int64 - endTime int64 - workerSem *awscommon.Sem log *logp.Logger metrics *inputMetrics workersListingMap *sync.Map workersProcessingMap *sync.Map + + // When a worker is ready for its next task, it should + // send to workRequestChan and then read from workResponseChan. + // The worker can cancel the request based on other context + // cancellations, but if the write succeeds it _must_ read from + // workResponseChan to avoid deadlocking the main loop. + workRequestChan chan struct{} + workResponseChan chan workResponse + + workerWg sync.WaitGroup +} + +type workResponse struct { + logGroup string + startTime, endTime time.Time } func newCloudwatchPoller(log *logp.Logger, metrics *inputMetrics, - awsRegion string, apiSleep time.Duration, - numberOfWorkers int, logStreams []*string, logStreamPrefix string) *cloudwatchPoller { + awsRegion string, config config) *cloudwatchPoller { if metrics == nil { metrics = newInputMetrics("", nil) } return &cloudwatchPoller{ - numberOfWorkers: numberOfWorkers, - apiSleep: apiSleep, - region: awsRegion, - logStreams: logStreams, - logStreamPrefix: logStreamPrefix, - startTime: int64(0), - endTime: int64(0), - workerSem: awscommon.NewSem(numberOfWorkers), log: log, metrics: metrics, + region: awsRegion, + config: config, workersListingMap: new(sync.Map), workersProcessingMap: new(sync.Map), + // workRequestChan is unbuffered to guarantee that + // the worker and main loop agree whether a request + // was sent. workerResponseChan is buffered so the + // main loop doesn't have to block on the workers + // while distributing new data. + workRequestChan: make(chan struct{}), + workResponseChan: make(chan workResponse, 10), } } -func (p *cloudwatchPoller) run(svc *cloudwatchlogs.Client, logGroup string, startTime int64, endTime int64, logProcessor *logProcessor) { +func (p *cloudwatchPoller) run(svc *cloudwatchlogs.Client, logGroup string, startTime, endTime time.Time, logProcessor *logProcessor) { err := p.getLogEventsFromCloudWatch(svc, logGroup, startTime, endTime, logProcessor) if err != nil { var errRequestCanceled *awssdk.RequestCanceledError if errors.As(err, &errRequestCanceled) { - p.log.Error("getLogEventsFromCloudWatch failed with RequestCanceledError: ", err) + p.log.Error("getLogEventsFromCloudWatch failed with RequestCanceledError: ", errRequestCanceled) } p.log.Error("getLogEventsFromCloudWatch failed: ", err) } } // getLogEventsFromCloudWatch uses FilterLogEvents API to collect logs from CloudWatch -func (p *cloudwatchPoller) getLogEventsFromCloudWatch(svc *cloudwatchlogs.Client, logGroup string, startTime int64, endTime int64, logProcessor *logProcessor) error { +func (p *cloudwatchPoller) getLogEventsFromCloudWatch(svc *cloudwatchlogs.Client, logGroup string, startTime, endTime time.Time, logProcessor *logProcessor) error { // construct FilterLogEventsInput filterLogEventsInput := p.constructFilterLogEventsInput(startTime, endTime, logGroup) paginator := cloudwatchlogs.NewFilterLogEventsPaginator(svc, filterLogEventsInput) @@ -83,8 +91,8 @@ func (p *cloudwatchPoller) getLogEventsFromCloudWatch(svc *cloudwatchlogs.Client p.metrics.logEventsReceivedTotal.Add(uint64(len(logEvents))) // This sleep is to avoid hitting the FilterLogEvents API limit(5 transactions per second (TPS)/account/Region). - p.log.Debugf("sleeping for %v before making FilterLogEvents API call again", p.apiSleep) - time.Sleep(p.apiSleep) + p.log.Debugf("sleeping for %v before making FilterLogEvents API call again", p.config.APISleep) + time.Sleep(p.config.APISleep) p.log.Debug("done sleeping") p.log.Debugf("Processing #%v events", len(logEvents)) @@ -93,21 +101,87 @@ func (p *cloudwatchPoller) getLogEventsFromCloudWatch(svc *cloudwatchlogs.Client return nil } -func (p *cloudwatchPoller) constructFilterLogEventsInput(startTime int64, endTime int64, logGroup string) *cloudwatchlogs.FilterLogEventsInput { +func (p *cloudwatchPoller) constructFilterLogEventsInput(startTime, endTime time.Time, logGroup string) *cloudwatchlogs.FilterLogEventsInput { filterLogEventsInput := &cloudwatchlogs.FilterLogEventsInput{ LogGroupName: awssdk.String(logGroup), - StartTime: awssdk.Int64(startTime), - EndTime: awssdk.Int64(endTime), + StartTime: awssdk.Int64(startTime.UnixNano() / int64(time.Millisecond)), + EndTime: awssdk.Int64(endTime.UnixNano() / int64(time.Millisecond)), } - if len(p.logStreams) > 0 { - for _, stream := range p.logStreams { + if len(p.config.LogStreams) > 0 { + for _, stream := range p.config.LogStreams { filterLogEventsInput.LogStreamNames = append(filterLogEventsInput.LogStreamNames, *stream) } } - if p.logStreamPrefix != "" { - filterLogEventsInput.LogStreamNamePrefix = awssdk.String(p.logStreamPrefix) + if p.config.LogStreamPrefix != "" { + filterLogEventsInput.LogStreamNamePrefix = awssdk.String(p.config.LogStreamPrefix) } return filterLogEventsInput } + +func (p *cloudwatchPoller) startWorkers( + ctx context.Context, + svc *cloudwatchlogs.Client, + logProcessor *logProcessor, +) { + for i := 0; i < p.config.NumberOfWorkers; i++ { + p.workerWg.Add(1) + go func() { + defer p.workerWg.Done() + for { + var work workResponse + select { + case <-ctx.Done(): + return + case p.workRequestChan <- struct{}{}: + work = <-p.workResponseChan + } + + p.log.Infof("aws-cloudwatch input worker for log group: '%v' has started", work.logGroup) + p.run(svc, work.logGroup, work.startTime, work.endTime, logProcessor) + p.log.Infof("aws-cloudwatch input worker for log group '%v' has stopped.", work.logGroup) + } + }() + } +} + +// receive implements the main run loop that distributes tasks to the worker +// goroutines. It accepts a "clock" callback (which on a live input should +// equal time.Now) to allow deterministic unit tests. +func (p *cloudwatchPoller) receive(ctx context.Context, logGroupNames []string, clock func() time.Time) { + defer p.workerWg.Wait() + // startTime and endTime are the bounds of the current scanning interval. + // If we're starting at the end of the logs, advance the start time to the + // most recent scan window + var startTime time.Time + endTime := clock().Add(-p.config.Latency) + if p.config.StartPosition == "end" { + startTime = endTime.Add(-p.config.ScanFrequency) + } + for ctx.Err() == nil { + for _, lg := range logGroupNames { + select { + case <-ctx.Done(): + return + case <-p.workRequestChan: + p.workResponseChan <- workResponse{ + logGroup: lg, + startTime: startTime, + endTime: endTime, + } + } + } + + // Delay for ScanFrequency after finishing a time span + p.log.Debugf("sleeping for %v before checking new logs", p.config.ScanFrequency) + select { + case <-time.After(p.config.ScanFrequency): + case <-ctx.Done(): + } + p.log.Debug("done sleeping") + + // Advance to the next time span + startTime, endTime = endTime, clock().Add(-p.config.Latency) + } +} diff --git a/x-pack/filebeat/input/awscloudwatch/cloudwatch_test.go b/x-pack/filebeat/input/awscloudwatch/cloudwatch_test.go new file mode 100644 index 00000000000..2f8198c021d --- /dev/null +++ b/x-pack/filebeat/input/awscloudwatch/cloudwatch_test.go @@ -0,0 +1,207 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package awscloudwatch + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/elastic-agent-libs/logp" +) + +type clock struct { + time time.Time +} + +func (c *clock) now() time.Time { + return c.time +} + +type receiveTestStep struct { + expected []workResponse + nextTime time.Time +} + +type receiveTestCase struct { + name string + logGroups []string + configOverrides func(*config) + startTime time.Time + steps []receiveTestStep +} + +func TestReceive(t *testing.T) { + // We use a mocked clock so scan frequency can be any positive value. + const defaultScanFrequency = time.Microsecond + t0 := time.Time{} + t1 := t0.Add(time.Hour) + t2 := t1.Add(time.Minute) + t3 := t2.Add(time.Hour) + testCases := []receiveTestCase{ + { + name: "Default config with one log group", + logGroups: []string{"a"}, + startTime: t1, + steps: []receiveTestStep{ + { + expected: []workResponse{ + {logGroup: "a", startTime: t0, endTime: t1}, + }, + nextTime: t2, + }, + { + expected: []workResponse{ + {logGroup: "a", startTime: t1, endTime: t2}, + }, + nextTime: t3, + }, + { + expected: []workResponse{ + {logGroup: "a", startTime: t2, endTime: t3}, + }, + }, + }, + }, + { + name: "Default config with two log groups", + logGroups: []string{"a", "b"}, + startTime: t1, + steps: []receiveTestStep{ + { + expected: []workResponse{ + {logGroup: "a", startTime: t0, endTime: t1}, + }, + nextTime: t2, + }, + { + expected: []workResponse{ + // start/end times for the second log group should be the same + // even though the clock has changed. + {logGroup: "b", startTime: t0, endTime: t1}, + }, + }, + { + expected: []workResponse{ + {logGroup: "a", startTime: t1, endTime: t2}, + {logGroup: "b", startTime: t1, endTime: t2}, + }, + nextTime: t3, + }, + { + expected: []workResponse{ + {logGroup: "a", startTime: t2, endTime: t3}, + {logGroup: "b", startTime: t2, endTime: t3}, + }, + }, + }, + }, + { + name: "One log group with start_position: end", + logGroups: []string{"a"}, + startTime: t1, + configOverrides: func(c *config) { + c.StartPosition = "end" + }, + steps: []receiveTestStep{ + { + expected: []workResponse{ + {logGroup: "a", startTime: t1.Add(-defaultScanFrequency), endTime: t1}, + }, + nextTime: t2, + }, + { + expected: []workResponse{ + {logGroup: "a", startTime: t1, endTime: t2}, + }, + }, + }, + }, + { + name: "Two log group with start_position: end and latency", + logGroups: []string{"a", "b"}, + startTime: t1, + configOverrides: func(c *config) { + c.StartPosition = "end" + c.Latency = time.Second + }, + steps: []receiveTestStep{ + { + expected: []workResponse{ + {logGroup: "a", startTime: t1.Add(-defaultScanFrequency - time.Second), endTime: t1.Add(-time.Second)}, + {logGroup: "b", startTime: t1.Add(-defaultScanFrequency - time.Second), endTime: t1.Add(-time.Second)}, + }, + nextTime: t2, + }, + { + expected: []workResponse{ + {logGroup: "a", startTime: t1.Add(-time.Second), endTime: t2.Add(-time.Second)}, + {logGroup: "b", startTime: t1.Add(-time.Second), endTime: t2.Add(-time.Second)}, + }, + }, + }, + }, + { + name: "Three log groups with latency", + logGroups: []string{"a", "b", "c"}, + startTime: t1, + configOverrides: func(c *config) { + c.Latency = time.Second + }, + steps: []receiveTestStep{ + { + expected: []workResponse{ + {logGroup: "a", startTime: t0, endTime: t1.Add(-time.Second)}, + {logGroup: "b", startTime: t0, endTime: t1.Add(-time.Second)}, + {logGroup: "c", startTime: t0, endTime: t1.Add(-time.Second)}, + }, + nextTime: t2, + }, + { + expected: []workResponse{ + {logGroup: "a", startTime: t1.Add(-time.Second), endTime: t2.Add(-time.Second)}, + {logGroup: "b", startTime: t1.Add(-time.Second), endTime: t2.Add(-time.Second)}, + {logGroup: "c", startTime: t1.Add(-time.Second), endTime: t2.Add(-time.Second)}, + }, + }, + }, + }, + } + clock := &clock{} + for stepIndex, test := range testCases { + ctx, cancel := context.WithCancel(context.Background()) + p := &cloudwatchPoller{ + workRequestChan: make(chan struct{}), + // Unlike the live cwPoller, we make workResponseChan unbuffered, + // so we can guarantee that clock updates happen when cwPoller has already + // decided on its output + workResponseChan: make(chan workResponse), + log: logp.NewLogger("test"), + } + + p.config = defaultConfig() + p.config.ScanFrequency = defaultScanFrequency + if test.configOverrides != nil { + test.configOverrides(&p.config) + } + clock.time = test.startTime + go p.receive(ctx, test.logGroups, clock.now) + for _, step := range test.steps { + for i, expected := range step.expected { + p.workRequestChan <- struct{}{} + if i+1 == len(step.expected) && !step.nextTime.Equal(time.Time{}) { + // On the last request of the step, we advance the clock if a + // time is set + clock.time = step.nextTime + } + response := <-p.workResponseChan + assert.Equalf(t, expected, response, "%v: step %v response %v doesn't match", test.name, stepIndex, i) + } + } + cancel() + } +} diff --git a/x-pack/filebeat/input/awscloudwatch/input.go b/x-pack/filebeat/input/awscloudwatch/input.go index 4ee9daa05ad..75f22e6625a 100644 --- a/x-pack/filebeat/input/awscloudwatch/input.go +++ b/x-pack/filebeat/input/awscloudwatch/input.go @@ -6,10 +6,8 @@ package awscloudwatch import ( "context" - "errors" "fmt" "strings" - "sync" "time" awssdk "github.com/aws/aws-sdk-go-v2/aws" @@ -137,82 +135,12 @@ func (in *cloudwatchInput) Run(inputContext v2.Context, pipeline beat.Pipeline) log.Named("cloudwatch_poller"), in.metrics, in.awsConfig.Region, - in.config.APISleep, - in.config.NumberOfWorkers, - in.config.LogStreams, - in.config.LogStreamPrefix) + in.config) logProcessor := newLogProcessor(log.Named("log_processor"), in.metrics, client, ctx) cwPoller.metrics.logGroupsTotal.Add(uint64(len(logGroupNames))) - return in.Receive(svc, cwPoller, ctx, logProcessor, logGroupNames) -} - -func (in *cloudwatchInput) Receive(svc *cloudwatchlogs.Client, cwPoller *cloudwatchPoller, ctx context.Context, logProcessor *logProcessor, logGroupNames []string) error { - // This loop tries to keep the workers busy as much as possible while - // honoring the number in config opposed to a simpler loop that does one - // listing, sequentially processes every object and then does another listing - start := true - workerWg := new(sync.WaitGroup) - lastLogGroupOffset := 0 - for ctx.Err() == nil { - if !start { - cwPoller.log.Debugf("sleeping for %v before checking new logs", in.config.ScanFrequency) - time.Sleep(in.config.ScanFrequency) - cwPoller.log.Debug("done sleeping") - } - start = false - - currentTime := time.Now() - cwPoller.startTime, cwPoller.endTime = getStartPosition(in.config.StartPosition, currentTime, cwPoller.endTime, in.config.ScanFrequency, in.config.Latency) - cwPoller.log.Debugf("start_position = %s, startTime = %v, endTime = %v", in.config.StartPosition, time.Unix(cwPoller.startTime/1000, 0), time.Unix(cwPoller.endTime/1000, 0)) - availableWorkers, err := cwPoller.workerSem.AcquireContext(in.config.NumberOfWorkers, ctx) - if err != nil { - break - } - - if availableWorkers == 0 { - continue - } - - workerWg.Add(availableWorkers) - logGroupNamesLength := len(logGroupNames) - runningGoroutines := 0 - - for i := lastLogGroupOffset; i < logGroupNamesLength; i++ { - if runningGoroutines >= availableWorkers { - break - } - - runningGoroutines++ - lastLogGroupOffset = i + 1 - if lastLogGroupOffset >= logGroupNamesLength { - // release unused workers - cwPoller.workerSem.Release(availableWorkers - runningGoroutines) - for j := 0; j < availableWorkers-runningGoroutines; j++ { - workerWg.Done() - } - lastLogGroupOffset = 0 - } - - lg := logGroupNames[i] - go func(logGroup string, startTime int64, endTime int64) { - defer func() { - cwPoller.log.Infof("aws-cloudwatch input worker for log group '%v' has stopped.", logGroup) - workerWg.Done() - cwPoller.workerSem.Release(1) - }() - cwPoller.log.Infof("aws-cloudwatch input worker for log group: '%v' has started", logGroup) - cwPoller.run(svc, logGroup, startTime, endTime, logProcessor) - }(lg, cwPoller.startTime, cwPoller.endTime) - } - } - - // Wait for all workers to finish. - workerWg.Wait() - if errors.Is(ctx.Err(), context.Canceled) { - // A canceled context is a normal shutdown. - return nil - } - return ctx.Err() + cwPoller.startWorkers(ctx, svc, logProcessor) + cwPoller.receive(ctx, logGroupNames, time.Now) + return nil } func parseARN(logGroupARN string) (string, string, error) { @@ -256,24 +184,3 @@ func getLogGroupNames(svc *cloudwatchlogs.Client, logGroupNamePrefix string, log } return logGroupNames, nil } - -func getStartPosition(startPosition string, currentTime time.Time, endTime int64, scanFrequency time.Duration, latency time.Duration) (int64, int64) { - if latency != 0 { - // add latency if config is not 0 - currentTime = currentTime.Add(latency * -1) - } - - switch startPosition { - case "beginning": - if endTime != int64(0) { - return endTime, currentTime.UnixNano() / int64(time.Millisecond) - } - return 0, currentTime.UnixNano() / int64(time.Millisecond) - case "end": - if endTime != int64(0) { - return endTime, currentTime.UnixNano() / int64(time.Millisecond) - } - return currentTime.Add(-scanFrequency).UnixNano() / int64(time.Millisecond), currentTime.UnixNano() / int64(time.Millisecond) - } - return 0, 0 -} diff --git a/x-pack/filebeat/input/awscloudwatch/input_test.go b/x-pack/filebeat/input/awscloudwatch/input_test.go index c51c6a072f4..4f9754c6a13 100644 --- a/x-pack/filebeat/input/awscloudwatch/input_test.go +++ b/x-pack/filebeat/input/awscloudwatch/input_test.go @@ -15,109 +15,6 @@ import ( "github.com/elastic/elastic-agent-libs/mapstr" ) -func TestGetStartPosition(t *testing.T) { - currentTime := time.Date(2020, time.June, 1, 0, 0, 0, 0, time.UTC) - cases := []struct { - title string - startPosition string - prevEndTime int64 - scanFrequency time.Duration - latency time.Duration - expectedStartTime int64 - expectedEndTime int64 - }{ - { - "startPosition=beginning", - "beginning", - int64(0), - 30 * time.Second, - 0, - int64(0), - int64(1590969600000), - }, - { - "startPosition=end", - "end", - int64(0), - 30 * time.Second, - 0, - int64(1590969570000), - int64(1590969600000), - }, - { - "startPosition=typo", - "typo", - int64(0), - 30 * time.Second, - 0, - int64(0), - int64(0), - }, - { - "startPosition=beginning with prevEndTime", - "beginning", - int64(1590000000000), - 30 * time.Second, - 0, - int64(1590000000000), - int64(1590969600000), - }, - { - "startPosition=end with prevEndTime", - "end", - int64(1590000000000), - 30 * time.Second, - 0, - int64(1590000000000), - int64(1590969600000), - }, - { - "startPosition=beginning with latency", - "beginning", - int64(0), - 30 * time.Second, - 10 * time.Minute, - int64(0), - int64(1590969000000), - }, - { - "startPosition=beginning with prevEndTime and latency", - "beginning", - int64(1590000000000), - 30 * time.Second, - 10 * time.Minute, - int64(1590000000000), - int64(1590969000000), - }, - { - "startPosition=end with latency", - "end", - int64(0), - 30 * time.Second, - 10 * time.Minute, - int64(1590968970000), - int64(1590969000000), - }, - { - "startPosition=end with prevEndTime and latency", - "end", - int64(1590000000000), - 30 * time.Second, - 10 * time.Minute, - int64(1590000000000), - int64(1590969000000), - }, - } - - for _, c := range cases { - t.Run(c.title, func(t *testing.T) { - startTime, endTime := getStartPosition(c.startPosition, currentTime, c.prevEndTime, c.scanFrequency, c.latency) - assert.Equal(t, c.expectedStartTime, startTime) - assert.Equal(t, c.expectedEndTime, endTime) - }) - } -} - func TestCreateEvent(t *testing.T) { logEvent := &types.FilteredLogEvent{ EventId: awssdk.String("id-1"), From 38f49a9da9c102179c5c2a9a73bf8747b2008e7c Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Tue, 23 Apr 2024 12:29:33 -0400 Subject: [PATCH 4/6] Remove unused ack tracking (#38983) Remove `EventACKTracker` from `awscloudwatch` event handling. `EventACKTracker` is buggy (see https://github.com/elastic/beats/issues/38961) but in the `awscloudwatch` input it's also never used -- its callbacks are attached to events, but their result doesn't affect any of the event handling. This PR doesn't change any functional behavior. --- x-pack/filebeat/input/awscloudwatch/input.go | 4 +--- .../input/awscloudwatch/input_integration_test.go | 7 ------- x-pack/filebeat/input/awscloudwatch/processor.go | 13 ++----------- 3 files changed, 3 insertions(+), 21 deletions(-) diff --git a/x-pack/filebeat/input/awscloudwatch/input.go b/x-pack/filebeat/input/awscloudwatch/input.go index 75f22e6625a..80ed3f2b2bf 100644 --- a/x-pack/filebeat/input/awscloudwatch/input.go +++ b/x-pack/filebeat/input/awscloudwatch/input.go @@ -109,9 +109,7 @@ func (in *cloudwatchInput) Run(inputContext v2.Context, pipeline beat.Pipeline) defer cancelInputCtx() // Create client for publishing events and receive notification of their ACKs. - client, err := pipeline.ConnectWith(beat.ClientConfig{ - EventListener: awscommon.NewEventACKHandler(), - }) + client, err := pipeline.ConnectWith(beat.ClientConfig{}) if err != nil { return fmt.Errorf("failed to create pipeline client: %w", err) } diff --git a/x-pack/filebeat/input/awscloudwatch/input_integration_test.go b/x-pack/filebeat/input/awscloudwatch/input_integration_test.go index f3a45fb5c40..3a5aa179cf0 100644 --- a/x-pack/filebeat/input/awscloudwatch/input_integration_test.go +++ b/x-pack/filebeat/input/awscloudwatch/input_integration_test.go @@ -32,7 +32,6 @@ import ( v2 "github.com/elastic/beats/v7/filebeat/input/v2" pubtest "github.com/elastic/beats/v7/libbeat/publisher/testing" - awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" ) @@ -163,12 +162,6 @@ func TestInputWithLogGroupNamePrefix(t *testing.T) { client := pubtest.NewChanClient(0) defer close(client.Channel) - go func() { - for event := range client.Channel { - // Fake the ACK handling that's not implemented in pubtest. - event.Private.(*awscommon.EventACKTracker).ACK() - } - }() var errGroup errgroup.Group errGroup.Go(func() error { diff --git a/x-pack/filebeat/input/awscloudwatch/processor.go b/x-pack/filebeat/input/awscloudwatch/processor.go index 999cad4d7f0..0ac2bc244d5 100644 --- a/x-pack/filebeat/input/awscloudwatch/processor.go +++ b/x-pack/filebeat/input/awscloudwatch/processor.go @@ -11,7 +11,6 @@ import ( "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs/types" "github.com/elastic/beats/v7/libbeat/beat" - awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" ) @@ -20,7 +19,6 @@ type logProcessor struct { log *logp.Logger metrics *inputMetrics publisher beat.Client - ack *awscommon.EventACKTracker } func newLogProcessor(log *logp.Logger, metrics *inputMetrics, publisher beat.Client, ctx context.Context) *logProcessor { @@ -31,24 +29,17 @@ func newLogProcessor(log *logp.Logger, metrics *inputMetrics, publisher beat.Cli log: log, metrics: metrics, publisher: publisher, - ack: awscommon.NewEventACKTracker(ctx), } } func (p *logProcessor) processLogEvents(logEvents []types.FilteredLogEvent, logGroup string, regionName string) { for _, logEvent := range logEvents { event := createEvent(logEvent, logGroup, regionName) - p.publish(p.ack, &event) + p.metrics.cloudwatchEventsCreatedTotal.Inc() + p.publisher.Publish(event) } } -func (p *logProcessor) publish(ack *awscommon.EventACKTracker, event *beat.Event) { - ack.Add() - event.Private = ack - p.metrics.cloudwatchEventsCreatedTotal.Inc() - p.publisher.Publish(*event) -} - func createEvent(logEvent types.FilteredLogEvent, logGroup string, regionName string) beat.Event { event := beat.Event{ Timestamp: time.Unix(*logEvent.Timestamp/1000, 0).UTC(), From 73c6b25b6804db5a02ab5ae79c0b24e16d897051 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Tue, 23 Apr 2024 12:30:55 -0400 Subject: [PATCH 5/6] Cleanup: Replace custom goroutine shutdown listeners with shared context wrapper (#38957) Simplify context handling in the awscloudwatch and awss3 inputs. The awscloudwatch and awss3 inputs both need to convert their Beats input context to a standard golang context for use with the AWS API, and both of them use custom wrappers that create an extra goroutine to listen for Beat shutdown. I added an explicit wrapper object to the `v2.InputContext` API that is a single function call and requires no extra goroutines, and replaced the context wrapper in both inputs. This doesn't change any functional behavior. --- filebeat/input/v2/input.go | 19 +++++++++++++++++++ x-pack/filebeat/input/awscloudwatch/input.go | 14 +------------- x-pack/filebeat/input/awss3/input.go | 12 +----------- 3 files changed, 21 insertions(+), 24 deletions(-) diff --git a/filebeat/input/v2/input.go b/filebeat/input/v2/input.go index f816e285eb3..cdfde85c846 100644 --- a/filebeat/input/v2/input.go +++ b/filebeat/input/v2/input.go @@ -18,6 +18,9 @@ package v2 import ( + "context" + "time" + "github.com/elastic/beats/v7/libbeat/beat" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" @@ -111,3 +114,19 @@ type Canceler interface { Done() <-chan struct{} Err() error } + +type cancelerCtx struct { + Canceler +} + +func GoContextFromCanceler(c Canceler) context.Context { + return cancelerCtx{c} +} + +func (c cancelerCtx) Deadline() (deadline time.Time, ok bool) { + return time.Time{}, false +} + +func (c cancelerCtx) Value(_ any) any { + return nil +} diff --git a/x-pack/filebeat/input/awscloudwatch/input.go b/x-pack/filebeat/input/awscloudwatch/input.go index 80ed3f2b2bf..f274fb5fcc9 100644 --- a/x-pack/filebeat/input/awscloudwatch/input.go +++ b/x-pack/filebeat/input/awscloudwatch/input.go @@ -94,19 +94,7 @@ func (in *cloudwatchInput) Test(ctx v2.TestContext) error { } func (in *cloudwatchInput) Run(inputContext v2.Context, pipeline beat.Pipeline) error { - var err error - - // Wrap input Context's cancellation Done channel a context.Context. This - // goroutine stops with the parent closes the Done channel. - ctx, cancelInputCtx := context.WithCancel(context.Background()) - go func() { - defer cancelInputCtx() - select { - case <-inputContext.Cancelation.Done(): - case <-ctx.Done(): - } - }() - defer cancelInputCtx() + ctx := v2.GoContextFromCanceler(inputContext.Cancelation) // Create client for publishing events and receive notification of their ACKs. client, err := pipeline.ConnectWith(beat.ClientConfig{}) diff --git a/x-pack/filebeat/input/awss3/input.go b/x-pack/filebeat/input/awss3/input.go index 855403e5dc4..733de949f29 100644 --- a/x-pack/filebeat/input/awss3/input.go +++ b/x-pack/filebeat/input/awss3/input.go @@ -114,17 +114,7 @@ func (in *s3Input) Run(inputContext v2.Context, pipeline beat.Pipeline) error { return fmt.Errorf("can not start persistent store: %w", err) } - // Wrap input Context's cancellation Done channel a context.Context. This - // goroutine stops with the parent closes the Done channel. - ctx, cancelInputCtx := context.WithCancel(context.Background()) - go func() { - defer cancelInputCtx() - select { - case <-inputContext.Cancelation.Done(): - case <-ctx.Done(): - } - }() - defer cancelInputCtx() + ctx := v2.GoContextFromCanceler(inputContext.Cancelation) if in.config.QueueURL != "" { regionName, err := getRegionFromQueueURL(in.config.QueueURL, in.config.AWSConfig.Endpoint, in.config.RegionName) From 3ba900be92cf4c207787f05a08c6d5d5fcc8fcd2 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 23 Apr 2024 15:26:30 -0400 Subject: [PATCH 6/6] build(deps): bump go.elastic.co/apm/module/apmelasticsearch/v2 from 2.4.8 to 2.6.0 (#39157) * build(deps): bump go.elastic.co/apm/module/apmelasticsearch/v2 Bumps [go.elastic.co/apm/module/apmelasticsearch/v2](https://github.com/elastic/apm-agent-go) from 2.4.8 to 2.6.0. - [Release notes](https://github.com/elastic/apm-agent-go/releases) - [Changelog](https://github.com/elastic/apm-agent-go/blob/main/CHANGELOG.asciidoc) - [Commits](https://github.com/elastic/apm-agent-go/compare/v2.4.8...v2.6.0) --- updated-dependencies: - dependency-name: go.elastic.co/apm/module/apmelasticsearch/v2 dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] * Update NOTICE.txt --------- Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: dependabot[bot] --- NOTICE.txt | 8 ++++---- go.mod | 4 ++-- go.sum | 8 ++++---- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/NOTICE.txt b/NOTICE.txt index 9cfb93da670..d6d6bc76958 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -23901,11 +23901,11 @@ Contents of probable licence file $GOMODCACHE/github.com/xdg/scram@v1.0.3/LICENS -------------------------------------------------------------------------------- Dependency : go.elastic.co/apm/module/apmelasticsearch/v2 -Version: v2.4.8 +Version: v2.6.0 Licence type (autodetected): Apache-2.0 -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/go.elastic.co/apm/module/apmelasticsearch/v2@v2.4.8/LICENSE: +Contents of probable licence file $GOMODCACHE/go.elastic.co/apm/module/apmelasticsearch/v2@v2.6.0/LICENSE: Apache License Version 2.0, January 2004 @@ -24112,11 +24112,11 @@ Contents of probable licence file $GOMODCACHE/go.elastic.co/apm/module/apmelasti -------------------------------------------------------------------------------- Dependency : go.elastic.co/apm/module/apmhttp/v2 -Version: v2.5.0 +Version: v2.6.0 Licence type (autodetected): Apache-2.0 -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/go.elastic.co/apm/module/apmhttp/v2@v2.5.0/LICENSE: +Contents of probable licence file $GOMODCACHE/go.elastic.co/apm/module/apmhttp/v2@v2.6.0/LICENSE: Apache License Version 2.0, January 2004 diff --git a/go.mod b/go.mod index ec251e13402..eafbc70a171 100644 --- a/go.mod +++ b/go.mod @@ -228,8 +228,8 @@ require ( github.com/sergi/go-diff v1.3.1 github.com/shirou/gopsutil/v3 v3.22.10 github.com/tklauser/go-sysconf v0.3.10 - go.elastic.co/apm/module/apmelasticsearch/v2 v2.4.8 - go.elastic.co/apm/module/apmhttp/v2 v2.5.0 + go.elastic.co/apm/module/apmelasticsearch/v2 v2.6.0 + go.elastic.co/apm/module/apmhttp/v2 v2.6.0 go.elastic.co/apm/v2 v2.6.0 go.mongodb.org/mongo-driver v1.5.1 golang.org/x/exp v0.0.0-20231127185646-65229373498e diff --git a/go.sum b/go.sum index 519bc82cd0b..d92adb12260 100644 --- a/go.sum +++ b/go.sum @@ -1951,10 +1951,10 @@ github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ= github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0= github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0= github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA= -go.elastic.co/apm/module/apmelasticsearch/v2 v2.4.8 h1:4j3wI1e+WV6u+9ZR7lorkJI2rnJfjOWtkMeZG08PbRI= -go.elastic.co/apm/module/apmelasticsearch/v2 v2.4.8/go.mod h1:C9ajbSjZ3akTrFOjBr+pMq8bPVOH9vhIG+knZAuPW3s= -go.elastic.co/apm/module/apmhttp/v2 v2.5.0 h1:4AWlw8giL7hRYBQiwF1/Thm0GDsbQH/Ofe4eySAnURo= -go.elastic.co/apm/module/apmhttp/v2 v2.5.0/go.mod h1:ZP7gLEzY/OAPTqNZjp8AzA06HF82zfwXEpKI2sSVTgk= +go.elastic.co/apm/module/apmelasticsearch/v2 v2.6.0 h1:ukMcwyMaDXsS1dRK2qRYXT2AsfwaUy74TOOYCqkWJow= +go.elastic.co/apm/module/apmelasticsearch/v2 v2.6.0/go.mod h1:YpfiTTrqX5LB/CKBwX89oDCBAxuLJTFv40gcfxJyehM= +go.elastic.co/apm/module/apmhttp/v2 v2.6.0 h1:s8UeNFQmVBCNd4eoz7KDD9rEFhQC0HeUFXz3z9gpAmQ= +go.elastic.co/apm/module/apmhttp/v2 v2.6.0/go.mod h1:D0GLppLuI0Ddwvtl595GUxRgn6Z8L5KaDFVMv2H3GK0= go.elastic.co/apm/v2 v2.6.0 h1:VieBMLQFtXua2YxpYxaSdYGnmmxhLT46gosI5yErJgY= go.elastic.co/apm/v2 v2.6.0/go.mod h1:33rOXgtHwbgZcDgi6I/GtCSMZQqgxkHC0IQT3gudKvo= go.elastic.co/ecszap v1.0.2 h1:iW5OGx8IiokiUzx/shD4AJCPFMC9uUtr7ycaiEIU++I=