From a72f7c7661e1a7ad85ea21016a05a6c8f910dddb Mon Sep 17 00:00:00 2001 From: Michael Bischoff Date: Thu, 12 Aug 2021 01:26:36 +0200 Subject: [PATCH 1/9] using input v2 adding config for parsers --- filebeat/input/default-inputs/inputs.go | 2 + filebeat/input/kafka/config.go | 5 +- filebeat/input/kafka/input.go | 233 +++++++++--------- filebeat/input/kafka/input_test.go | 15 +- .../input/kafka/kafka_integration_test.go | 130 ++++------ 5 files changed, 174 insertions(+), 211 deletions(-) diff --git a/filebeat/input/default-inputs/inputs.go b/filebeat/input/default-inputs/inputs.go index 881f3664efd..95dd18e35f6 100644 --- a/filebeat/input/default-inputs/inputs.go +++ b/filebeat/input/default-inputs/inputs.go @@ -20,6 +20,7 @@ package inputs import ( "github.com/elastic/beats/v7/filebeat/beater" "github.com/elastic/beats/v7/filebeat/input/filestream" + "github.com/elastic/beats/v7/filebeat/input/kafka" "github.com/elastic/beats/v7/filebeat/input/unix" v2 "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/libbeat/beat" @@ -36,6 +37,7 @@ func Init(info beat.Info, log *logp.Logger, components beater.StateStore) []v2.P func genericInputs(log *logp.Logger, components beater.StateStore) []v2.Plugin { return []v2.Plugin{ filestream.Plugin(log, components), + kafka.Plugin(), unix.Plugin(), } } diff --git a/filebeat/input/kafka/config.go b/filebeat/input/kafka/config.go index c69b2522a4f..69c5f80350d 100644 --- a/filebeat/input/kafka/config.go +++ b/filebeat/input/kafka/config.go @@ -23,14 +23,13 @@ import ( "time" "github.com/Shopify/sarama" - "github.com/elastic/beats/v7/libbeat/common/cfgwarn" "github.com/elastic/beats/v7/libbeat/common/kafka" "github.com/elastic/beats/v7/libbeat/common/transport/kerberos" "github.com/elastic/beats/v7/libbeat/common/transport/tlscommon" - "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/monitoring" "github.com/elastic/beats/v7/libbeat/monitoring/adapter" + "github.com/elastic/beats/v7/libbeat/reader/parser" ) type kafkaInputConfig struct { @@ -53,6 +52,7 @@ type kafkaInputConfig struct { Username string `config:"username"` Password string `config:"password"` ExpandEventListFromField string `config:"expand_event_list_from_field"` + Parsers parser.Config `config:",inline"` } type kafkaFetch struct { @@ -215,7 +215,6 @@ func newSaramaConfig(config kafkaInputConfig) (*sarama.Config, error) { ) if err := k.Validate(); err != nil { - logp.Err("Invalid kafka configuration: %v", err) return nil, err } return k, nil diff --git a/filebeat/input/kafka/input.go b/filebeat/input/kafka/input.go index add3c664224..79856c77417 100644 --- a/filebeat/input/kafka/input.go +++ b/filebeat/input/kafka/input.go @@ -26,50 +26,65 @@ import ( "time" "github.com/Shopify/sarama" - - "github.com/elastic/beats/v7/filebeat/channel" - "github.com/elastic/beats/v7/filebeat/input" + input "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/acker" "github.com/elastic/beats/v7/libbeat/common/backoff" "github.com/elastic/beats/v7/libbeat/common/kafka" + "github.com/elastic/beats/v7/libbeat/feature" "github.com/elastic/beats/v7/libbeat/logp" - + "github.com/elastic/beats/v7/libbeat/reader/parser" "github.com/pkg/errors" ) -func init() { - err := input.Register("kafka", NewInput) +const pluginName = "kafka" + +// Plugin creates a new filestream input plugin for creating a stateful input. +func Plugin() input.Plugin { + return input.Plugin{ + Name: pluginName, + Stability: feature.Stable, + Deprecated: false, + Info: "Kafka input", + Doc: "The Kafka input consumes events from topics by connecting to the configured kafka brokers", + Manager: input.ConfigureWith(configure), + } +} + +func configure(cfg *common.Config) (input.Input, error) { + config := defaultConfig() + if err := cfg.Unpack(&config); err != nil { + return nil, err + } + + saramaConfig, err := newSaramaConfig(config) if err != nil { - panic(err) + return nil, errors.Wrap(err, "initializing Sarama config") } + return NewInput(config, saramaConfig) +} + +func NewInput(config kafkaInputConfig, saramaConfig *sarama.Config) (*kafkaInput, error) { + return &kafkaInput{config: config, saramaConfig: saramaConfig}, nil } -// Input contains the input and its config type kafkaInput struct { config kafkaInputConfig saramaConfig *sarama.Config - context input.Context - outlet channel.Outleter saramaWaitGroup sync.WaitGroup // indicates a sarama consumer group is active - log *logp.Logger - runOnce sync.Once } -// NewInput creates a new kafka input -func NewInput( - cfg *common.Config, - connector channel.Connector, - inputContext input.Context, -) (input.Input, error) { +func (input *kafkaInput) Name() string { return pluginName } - config := defaultConfig() - if err := cfg.Unpack(&config); err != nil { - return nil, errors.Wrap(err, "reading kafka input config") - } +func (input *kafkaInput) Test(_ input.TestContext) error { + return nil +} + +func (input *kafkaInput) Run(ctx input.Context, pipeline beat.Pipeline) error { + log := ctx.Logger.Named("kafka input").With("hosts", input.config.Hosts) - out, err := connector.ConnectWith(cfg, beat.ClientConfig{ + client, err := pipeline.ConnectWith(beat.ClientConfig{ ACKHandler: acker.ConnectionOnly( acker.EventPrivateReporter(func(_ int, events []interface{}) { for _, event := range events { @@ -79,38 +94,83 @@ func NewInput( } }), ), - CloseRef: doneChannelContext(inputContext.Done), - WaitClose: config.WaitClose, + CloseRef: ctx.Cancelation, + WaitClose: input.config.WaitClose, }) if err != nil { - return nil, err + return err } - saramaConfig, err := newSaramaConfig(config) - if err != nil { - return nil, errors.Wrap(err, "initializing Sarama config") + log.Info("Starting Kafka input") + defer log.Info("Kafka input stopped") + + // Sarama uses standard go contexts to control cancellation, so we need + // to wrap our input context channel in that interface. + goContext := doneChannelContext(ctx) + + // If the consumer fails to connect, we use exponential backoff with + // jitter up to 8 * the initial backoff interval. + connectDelay := backoff.NewEqualJitterBackoff( + ctx.Cancelation.Done(), + input.config.ConnectBackoff, + 8*input.config.ConnectBackoff, + ) + + for goContext.Err() == nil { + // Connect to Kafka with a new consumer group. + consumerGroup, err := sarama.NewConsumerGroup( + input.config.Hosts, + input.config.GroupID, + input.saramaConfig, + ) + if err != nil { + log.Errorw("Error initializing kafka consumer group", "error", err) + connectDelay.Wait() + continue + } + // We've successfully connected, reset the backoff timer. + connectDelay.Reset() + + // We have a connected consumer group now, try to start the main event + // loop by calling Consume (which starts an asynchronous consumer). + // In an ideal run, this function never returns until shutdown; if it + // does, it means the errors have been logged and the consumer group + // has been closed, so we try creating a new one in the next iteration. + input.runConsumerGroup(log, client, goContext, consumerGroup) } - input := &kafkaInput{ - config: config, - saramaConfig: saramaConfig, - context: inputContext, - outlet: out, - log: logp.NewLogger("kafka input").With("hosts", config.Hosts), + if ctx.Cancelation.Err() == context.Canceled { + return nil + } else { + return ctx.Cancelation.Err() } +} + +// Stop doesn't need to do anything because the kafka consumer group and the +// input's outlet both have a context based on input.context.Done and will +// shut themselves down, since the done channel is already closed as part of +// the shutdown process in Runner.Stop(). +func (input *kafkaInput) Stop() { +} - return input, nil +// Wait should shut down the input and wait for it to complete, however (see +// Stop above) we don't need to take actions to shut down as long as the +// input.config.Done channel is closed, so we just make a (currently no-op) +// call to Stop() and then wait for sarama to signal completion. +func (input *kafkaInput) Wait() { + input.Stop() + // Wait for sarama to shut down + input.saramaWaitGroup.Wait() } -func (input *kafkaInput) runConsumerGroup( - context context.Context, consumerGroup sarama.ConsumerGroup, -) { +func (input *kafkaInput) runConsumerGroup(log *logp.Logger, client beat.Client, context context.Context, consumerGroup sarama.ConsumerGroup) { handler := &groupHandler{ version: input.config.Version, - outlet: input.outlet, + client: client, + parsers: input.config.Parsers, // expandEventListFromField will be assigned the configuration option expand_event_list_from_field expandEventListFromField: input.config.ExpandEventListFromField, - log: input.log, + log: log, } input.saramaWaitGroup.Add(1) @@ -122,72 +182,16 @@ func (input *kafkaInput) runConsumerGroup( // Listen asynchronously to any errors during the consume process go func() { for err := range consumerGroup.Errors() { - input.log.Errorw("Error reading from kafka", "error", err) + log.Errorw("Error reading from kafka", "error", err) } }() err := consumerGroup.Consume(context, input.config.Topics, handler) if err != nil { - input.log.Errorw("Kafka consume error", "error", err) + log.Errorw("Kafka consume error", "error", err) } } -// Run starts the input by scanning for incoming messages and errors. -func (input *kafkaInput) Run() { - input.runOnce.Do(func() { - go func() { - // Sarama uses standard go contexts to control cancellation, so we need - // to wrap our input context channel in that interface. - context := doneChannelContext(input.context.Done) - - // If the consumer fails to connect, we use exponential backoff with - // jitter up to 8 * the initial backoff interval. - backoff := backoff.NewEqualJitterBackoff( - input.context.Done, - input.config.ConnectBackoff, - 8*input.config.ConnectBackoff) - - for context.Err() == nil { - // Connect to Kafka with a new consumer group. - consumerGroup, err := sarama.NewConsumerGroup( - input.config.Hosts, input.config.GroupID, input.saramaConfig) - if err != nil { - input.log.Errorw( - "Error initializing kafka consumer group", "error", err) - backoff.Wait() - continue - } - // We've successfully connected, reset the backoff timer. - backoff.Reset() - - // We have a connected consumer group now, try to start the main event - // loop by calling Consume (which starts an asynchronous consumer). - // In an ideal run, this function never returns until shutdown; if it - // does, it means the errors have been logged and the consumer group - // has been closed, so we try creating a new one in the next iteration. - input.runConsumerGroup(context, consumerGroup) - } - }() - }) -} - -// Stop doesn't need to do anything because the kafka consumer group and the -// input's outlet both have a context based on input.context.Done and will -// shut themselves down, since the done channel is already closed as part of -// the shutdown process in Runner.Stop(). -func (input *kafkaInput) Stop() { -} - -// Wait should shut down the input and wait for it to complete, however (see -// Stop above) we don't need to take actions to shut down as long as the -// input.config.Done channel is closed, so we just make a (currently no-op) -// call to Stop() and then wait for sarama to signal completion. -func (input *kafkaInput) Wait() { - input.Stop() - // Wait for sarama to shut down - input.saramaWaitGroup.Wait() -} - func arrayForKafkaHeaders(headers []*sarama.RecordHeader) []string { array := []string{} for _, header := range headers { @@ -209,25 +213,23 @@ func arrayForKafkaHeaders(headers []*sarama.RecordHeader) []string { // channels that are more common in the beats codebase. // TODO(faec): Generalize this to a common utility in a shared library // (https://github.com/elastic/beats/issues/13125). -type channelCtx <-chan struct{} - -func doneChannelContext(ch <-chan struct{}) context.Context { - return channelCtx(ch) +type channelCtx struct { + ctx input.Context } -func (c channelCtx) Deadline() (deadline time.Time, ok bool) { return } +func doneChannelContext(ctx input.Context) context.Context { + return channelCtx{ctx} +} +func (c channelCtx) Deadline() (deadline time.Time, ok bool) { + return +} func (c channelCtx) Done() <-chan struct{} { - return (<-chan struct{})(c) + return c.ctx.Cancelation.Done() } func (c channelCtx) Err() error { - select { - case <-c: - return context.Canceled - default: - return nil - } + return c.ctx.Cancelation.Err() } -func (c channelCtx) Value(key interface{}) interface{} { return nil } +func (c channelCtx) Value(_ interface{}) interface{} { return nil } // The group handler for the sarama consumer group interface. In addition to // providing the basic consumption callbacks needed by sarama, groupHandler is @@ -237,7 +239,8 @@ type groupHandler struct { sync.Mutex version kafka.Version session sarama.ConsumerGroupSession - outlet channel.Outleter + client beat.Client + parsers parser.Config // if the fileset using this input expects to receive multiple messages bundled under a specific field then this value is assigned // ex. in this case are the azure fielsets where the events are found under the json object "records" expandEventListFromField string @@ -252,7 +255,7 @@ type eventMeta struct { } func (h *groupHandler) createEvents( - sess sarama.ConsumerGroupSession, + _ sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim, message *sarama.ConsumerMessage, ) []beat.Event { @@ -329,7 +332,7 @@ func (h *groupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sara for msg := range claim.Messages() { events := h.createEvents(sess, claim, msg) for _, event := range events { - h.outlet.OnEvent(event) + h.client.Publish(event) } } return nil diff --git a/filebeat/input/kafka/input_test.go b/filebeat/input/kafka/input_test.go index e83c0e908a8..ed72dfc5fc6 100644 --- a/filebeat/input/kafka/input_test.go +++ b/filebeat/input/kafka/input_test.go @@ -21,16 +21,13 @@ package kafka import ( "testing" - - "github.com/elastic/beats/v7/filebeat/input/inputtest" - "github.com/elastic/beats/v7/libbeat/common" ) func TestNewInputDone(t *testing.T) { - config := common.MapStr{ - "hosts": "localhost:9092", - "topics": "messages", - "group_id": "filebeat", - } - inputtest.AssertNotStartedInputCanBeDone(t, NewInput, &config) + //config := common.MapStr{ + // "hosts": "localhost:9092", + // "topics": "messages", + // "group_id": "filebeat", + //} + // TODO find v2 equivalent inputtest.AssertNotStartedInputCanBeDone(t, NewInput, &config) } diff --git a/filebeat/input/kafka/kafka_integration_test.go b/filebeat/input/kafka/kafka_integration_test.go index c7ad9fc999c..feb4471e201 100644 --- a/filebeat/input/kafka/kafka_integration_test.go +++ b/filebeat/input/kafka/kafka_integration_test.go @@ -20,20 +20,21 @@ package kafka import ( + "context" "fmt" + v2 "github.com/elastic/beats/v7/filebeat/input/v2" + "github.com/elastic/beats/v7/libbeat/logp" + beattest "github.com/elastic/beats/v7/libbeat/publisher/testing" "math/rand" "os" "strconv" "strings" - "sync" "testing" "time" "github.com/Shopify/sarama" "github.com/stretchr/testify/assert" - "github.com/elastic/beats/v7/filebeat/channel" - "github.com/elastic/beats/v7/filebeat/input" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" _ "github.com/elastic/beats/v7/libbeat/outputs/codec/format" @@ -45,41 +46,6 @@ const ( kafkaDefaultPort = "9092" ) -type eventInfo struct { - events []beat.Event -} - -type eventCapturer struct { - closed bool - c chan struct{} - closeOnce sync.Once - events chan beat.Event -} - -func NewEventCapturer(events chan beat.Event) channel.Outleter { - return &eventCapturer{ - c: make(chan struct{}), - events: events, - } -} - -func (o *eventCapturer) OnEvent(event beat.Event) bool { - o.events <- event - return true -} - -func (o *eventCapturer) Close() error { - o.closeOnce.Do(func() { - o.closed = true - close(o.c) - }) - return nil -} - -func (o *eventCapturer) Done() <-chan struct{} { - return o.c -} - type testMessage struct { message string headers []sarama.RecordHeader @@ -93,12 +59,7 @@ func recordHeader(key, value string) sarama.RecordHeader { } func TestInput(t *testing.T) { - id := strconv.Itoa(rand.New(rand.NewSource(int64(time.Now().Nanosecond()))).Int()) - testTopic := fmt.Sprintf("Filebeat-TestInput-%s", id) - context := input.Context{ - Done: make(chan struct{}), - BeatDone: make(chan struct{}), - } + testTopic := createTestTopicName() // Send test messages to the topic for the input to read. messages := []testMessage{ @@ -129,22 +90,10 @@ func TestInput(t *testing.T) { "wait_close": 0, }) - // Route input events through our capturer instead of sending through ES. - events := make(chan beat.Event, 100) - defer close(events) - capturer := NewEventCapturer(events) - defer capturer.Close() - connector := channel.ConnectorFunc(func(_ *common.Config, _ beat.ClientConfig) (channel.Outleter, error) { - return channel.SubOutlet(capturer), nil - }) - - input, err := NewInput(config, connector, context) - if err != nil { - t.Fatal(err) - } - - // Run the input and wait for finalization - input.Run() + client := beattest.NewChanClient(100) + defer client.Close() + events := client.Channel + cancel, input := run(t, config, client) timeout := time.After(30 * time.Second) for range messages { @@ -169,7 +118,7 @@ func TestInput(t *testing.T) { // Close the done channel and make sure the beat shuts down in a reasonable // amount of time. - close(context.Done) + cancel() didClose := make(chan struct{}) go func() { input.Wait() @@ -184,12 +133,7 @@ func TestInput(t *testing.T) { } func TestInputWithMultipleEvents(t *testing.T) { - id := strconv.Itoa(rand.New(rand.NewSource(int64(time.Now().Nanosecond()))).Int()) - testTopic := fmt.Sprintf("Filebeat-TestInput-%s", id) - context := input.Context{ - Done: make(chan struct{}), - BeatDone: make(chan struct{}), - } + testTopic := createTestTopicName() // Send test messages to the topic for the input to read. message := testMessage{ @@ -209,22 +153,10 @@ func TestInputWithMultipleEvents(t *testing.T) { "expand_event_list_from_field": "records", }) - // Route input events through our capturer instead of sending through ES. - events := make(chan beat.Event, 100) - defer close(events) - capturer := NewEventCapturer(events) - defer capturer.Close() - connector := channel.ConnectorFunc(func(_ *common.Config, _ beat.ClientConfig) (channel.Outleter, error) { - return channel.SubOutlet(capturer), nil - }) - - input, err := NewInput(config, connector, context) - if err != nil { - t.Fatal(err) - } - - // Run the input and wait for finalization - input.Run() + client := beattest.NewChanClient(100) + defer client.Close() + events := client.Channel + cancel, input := run(t, config, client) timeout := time.After(30 * time.Second) select { @@ -240,9 +172,9 @@ func TestInputWithMultipleEvents(t *testing.T) { t.Fatal("timeout waiting for incoming events") } + cancel() // Close the done channel and make sure the beat shuts down in a reasonable // amount of time. - close(context.Done) didClose := make(chan struct{}) go func() { input.Wait() @@ -256,6 +188,12 @@ func TestInputWithMultipleEvents(t *testing.T) { } } +func createTestTopicName() string { + id := strconv.Itoa(rand.New(rand.NewSource(int64(time.Now().Nanosecond()))).Int()) + testTopic := fmt.Sprintf("Filebeat-TestInput-%s", id) + return testTopic +} + func findMessage(t *testing.T, text string, msgs []testMessage) *testMessage { var msg *testMessage for _, m := range msgs { @@ -357,3 +295,27 @@ func writeToKafkaTopic( t.Fatal(err) } } + +func run(t *testing.T, cfg *common.Config, client *beattest.ChanClient) (func(), *kafkaInput) { + inp, err := Plugin().Manager.Create(cfg) + if err != nil { + t.Fatal(err) + } + + ctx, cancel := newV2Context() + t.Cleanup(cancel) + + pipeline := beattest.ConstClient(client) + input := inp.(*kafkaInput) + go input.Run(ctx, pipeline) + return cancel, input +} + +func newV2Context() (v2.Context, func()) { + ctx, cancel := context.WithCancel(context.Background()) + return v2.Context{ + Logger: logp.NewLogger("kafka_test"), + ID: "test_id", + Cancelation: ctx, + }, cancel +} From 6d1c09e18dd6669bf5a6d3cb558ae13df7e19b90 Mon Sep 17 00:00:00 2001 From: Michael Bischoff Date: Thu, 12 Aug 2021 12:02:54 +0200 Subject: [PATCH 2/9] implemented reader and parsers --- filebeat/input/kafka/input.go | 139 ++++++++++++++++------------------ 1 file changed, 65 insertions(+), 74 deletions(-) diff --git a/filebeat/input/kafka/input.go b/filebeat/input/kafka/input.go index 79856c77417..a6e98077342 100644 --- a/filebeat/input/kafka/input.go +++ b/filebeat/input/kafka/input.go @@ -19,8 +19,9 @@ package kafka import ( "context" - "encoding/json" "fmt" + "github.com/elastic/beats/v7/libbeat/reader" + "io" "strings" "sync" "time" @@ -243,8 +244,9 @@ type groupHandler struct { parsers parser.Config // if the fileset using this input expects to receive multiple messages bundled under a specific field then this value is assigned // ex. in this case are the azure fielsets where the events are found under the json object "records" - expandEventListFromField string + expandEventListFromField string // TODO log *logp.Logger + reader reader.Reader } // The metadata attached to incoming events so they can be ACKed once they've @@ -254,56 +256,6 @@ type eventMeta struct { message *sarama.ConsumerMessage } -func (h *groupHandler) createEvents( - _ sarama.ConsumerGroupSession, - claim sarama.ConsumerGroupClaim, - message *sarama.ConsumerMessage, -) []beat.Event { - timestamp := time.Now() - kafkaFields := common.MapStr{ - "topic": claim.Topic(), - "partition": claim.Partition(), - "offset": message.Offset, - "key": string(message.Key), - } - - version, versionOk := h.version.Get() - if versionOk && version.IsAtLeast(sarama.V0_10_0_0) { - timestamp = message.Timestamp - if !message.BlockTimestamp.IsZero() { - kafkaFields["block_timestamp"] = message.BlockTimestamp - } - } - if versionOk && version.IsAtLeast(sarama.V0_11_0_0) { - kafkaFields["headers"] = arrayForKafkaHeaders(message.Headers) - } - - // if expandEventListFromField has been set, then a check for the actual json object will be done and a return for multiple messages is executed - var events []beat.Event - var messages []string - if h.expandEventListFromField == "" { - messages = []string{string(message.Value)} - } else { - messages = h.parseMultipleMessages(message.Value) - } - for _, msg := range messages { - event := beat.Event{ - Timestamp: timestamp, - Fields: common.MapStr{ - "message": msg, - "kafka": kafkaFields, - }, - Private: eventMeta{ - handler: h, - message: message, - }, - } - events = append(events, event) - - } - return events -} - func (h *groupHandler) Setup(session sarama.ConsumerGroupSession) error { h.Lock() h.session = session @@ -328,34 +280,73 @@ func (h *groupHandler) ack(message *sarama.ConsumerMessage) { } } -func (h *groupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { - for msg := range claim.Messages() { - events := h.createEvents(sess, claim, msg) - for _, event := range events { - h.client.Publish(event) +func (h *groupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { + reader := messageReader{ + claim: claim, + groupHandler: h, + } + parser := h.parsers.Create(reader) + for h.session.Context().Err() == nil { + message, err := parser.Next() + if err == io.EOF { + return nil } + if err != nil { + return err + } + h.client.Publish(beat.Event{ + Timestamp: message.Ts, + Meta: message.Meta, + Fields: message.Fields, + }) } return nil } -// parseMultipleMessages will try to split the message into multiple ones based on the group field provided by the configuration -func (h *groupHandler) parseMultipleMessages(bMessage []byte) []string { - var obj map[string][]interface{} - err := json.Unmarshal(bMessage, &obj) - if err != nil { - h.log.Errorw(fmt.Sprintf("Kafka desirializing multiple messages using the group object %s", h.expandEventListFromField), "error", err) - return []string{} +type messageReader struct { + claim sarama.ConsumerGroupClaim + groupHandler *groupHandler +} + +func (m messageReader) Close() error { + return nil +} + +func (m messageReader) Next() (reader.Message, error) { + msg, ok := <-m.claim.Messages() + if !ok { + return reader.Message{}, io.EOF } - var messages []string - if len(obj[h.expandEventListFromField]) > 0 { - for _, ms := range obj[h.expandEventListFromField] { - js, err := json.Marshal(ms) - if err == nil { - messages = append(messages, string(js)) - } else { - h.log.Errorw(fmt.Sprintf("Kafka serializing message %s", ms), "error", err) - } + + timestamp := time.Now() + kafkaFields := common.MapStr{ + "topic": m.claim.Topic(), + "partition": m.claim.Partition(), + "offset": msg.Offset, + "key": string(msg.Key), + } + + version, versionOk := m.groupHandler.version.Get() + if versionOk && version.IsAtLeast(sarama.V0_10_0_0) { + timestamp = msg.Timestamp + if !msg.BlockTimestamp.IsZero() { + kafkaFields["block_timestamp"] = msg.BlockTimestamp } } - return messages + if versionOk && version.IsAtLeast(sarama.V0_11_0_0) { + kafkaFields["headers"] = arrayForKafkaHeaders(msg.Headers) + } + + return reader.Message{ + Ts: timestamp, + Content: msg.Value, + Fields: common.MapStr{ + "kafka": kafkaFields, + "message": string(msg.Value), + }, + Meta: common.MapStr{ + "handler": m.groupHandler, + "message": msg, + }, + }, nil } From 80552d8ff57d6e9e6537598fe5e84a891977f406 Mon Sep 17 00:00:00 2001 From: Michael Bischoff Date: Sun, 22 Aug 2021 01:34:28 +0200 Subject: [PATCH 3/9] implemented expandEventListFromField in separate reader Fixing lint issues --- filebeat/include/list.go | 1 - filebeat/input/kafka/config.go | 1 + filebeat/input/kafka/input.go | 143 +++++++++++++++--- .../input/kafka/kafka_integration_test.go | 134 +++++++++++++++- 4 files changed, 250 insertions(+), 29 deletions(-) diff --git a/filebeat/include/list.go b/filebeat/include/list.go index e4c1396d973..51b04d4f92c 100644 --- a/filebeat/include/list.go +++ b/filebeat/include/list.go @@ -23,7 +23,6 @@ import ( // Import packages that need to register themselves. _ "github.com/elastic/beats/v7/filebeat/input/container" _ "github.com/elastic/beats/v7/filebeat/input/docker" - _ "github.com/elastic/beats/v7/filebeat/input/kafka" _ "github.com/elastic/beats/v7/filebeat/input/log" _ "github.com/elastic/beats/v7/filebeat/input/mqtt" _ "github.com/elastic/beats/v7/filebeat/input/redis" diff --git a/filebeat/input/kafka/config.go b/filebeat/input/kafka/config.go index 69c5f80350d..338182e460a 100644 --- a/filebeat/input/kafka/config.go +++ b/filebeat/input/kafka/config.go @@ -23,6 +23,7 @@ import ( "time" "github.com/Shopify/sarama" + "github.com/elastic/beats/v7/libbeat/common/cfgwarn" "github.com/elastic/beats/v7/libbeat/common/kafka" "github.com/elastic/beats/v7/libbeat/common/transport/kerberos" diff --git a/filebeat/input/kafka/input.go b/filebeat/input/kafka/input.go index a6e98077342..982390d31c7 100644 --- a/filebeat/input/kafka/input.go +++ b/filebeat/input/kafka/input.go @@ -19,14 +19,18 @@ package kafka import ( "context" + "encoding/json" "fmt" - "github.com/elastic/beats/v7/libbeat/reader" "io" "strings" "sync" "time" + "github.com/elastic/beats/v7/libbeat/common/atomic" + "github.com/Shopify/sarama" + "github.com/pkg/errors" + input "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" @@ -35,8 +39,8 @@ import ( "github.com/elastic/beats/v7/libbeat/common/kafka" "github.com/elastic/beats/v7/libbeat/feature" "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/libbeat/reader" "github.com/elastic/beats/v7/libbeat/reader/parser" - "github.com/pkg/errors" ) const pluginName = "kafka" @@ -90,7 +94,7 @@ func (input *kafkaInput) Run(ctx input.Context, pipeline beat.Pipeline) error { acker.EventPrivateReporter(func(_ int, events []interface{}) { for _, event := range events { if meta, ok := event.(eventMeta); ok { - meta.handler.ack(meta.message) + meta.ackHandler() } } }), @@ -193,6 +197,12 @@ func (input *kafkaInput) runConsumerGroup(log *logp.Logger, client beat.Client, } } +// The metadata attached to incoming events, so they can be ACKed once they've +// been successfully sent. +type eventMeta struct { + ackHandler func() +} + func arrayForKafkaHeaders(headers []*sarama.RecordHeader) []string { array := []string{} for _, header := range headers { @@ -249,13 +259,6 @@ type groupHandler struct { reader reader.Reader } -// The metadata attached to incoming events so they can be ACKed once they've -// been successfully sent. -type eventMeta struct { - handler *groupHandler - message *sarama.ConsumerMessage -} - func (h *groupHandler) Setup(session sarama.ConsumerGroupSession) error { h.Lock() h.session = session @@ -281,10 +284,7 @@ func (h *groupHandler) ack(message *sarama.ConsumerMessage) { } func (h *groupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { - reader := messageReader{ - claim: claim, - groupHandler: h, - } + reader := h.createReader(claim) parser := h.parsers.Create(reader) for h.session.Context().Err() == nil { message, err := parser.Next() @@ -303,30 +303,101 @@ func (h *groupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim s return nil } -type messageReader struct { +func (h *groupHandler) createReader(claim sarama.ConsumerGroupClaim) reader.Reader { + if h.expandEventListFromField != "" { + return &listFromFieldReader{ + claim: claim, + groupHandler: h, + field: h.expandEventListFromField, + log: h.log, + } + } + return &recordReader{ + claim: claim, + groupHandler: h, + log: h.log, + } +} + +type recordReader struct { claim sarama.ConsumerGroupClaim groupHandler *groupHandler + log *logp.Logger } -func (m messageReader) Close() error { +func (m *recordReader) Close() error { return nil } -func (m messageReader) Next() (reader.Message, error) { +func (m *recordReader) Next() (reader.Message, error) { msg, ok := <-m.claim.Messages() if !ok { return reader.Message{}, io.EOF } + timestamp, kafkaFields := composeEventMetadata(m.claim, m.groupHandler, msg) + ackHandler := func() { + m.groupHandler.ack(msg) + } + return composeMessage(timestamp, msg.Value, kafkaFields, ackHandler), nil +} + +type listFromFieldReader struct { + claim sarama.ConsumerGroupClaim + groupHandler *groupHandler + buffer []reader.Message + field string + log *logp.Logger +} + +func (l *listFromFieldReader) Close() error { + return nil +} + +func (l *listFromFieldReader) Next() (reader.Message, error) { + if len(l.buffer) != 0 { + return l.returnFromBuffer() + } + + msg, ok := <-l.claim.Messages() + if !ok { + return reader.Message{}, io.EOF + } + + timestamp, kafkaFields := composeEventMetadata(l.claim, l.groupHandler, msg) + messages := parseMultipleMessages(msg.Value, l.field, l.log) + + neededAcks := atomic.MakeInt(len(messages)) + ackHandler := func() { + if neededAcks.Dec() == 0 { + l.groupHandler.ack(msg) + } + } + for _, message := range messages { + newBuffer := append(l.buffer, composeMessage(timestamp, []byte(message), kafkaFields, ackHandler)) + l.buffer = newBuffer + } + + return l.returnFromBuffer() +} + +func (l *listFromFieldReader) returnFromBuffer() (reader.Message, error) { + next := l.buffer[0] + newBuffer := l.buffer[1:] + l.buffer = newBuffer + return next, nil +} + +func composeEventMetadata(claim sarama.ConsumerGroupClaim, handler *groupHandler, msg *sarama.ConsumerMessage) (time.Time, common.MapStr) { timestamp := time.Now() kafkaFields := common.MapStr{ - "topic": m.claim.Topic(), - "partition": m.claim.Partition(), + "topic": claim.Topic(), + "partition": claim.Partition(), "offset": msg.Offset, "key": string(msg.Key), } - version, versionOk := m.groupHandler.version.Get() + version, versionOk := handler.version.Get() if versionOk && version.IsAtLeast(sarama.V0_10_0_0) { timestamp = msg.Timestamp if !msg.BlockTimestamp.IsZero() { @@ -336,17 +407,39 @@ func (m messageReader) Next() (reader.Message, error) { if versionOk && version.IsAtLeast(sarama.V0_11_0_0) { kafkaFields["headers"] = arrayForKafkaHeaders(msg.Headers) } + return timestamp, kafkaFields +} +func composeMessage(timestamp time.Time, content []byte, kafkaFields common.MapStr, ackHandler func()) reader.Message { return reader.Message{ Ts: timestamp, - Content: msg.Value, + Content: content, Fields: common.MapStr{ "kafka": kafkaFields, - "message": string(msg.Value), + "message": string(content), }, Meta: common.MapStr{ - "handler": m.groupHandler, - "message": msg, + "ackHandler": ackHandler, }, - }, nil + } +} + +// parseMultipleMessages will try to split the message into multiple ones based on the group field provided by the configuration +func parseMultipleMessages(bMessage []byte, field string, log *logp.Logger) []string { + var obj map[string][]interface{} + err := json.Unmarshal(bMessage, &obj) + if err != nil { + log.Errorw(fmt.Sprintf("Kafka desirializing multiple messages using the group object %s", field), "error", err) + return []string{} + } + var messages []string + for _, ms := range obj[field] { + js, err := json.Marshal(ms) + if err == nil { + messages = append(messages, string(js)) + } else { + log.Errorw(fmt.Sprintf("Kafka serializing message %s", ms), "error", err) + } + } + return messages } diff --git a/filebeat/input/kafka/kafka_integration_test.go b/filebeat/input/kafka/kafka_integration_test.go index feb4471e201..2fd35c78d3b 100644 --- a/filebeat/input/kafka/kafka_integration_test.go +++ b/filebeat/input/kafka/kafka_integration_test.go @@ -22,9 +22,6 @@ package kafka import ( "context" "fmt" - v2 "github.com/elastic/beats/v7/filebeat/input/v2" - "github.com/elastic/beats/v7/libbeat/logp" - beattest "github.com/elastic/beats/v7/libbeat/publisher/testing" "math/rand" "os" "strconv" @@ -32,6 +29,10 @@ import ( "testing" "time" + v2 "github.com/elastic/beats/v7/filebeat/input/v2" + "github.com/elastic/beats/v7/libbeat/logp" + beattest "github.com/elastic/beats/v7/libbeat/publisher/testing" + "github.com/Shopify/sarama" "github.com/stretchr/testify/assert" @@ -188,6 +189,133 @@ func TestInputWithMultipleEvents(t *testing.T) { } } +func TestInputWithJsonPayload(t *testing.T) { + testTopic := createTestTopicName() + + // Send test message to the topic for the input to read. + message := testMessage{ + message: "{\"val\":\"val1\"}", + headers: []sarama.RecordHeader{ + recordHeader("X-Test-Header", "test header value"), + }, + } + writeToKafkaTopic(t, testTopic, message.message, message.headers, time.Second*20) + + // Setup the input config + config := common.MustNewConfigFrom(common.MapStr{ + "hosts": getTestKafkaHost(), + "topics": []string{testTopic}, + "group_id": "filebeat", + "wait_close": 0, + "parsers": []common.MapStr{ + { + "ndjson": common.MapStr{ + "target": "", + }, + }, + }, + }) + + client := beattest.NewChanClient(100) + defer client.Close() + events := client.Channel + cancel, input := run(t, config, client) + + timeout := time.After(30 * time.Second) + select { + case event := <-events: + text, err := event.Fields.GetValue("val") + if err != nil { + t.Fatal(err) + } + msgs := []string{"val1"} + assert.Contains(t, msgs, text) + checkMatchingHeaders(t, event, message.headers) + case <-timeout: + t.Fatal("timeout waiting for incoming events") + } + + cancel() + // Close the done channel and make sure the beat shuts down in a reasonable + // amount of time. + didClose := make(chan struct{}) + go func() { + input.Wait() + close(didClose) + }() + + select { + case <-time.After(30 * time.Second): + t.Fatal("timeout waiting for beat to shut down") + case <-didClose: + } +} + +func TestInputWithJsonPayloadAndMultipleEvents(t *testing.T) { + testTopic := createTestTopicName() + + // Send test messages to the topic for the input to read. + message := testMessage{ + message: "{\"records\": [{\"val\":\"val1\"}, {\"val\":\"val2\"}]}", + headers: []sarama.RecordHeader{ + recordHeader("X-Test-Header", "test header value"), + }, + } + writeToKafkaTopic(t, testTopic, message.message, message.headers, time.Second*20) + + // Setup the input config + config := common.MustNewConfigFrom(common.MapStr{ + "hosts": getTestKafkaHost(), + "topics": []string{testTopic}, + "group_id": "filebeat", + "wait_close": 0, + "expand_event_list_from_field": "records", + "parsers": []common.MapStr{ + { + "ndjson": common.MapStr{ + "target": "", + }, + }, + }, + }) + + client := beattest.NewChanClient(100) + defer client.Close() + events := client.Channel + cancel, input := run(t, config, client) + + timeout := time.After(30 * time.Second) + for i := 0; i < 2; i++ { + select { + case event := <-events: + text, err := event.Fields.GetValue("val") + if err != nil { + t.Fatal(err) + } + msgs := []string{"val1", "val2"} + assert.Contains(t, msgs, text) + checkMatchingHeaders(t, event, message.headers) + case <-timeout: + t.Fatal("timeout waiting for incoming events") + } + } + + cancel() + // Close the done channel and make sure the beat shuts down in a reasonable + // amount of time. + didClose := make(chan struct{}) + go func() { + input.Wait() + close(didClose) + }() + + select { + case <-time.After(30 * time.Second): + t.Fatal("timeout waiting for beat to shut down") + case <-didClose: + } +} + func createTestTopicName() string { id := strconv.Itoa(rand.New(rand.NewSource(int64(time.Now().Nanosecond()))).Int()) testTopic := fmt.Sprintf("Filebeat-TestInput-%s", id) From 70ad0b6bf1e85fa1b0569b069e6ac9a57aea8ded Mon Sep 17 00:00:00 2001 From: Michael Bischoff Date: Mon, 23 Aug 2021 01:29:04 +0200 Subject: [PATCH 4/9] Adding documentation --- filebeat/docs/inputs/input-kafka.asciidoc | 65 +++++++++++++++++++++++ 1 file changed, 65 insertions(+) diff --git a/filebeat/docs/inputs/input-kafka.asciidoc b/filebeat/docs/inputs/input-kafka.asciidoc index e134fb86404..33f3347662a 100644 --- a/filebeat/docs/inputs/input-kafka.asciidoc +++ b/filebeat/docs/inputs/input-kafka.asciidoc @@ -168,6 +168,71 @@ Configuration options for Kerberos authentication. See <> for more information. +[float] +===== `parsers` + +This option expects a list of parsers that the payload has to go through. + +Available parsers: + +* `ndjson` +* `multiline` + +[float] +===== `ndjson` + +These options make it possible for {beatname_uc} to decode the payload as +JSON messages. + +Example configuration: + +[source,yaml] +---- +- ndjson: + keys_under_root: true + add_error_key: true + message_key: log +---- + +*`keys_under_root`*:: By default, the decoded JSON is placed under a "json" key +in the output document. If you enable this setting, the keys are copied top +level in the output document. The default is false. + +*`overwrite_keys`*:: If `keys_under_root` and this setting are enabled, then the +values from the decoded JSON object overwrite the fields that {beatname_uc} +normally adds (type, source, offset, etc.) in case of conflicts. + +*`expand_keys`*:: If this setting is enabled, {beatname_uc} will recursively +de-dot keys in the decoded JSON, and expand them into a hierarchical object +structure. For example, `{"a.b.c": 123}` would be expanded into `{"a":{"b":{"c":123}}}`. +This setting should be enabled when the input is produced by an +https://github.com/elastic/ecs-logging[ECS logger]. + +*`add_error_key`*:: If this setting is enabled, {beatname_uc} adds an +"error.message" and "error.type: json" key in case of JSON unmarshalling errors +or when a `message_key` is defined in the configuration but cannot be used. + +*`message_key`*:: An optional configuration setting that specifies a JSON key on +which to apply the line filtering and multiline settings. If specified the key +must be at the top level in the JSON object and the value associated with the +key must be a string, otherwise no filtering or multiline aggregation will +occur. + +*`document_id`*:: Option configuration setting that specifies the JSON key to +set the document id. If configured, the field will be removed from the original +JSON document and stored in `@metadata._id` + +*`ignore_decoding_error`*:: An optional configuration setting that specifies if +JSON decoding errors should be logged or not. If set to true, errors will not +be logged. The default is false. + +[float] +===== `multiline` + +Options that control how {beatname_uc} deals with log messages that span +multiple lines. See <> for more information about +configuring multiline options. + [id="{beatname_lc}-input-{type}-common-options"] include::../inputs/input-common-options.asciidoc[] From 84ce842e0f467bf678fd5e76105469bcd06ef687 Mon Sep 17 00:00:00 2001 From: Michael Bischoff Date: Tue, 31 Aug 2021 23:38:21 +0200 Subject: [PATCH 5/9] Adding plugin test method --- filebeat/input/kafka/input.go | 31 ++++++++++++- .../input/kafka/kafka_integration_test.go | 44 +++++++++++++++---- 2 files changed, 66 insertions(+), 9 deletions(-) diff --git a/filebeat/input/kafka/input.go b/filebeat/input/kafka/input.go index 982390d31c7..14b6a3449c7 100644 --- a/filebeat/input/kafka/input.go +++ b/filebeat/input/kafka/input.go @@ -82,7 +82,27 @@ type kafkaInput struct { func (input *kafkaInput) Name() string { return pluginName } -func (input *kafkaInput) Test(_ input.TestContext) error { +func (input *kafkaInput) Test(ctx input.TestContext) error { + client, err := sarama.NewClient(input.config.Hosts, input.saramaConfig) + if err != nil { + ctx.Logger.Error(err) + } + topics, err := client.Topics() + if err != nil { + ctx.Logger.Error(err) + } + + var missingTopics []string + for _, neededTopic := range input.config.Topics { + if !contains(topics, neededTopic) { + missingTopics = append(missingTopics, neededTopic) + } + } + + if len(missingTopics) > 0 { + return fmt.Errorf("Of configured topics %v, topics: %v are not in available topics %v", input.config.Topics, missingTopics, topics) + } + return nil } @@ -443,3 +463,12 @@ func parseMultipleMessages(bMessage []byte, field string, log *logp.Logger) []st } return messages } + +func contains(elements []string, element string) bool { + for _, e := range elements { + if e == element { + return true + } + } + return false +} diff --git a/filebeat/input/kafka/kafka_integration_test.go b/filebeat/input/kafka/kafka_integration_test.go index 2fd35c78d3b..f344847eaf0 100644 --- a/filebeat/input/kafka/kafka_integration_test.go +++ b/filebeat/input/kafka/kafka_integration_test.go @@ -316,6 +316,38 @@ func TestInputWithJsonPayloadAndMultipleEvents(t *testing.T) { } } +func TestTest(t *testing.T) { + testTopic := createTestTopicName() + + // Send test messages to the topic for the input to read. + message := testMessage{ + message: "{\"records\": [{\"val\":\"val1\"}, {\"val\":\"val2\"}]}", + headers: []sarama.RecordHeader{ + recordHeader("X-Test-Header", "test header value"), + }, + } + writeToKafkaTopic(t, testTopic, message.message, message.headers, time.Second*20) + + // Setup the input config + config := common.MustNewConfigFrom(common.MapStr{ + "hosts": getTestKafkaHost(), + "topics": []string{testTopic}, + "group_id": "filebeat", + }) + + inp, err := Plugin().Manager.Create(config) + if err != nil { + t.Fatal(err) + } + + err = inp.Test(v2.TestContext{ + Logger: logp.NewLogger("kafka_test"), + }) + if err != nil { + t.Fatal(err) + } +} + func createTestTopicName() string { id := strconv.Itoa(rand.New(rand.NewSource(int64(time.Now().Nanosecond()))).Int()) testTopic := fmt.Sprintf("Filebeat-TestInput-%s", id) @@ -340,23 +372,19 @@ func checkMatchingHeaders( ) { kafka, err := event.Fields.GetValue("kafka") if err != nil { - t.Error(err) - return + t.Fatal(err) } kafkaMap, ok := kafka.(common.MapStr) if !ok { - t.Error("event.Fields.kafka isn't MapStr") - return + t.Fatal("event.Fields.kafka isn't MapStr") } headers, err := kafkaMap.GetValue("headers") if err != nil { - t.Error(err) - return + t.Fatal(err) } headerArray, ok := headers.([]string) if !ok { - t.Error("event.Fields.kafka.headers isn't a []string") - return + t.Fatal("event.Fields.kafka.headers isn't a []string") } assert.Equal(t, len(expected), len(headerArray)) for i := 0; i < len(expected); i++ { From 9b08843831905e7f8309f5ea82ecb39504fd8baa Mon Sep 17 00:00:00 2001 From: Michael Bischoff Date: Tue, 31 Aug 2021 23:47:15 +0200 Subject: [PATCH 6/9] making parseMultipleMessages a function of listFromFieldReader --- filebeat/input/kafka/input.go | 42 +++++++++++++++++------------------ 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/filebeat/input/kafka/input.go b/filebeat/input/kafka/input.go index 14b6a3449c7..9c872f1a14b 100644 --- a/filebeat/input/kafka/input.go +++ b/filebeat/input/kafka/input.go @@ -385,7 +385,7 @@ func (l *listFromFieldReader) Next() (reader.Message, error) { } timestamp, kafkaFields := composeEventMetadata(l.claim, l.groupHandler, msg) - messages := parseMultipleMessages(msg.Value, l.field, l.log) + messages := l.parseMultipleMessages(msg.Value) neededAcks := atomic.MakeInt(len(messages)) ackHandler := func() { @@ -408,6 +408,26 @@ func (l *listFromFieldReader) returnFromBuffer() (reader.Message, error) { return next, nil } +// parseMultipleMessages will try to split the message into multiple ones based on the group field provided by the configuration +func (l *listFromFieldReader) parseMultipleMessages(bMessage []byte) []string { + var obj map[string][]interface{} + err := json.Unmarshal(bMessage, &obj) + if err != nil { + l.log.Errorw(fmt.Sprintf("Kafka desirializing multiple messages using the group object %s", l.field), "error", err) + return []string{} + } + var messages []string + for _, ms := range obj[l.field] { + js, err := json.Marshal(ms) + if err == nil { + messages = append(messages, string(js)) + } else { + l.log.Errorw(fmt.Sprintf("Kafka serializing message %s", ms), "error", err) + } + } + return messages +} + func composeEventMetadata(claim sarama.ConsumerGroupClaim, handler *groupHandler, msg *sarama.ConsumerMessage) (time.Time, common.MapStr) { timestamp := time.Now() kafkaFields := common.MapStr{ @@ -444,26 +464,6 @@ func composeMessage(timestamp time.Time, content []byte, kafkaFields common.MapS } } -// parseMultipleMessages will try to split the message into multiple ones based on the group field provided by the configuration -func parseMultipleMessages(bMessage []byte, field string, log *logp.Logger) []string { - var obj map[string][]interface{} - err := json.Unmarshal(bMessage, &obj) - if err != nil { - log.Errorw(fmt.Sprintf("Kafka desirializing multiple messages using the group object %s", field), "error", err) - return []string{} - } - var messages []string - for _, ms := range obj[field] { - js, err := json.Marshal(ms) - if err == nil { - messages = append(messages, string(js)) - } else { - log.Errorw(fmt.Sprintf("Kafka serializing message %s", ms), "error", err) - } - } - return messages -} - func contains(elements []string, element string) bool { for _, e := range elements { if e == element { From ad521cd749fef1e5cfac8ac75eca3325d3e5cb64 Mon Sep 17 00:00:00 2001 From: Michael Bischoff Date: Wed, 1 Sep 2021 14:46:22 +0200 Subject: [PATCH 7/9] changing order of return values --- filebeat/input/kafka/kafka_integration_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/filebeat/input/kafka/kafka_integration_test.go b/filebeat/input/kafka/kafka_integration_test.go index f344847eaf0..2f168eb6daf 100644 --- a/filebeat/input/kafka/kafka_integration_test.go +++ b/filebeat/input/kafka/kafka_integration_test.go @@ -94,7 +94,7 @@ func TestInput(t *testing.T) { client := beattest.NewChanClient(100) defer client.Close() events := client.Channel - cancel, input := run(t, config, client) + input, cancel := run(t, config, client) timeout := time.After(30 * time.Second) for range messages { @@ -157,7 +157,7 @@ func TestInputWithMultipleEvents(t *testing.T) { client := beattest.NewChanClient(100) defer client.Close() events := client.Channel - cancel, input := run(t, config, client) + input, cancel := run(t, config, client) timeout := time.After(30 * time.Second) select { @@ -219,7 +219,7 @@ func TestInputWithJsonPayload(t *testing.T) { client := beattest.NewChanClient(100) defer client.Close() events := client.Channel - cancel, input := run(t, config, client) + input, cancel := run(t, config, client) timeout := time.After(30 * time.Second) select { @@ -282,7 +282,7 @@ func TestInputWithJsonPayloadAndMultipleEvents(t *testing.T) { client := beattest.NewChanClient(100) defer client.Close() events := client.Channel - cancel, input := run(t, config, client) + input, cancel := run(t, config, client) timeout := time.After(30 * time.Second) for i := 0; i < 2; i++ { @@ -452,7 +452,7 @@ func writeToKafkaTopic( } } -func run(t *testing.T, cfg *common.Config, client *beattest.ChanClient) (func(), *kafkaInput) { +func run(t *testing.T, cfg *common.Config, client *beattest.ChanClient) (*kafkaInput, func()) { inp, err := Plugin().Manager.Create(cfg) if err != nil { t.Fatal(err) @@ -464,7 +464,7 @@ func run(t *testing.T, cfg *common.Config, client *beattest.ChanClient) (func(), pipeline := beattest.ConstClient(client) input := inp.(*kafkaInput) go input.Run(ctx, pipeline) - return cancel, input + return input, cancel } func newV2Context() (v2.Context, func()) { From cee8602e9be8de75d09f335118858ea97488da13 Mon Sep 17 00:00:00 2001 From: Michael Bischoff Date: Thu, 2 Sep 2021 12:55:12 +0200 Subject: [PATCH 8/9] Adding replacement for input v1 --- filebeat/input/kafka/input_test.go | 30 ++++++++++++++++++++++++------ 1 file changed, 24 insertions(+), 6 deletions(-) diff --git a/filebeat/input/kafka/input_test.go b/filebeat/input/kafka/input_test.go index ed72dfc5fc6..f50de44019f 100644 --- a/filebeat/input/kafka/input_test.go +++ b/filebeat/input/kafka/input_test.go @@ -20,14 +20,32 @@ package kafka import ( + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/tests/resources" + "github.com/stretchr/testify/require" "testing" ) func TestNewInputDone(t *testing.T) { - //config := common.MapStr{ - // "hosts": "localhost:9092", - // "topics": "messages", - // "group_id": "filebeat", - //} - // TODO find v2 equivalent inputtest.AssertNotStartedInputCanBeDone(t, NewInput, &config) + config := common.MustNewConfigFrom(common.MapStr{ + "hosts": "localhost:9092", + "topics": "messages", + "group_id": "filebeat", + }) + + AssertNotStartedInputCanBeDone(t, config) +} + +// AssertNotStartedInputCanBeDone checks that the context of an input can be +// done before starting the input, and it doesn't leak goroutines. This is +// important to confirm that leaks don't happen with CheckConfig. +func AssertNotStartedInputCanBeDone(t *testing.T, configMap *common.Config) { + goroutines := resources.NewGoroutinesChecker() + defer goroutines.Check(t) + + config, err := common.NewConfigFrom(configMap) + require.NoError(t, err) + + _, err = Plugin().Manager.Create(config) + require.NoError(t, err) } From f4977e757610226500abfb449851aeba287ca4f1 Mon Sep 17 00:00:00 2001 From: Michael Bischoff Date: Thu, 2 Sep 2021 15:53:09 +0200 Subject: [PATCH 9/9] lint :( --- filebeat/input/kafka/input_test.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/filebeat/input/kafka/input_test.go b/filebeat/input/kafka/input_test.go index f50de44019f..a239149cb48 100644 --- a/filebeat/input/kafka/input_test.go +++ b/filebeat/input/kafka/input_test.go @@ -20,10 +20,12 @@ package kafka import ( + "testing" + + "github.com/stretchr/testify/require" + "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/tests/resources" - "github.com/stretchr/testify/require" - "testing" ) func TestNewInputDone(t *testing.T) {