Skip to content

Commit

Permalink
Fix flakyness of TestFilestreamEmptyLine (#27705)
Browse files Browse the repository at this point in the history
Closes #27585

(cherry picked from commit 20be506)
  • Loading branch information
kvch authored and mergify-bot committed Sep 6, 2021
1 parent 0f477da commit 64916f6
Showing 1 changed file with 8 additions and 3 deletions.
11 changes: 8 additions & 3 deletions filebeat/input/filestream/environment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/statestore"
"github.com/elastic/beats/v7/libbeat/statestore/storetest"
"github.com/elastic/go-concert/unison"
)

type inputTestingEnvironment struct {
Expand All @@ -50,7 +51,8 @@ type inputTestingEnvironment struct {
pluginInitOnce sync.Once
plugin v2.Plugin

wg sync.WaitGroup
wg sync.WaitGroup
grp unison.TaskGroup
}

type registryEntry struct {
Expand All @@ -70,7 +72,9 @@ func newInputTestingEnvironment(t *testing.T) *inputTestingEnvironment {
}

func (e *inputTestingEnvironment) mustCreateInput(config map[string]interface{}) v2.Input {
e.grp = unison.TaskGroup{}
manager := e.getManager()
manager.Init(&e.grp, v2.ModeRun)
c := common.MustNewConfigFrom(config)
inp, err := manager.Create(c)
if err != nil {
Expand All @@ -88,12 +92,13 @@ func (e *inputTestingEnvironment) getManager() v2.InputManager {

func (e *inputTestingEnvironment) startInput(ctx context.Context, inp v2.Input) {
e.wg.Add(1)
go func(wg *sync.WaitGroup) {
go func(wg *sync.WaitGroup, grp *unison.TaskGroup) {
defer wg.Done()
defer grp.Stop()

inputCtx := input.Context{Logger: logp.L(), Cancelation: ctx}
inp.Run(inputCtx, e.pipeline)
}(&e.wg)
}(&e.wg, &e.grp)
}

func (e *inputTestingEnvironment) waitUntilInputStops() {
Expand Down

0 comments on commit 64916f6

Please sign in to comment.