diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 049c46e6647..ee8433325a1 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -577,6 +577,7 @@ field. You can revert this change by configuring tags for the module and omittin - Add memory metrics into compute googlecloud. {pull}18802[18802] - Add new fields to HAProxy module. {issue}18523[18523] - Add Tomcat overview dashboard {pull}14026[14026] +- Add support for v1 consumer API in Cloud Foundry module, use it by default. {pull}19268[19268] *Packetbeat* diff --git a/metricbeat/docs/modules/cloudfoundry.asciidoc b/metricbeat/docs/modules/cloudfoundry.asciidoc index 0baeb3f4180..3a8ba132280 100644 --- a/metricbeat/docs/modules/cloudfoundry.asciidoc +++ b/metricbeat/docs/modules/cloudfoundry.asciidoc @@ -85,7 +85,7 @@ The URL of the Cloud Foundry UAA API. Optional. Default: "(value from ${api_addr [float] === `rlp_address` -The URL of the Cloud Foundry RLP Gateway. Optional. Default: "(value from ${api_address}/v2/info)". +The URL of the Cloud Foundry RLP Gateway. Optional. Default: "(`log-stream` subdomain under the same domain as `api_server`)". [float] === `client_id` @@ -103,6 +103,12 @@ Client Secret to authenticate with Cloud Foundry. Default: "". Shard ID for connection to the RLP Gateway. Use the same ID across multiple {beatname_lc} to shard the load of events from the RLP Gateway. Default: "(generated UUID)". +[float] +==== `version` + +Consumer API version to connect with Cloud Foundry to collect events. Use `v1` to collect events using Doppler/Traffic Control. +Use `v2` to collect events from the RLP Gateway. Default: "`v1`". + [float] === `ssl` @@ -130,6 +136,7 @@ metricbeat.modules: rlp_address: '${CLOUDFOUNDRY_RLP_ADDRESS:""}' client_id: '${CLOUDFOUNDRY_CLIENT_ID:""}' client_secret: '${CLOUDFOUNDRY_CLIENT_SECRET:""}' + version: v1 ---- [float] diff --git a/x-pack/libbeat/common/cloudfoundry/dopplerconsumer.go b/x-pack/libbeat/common/cloudfoundry/dopplerconsumer.go index 8d068a4fd84..10ea50dd928 100644 --- a/x-pack/libbeat/common/cloudfoundry/dopplerconsumer.go +++ b/x-pack/libbeat/common/cloudfoundry/dopplerconsumer.go @@ -122,9 +122,11 @@ func (c *DopplerConsumer) firehose(cb func(evt Event), filter consumer.EnvelopeF } cb(event) case err := <-errChan: - // This error is an error on the connection, not a cloud foundry - // error envelope. Firehose should be able to reconnect, so just log it. - c.log.Infof("Error received on firehose: %v", err) + if err != nil { + // This error is an error on the connection, not a cloud foundry + // error envelope. Firehose should be able to reconnect, so just log it. + c.log.Infof("Error received on firehose: %v", err) + } case <-c.stop: return } diff --git a/x-pack/metricbeat/metricbeat.reference.yml b/x-pack/metricbeat/metricbeat.reference.yml index 179a4076319..5651cd69357 100644 --- a/x-pack/metricbeat/metricbeat.reference.yml +++ b/x-pack/metricbeat/metricbeat.reference.yml @@ -372,6 +372,7 @@ metricbeat.modules: rlp_address: '${CLOUDFOUNDRY_RLP_ADDRESS:""}' client_id: '${CLOUDFOUNDRY_CLIENT_ID:""}' client_secret: '${CLOUDFOUNDRY_CLIENT_SECRET:""}' + version: v1 #----------------------------- CockroachDB Module ----------------------------- - module: cockroachdb diff --git a/x-pack/metricbeat/module/cloudfoundry/_meta/config.reference.yml b/x-pack/metricbeat/module/cloudfoundry/_meta/config.reference.yml index c157d5deeff..be15db23b65 100644 --- a/x-pack/metricbeat/module/cloudfoundry/_meta/config.reference.yml +++ b/x-pack/metricbeat/module/cloudfoundry/_meta/config.reference.yml @@ -10,3 +10,4 @@ rlp_address: '${CLOUDFOUNDRY_RLP_ADDRESS:""}' client_id: '${CLOUDFOUNDRY_CLIENT_ID:""}' client_secret: '${CLOUDFOUNDRY_CLIENT_SECRET:""}' + version: v1 diff --git a/x-pack/metricbeat/module/cloudfoundry/_meta/docs.asciidoc b/x-pack/metricbeat/module/cloudfoundry/_meta/docs.asciidoc index 762d3e4f34f..b36a41bf891 100644 --- a/x-pack/metricbeat/module/cloudfoundry/_meta/docs.asciidoc +++ b/x-pack/metricbeat/module/cloudfoundry/_meta/docs.asciidoc @@ -75,7 +75,7 @@ The URL of the Cloud Foundry UAA API. Optional. Default: "(value from ${api_addr [float] === `rlp_address` -The URL of the Cloud Foundry RLP Gateway. Optional. Default: "(value from ${api_address}/v2/info)". +The URL of the Cloud Foundry RLP Gateway. Optional. Default: "(`log-stream` subdomain under the same domain as `api_server`)". [float] === `client_id` @@ -93,6 +93,12 @@ Client Secret to authenticate with Cloud Foundry. Default: "". Shard ID for connection to the RLP Gateway. Use the same ID across multiple {beatname_lc} to shard the load of events from the RLP Gateway. Default: "(generated UUID)". +[float] +==== `version` + +Consumer API version to connect with Cloud Foundry to collect events. Use `v1` to collect events using Doppler/Traffic Control. +Use `v2` to collect events from the RLP Gateway. Default: "`v1`". + [float] === `ssl` diff --git a/x-pack/metricbeat/module/cloudfoundry/cloudfoundry.go b/x-pack/metricbeat/module/cloudfoundry/cloudfoundry.go index 96cded6fa7e..961827469dd 100644 --- a/x-pack/metricbeat/module/cloudfoundry/cloudfoundry.go +++ b/x-pack/metricbeat/module/cloudfoundry/cloudfoundry.go @@ -5,8 +5,7 @@ package cloudfoundry import ( - "context" - "sync" + "fmt" "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/metricbeat/mb" @@ -16,26 +15,18 @@ import ( // ModuleName is the name of this module. const ModuleName = "cloudfoundry" -type Module struct { - mb.BaseModule - - log *logp.Logger - - hub *cfcommon.Hub - listener *cfcommon.RlpListener - listenerLock sync.Mutex - - counterReporter mb.PushReporterV2 - valueReporter mb.PushReporterV2 - containerReporter mb.PushReporterV2 -} - func init() { if err := mb.Registry.AddModule(ModuleName, newModule); err != nil { panic(err) } } +type Module interface { + RunCounterReporter(mb.PushReporterV2) + RunContainerReporter(mb.PushReporterV2) + RunValueReporter(mb.PushReporterV2) +} + func newModule(base mb.BaseModule) (mb.Module, error) { var cfg cfcommon.Config if err := base.UnpackConfig(&cfg); err != nil { @@ -45,101 +36,12 @@ func newModule(base mb.BaseModule) (mb.Module, error) { log := logp.NewLogger("cloudfoundry") hub := cfcommon.NewHub(&cfg, "metricbeat", log) - // early check that listener can be created - _, err := hub.RlpListener(cfcommon.RlpListenerCallbacks{}) - if err != nil { - return nil, err - - } - - return &Module{ - BaseModule: base, - log: log, - hub: hub, - }, nil -} - -func (m *Module) RunCounterReporter(reporter mb.PushReporterV2) { - m.listenerLock.Lock() - m.runReporters(reporter, m.valueReporter, m.containerReporter) - m.listenerLock.Unlock() - - <-reporter.Done() - - m.listenerLock.Lock() - m.runReporters(nil, m.valueReporter, m.containerReporter) - m.listenerLock.Unlock() -} - -func (m *Module) RunValueReporter(reporter mb.PushReporterV2) { - m.listenerLock.Lock() - m.runReporters(m.counterReporter, reporter, m.containerReporter) - m.listenerLock.Unlock() - - <-reporter.Done() - - m.listenerLock.Lock() - m.runReporters(m.counterReporter, nil, m.containerReporter) - m.listenerLock.Unlock() -} - -func (m *Module) RunContainerReporter(reporter mb.PushReporterV2) { - m.listenerLock.Lock() - m.runReporters(m.counterReporter, m.valueReporter, reporter) - m.listenerLock.Unlock() - - <-reporter.Done() - - m.listenerLock.Lock() - m.runReporters(m.counterReporter, m.valueReporter, nil) - m.listenerLock.Unlock() -} - -func (m *Module) runReporters(counterReporter, valueReporter, containerReporter mb.PushReporterV2) { - if m.listener != nil { - m.listener.Stop() - m.listener = nil - } - m.counterReporter = counterReporter - m.valueReporter = valueReporter - m.containerReporter = containerReporter - - start := false - callbacks := cfcommon.RlpListenerCallbacks{} - if m.counterReporter != nil { - start = true - callbacks.Counter = func(evt *cfcommon.EventCounter) { - m.counterReporter.Event(mb.Event{ - Timestamp: evt.Timestamp(), - RootFields: evt.ToFields(), - }) - } - } - if m.valueReporter != nil { - start = true - callbacks.ValueMetric = func(evt *cfcommon.EventValueMetric) { - m.valueReporter.Event(mb.Event{ - Timestamp: evt.Timestamp(), - RootFields: evt.ToFields(), - }) - } - } - if m.containerReporter != nil { - start = true - callbacks.ContainerMetric = func(evt *cfcommon.EventContainerMetric) { - m.containerReporter.Event(mb.Event{ - Timestamp: evt.Timestamp(), - RootFields: evt.ToFields(), - }) - } - } - if start { - l, err := m.hub.RlpListener(callbacks) - if err != nil { - m.log.Errorf("failed to create RlpListener: %v", err) - return - } - l.Start(context.Background()) - m.listener = l + switch cfg.Version { + case cfcommon.ConsumerVersionV1: + return newModuleV1(base, hub, log) + case cfcommon.ConsumerVersionV2: + return newModuleV2(base, hub, log) + default: + return nil, fmt.Errorf("not supported consumer version: %s", cfg.Version) } } diff --git a/x-pack/metricbeat/module/cloudfoundry/container/container.go b/x-pack/metricbeat/module/cloudfoundry/container/container.go index 73019fb8808..4f8c6227103 100644 --- a/x-pack/metricbeat/module/cloudfoundry/container/container.go +++ b/x-pack/metricbeat/module/cloudfoundry/container/container.go @@ -25,14 +25,14 @@ func init() { type MetricSet struct { mb.BaseMetricSet - mod *cloudfoundry.Module + mod cloudfoundry.Module } // New create a new instance of the MetricSet // Part of new is also setting up the configuration by processing additional // configuration entries if needed. func New(base mb.BaseMetricSet) (mb.MetricSet, error) { - mod, ok := base.Module().(*cloudfoundry.Module) + mod, ok := base.Module().(cloudfoundry.Module) if !ok { return nil, fmt.Errorf("must be child of cloudfoundry module") } diff --git a/x-pack/metricbeat/module/cloudfoundry/container/container_integration_test.go b/x-pack/metricbeat/module/cloudfoundry/container/container_integration_test.go index b05683b487e..e871a5823fc 100644 --- a/x-pack/metricbeat/module/cloudfoundry/container/container_integration_test.go +++ b/x-pack/metricbeat/module/cloudfoundry/container/container_integration_test.go @@ -13,12 +13,26 @@ import ( "github.com/stretchr/testify/require" + "github.com/elastic/beats/v7/libbeat/logp" mbtest "github.com/elastic/beats/v7/metricbeat/mb/testing" "github.com/elastic/beats/v7/x-pack/metricbeat/module/cloudfoundry/mtest" ) func TestFetch(t *testing.T) { + logp.TestingSetup(logp.WithSelectors("cloudfoundry")) + + t.Run("v1", func(t *testing.T) { + testFetch(t, "v1") + }) + + t.Run("v2", func(t *testing.T) { + testFetch(t, "v2") + }) +} + +func testFetch(t *testing.T, version string) { config := mtest.GetConfig(t, "container") + config["version"] = version ms := mbtest.NewPushMetricSetV2(t, config) events := mbtest.RunPushMetricSetV2(60*time.Second, 1, ms) diff --git a/x-pack/metricbeat/module/cloudfoundry/counter/counter.go b/x-pack/metricbeat/module/cloudfoundry/counter/counter.go index 10022f87d04..53d3833d810 100644 --- a/x-pack/metricbeat/module/cloudfoundry/counter/counter.go +++ b/x-pack/metricbeat/module/cloudfoundry/counter/counter.go @@ -25,14 +25,14 @@ func init() { type MetricSet struct { mb.BaseMetricSet - mod *cloudfoundry.Module + mod cloudfoundry.Module } // New create a new instance of the MetricSet // Part of new is also setting up the configuration by processing additional // configuration entries if needed. func New(base mb.BaseMetricSet) (mb.MetricSet, error) { - mod, ok := base.Module().(*cloudfoundry.Module) + mod, ok := base.Module().(cloudfoundry.Module) if !ok { return nil, fmt.Errorf("must be child of cloudfoundry module") } diff --git a/x-pack/metricbeat/module/cloudfoundry/counter/counter_integration_test.go b/x-pack/metricbeat/module/cloudfoundry/counter/counter_integration_test.go index 6a87ce6f951..44cb4935e70 100644 --- a/x-pack/metricbeat/module/cloudfoundry/counter/counter_integration_test.go +++ b/x-pack/metricbeat/module/cloudfoundry/counter/counter_integration_test.go @@ -13,12 +13,26 @@ import ( "github.com/stretchr/testify/require" + "github.com/elastic/beats/v7/libbeat/logp" mbtest "github.com/elastic/beats/v7/metricbeat/mb/testing" "github.com/elastic/beats/v7/x-pack/metricbeat/module/cloudfoundry/mtest" ) func TestFetch(t *testing.T) { + logp.TestingSetup(logp.WithSelectors("cloudfoundry")) + + t.Run("v1", func(t *testing.T) { + testFetch(t, "v1") + }) + + t.Run("v2", func(t *testing.T) { + testFetch(t, "v2") + }) +} + +func testFetch(t *testing.T, version string) { config := mtest.GetConfig(t, "counter") + config["version"] = version ms := mbtest.NewPushMetricSetV2(t, config) events := mbtest.RunPushMetricSetV2(10*time.Second, 1, ms) diff --git a/x-pack/metricbeat/module/cloudfoundry/v1.go b/x-pack/metricbeat/module/cloudfoundry/v1.go new file mode 100644 index 00000000000..7d9daf24673 --- /dev/null +++ b/x-pack/metricbeat/module/cloudfoundry/v1.go @@ -0,0 +1,164 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package cloudfoundry + +import ( + "github.com/elastic/beats/v7/libbeat/common/atomic" + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/metricbeat/mb" + cfcommon "github.com/elastic/beats/v7/x-pack/libbeat/common/cloudfoundry" +) + +type ModuleV1 struct { + mb.BaseModule + + log *logp.Logger + + running atomic.Bool + consumer *cfcommon.DopplerConsumer + + events chan cfcommon.Event + subscriptions chan subscription +} + +func newModuleV1(base mb.BaseModule, hub *cfcommon.Hub, log *logp.Logger) (*ModuleV1, error) { + m := ModuleV1{ + BaseModule: base, + log: log, + running: atomic.MakeBool(false), + } + consumer, err := hub.DopplerConsumer(cfcommon.DopplerCallbacks{ + Metric: m.callback, + }) + if err != nil { + return nil, err + } + m.consumer = consumer + m.events = make(chan cfcommon.Event) + m.subscriptions = make(chan subscription) + + return &m, nil +} + +func (m *ModuleV1) RunCounterReporter(reporter mb.PushReporterV2) { + m.subscribe(cfcommon.EventTypeCounter, reporter) + defer m.unsubscribe(cfcommon.EventTypeCounter, reporter) + <-reporter.Done() +} + +func (m *ModuleV1) RunValueReporter(reporter mb.PushReporterV2) { + m.subscribe(cfcommon.EventTypeValueMetric, reporter) + defer m.unsubscribe(cfcommon.EventTypeValueMetric, reporter) + <-reporter.Done() +} + +func (m *ModuleV1) RunContainerReporter(reporter mb.PushReporterV2) { + m.subscribe(cfcommon.EventTypeContainerMetric, reporter) + defer m.unsubscribe(cfcommon.EventTypeContainerMetric, reporter) + <-reporter.Done() +} + +func (m *ModuleV1) subscribe(eventType cfcommon.EventType, reporter mb.PushReporterV2) { + go m.run() + m.subscriptions <- subscription{ + eventType: eventType, + reporter: reporter, + } +} + +func (m *ModuleV1) unsubscribe(eventType cfcommon.EventType, reporter mb.PushReporterV2) { + m.subscriptions <- subscription{ + eventType: eventType, + reporter: reporter, + unsubscribe: true, + } +} + +func (m *ModuleV1) callback(event cfcommon.Event) { + m.events <- event +} + +func (m *ModuleV1) run() { + if !m.running.CAS(false, true) { + return + } + defer func() { m.running.Store(false) }() + + m.consumer.Run() + defer m.consumer.Stop() + + dispatcher := newEventDispatcher(m.log) + + for { + // Handle subscriptions and events dispatching on the same + // goroutine so locking is not needed. + select { + case e := <-m.events: + dispatcher.dispatch(e) + case s := <-m.subscriptions: + dispatcher.handleSubscription(s) + if dispatcher.empty() { + return + } + } + } +} + +type subscription struct { + eventType cfcommon.EventType + reporter mb.PushReporterV2 + + unsubscribe bool +} + +// eventDispatcher keeps track on the reporters that are subscribed to each event type +// and dispatches events to them when received. +type eventDispatcher struct { + log *logp.Logger + reporters map[cfcommon.EventType]mb.PushReporterV2 +} + +func newEventDispatcher(log *logp.Logger) *eventDispatcher { + return &eventDispatcher{ + log: log, + reporters: make(map[cfcommon.EventType]mb.PushReporterV2), + } +} + +func (d *eventDispatcher) handleSubscription(s subscription) { + current, subscribed := d.reporters[s.eventType] + if s.unsubscribe { + if !subscribed || current != s.reporter { + // This can happen if same metricset is used twice + d.log.Warnf("Ignoring unsubscription of not subscribed reporter for %s", s.eventType) + return + } + delete(d.reporters, s.eventType) + } else { + if subscribed { + if s.reporter != current { + // This can happen if same metricset is used twice + d.log.Warnf("Ignoring subscription of multiple reporters for %s", s.eventType) + } + return + } + d.reporters[s.eventType] = s.reporter + } +} + +func (d *eventDispatcher) dispatch(e cfcommon.Event) { + reporter, found := d.reporters[e.EventType()] + if !found { + return + } + reporter.Event(mb.Event{ + Timestamp: e.Timestamp(), + RootFields: e.ToFields(), + }) +} + +func (d *eventDispatcher) empty() bool { + return len(d.reporters) == 0 +} diff --git a/x-pack/metricbeat/module/cloudfoundry/v1_test.go b/x-pack/metricbeat/module/cloudfoundry/v1_test.go new file mode 100644 index 00000000000..b30952a9189 --- /dev/null +++ b/x-pack/metricbeat/module/cloudfoundry/v1_test.go @@ -0,0 +1,181 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package cloudfoundry + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/metricbeat/mb" + cfcommon "github.com/elastic/beats/v7/x-pack/libbeat/common/cloudfoundry" +) + +func TestDispatcher(t *testing.T) { + logp.TestingSetup(logp.WithSelectors("cloudfoundry")) + log := logp.NewLogger("cloudfoundry") + + assertEventType := func(t *testing.T, expected string, e mb.Event) { + t.Helper() + cf := e.RootFields["cloudfoundry"].(common.MapStr) + assert.Equal(t, expected, cf["type"]) + } + + waitFor := func(t *testing.T, expected string, r pushReporter) { + t.Helper() + select { + case e := <-r.events: + assertEventType(t, expected, e) + default: + t.Errorf("expected %s event", expected) + } + } + + t.Run("subscribe to one type", func(t *testing.T) { + d := newEventDispatcher(log) + r := pushReporter{events: make(chan mb.Event, 1)} + d.handleSubscription(subscription{ + eventType: cfcommon.EventTypeCounter, + reporter: &r, + }) + + d.dispatch(&cfcommon.EventCounter{}) + waitFor(t, "counter", r) + }) + + t.Run("subscribe and unsubscribe", func(t *testing.T) { + d := newEventDispatcher(log) + r := pushReporter{events: make(chan mb.Event, 1)} + d.handleSubscription(subscription{ + eventType: cfcommon.EventTypeCounter, + reporter: &r, + }) + + d.dispatch(&cfcommon.EventCounter{}) + waitFor(t, "counter", r) + + d.handleSubscription(subscription{ + eventType: cfcommon.EventTypeCounter, + reporter: &r, + unsubscribe: true, + }) + + assert.True(t, d.empty()) + d.dispatch(&cfcommon.EventCounter{}) + + select { + case <-r.events: + t.Errorf("shouldn't receive on this reporter") + default: + } + }) + + t.Run("subscribe to two types", func(t *testing.T) { + d := newEventDispatcher(log) + + counterReporter := pushReporter{events: make(chan mb.Event, 2)} + d.handleSubscription(subscription{ + eventType: cfcommon.EventTypeCounter, + reporter: &counterReporter, + }) + + valueReporter := pushReporter{events: make(chan mb.Event, 2)} + d.handleSubscription(subscription{ + eventType: cfcommon.EventTypeValueMetric, + reporter: &valueReporter, + }) + + d.dispatch(&cfcommon.EventCounter{}) + d.dispatch(&cfcommon.EventValueMetric{}) + + waitFor(t, "counter", counterReporter) + waitFor(t, "value", valueReporter) + }) + + t.Run("subscribe to two types, receive only from one", func(t *testing.T) { + d := newEventDispatcher(log) + + counterReporter := pushReporter{events: make(chan mb.Event, 2)} + d.handleSubscription(subscription{ + eventType: cfcommon.EventTypeCounter, + reporter: &counterReporter, + }) + + valueReporter := pushReporter{events: make(chan mb.Event, 2)} + d.handleSubscription(subscription{ + eventType: cfcommon.EventTypeValueMetric, + reporter: &valueReporter, + }) + + d.dispatch(&cfcommon.EventCounter{}) + d.dispatch(&cfcommon.EventCounter{}) + + select { + case <-valueReporter.events: + t.Errorf("shouldn't receive on this reporter") + default: + } + + waitFor(t, "counter", counterReporter) + waitFor(t, "counter", counterReporter) + }) + + t.Run("subscribe twice to same type, ignore second", func(t *testing.T) { + d := newEventDispatcher(log) + first := pushReporter{events: make(chan mb.Event, 2)} + d.handleSubscription(subscription{ + eventType: cfcommon.EventTypeCounter, + reporter: &first, + }) + d.dispatch(&cfcommon.EventCounter{}) + waitFor(t, "counter", first) + + second := pushReporter{events: make(chan mb.Event, 2)} + d.handleSubscription(subscription{ + eventType: cfcommon.EventTypeCounter, + reporter: &second, + }) + + d.dispatch(&cfcommon.EventCounter{}) + select { + case <-second.events: + t.Errorf("shouldn't receive on this reporter") + default: + } + waitFor(t, "counter", first) + }) + + t.Run("unsubscribe not subscribed reporters, first one continues subscribed", func(t *testing.T) { + d := newEventDispatcher(log) + r := pushReporter{events: make(chan mb.Event, 2)} + d.handleSubscription(subscription{ + eventType: cfcommon.EventTypeCounter, + reporter: &r, + }) + d.dispatch(&cfcommon.EventCounter{}) + waitFor(t, "counter", r) + + d.handleSubscription(subscription{ + eventType: cfcommon.EventTypeCounter, + reporter: &pushReporter{}, + unsubscribe: true, + }) + d.dispatch(&cfcommon.EventCounter{}) + waitFor(t, "counter", r) + }) +} + +type pushReporter struct { + events chan mb.Event +} + +func (r *pushReporter) Done() <-chan struct{} { return nil } +func (r *pushReporter) Error(err error) bool { return true } +func (r *pushReporter) Event(e mb.Event) bool { + r.events <- e + return true +} diff --git a/x-pack/metricbeat/module/cloudfoundry/v2.go b/x-pack/metricbeat/module/cloudfoundry/v2.go new file mode 100644 index 00000000000..5cf7de6c103 --- /dev/null +++ b/x-pack/metricbeat/module/cloudfoundry/v2.go @@ -0,0 +1,128 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package cloudfoundry + +import ( + "context" + "sync" + + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/metricbeat/mb" + cfcommon "github.com/elastic/beats/v7/x-pack/libbeat/common/cloudfoundry" +) + +type ModuleV2 struct { + mb.BaseModule + + log *logp.Logger + + hub *cfcommon.Hub + listener *cfcommon.RlpListener + listenerLock sync.Mutex + + counterReporter mb.PushReporterV2 + valueReporter mb.PushReporterV2 + containerReporter mb.PushReporterV2 +} + +func newModuleV2(base mb.BaseModule, hub *cfcommon.Hub, log *logp.Logger) (mb.Module, error) { + // early check that listener can be created + _, err := hub.RlpListener(cfcommon.RlpListenerCallbacks{}) + if err != nil { + return nil, err + + } + + return &ModuleV2{ + BaseModule: base, + log: log, + hub: hub, + }, nil +} + +func (m *ModuleV2) RunCounterReporter(reporter mb.PushReporterV2) { + m.listenerLock.Lock() + m.runReporters(reporter, m.valueReporter, m.containerReporter) + m.listenerLock.Unlock() + + <-reporter.Done() + + m.listenerLock.Lock() + m.runReporters(nil, m.valueReporter, m.containerReporter) + m.listenerLock.Unlock() +} + +func (m *ModuleV2) RunValueReporter(reporter mb.PushReporterV2) { + m.listenerLock.Lock() + m.runReporters(m.counterReporter, reporter, m.containerReporter) + m.listenerLock.Unlock() + + <-reporter.Done() + + m.listenerLock.Lock() + m.runReporters(m.counterReporter, nil, m.containerReporter) + m.listenerLock.Unlock() +} + +func (m *ModuleV2) RunContainerReporter(reporter mb.PushReporterV2) { + m.listenerLock.Lock() + m.runReporters(m.counterReporter, m.valueReporter, reporter) + m.listenerLock.Unlock() + + <-reporter.Done() + + m.listenerLock.Lock() + m.runReporters(m.counterReporter, m.valueReporter, nil) + m.listenerLock.Unlock() +} + +func (m *ModuleV2) runReporters(counterReporter, valueReporter, containerReporter mb.PushReporterV2) { + if m.listener != nil { + m.listener.Stop() + m.listener = nil + } + m.counterReporter = counterReporter + m.valueReporter = valueReporter + m.containerReporter = containerReporter + + start := false + callbacks := cfcommon.RlpListenerCallbacks{} + if m.counterReporter != nil { + start = true + callbacks.Counter = func(evt *cfcommon.EventCounter) { + m.counterReporter.Event(mb.Event{ + Timestamp: evt.Timestamp(), + RootFields: evt.ToFields(), + }) + } + } + if m.valueReporter != nil { + start = true + callbacks.ValueMetric = func(evt *cfcommon.EventValueMetric) { + m.valueReporter.Event(mb.Event{ + Timestamp: evt.Timestamp(), + RootFields: evt.ToFields(), + }) + } + } + if m.containerReporter != nil { + start = true + callbacks.ContainerMetric = func(evt *cfcommon.EventContainerMetric) { + m.containerReporter.Event(mb.Event{ + Timestamp: evt.Timestamp(), + RootFields: evt.ToFields(), + }) + } + } + if start { + l, err := m.hub.RlpListener(callbacks) + if err != nil { + m.log.Errorf("failed to create RlpListener: %v", err) + return + } + l.Start(context.TODO()) + m.listener = l + } +} diff --git a/x-pack/metricbeat/module/cloudfoundry/value/value.go b/x-pack/metricbeat/module/cloudfoundry/value/value.go index 7a30a2c67db..55cb6ca689a 100644 --- a/x-pack/metricbeat/module/cloudfoundry/value/value.go +++ b/x-pack/metricbeat/module/cloudfoundry/value/value.go @@ -25,14 +25,14 @@ func init() { type MetricSet struct { mb.BaseMetricSet - mod *cloudfoundry.Module + mod cloudfoundry.Module } // New create a new instance of the MetricSet // Part of new is also setting up the configuration by processing additional // configuration entries if needed. func New(base mb.BaseMetricSet) (mb.MetricSet, error) { - mod, ok := base.Module().(*cloudfoundry.Module) + mod, ok := base.Module().(cloudfoundry.Module) if !ok { return nil, fmt.Errorf("must be child of cloudfoundry module") } diff --git a/x-pack/metricbeat/module/cloudfoundry/value/value_integration_test.go b/x-pack/metricbeat/module/cloudfoundry/value/value_integration_test.go index 03d11bb6b7e..610a0a8e029 100644 --- a/x-pack/metricbeat/module/cloudfoundry/value/value_integration_test.go +++ b/x-pack/metricbeat/module/cloudfoundry/value/value_integration_test.go @@ -13,12 +13,26 @@ import ( "github.com/stretchr/testify/require" + "github.com/elastic/beats/v7/libbeat/logp" mbtest "github.com/elastic/beats/v7/metricbeat/mb/testing" "github.com/elastic/beats/v7/x-pack/metricbeat/module/cloudfoundry/mtest" ) func TestFetch(t *testing.T) { + logp.TestingSetup(logp.WithSelectors("cloudfoundry")) + + t.Run("v1", func(t *testing.T) { + testFetch(t, "v1") + }) + + t.Run("v2", func(t *testing.T) { + testFetch(t, "v2") + }) +} + +func testFetch(t *testing.T, version string) { config := mtest.GetConfig(t, "value") + config["version"] = version ms := mbtest.NewPushMetricSetV2(t, config) events := mbtest.RunPushMetricSetV2(10*time.Second, 1, ms)