Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check goroutines in places related to recent found leaks #12106

Merged
merged 20 commits into from
May 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ The list below covers the major changes between 7.0.0-rc2 and master only.
- Reduce idxmgmt.Supporter interface and rework export commands to reuse logic. {pull}11777[11777],{pull}12065[12065],{pull}12067[12067],{pull}12160[12160]
- Update urllib3 version to 1.24.2 {pull}11930[11930]
- Add libbeat/common/cleanup package. {pull}12134[12134]
- New helper to check for leaked goroutines on tests. {pull}12106[12106]
- Only Load minimal template if no fields are provided. {pull}12103[12103]
- Add new option `IgnoreAllErrors` to `libbeat.common.schema` for skipping fields that failed while converting. {pull}12089[12089]
- Deprecate setup cmds for `template` and `ilm-policy`. Add new setup cmd for `index-management`. {pull}12132[12132]
- Deprecate setup cmds for `template` and `ilm-policy`. Add new setup cmd for `index-management`. {pull}12132[12132]
86 changes: 86 additions & 0 deletions filebeat/channel/util_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package channel

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/filebeat/util"
"github.com/elastic/beats/libbeat/tests/resources"
)

type dummyOutletter struct {
closed bool
c chan struct{}
}

func (o *dummyOutletter) OnEvent(event *util.Data) bool {
return true
}

func (o *dummyOutletter) Close() error {
o.closed = true
close(o.c)
return nil
}

func (o *dummyOutletter) Done() <-chan struct{} {
return o.c
}

func TestCloseOnSignal(t *testing.T) {
goroutines := resources.NewGoroutinesChecker()
defer goroutines.Check(t)

o := &dummyOutletter{c: make(chan struct{})}
sig := make(chan struct{})
CloseOnSignal(o, sig)
close(sig)
}

func TestCloseOnSignalClosed(t *testing.T) {
goroutines := resources.NewGoroutinesChecker()
defer goroutines.Check(t)

o := &dummyOutletter{c: make(chan struct{})}
sig := make(chan struct{})
c := CloseOnSignal(o, sig)
c.Close()
}

func TestSubOutlet(t *testing.T) {
goroutines := resources.NewGoroutinesChecker()
defer goroutines.Check(t)

o := &dummyOutletter{c: make(chan struct{})}
so := SubOutlet(o)
so.Close()
assert.False(t, o.closed)
}

func TestCloseOnSignalSubOutlet(t *testing.T) {
goroutines := resources.NewGoroutinesChecker()
defer goroutines.Check(t)

o := &dummyOutletter{c: make(chan struct{})}
c := CloseOnSignal(SubOutlet(o), make(chan struct{}))
o.Close()
c.Close()
}
10 changes: 1 addition & 9 deletions filebeat/input/log/input_other_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,14 @@
// specific language governing permissions and limitations
// under the License.

// +build !windows
// +build !windows,!integration

package log

import (
"testing"

"github.com/elastic/beats/filebeat/input/file"
"github.com/elastic/beats/filebeat/util"
"github.com/elastic/beats/libbeat/common/match"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -163,10 +162,3 @@ func TestInit(t *testing.T) {
assert.Equal(t, test.count, p.states.Count())
}
}

// TestOutlet is an empty outlet for testing
type TestOutlet struct{}

func (o TestOutlet) OnEvent(event *util.Data) bool { return true }
func (o TestOutlet) Close() error { return nil }
func (o TestOutlet) Done() <-chan struct{} { return nil }
179 changes: 179 additions & 0 deletions filebeat/input/log/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,22 @@
package log

import (
"io/ioutil"
"os"
"path"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/filebeat/channel"
"github.com/elastic/beats/filebeat/input"
"github.com/elastic/beats/filebeat/input/file"
"github.com/elastic/beats/filebeat/util"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/match"
"github.com/elastic/beats/libbeat/tests/resources"
)

func TestInputFileExclude(t *testing.T) {
Expand Down Expand Up @@ -81,6 +89,139 @@ func TestIsCleanInactive(t *testing.T) {
}
}

func TestInputLifecycle(t *testing.T) {
cases := []struct {
title string
closer func(input.Context, *Input)
}{
{
title: "explicitly closed",
closer: func(_ input.Context, input *Input) {
input.Wait()
},
},
{
title: "context done",
closer: func(ctx input.Context, _ *Input) {
close(ctx.Done)
},
},
{
title: "beat context done",
closer: func(ctx input.Context, _ *Input) {
close(ctx.Done)
close(ctx.BeatDone)
},
},
}

for _, c := range cases {
t.Run(c.title, func(t *testing.T) {
context := input.Context{
Done: make(chan struct{}),
BeatDone: make(chan struct{}),
}
testInputLifecycle(t, context, c.closer)
})
}
}

