Skip to content

Commit

Permalink
more addressing
Browse files Browse the repository at this point in the history
  • Loading branch information
kvch committed Mar 5, 2021
1 parent ecf73e8 commit 16db6e9
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (r *readerGroup) remove(id string) {
delete(r.table, id)
}

func (r *readerGroup) isIDAdded(id string) bool {
func (r *readerGroup) hasID(id string) bool {
r.mu.Lock()
defer r.mu.Unlock()

Expand Down
56 changes: 25 additions & 31 deletions filebeat/input/filestream/internal/input-logfile/harvester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,11 @@ func TestDefaultHarvesterGroup(t *testing.T) {
source := &testSource{"/path/to/test"}

requireSourceAddedToBookkeeper := func(t *testing.T, hg *defaultHarvesterGroup, s Source) {
require.True(t, hg.readers.isIDAdded(s.Name()))
require.True(t, hg.readers.hasID(s.Name()))
}

requireSourceRemovedFromBookkeeper := func(t *testing.T, hg *defaultHarvesterGroup, s Source) {
require.False(t, hg.readers.isIDAdded(s.Name()))
require.False(t, hg.readers.hasID(s.Name()))
}

t.Run("assert a harvester is started in a goroutine", func(t *testing.T) {
Expand Down Expand Up @@ -129,16 +129,14 @@ func TestDefaultHarvesterGroup(t *testing.T) {

hg.Start(input.Context{Logger: logp.L(), Cancelation: context.Background()}, source)

// run commands while harvester is running
gorountineChecker.RunFuncWhenNewGoroutinesAreStarted(func() {
// wait until harvester is started
if mockHarvester.getRunCount() == 1 {
requireSourceAddedToBookkeeper(t, hg, source)
// after started, stop it
hg.Stop(source)
gorountineChecker.WaitUntilOriginalCount()
}
})
gorountineChecker.WaitUntilIncreased(1)
// wait until harvester is started
if mockHarvester.getRunCount() == 1 {
requireSourceAddedToBookkeeper(t, hg, source)
// after started, stop it
hg.Stop(source)
gorountineChecker.WaitUntilOriginalCount()
}

requireSourceRemovedFromBookkeeper(t, hg, source)
})
Expand All @@ -154,12 +152,11 @@ func TestDefaultHarvesterGroup(t *testing.T) {
hg.Start(inputCtx, source)
hg.Start(inputCtx, source)

gorountineChecker.RunFuncWhenNewGoroutinesAreStarted(func() {
// error is expected as a harvester group was expected to start twice for the same source
for !hg.readers.isIDAdded(source.Name()) {
}
time.Sleep(3 * time.Millisecond)
})
gorountineChecker.WaitUntilIncreased(2)
// error is expected as a harvester group was expected to start twice for the same source
for !hg.readers.hasID(source.Name()) {
}
time.Sleep(3 * time.Millisecond)

hg.Stop(source)

Expand Down Expand Up @@ -225,16 +222,15 @@ func TestDefaultHarvesterGroup(t *testing.T) {
wg.Add(1)
hg.Start(inputCtx, source)

gorountineChecker.RunFuncWhenNewGoroutinesAreStarted(func() {
ok := false
for !ok {
// wait until harvester is added to the bookeeper
ok = hg.readers.isIDAdded(source.Name())
if ok {
releaseResource(r)
}
gorountineChecker.WaitUntilIncreased(1)
ok := false
for !ok {
// wait until harvester is added to the bookeeper
ok = hg.readers.hasID(source.Name())
if ok {
releaseResource(r)
}
})
}

// wait until harvester.Run is done
wg.Wait()
Expand All @@ -260,10 +256,8 @@ func TestDefaultHarvesterGroup(t *testing.T) {

hg.Start(inputCtx, source)

gorountineChecker.RunFuncWhenNewGoroutinesAreStarted(func() {
err := hg.StopGroup()
require.Error(t, err)
})
gorountineChecker.WaitUntilIncreased(1)
require.Error(t, hg.StopGroup())

require.Equal(t, 0, mockHarvester.getRunCount())
})
Expand Down
8 changes: 0 additions & 8 deletions libbeat/tests/resources/goroutines.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,16 +90,8 @@ func (c GoroutinesChecker) WaitUntilOriginalCount() int {
return after
}

func (c *GoroutinesChecker) RunFuncWhenNewGoroutinesAreStarted(f func()) {
for runtime.NumGoroutine() == c.before {
time.Sleep(10 * time.Millisecond)
}
f()
}

func (c *GoroutinesChecker) WaitUntilIncreased(n int) {
for runtime.NumGoroutine() < c.before+n {
fmt.Println("before", c.before+n, "now", runtime.NumGoroutine())
time.Sleep(10 * time.Millisecond)
}
}

0 comments on commit 16db6e9

Please sign in to comment.