From 5e4c60b80da6ed3458bfaf2e21cf84357ee890fd Mon Sep 17 00:00:00 2001 From: Silvia Mitter Date: Mon, 25 Feb 2019 14:31:46 +0100 Subject: [PATCH] Use IndexPrefix for kafka and logstash output. (#10841) Output index differed to Elasticsearch output. fixes #10839 --- libbeat/outputs/kafka/config.go | 8 ++ libbeat/outputs/kafka/config_test.go | 13 +-- libbeat/outputs/kafka/kafka.go | 8 +- .../outputs/kafka/kafka_integration_test.go | 3 +- libbeat/outputs/logstash/async_test.go | 2 +- libbeat/outputs/logstash/config.go | 55 ++++++---- libbeat/outputs/logstash/config_test.go | 100 ++++++++++++++++++ libbeat/outputs/logstash/logstash.go | 12 +-- libbeat/outputs/logstash/sync_test.go | 2 +- libbeat/outputs/logstash/window_test.go | 2 +- 10 files changed, 160 insertions(+), 45 deletions(-) create mode 100644 libbeat/outputs/logstash/config_test.go diff --git a/libbeat/outputs/kafka/config.go b/libbeat/outputs/kafka/config.go index 9cf81d342611..48a46491f918 100644 --- a/libbeat/outputs/kafka/config.go +++ b/libbeat/outputs/kafka/config.go @@ -106,6 +106,14 @@ func defaultConfig() kafkaConfig { } } +func readConfig(cfg *common.Config) (*kafkaConfig, error) { + c := defaultConfig() + if err := cfg.Unpack(&c); err != nil { + return nil, err + } + return &c, nil +} + func (c *kafkaConfig) Validate() error { if len(c.Hosts) == 0 { return errors.New("no hosts configured") diff --git a/libbeat/outputs/kafka/config_test.go b/libbeat/outputs/kafka/config_test.go index 262da4f9c056..dd74b50ed3d7 100644 --- a/libbeat/outputs/kafka/config_test.go +++ b/libbeat/outputs/kafka/config_test.go @@ -39,18 +39,13 @@ func TestConfigAcceptValid(t *testing.T) { for name, test := range tests { test := test t.Run(name, func(t *testing.T) { - c, err := common.NewConfigFrom(test) + c := common.MustNewConfigFrom(test) + c.SetString("hosts", 0, "localhost") + cfg, err := readConfig(c) if err != nil { t.Fatalf("Can not create test configuration: %v", err) } - c.SetString("hosts", 0, "localhost") - - cfg := defaultConfig() - if err := c.Unpack(&cfg); err != nil { - t.Fatalf("Unpacking configuration failed: %v", err) - } - - if _, err := newSaramaConfig(&cfg); err != nil { + if _, err := newSaramaConfig(cfg); err != nil { t.Fatalf("Failure creating sarama config: %v", err) } }) diff --git a/libbeat/outputs/kafka/kafka.go b/libbeat/outputs/kafka/kafka.go index d9f9e86ac18c..e795dbd3671c 100644 --- a/libbeat/outputs/kafka/kafka.go +++ b/libbeat/outputs/kafka/kafka.go @@ -76,8 +76,8 @@ func makeKafka( ) (outputs.Group, error) { debugf("initialize kafka output") - config := defaultConfig() - if err := cfg.Unpack(&config); err != nil { + config, err := readConfig(cfg) + if err != nil { return outputs.Fail(err) } @@ -91,7 +91,7 @@ func makeKafka( return outputs.Fail(err) } - libCfg, err := newSaramaConfig(&config) + libCfg, err := newSaramaConfig(config) if err != nil { return outputs.Fail(err) } @@ -106,7 +106,7 @@ func makeKafka( return outputs.Fail(err) } - client, err := newKafkaClient(observer, hosts, beat.Beat, config.Key, topic, codec, libCfg) + client, err := newKafkaClient(observer, hosts, beat.IndexPrefix, config.Key, topic, codec, libCfg) if err != nil { return outputs.Fail(err) } diff --git a/libbeat/outputs/kafka/kafka_integration_test.go b/libbeat/outputs/kafka/kafka_integration_test.go index 78df0a12a081..142cf478b1e0 100644 --- a/libbeat/outputs/kafka/kafka_integration_test.go +++ b/libbeat/outputs/kafka/kafka_integration_test.go @@ -199,7 +199,7 @@ func TestKafkaPublish(t *testing.T) { } t.Run(name, func(t *testing.T) { - grp, err := makeKafka(beat.Info{Beat: "libbeat"}, outputs.NewNilObserver(), cfg) + grp, err := makeKafka(beat.Info{Beat: "libbeat", IndexPrefix: "testbeat"}, outputs.NewNilObserver(), cfg) if err != nil { t.Fatal(err) } @@ -208,6 +208,7 @@ func TestKafkaPublish(t *testing.T) { if err := output.Connect(); err != nil { t.Fatal(err) } + assert.Equal(t, output.index, "testbeat") defer output.Close() // publish test events diff --git a/libbeat/outputs/logstash/async_test.go b/libbeat/outputs/logstash/async_test.go index d02c3d348e88..b99fb9a57491 100644 --- a/libbeat/outputs/logstash/async_test.go +++ b/libbeat/outputs/logstash/async_test.go @@ -50,7 +50,7 @@ func TestAsyncStructuredEvent(t *testing.T) { } func makeAsyncTestClient(conn *transport.Client) testClientDriver { - config := defaultConfig + config := defaultConfig() config.Timeout = 1 * time.Second config.Pipelining = 3 client, err := newAsyncClient(beat.Info{}, conn, outputs.NewNilObserver(), &config) diff --git a/libbeat/outputs/logstash/config.go b/libbeat/outputs/logstash/config.go index 36e2a6187166..b5122ccb8feb 100644 --- a/libbeat/outputs/logstash/config.go +++ b/libbeat/outputs/logstash/config.go @@ -20,6 +20,10 @@ package logstash import ( "time" + "github.com/elastic/beats/libbeat/beat" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/cfgwarn" "github.com/elastic/beats/libbeat/common/transport/tlscommon" "github.com/elastic/beats/libbeat/outputs/transport" ) @@ -46,24 +50,39 @@ type Backoff struct { Max time.Duration } -var defaultConfig = Config{ - Port: 5044, - LoadBalance: false, - Pipelining: 2, - BulkMaxSize: 2048, - SlowStart: false, - CompressionLevel: 3, - Timeout: 30 * time.Second, - MaxRetries: 3, - TTL: 0 * time.Second, - Backoff: Backoff{ - Init: 1 * time.Second, - Max: 60 * time.Second, - }, - EscapeHTML: true, +func defaultConfig() Config { + return Config{ + Port: 5044, + LoadBalance: false, + Pipelining: 2, + BulkMaxSize: 2048, + SlowStart: false, + CompressionLevel: 3, + Timeout: 30 * time.Second, + MaxRetries: 3, + TTL: 0 * time.Second, + Backoff: Backoff{ + Init: 1 * time.Second, + Max: 60 * time.Second, + }, + EscapeHTML: false, + } } -func newConfig() *Config { - c := defaultConfig - return &c +func readConfig(cfg *common.Config, info beat.Info) (*Config, error) { + c := defaultConfig() + + if err := cfg.Unpack(&c); err != nil { + return nil, err + } + + if cfg.HasField("port") { + cfgwarn.Deprecate("7.0.0", "The Logstash outputs port setting") + } + + if c.Index == "" { + c.Index = info.IndexPrefix + } + + return &c, nil } diff --git a/libbeat/outputs/logstash/config_test.go b/libbeat/outputs/logstash/config_test.go new file mode 100644 index 000000000000..ee3ffe179786 --- /dev/null +++ b/libbeat/outputs/logstash/config_test.go @@ -0,0 +1,100 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package logstash + +import ( + "testing" + "time" + + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" + + "github.com/stretchr/testify/assert" +) + +func TestConfig(t *testing.T) { + + info := beat.Info{Beat: "testbeat", Name: "foo", IndexPrefix: "bar"} + for name, test := range map[string]struct { + config *common.Config + expectedConfig *Config + err bool + }{ + "default config": { + config: common.MustNewConfigFrom([]byte(`{ }`)), + expectedConfig: &Config{ + LoadBalance: false, + Pipelining: 2, + BulkMaxSize: 2048, + SlowStart: false, + CompressionLevel: 3, + Timeout: 30 * time.Second, + MaxRetries: 3, + TTL: 0 * time.Second, + Backoff: Backoff{ + Init: 1 * time.Second, + Max: 60 * time.Second, + }, + EscapeHTML: false, + Index: "bar", + }, + }, + "config given": { + config: common.MustNewConfigFrom(common.MapStr{ + "index": "beat-index", + "loadbalance": true, + "bulk_max_size": 1024, + "slow_start": false, + }), + expectedConfig: &Config{ + LoadBalance: true, + BulkMaxSize: 1024, + Pipelining: 2, + SlowStart: false, + CompressionLevel: 3, + Timeout: 30 * time.Second, + MaxRetries: 3, + TTL: 0 * time.Second, + Backoff: Backoff{ + Init: 1 * time.Second, + Max: 60 * time.Second, + }, + EscapeHTML: false, + Index: "beat-index", + }, + }, + "removed config setting": { + config: common.MustNewConfigFrom(common.MapStr{ + "port": "8080", + }), + expectedConfig: nil, + err: true, + }, + } { + t.Run(name, func(t *testing.T) { + cfg, err := readConfig(test.config, info) + if test.err { + assert.Error(t, err) + assert.Nil(t, cfg) + } else { + assert.NoError(t, err) + assert.Equal(t, test.expectedConfig, cfg) + } + }) + } +} diff --git a/libbeat/outputs/logstash/logstash.go b/libbeat/outputs/logstash/logstash.go index cf6a40ed88f8..81785d2797a4 100644 --- a/libbeat/outputs/logstash/logstash.go +++ b/libbeat/outputs/logstash/logstash.go @@ -20,7 +20,6 @@ package logstash import ( "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/libbeat/common/cfgwarn" "github.com/elastic/beats/libbeat/common/transport/tlscommon" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/outputs" @@ -43,19 +42,12 @@ func makeLogstash( observer outputs.Observer, cfg *common.Config, ) (outputs.Group, error) { - if !cfg.HasField("index") { - cfg.SetString("index", -1, beat.Beat) - } - config := newConfig() - if err := cfg.Unpack(config); err != nil { + config, err := readConfig(cfg, beat) + if err != nil { return outputs.Fail(err) } - if cfg.HasField("port") { - cfgwarn.Deprecate("7.0.0", "The Logstash outputs port setting") - } - hosts, err := outputs.ReadHostList(cfg) if err != nil { return outputs.Fail(err) diff --git a/libbeat/outputs/logstash/sync_test.go b/libbeat/outputs/logstash/sync_test.go index dc48309211dd..f9d74cb69cba 100644 --- a/libbeat/outputs/logstash/sync_test.go +++ b/libbeat/outputs/logstash/sync_test.go @@ -63,7 +63,7 @@ func newClientServerTCP(t *testing.T, to time.Duration) *clientServer { } func makeTestClient(conn *transport.Client) testClientDriver { - config := defaultConfig + config := defaultConfig() config.Timeout = 1 * time.Second config.TTL = 5 * time.Second client, err := newSyncClient(beat.Info{}, conn, outputs.NewNilObserver(), &config) diff --git a/libbeat/outputs/logstash/window_test.go b/libbeat/outputs/logstash/window_test.go index 773f9c374df9..ab5e64f85087 100644 --- a/libbeat/outputs/logstash/window_test.go +++ b/libbeat/outputs/logstash/window_test.go @@ -30,7 +30,7 @@ func TestShrinkWindowSizeNeverZero(t *testing.T) { windowSize := 124 var w window - w.init(windowSize, defaultConfig.BulkMaxSize) + w.init(windowSize, defaultConfig().BulkMaxSize) w.windowSize = int32(windowSize) for i := 0; i < 100; i++ {