diff --git a/filebeat/input/kafka/input_test.go b/filebeat/input/kafka/input_test.go index ed72dfc5fc6..f50de44019f 100644 --- a/filebeat/input/kafka/input_test.go +++ b/filebeat/input/kafka/input_test.go @@ -20,14 +20,32 @@ package kafka import ( + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/tests/resources" + "github.com/stretchr/testify/require" "testing" ) func TestNewInputDone(t *testing.T) { - //config := common.MapStr{ - // "hosts": "localhost:9092", - // "topics": "messages", - // "group_id": "filebeat", - //} - // TODO find v2 equivalent inputtest.AssertNotStartedInputCanBeDone(t, NewInput, &config) + config := common.MustNewConfigFrom(common.MapStr{ + "hosts": "localhost:9092", + "topics": "messages", + "group_id": "filebeat", + }) + + AssertNotStartedInputCanBeDone(t, config) +} + +// AssertNotStartedInputCanBeDone checks that the context of an input can be +// done before starting the input, and it doesn't leak goroutines. This is +// important to confirm that leaks don't happen with CheckConfig. +func AssertNotStartedInputCanBeDone(t *testing.T, configMap *common.Config) { + goroutines := resources.NewGoroutinesChecker() + defer goroutines.Check(t) + + config, err := common.NewConfigFrom(configMap) + require.NoError(t, err) + + _, err = Plugin().Manager.Create(config) + require.NoError(t, err) }