diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 978309d1f70b..0c9b3d89cb20 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -204,6 +204,8 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Update CEL mito extensions to v1.10.0 to add keys/values helper. {pull}38504[38504] - Add support for Active Directory an entity analytics provider. {pull}37919[37919] - Add debugging breadcrumb to logs when writing request trace log. {pull}38636[38636] +- added benchmark input {pull}37437[37437] +- added benchmark input and discard output {pull}37437[37437] *Auditbeat* diff --git a/filebeat/docs/filebeat-options.asciidoc b/filebeat/docs/filebeat-options.asciidoc index 1e9f9cac6e06..13e4ffde4993 100644 --- a/filebeat/docs/filebeat-options.asciidoc +++ b/filebeat/docs/filebeat-options.asciidoc @@ -70,6 +70,7 @@ You can configure {beatname_uc} to use the following inputs: * <<{beatname_lc}-input-aws-s3>> * <<{beatname_lc}-input-azure-eventhub>> * <<{beatname_lc}-input-azure-blob-storage>> +* <<{beatname_lc}-input-benchmark>> * <<{beatname_lc}-input-cel>> * <<{beatname_lc}-input-cloudfoundry>> * <<{beatname_lc}-input-cometd>> @@ -104,6 +105,8 @@ include::../../x-pack/filebeat/docs/inputs/input-azure-eventhub.asciidoc[] include::../../x-pack/filebeat/docs/inputs/input-azure-blob-storage.asciidoc[] +include::../../x-pack/filebeat/docs/inputs/input-benchmark.asciidoc[] + include::../../x-pack/filebeat/docs/inputs/input-cel.asciidoc[] include::../../x-pack/filebeat/docs/inputs/input-cloudfoundry.asciidoc[] diff --git a/libbeat/docs/outputs-list.asciidoc b/libbeat/docs/outputs-list.asciidoc index 4181c10f64f6..bf6bda35094b 100644 --- a/libbeat/docs/outputs-list.asciidoc +++ b/libbeat/docs/outputs-list.asciidoc @@ -24,6 +24,9 @@ endif::[] ifndef::no_console_output[] * <> endif::[] +ifndef::no_discard_output[] +* <> +endif::[] //# end::outputs-list[] @@ -77,6 +80,13 @@ endif::[] include::{libbeat-outputs-dir}/console/docs/console.asciidoc[] endif::[] +ifndef::no_discard_output[] +ifdef::requires_xpack[] +[role="xpack"] +endif::[] +include::{libbeat-outputs-dir}/discard/docs/discard.asciidoc[] +endif::[] + ifndef::no_codec[] ifdef::requires_xpack[] [role="xpack"] diff --git a/libbeat/outputs/discard/config.go b/libbeat/outputs/discard/config.go new file mode 100644 index 000000000000..ffdb6c038b37 --- /dev/null +++ b/libbeat/outputs/discard/config.go @@ -0,0 +1,30 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package discard + +import ( + "github.com/elastic/elastic-agent-libs/config" +) + +type discardOutConfig struct { + Queue config.Namespace `config:"queue"` +} + +func defaultConfig() discardOutConfig { + return discardOutConfig{} +} diff --git a/libbeat/outputs/discard/discard.go b/libbeat/outputs/discard/discard.go new file mode 100644 index 000000000000..c9a51b0f33df --- /dev/null +++ b/libbeat/outputs/discard/discard.go @@ -0,0 +1,79 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package discard + +import ( + "context" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/outputs" + "github.com/elastic/beats/v7/libbeat/publisher" + "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/logp" +) + +func init() { + outputs.RegisterType("discard", makeDiscard) +} + +type discardOutput struct { + log *logp.Logger + beat beat.Info + observer outputs.Observer +} + +func makeDiscard( + _ outputs.IndexManager, + beat beat.Info, + observer outputs.Observer, + cfg *config.C, +) (outputs.Group, error) { + out := &discardOutput{ + log: logp.NewLogger("discard"), + beat: beat, + observer: observer, + } + doConfig := defaultConfig() + if err := cfg.Unpack(&doConfig); err != nil { + return outputs.Fail(err) + } + + // disable bulk support in publisher pipeline + _ = cfg.SetInt("bulk_max_size", -1, -1) + out.log.Infof("Initialized discard output") + return outputs.Success(doConfig.Queue, -1, 0, nil, out) +} + +// Implement Outputer +func (out *discardOutput) Close() error { + return nil +} + +func (out *discardOutput) Publish(_ context.Context, batch publisher.Batch) error { + defer batch.ACK() + + st := out.observer + events := batch.Events() + st.NewBatch(len(events)) + st.Acked(len(events)) + return nil +} + +func (out *discardOutput) String() string { + return "discard" +} diff --git a/libbeat/outputs/discard/docs/discard.asciidoc b/libbeat/outputs/discard/docs/discard.asciidoc new file mode 100644 index 000000000000..3e2990cb93ba --- /dev/null +++ b/libbeat/outputs/discard/docs/discard.asciidoc @@ -0,0 +1,34 @@ +[[discard-output]] +=== Configure the Discard output + +++++ +Discard +++++ + +The Discard output throws away data. + +WARNING: The Discard output should be used only for development or +debugging issues. Data is lost. + +This can be useful if you want to work on your input configuration +without needing to configure an output. It can also be useful to test +how changes in input and processor configuration affect performance. + +Example configuration: + +["source","yaml",subs="attributes"] +------------------------------------------------------------------------------ +output.discard: + enabled: true +------------------------------------------------------------------------------ + +==== Configuration options + +You can specify the following `output.discard` options in the +{beatname_lc}.yml+ config file: + +===== `enabled` + +The enabled config is a boolean setting to enable or disable the output. If set +to false, the output is disabled. + +The default value is `true`. diff --git a/libbeat/publisher/includes/includes.go b/libbeat/publisher/includes/includes.go index 84622ad10f1a..c1e2d02e3cfd 100644 --- a/libbeat/publisher/includes/includes.go +++ b/libbeat/publisher/includes/includes.go @@ -22,6 +22,7 @@ import ( _ "github.com/elastic/beats/v7/libbeat/outputs/codec/format" _ "github.com/elastic/beats/v7/libbeat/outputs/codec/json" _ "github.com/elastic/beats/v7/libbeat/outputs/console" + _ "github.com/elastic/beats/v7/libbeat/outputs/discard" _ "github.com/elastic/beats/v7/libbeat/outputs/elasticsearch" _ "github.com/elastic/beats/v7/libbeat/outputs/fileout" _ "github.com/elastic/beats/v7/libbeat/outputs/kafka" diff --git a/x-pack/filebeat/docs/inputs/input-benchmark.asciidoc b/x-pack/filebeat/docs/inputs/input-benchmark.asciidoc new file mode 100644 index 000000000000..db8036973357 --- /dev/null +++ b/x-pack/filebeat/docs/inputs/input-benchmark.asciidoc @@ -0,0 +1,93 @@ +[role="xpack"] + +:type: benchmark + +[id="{beatname_lc}-input-{type}"] +=== Benchmark input + +++++ +Benchmark +++++ + +beta[] + +The Benchmark input generates generic events and sends them to the output. This can be useful when you want to benchmark the difference between outputs or output settings. + +Example configurations: + +Basic example, infinite events as quickly as possible: +["source","yaml",subs="attributes"] +---- +{beatname_lc}.inputs: +- type: benchmark + enabled: true + message: "test message" + threads: 1 +---- + +Send 1024 events and stop example: +["source","yaml",subs="attributes"] +---- +{beatname_lc}.inputs: +- type: benchmark + enabled: true + message: "test message" + threads: 1 + count: 1024 +---- + +Send 5 events per second example: +["source","yaml",subs="attributes"] +---- +{beatname_lc}.inputs: +- type: benchmark + enabled: true + message: "test message" + threads: 1 + eps: 5 +---- + +==== Configuration options + +The Benchmark input supports the following configuration options plus the +<<{beatname_lc}-input-{type}-common-options>> described later. + +[float] +==== `message` + +This is the value that will be in the `message` field of the json document. + +[float] +==== `threads` + +This is the number of goroutines that will be started generating messages. Normally 1 thread can saturate an output but if necessary this can be increased. + +[float] +==== `count` + +This is the number of messages to send. 0 represents sending infinite messages. This is mutually exclusive with the `eps` option. + +[float] +==== `eps` + +This is the number of events per second to send. 0 represents sending as quickly as possible. This is mutually exclusive with the `count` option. + + +[float] +=== Metrics + +This input exposes metrics under the <>. +These metrics are exposed under the `/inputs` path. They can be used to +observe the activity of the input. + +[options="header"] +|======= +| Metric | Description +| `events_published_total` | Number of events published. +| `publishing_time` | Histogram of the elapsed in nanoseconds (time of publisher.Publish). +|======= + +[id="{beatname_lc}-input-{type}-common-options"] +include::../../../../filebeat/docs/inputs/input-common-options.asciidoc[] + +:type!: diff --git a/x-pack/filebeat/input/benchmark/config.go b/x-pack/filebeat/input/benchmark/config.go new file mode 100644 index 000000000000..e26182476a95 --- /dev/null +++ b/x-pack/filebeat/input/benchmark/config.go @@ -0,0 +1,31 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package benchmark + +import "fmt" + +type benchmarkConfig struct { + Message string `config:"message"` + Count uint64 `config:"count"` + Threads uint8 `config:"threads"` + Eps uint64 `config:"eps"` +} + +var ( + defaultConfig = benchmarkConfig{ + Message: "generic benchmark message", + Threads: 1, + } +) + +func (c *benchmarkConfig) Validate() error { + if c.Count > 0 && c.Eps > 0 { + return fmt.Errorf("only one of count or eps may be specified, not both") + } + if c.Message == "" { + return fmt.Errorf("message must be specified") + } + return nil +} diff --git a/x-pack/filebeat/input/benchmark/config_test.go b/x-pack/filebeat/input/benchmark/config_test.go new file mode 100644 index 000000000000..0481485d7e83 --- /dev/null +++ b/x-pack/filebeat/input/benchmark/config_test.go @@ -0,0 +1,37 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package benchmark + +import ( + "strings" + "testing" +) + +func TestValidate(t *testing.T) { + tests := map[string]struct { + cfg benchmarkConfig + expectError bool + errorString string + }{ + "default": {cfg: defaultConfig}, + "countAndEps": {cfg: benchmarkConfig{Message: "a", Count: 1, Eps: 1}, expectError: true, errorString: "only one of count or eps may be specified"}, + "empty": {cfg: benchmarkConfig{}, expectError: true, errorString: "message must be specified"}, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + err := tc.cfg.Validate() + if err == nil && tc.expectError == true { + t.Fatalf("expected validation error, didn't get it") + } + if err != nil && tc.expectError == false { + t.Fatalf("unexpected validation error: %s", err) + } + if err != nil && !strings.Contains(err.Error(), tc.errorString) { + t.Fatalf("error: '%s' didn't contain expected string: '%s'", err, tc.errorString) + } + }) + } +} diff --git a/x-pack/filebeat/input/benchmark/input.go b/x-pack/filebeat/input/benchmark/input.go new file mode 100644 index 000000000000..dd6d198cc409 --- /dev/null +++ b/x-pack/filebeat/input/benchmark/input.go @@ -0,0 +1,176 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package benchmark + +import ( + "sync" + "time" + + "github.com/rcrowley/go-metrics" + + v2 "github.com/elastic/beats/v7/filebeat/input/v2" + stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless" + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/feature" + "github.com/elastic/beats/v7/libbeat/monitoring/inputmon" + "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/mapstr" + "github.com/elastic/elastic-agent-libs/monitoring" + "github.com/elastic/elastic-agent-libs/monitoring/adapter" +) + +const ( + inputName = "benchmark" +) + +// Plugin registers the input +func Plugin() v2.Plugin { + return v2.Plugin{ + Name: inputName, + Stability: feature.Experimental, + Manager: stateless.NewInputManager(configure), + } +} + +func configure(cfg *config.C) (stateless.Input, error) { + bConf := defaultConfig + if err := cfg.Unpack(&bConf); err != nil { + return nil, err + } + return &benchmarkInput{cfg: bConf}, nil +} + +// benchmarkInput is the main runtime object for the input +type benchmarkInput struct { + cfg benchmarkConfig +} + +// Name returns the name of the input +func (bi *benchmarkInput) Name() string { + return inputName +} + +// Test validates the configuration +func (bi *benchmarkInput) Test(ctx v2.TestContext) error { + return bi.cfg.Validate() +} + +// Run starts the data generation. +func (bi *benchmarkInput) Run(ctx v2.Context, publisher stateless.Publisher) error { + var wg sync.WaitGroup + metrics := newInputMetrics(ctx.ID) + + for i := uint8(0); i < bi.cfg.Threads; i++ { + wg.Add(1) + go func(thread uint8) { + defer wg.Done() + runThread(ctx, publisher, thread, bi.cfg, metrics) + }(i) + } + wg.Wait() + return ctx.Cancelation.Err() +} + +func runThread(ctx v2.Context, publisher stateless.Publisher, thread uint8, cfg benchmarkConfig, metrics *inputMetrics) { + ctx.Logger.Infof("starting benchmark input thread: %d", thread) + defer ctx.Logger.Infof("stopping benchmark input thread: %d", thread) + + var line uint64 + var name uint64 + + switch { + case cfg.Count > 0: + for { + select { + case <-ctx.Cancelation.Done(): + return + default: + publishEvt(publisher, cfg.Message, line, name, thread, metrics) + line++ + if line == cfg.Count { + return + } + } + } + case cfg.Eps > 0: + ticker := time.NewTicker(1 * time.Second) + pubChan := make(chan bool, int(cfg.Eps)) + for { + select { + case <-ctx.Cancelation.Done(): + ticker.Stop() + return + case <-ticker.C: + //don't want to block on filling doPublish channel + //so only send as many as it can hold right now + numToSend := cap(pubChan) - len(pubChan) + for i := 0; i < numToSend; i++ { + pubChan <- true + } + case <-pubChan: + publishEvt(publisher, cfg.Message, line, name, thread, metrics) + line++ + if line == 0 { + name++ + } + } + } + default: + for { + select { + case <-ctx.Cancelation.Done(): + return + default: + publishEvt(publisher, cfg.Message, line, name, thread, metrics) + line++ + if line == 0 { + name++ + } + } + } + } +} + +func publishEvt(publisher stateless.Publisher, msg string, line uint64, filename uint64, thread uint8, metrics *inputMetrics) { + timestamp := time.Now() + evt := beat.Event{ + Timestamp: timestamp, + Fields: mapstr.M{ + "message": msg, + "line": line, + "filename": filename, + "thread": thread, + }, + } + publisher.Publish(evt) + metrics.publishingTime.Update(time.Since(timestamp).Nanoseconds()) + metrics.eventsPublished.Add(1) +} + +type inputMetrics struct { + unregister func() + + eventsPublished *monitoring.Uint // number of events published + publishingTime metrics.Sample // histogram of the elapsed times in nanoseconds (time of publisher.Publish) +} + +// newInputMetrics returns an input metric for the benchmark processor. +func newInputMetrics(id string) *inputMetrics { + reg, unreg := inputmon.NewInputRegistry(inputName, id, nil) + out := &inputMetrics{ + unregister: unreg, + eventsPublished: monitoring.NewUint(reg, "events_published_total"), + publishingTime: metrics.NewUniformSample(1024), + } + + _ = adapter.NewGoMetrics(reg, "publishing_time", adapter.Accept). + Register("histogram", metrics.NewHistogram(out.publishingTime)) + + return out +} + +func (m *inputMetrics) Close() { + m.unregister() +} diff --git a/x-pack/filebeat/input/default-inputs/inputs_other.go b/x-pack/filebeat/input/default-inputs/inputs_other.go index ab682e4e0010..e53538fbcef8 100644 --- a/x-pack/filebeat/input/default-inputs/inputs_other.go +++ b/x-pack/filebeat/input/default-inputs/inputs_other.go @@ -13,6 +13,7 @@ import ( "github.com/elastic/beats/v7/x-pack/filebeat/input/awscloudwatch" "github.com/elastic/beats/v7/x-pack/filebeat/input/awss3" "github.com/elastic/beats/v7/x-pack/filebeat/input/azureblobstorage" + "github.com/elastic/beats/v7/x-pack/filebeat/input/benchmark" "github.com/elastic/beats/v7/x-pack/filebeat/input/cel" "github.com/elastic/beats/v7/x-pack/filebeat/input/cloudfoundry" "github.com/elastic/beats/v7/x-pack/filebeat/input/entityanalytics" @@ -45,5 +46,6 @@ func xpackInputs(info beat.Info, log *logp.Logger, store beater.StateStore) []v2 shipper.Plugin(log, store), websocket.Plugin(log, store), netflow.Plugin(log), + benchmark.Plugin(), } }