Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Config api 2 #9546

Closed
wants to merge 48 commits into from
Closed
Changes from 1 commit
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
bc9d4df
config and config_test conflicts resolved
helenosheaa Jul 20, 2021
e5dd73e
config plugins, and testdata
helenosheaa Jul 20, 2021
839fdc0
agent refactor
ssoroka Jul 20, 2021
42a21e0
added docs
helenosheaa Jul 20, 2021
9cb1910
running aggregator, id
helenosheaa Jul 21, 2021
27759b9
config api pieces
ssoroka Jul 21, 2021
71f2e57
processor, state and plugin models
helenosheaa Jul 21, 2021
8595580
plugins and stuff
ssoroka Jul 21, 2021
714a809
update go mod
ssoroka Jul 21, 2021
fd12e02
running processor state
helenosheaa Jul 21, 2021
cb198c1
config.Duration, auto imports, missing bracket
helenosheaa Jul 21, 2021
7c9fa8a
fix naming in runningprocessor
helenosheaa Jul 21, 2021
3724225
more consistency with rp
helenosheaa Jul 21, 2021
99cf9f4
package imports
helenosheaa Jul 21, 2021
a749373
starlark test fix
helenosheaa Jul 21, 2021
3bd6a95
json v2 and starlark fixes
helenosheaa Jul 21, 2021
5a1bf41
fix agent, agent controller
helenosheaa Jul 21, 2021
a667daf
fix controller and ec2
helenosheaa Jul 21, 2021
58c966c
fix config tests
helenosheaa Jul 21, 2021
b33d5a0
fix panic test
helenosheaa Jul 21, 2021
6997228
test fix
ssoroka Jul 22, 2021
cc43b83
remove commented out config
helenosheaa Jul 22, 2021
7f77230
update dep list
ssoroka Jul 27, 2021
392bd96
fix linter issues
ssoroka Jul 27, 2021
6b9079a
test agent controller pulled out and refactored
helenosheaa Jul 27, 2021
f75c7f5
Update docs/CONFIG_API.md
sspaink Jul 27, 2021
3a4b44e
rename to agent helper, add comment, remove fanout and unsed func in …
helenosheaa Jul 30, 2021
9f03f38
fix ec2 tests to be init
helenosheaa Aug 2, 2021
f32b3dd
remove skip
helenosheaa Aug 2, 2021
34d5dc2
move k.done
helenosheaa Aug 2, 2021
8b593be
add delete/update plugin
helenosheaa Aug 3, 2021
9794b45
resolve agent feedback around potential race condition
ssoroka Aug 4, 2021
efa4eec
resolves feedback
ssoroka Aug 4, 2021
0ed2b60
resolves feedback
ssoroka Aug 4, 2021
cf7ced4
add allowed methods to mux router
helenosheaa Aug 4, 2021
06d4d3f
update config api docs location
ssoroka Aug 4, 2021
6730cc6
stop plugin test
helenosheaa Aug 6, 2021
84ef98a
linter fix
helenosheaa Aug 6, 2021
9992d86
Add shell script to test closing on sigint
reimda Aug 9, 2021
a6f3f4c
resolve some TODOs
ssoroka Aug 6, 2021
034e6e7
fix outputs not shutting down properly
ssoroka Aug 10, 2021
5d95894
log write errors
helenosheaa Aug 10, 2021
c1f904a
more log write errors
helenosheaa Aug 10, 2021
ffa54db
resolve feedback
ssoroka Aug 10, 2021
1cdd3d8
status code handling, refactor field names to be snake case
helenosheaa Aug 18, 2021
f920061
resolve linter
helenosheaa Aug 20, 2021
196645d
resolve linter
helenosheaa Aug 20, 2021
d905b9a
finish hooking up storage. add jsonfile storage module for easy testing
ssoroka Aug 27, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
config api pieces
ssoroka committed Jul 27, 2021
commit 27759b9c90249e520a9517010bcd6544172f0a18
259 changes: 259 additions & 0 deletions agent/agentcontrol_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,259 @@
package agent

import (
"context"
"sync"
"testing"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/models"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
)

var now = time.Date(2021, 4, 9, 0, 0, 0, 0, time.UTC)

func TestAgentPluginControllerLifecycle(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
cfg := config.NewConfig()
a := NewAgent(ctx, cfg)
// cfg.SetAgent(a)
inp := &testInputPlugin{}
_ = inp.Init()
ri := models.NewRunningInput(inp, &models.InputConfig{Name: "in"})
a.AddInput(ri)
go a.RunInput(ri, time.Now())

m, _ := metric.New("testing",
map[string]string{
"country": "canada",
},
map[string]interface{}{
"population": 37_590_000,
},
now)

rp := models.NewRunningProcessor(&testProcessorPlugin{}, &models.ProcessorConfig{Name: "proc"})
a.AddProcessor(rp)
go a.RunProcessor(rp)

outputCtx, outputCancel := context.WithCancel(context.Background())
o := &testOutputPlugin{}
_ = o.Init()
ro := models.NewRunningOutput(o, &models.OutputConfig{Name: "out"}, 100, 100)
a.AddOutput(ro)
go a.RunOutput(outputCtx, ro)

go a.RunWithAPI(outputCancel)

inp.injectMetric(m)

waitForStatus(t, ri, "running", 1*time.Second)
waitForStatus(t, rp, "running", 1*time.Second)
waitForStatus(t, ro, "running", 1*time.Second)

cancel()

waitForStatus(t, ri, "dead", 1*time.Second)
waitForStatus(t, rp, "dead", 1*time.Second)
waitForStatus(t, ro, "dead", 1*time.Second)

require.Len(t, o.receivedMetrics, 1)
expected := testutil.MustMetric("testing",
map[string]string{
"country": "canada",
},
map[string]interface{}{
"population": 37_590_000,
"capital": "Ottawa",
},
now)
testutil.RequireMetricEqual(t, expected, o.receivedMetrics[0])

}

