Skip to content

Commit

Permalink
Fix flakyness of TestFilestreamEmptyLine (elastic#27705)
Browse files Browse the repository at this point in the history
  • Loading branch information
kvch authored and wiwen committed Nov 1, 2021
1 parent 648db54 commit 310728e
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 4 deletions.
11 changes: 8 additions & 3 deletions filebeat/input/filestream/environment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/statestore"
"github.com/elastic/beats/v7/libbeat/statestore/storetest"
"github.com/elastic/go-concert/unison"
)

type inputTestingEnvironment struct {
Expand All @@ -50,7 +51,8 @@ type inputTestingEnvironment struct {
pluginInitOnce sync.Once
plugin v2.Plugin

wg sync.WaitGroup
wg sync.WaitGroup
grp unison.TaskGroup
}

type registryEntry struct {
Expand All @@ -70,7 +72,9 @@ func newInputTestingEnvironment(t *testing.T) *inputTestingEnvironment {
}

func (e *inputTestingEnvironment) mustCreateInput(config map[string]interface{}) v2.Input {
e.grp = unison.TaskGroup{}
manager := e.getManager()
manager.Init(&e.grp, v2.ModeRun)
c := common.MustNewConfigFrom(config)
inp, err := manager.Create(c)
if err != nil {
Expand All @@ -88,12 +92,13 @@ func (e *inputTestingEnvironment) getManager() v2.InputManager {

func (e *inputTestingEnvironment) startInput(ctx context.Context, inp v2.Input) {
e.wg.Add(1)
go func(wg *sync.WaitGroup) {
go func(wg *sync.WaitGroup, grp *unison.TaskGroup) {
defer wg.Done()
defer grp.Stop()

inputCtx := input.Context{Logger: logp.L(), Cancelation: ctx}
inp.Run(inputCtx, e.pipeline)
}(&e.wg)
}(&e.wg, &e.grp)
}

func (e *inputTestingEnvironment) waitUntilInputStops() {
Expand Down
1 change: 0 additions & 1 deletion filebeat/input/filestream/input_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,6 @@ func TestFilestreamCloseEOF(t *testing.T) {

// test_empty_lines from test_harvester.py
func TestFilestreamEmptyLine(t *testing.T) {
t.Skip("Flaky test https://github.com/elastic/beats/issues/27585")
env := newInputTestingEnvironment(t)

testlogName := "test.log"
Expand Down

0 comments on commit 310728e

Please sign in to comment.