Skip to content

Commit

Permalink
changing order of return values
Browse files Browse the repository at this point in the history
  • Loading branch information
mjmbischoff committed Sep 1, 2021
1 parent 9b08843 commit ad521cd
Showing 1 changed file with 6 additions and 6 deletions.
12 changes: 6 additions & 6 deletions filebeat/input/kafka/kafka_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func TestInput(t *testing.T) {
client := beattest.NewChanClient(100)
defer client.Close()
events := client.Channel
cancel, input := run(t, config, client)
input, cancel := run(t, config, client)

timeout := time.After(30 * time.Second)
for range messages {
Expand Down Expand Up @@ -157,7 +157,7 @@ func TestInputWithMultipleEvents(t *testing.T) {
client := beattest.NewChanClient(100)
defer client.Close()
events := client.Channel
cancel, input := run(t, config, client)
input, cancel := run(t, config, client)

timeout := time.After(30 * time.Second)
select {
Expand Down Expand Up @@ -219,7 +219,7 @@ func TestInputWithJsonPayload(t *testing.T) {
client := beattest.NewChanClient(100)
defer client.Close()
events := client.Channel
cancel, input := run(t, config, client)
input, cancel := run(t, config, client)

timeout := time.After(30 * time.Second)
select {
Expand Down Expand Up @@ -282,7 +282,7 @@ func TestInputWithJsonPayloadAndMultipleEvents(t *testing.T) {
client := beattest.NewChanClient(100)
defer client.Close()
events := client.Channel
cancel, input := run(t, config, client)
input, cancel := run(t, config, client)

timeout := time.After(30 * time.Second)
for i := 0; i < 2; i++ {
Expand Down Expand Up @@ -452,7 +452,7 @@ func writeToKafkaTopic(
}
}

func run(t *testing.T, cfg *common.Config, client *beattest.ChanClient) (func(), *kafkaInput) {
func run(t *testing.T, cfg *common.Config, client *beattest.ChanClient) (*kafkaInput, func()) {
inp, err := Plugin().Manager.Create(cfg)
if err != nil {
t.Fatal(err)
Expand All @@ -464,7 +464,7 @@ func run(t *testing.T, cfg *common.Config, client *beattest.ChanClient) (func(),
pipeline := beattest.ConstClient(client)
input := inp.(*kafkaInput)
go input.Run(ctx, pipeline)
return cancel, input
return input, cancel
}

func newV2Context() (v2.Context, func()) {
Expand Down

0 comments on commit ad521cd

Please sign in to comment.