func TestAgentPluginConnectionsAfterAddAndRemoveProcessor(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
cfg := config.NewConfig()
a := NewAgent(ctx, cfg)
// cfg.SetAgent(a)

// start an input
inp := &testInputPlugin{}
_ = inp.Init()
ri := models.NewRunningInput(inp, &models.InputConfig{Name: "in"})
a.AddInput(ri)
go a.RunInput(ri, time.Now())

// start output
outputCtx, outputCancel := context.WithCancel(context.Background())
o := &testOutputPlugin{}
_ = o.Init()
ro := models.NewRunningOutput(o, &models.OutputConfig{Name: "out"}, 100, 100)
a.AddOutput(ro)
go a.RunOutput(outputCtx, ro)

// Run agent
go a.RunWithAPI(outputCancel)

// wait for plugins to start
waitForStatus(t, ri, "running", 1*time.Second)
waitForStatus(t, ro, "running", 1*time.Second)

// inject a metric into the input plugin as if it collected it
m, _ := metric.New("mojo", nil, map[string]interface{}{"jenkins": "leroy"}, now)
inp.injectMetric(m)

// wait for the output to get it
o.wait(1)
testutil.RequireMetricEqual(t, m, o.receivedMetrics[0])
o.clear()

// spin up new processor
rp := models.NewRunningProcessor(&testProcessorPlugin{}, &models.ProcessorConfig{Name: "proc"})
a.AddProcessor(rp)
go a.RunProcessor(rp)

// wait for the processor to start
waitForStatus(t, rp, "running", 5*time.Second)

// inject a metric into the input
inp.injectMetric(m)
// wait for it to arrive
o.wait(1)

// create the expected output for comparison
expected := m.Copy()
expected.AddField("capital", "Ottawa")

testutil.RequireMetricEqual(t, expected, o.receivedMetrics[0])

o.clear()

// stop processor and wait for it to stop
a.StopProcessor(rp)
waitForStatus(t, rp, "dead", 5*time.Second)

// inject a new metric
inp.injectMetric(m)

// wait for the output to get it
o.wait(1)
testutil.RequireMetricEqual(t, m, o.receivedMetrics[0])
o.clear()

// cancel the app's context
cancel()

// wait for plugins to stop
waitForStatus(t, ri, "dead", 5*time.Second)
waitForStatus(t, ro, "dead", 5*time.Second)
}

type hasState interface {
GetState() models.PluginState
}

func waitForStatus(t *testing.T, stateable hasState, waitStatus string, timeout time.Duration) {
timeoutAt := time.Now().Add(timeout)
for timeoutAt.After(time.Now()) {
if stateable.GetState().String() == waitStatus {
return
}
time.Sleep(10 * time.Millisecond)
}
require.FailNow(t, "timed out waiting for status "+waitStatus)
}

type testInputPlugin struct {
sync.Mutex
*sync.Cond
started bool
acc telegraf.Accumulator
}

func (p *testInputPlugin) Init() error {
p.Cond = sync.NewCond(&p.Mutex)
return nil
}
func (p *testInputPlugin) SampleConfig() string { return "" }
func (p *testInputPlugin) Description() string { return "testInputPlugin" }
func (p *testInputPlugin) Gather(a telegraf.Accumulator) error { return nil }
func (p *testInputPlugin) Start(a telegraf.Accumulator) error {
println("locking")
p.Lock()
defer p.Unlock()
println("started")
p.acc = a
p.started = true
p.Cond.Broadcast()
return nil
}
func (p *testInputPlugin) Stop() {
println("stopping input (waiting for lock)")
p.Lock()
defer p.Unlock()
println("stopped input")
}
func (p *testInputPlugin) injectMetric(m telegraf.Metric) {
p.Lock()
defer p.Unlock()
for !p.started {
p.Cond.Wait()
}
p.acc.AddMetric(m)
}

type testProcessorPlugin struct {
}

func (p *testProcessorPlugin) Init() error { return nil }
func (p *testProcessorPlugin) SampleConfig() string { return "" }
func (p *testProcessorPlugin) Description() string { return "testProcessorPlugin" }
func (p *testProcessorPlugin) Start(acc telegraf.Accumulator) error { return nil }
func (p *testProcessorPlugin) Add(metric telegraf.Metric, acc telegraf.Accumulator) error {
metric.AddField("capital", "Ottawa")
acc.AddMetric(metric)
return nil
}
func (p *testProcessorPlugin) Stop() error { return nil }