// TestInputLifecycle performs blackbock testing of the log input
func testInputLifecycle(t *testing.T, context input.Context, closer func(input.Context, *Input)) {
goroutines := resources.NewGoroutinesChecker()
defer goroutines.Check(t)

// Prepare a log file
tmpdir, err := ioutil.TempDir(os.TempDir(), "input-test")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(tmpdir)
logs := []byte("some log line\nother log line\n")
err = ioutil.WriteFile(path.Join(tmpdir, "some.log"), logs, 0644)
assert.NoError(t, err)

// Setup the input
config, _ := common.NewConfigFrom(common.MapStr{
"paths": path.Join(tmpdir, "*.log"),
"close_eof": true,
})

events := make(chan *util.Data, 100)
defer close(events)
capturer := NewEventCapturer(events)
defer capturer.Close()
connector := func(*common.Config, *common.MapStrPointer) (channel.Outleter, error) {
return channel.SubOutlet(capturer), nil
}

input, err := NewInput(config, connector, context)
if err != nil {
t.Error(err)
return
}

// Run the input and wait for finalization
input.Run()

timeout := time.After(30 * time.Second)
done := make(chan struct{})
for {
select {
case event := <-events:
if state := event.GetState(); state.Finished {
assert.Equal(t, len(logs), int(state.Offset), "file has not been fully read")
go func() {
closer(context, input.(*Input))
close(done)
}()
}
case <-done:
return
case <-timeout:
t.Fatal("timeout waiting for closed state")
}
}
}

func TestNewInputDone(t *testing.T) {
goroutines := resources.NewGoroutinesChecker()
defer goroutines.Check(t)

config, _ := common.NewConfigFrom(common.MapStr{
"paths": path.Join(os.TempDir(), "logs", "*.log"),
})

connector := func(*common.Config, *common.MapStrPointer) (channel.Outleter, error) {
return TestOutlet{}, nil
}

context := input.Context{
Done: make(chan struct{}),
}

_, err := NewInput(config, connector, context)
assert.NoError(t, err)

close(context.Done)
}

func TestNewInputError(t *testing.T) {
goroutines := resources.NewGoroutinesChecker()
defer goroutines.Check(t)

config := common.NewConfig()

connector := func(*common.Config, *common.MapStrPointer) (channel.Outleter, error) {
return TestOutlet{}, nil
}

context := input.Context{}

_, err := NewInput(config, connector, context)
assert.Error(t, err)
}

func TestMatchesMeta(t *testing.T) {
tests := []struct {
Input *Input
Expand Down Expand Up @@ -146,3 +287,41 @@ func (t TestFileInfo) Mode() os.FileMode { return 0 }
func (t TestFileInfo) ModTime() time.Time { return t.time }
func (t TestFileInfo) IsDir() bool { return false }
func (t TestFileInfo) Sys() interface{} { return nil }

type eventCapturer struct {
closed bool
c chan struct{}
closeOnce sync.Once
events chan *util.Data
}

func NewEventCapturer(events chan *util.Data) channel.Outleter {
return &eventCapturer{
c: make(chan struct{}),
events: events,
}
}

func (o *eventCapturer) OnEvent(event *util.Data) bool {
o.events <- event
return true
}

func (o *eventCapturer) Close() error {
o.closeOnce.Do(func() {
o.closed = true
close(o.c)
})
return nil
}

func (o *eventCapturer) Done() <-chan struct{} {
return o.c
}

// TestOutlet is an empty outlet for testing
type TestOutlet struct{}

func (o TestOutlet) OnEvent(event *util.Data) bool { return true }
func (o TestOutlet) Close() error { return nil }
func (o TestOutlet) Done() <-chan struct{} { return nil }
10 changes: 10 additions & 0 deletions libbeat/autodiscover/autodiscover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/elastic/beats/libbeat/cfgfile"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/bus"
"github.com/elastic/beats/libbeat/tests/resources"
)

type mockRunner struct {
Expand Down Expand Up @@ -135,6 +136,9 @@ func TestNilAutodiscover(t *testing.T) {
}

func TestAutodiscover(t *testing.T) {
goroutines := resources.NewGoroutinesChecker()
defer goroutines.Check(t)

// Register mock autodiscover provider
busChan := make(chan bus.Bus, 1)
Registry = NewRegistry()
Expand Down Expand Up @@ -255,6 +259,9 @@ func TestAutodiscover(t *testing.T) {
}

func TestAutodiscoverHash(t *testing.T) {
goroutines := resources.NewGoroutinesChecker()
defer goroutines.Check(t)

// Register mock autodiscover provider
busChan := make(chan bus.Bus, 1)

Expand Down Expand Up @@ -319,6 +326,9 @@ func TestAutodiscoverHash(t *testing.T) {
}

func TestAutodiscoverWithConfigCheckFailures(t *testing.T) {
goroutines := resources.NewGoroutinesChecker()
defer goroutines.Check(t)

// Register mock autodiscover provider
busChan := make(chan bus.Bus, 1)
Registry = NewRegistry()
Expand Down
Loading