type testOutputPlugin struct {
sync.Mutex
*sync.Cond
receivedMetrics []telegraf.Metric
}

func (p *testOutputPlugin) Init() error {
p.Cond = sync.NewCond(&p.Mutex)
return nil
}
func (p *testOutputPlugin) SampleConfig() string { return "" }
func (p *testOutputPlugin) Description() string { return "testOutputPlugin" }
func (p *testOutputPlugin) Connect() error { return nil }
func (p *testOutputPlugin) Close() error { return nil }
func (p *testOutputPlugin) Write(metrics []telegraf.Metric) error {
p.Lock()
defer p.Unlock()
p.receivedMetrics = append(p.receivedMetrics, metrics...)
p.Broadcast()
return nil
}

// Wait for the given number of metrics to arrive
func (p *testOutputPlugin) wait(n int) {
p.Lock()
defer p.Unlock()
for len(p.receivedMetrics) < n {
p.Cond.Wait()
}
}

func (p *testOutputPlugin) clear() {
p.Lock()
defer p.Unlock()
p.receivedMetrics = nil
}
39 changes: 19 additions & 20 deletions cmd/telegraf/telegraf.go
Original file line number Diff line number Diff line change
@@ -23,11 +23,15 @@ import (
"github.com/influxdata/telegraf/internal/goplugin"
"github.com/influxdata/telegraf/logger"
_ "github.com/influxdata/telegraf/plugins/aggregators/all"
_ "github.com/influxdata/telegraf/plugins/configs"
_ "github.com/influxdata/telegraf/plugins/configs/all"
"github.com/influxdata/telegraf/plugins/inputs"
_ "github.com/influxdata/telegraf/plugins/inputs/all"
"github.com/influxdata/telegraf/plugins/outputs"
_ "github.com/influxdata/telegraf/plugins/outputs/all"
_ "github.com/influxdata/telegraf/plugins/processors/all"
_ "github.com/influxdata/telegraf/plugins/storage"
_ "github.com/influxdata/telegraf/plugins/storage/all"
"gopkg.in/tomb.v1"
)

@@ -197,35 +201,34 @@ func runAgent(ctx context.Context,
c := config.NewConfig()
c.OutputFilters = outputFilters
c.InputFilters = inputFilters

ag := agent.NewAgent(ctx, c)
c.SetAgent(ag)
outputCtx, outputCancel := context.WithCancel(context.Background())
defer outputCancel()

var err error
// providing no "config" flag should load default config
if len(fConfigs) == 0 {
err = c.LoadConfig("")
err = c.LoadConfig(ctx, outputCtx, "")
if err != nil {
return err
}
}
for _, fConfig := range fConfigs {
err = c.LoadConfig(fConfig)
err = c.LoadConfig(ctx, outputCtx, fConfig)
if err != nil {
return err
}
}

for _, fConfigDirectory := range fConfigDirs {
err = c.LoadDirectory(fConfigDirectory)
err = c.LoadDirectory(ctx, outputCtx, fConfigDirectory)
if err != nil {
return err
}
}

if !*fTest && len(c.Outputs) == 0 {
return errors.New("Error: no outputs found, did you provide a valid config file?")
}
if *fPlugins == "" && len(c.Inputs) == 0 {
return errors.New("Error: no inputs found, did you provide a valid config file?")
}

if int64(c.Agent.Interval) <= 0 {
return fmt.Errorf("Agent interval must be positive, found %v", c.Agent.Interval)
}
@@ -234,11 +237,6 @@ func runAgent(ctx context.Context,
return fmt.Errorf("Agent flush_interval must be positive; found %v", c.Agent.Interval)
}

ag, err := agent.NewAgent(c)
if err != nil {
return err
}

// Setup logging as configured.
telegraf.Debug = ag.Config.Agent.Debug || *fDebug
logConfig := logger.LogConfig{
@@ -255,13 +253,13 @@ func runAgent(ctx context.Context,
logger.SetupLogging(logConfig)

if *fRunOnce {
wait := time.Duration(*fTestWait) * time.Second
return ag.Once(ctx, wait)
// wait := time.Duration(*fTestWait) * time.Second
// return ag.Once(ctx, wait)
}

if *fTest || *fTestWait != 0 {
wait := time.Duration(*fTestWait) * time.Second
return ag.Test(ctx, wait)
// wait := time.Duration(*fTestWait) * time.Second
// return ag.Test(ctx, wait)
}

log.Printf("I! Loaded inputs: %s", strings.Join(c.InputNames(), " "))
@@ -288,7 +286,8 @@ func runAgent(ctx context.Context,
}
}

return ag.Run(ctx)
ag.RunWithAPI(outputCancel)
return nil
}

func usageExit(rc int) {
1 change: 1 addition & 0 deletions internal/channel/fanout.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package channel
helenosheaa marked this conversation as resolved.
Show resolved Hide resolved
1 change: 1 addition & 0 deletions internal/channel/fanout_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package channel
Loading