Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Config api 2 #9546

Closed
wants to merge 48 commits into from
Closed
Show file tree
Hide file tree
Changes from 41 commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
bc9d4df
config and config_test conflicts resolved
helenosheaa Jul 20, 2021
e5dd73e
config plugins, and testdata
helenosheaa Jul 20, 2021
839fdc0
agent refactor
ssoroka Jul 20, 2021
42a21e0
added docs
helenosheaa Jul 20, 2021
9cb1910
running aggregator, id
helenosheaa Jul 21, 2021
27759b9
config api pieces
ssoroka Jul 21, 2021
71f2e57
processor, state and plugin models
helenosheaa Jul 21, 2021
8595580
plugins and stuff
ssoroka Jul 21, 2021
714a809
update go mod
ssoroka Jul 21, 2021
fd12e02
running processor state
helenosheaa Jul 21, 2021
cb198c1
config.Duration, auto imports, missing bracket
helenosheaa Jul 21, 2021
7c9fa8a
fix naming in runningprocessor
helenosheaa Jul 21, 2021
3724225
more consistency with rp
helenosheaa Jul 21, 2021
99cf9f4
package imports
helenosheaa Jul 21, 2021
a749373
starlark test fix
helenosheaa Jul 21, 2021
3bd6a95
json v2 and starlark fixes
helenosheaa Jul 21, 2021
5a1bf41
fix agent, agent controller
helenosheaa Jul 21, 2021
a667daf
fix controller and ec2
helenosheaa Jul 21, 2021
58c966c
fix config tests
helenosheaa Jul 21, 2021
b33d5a0
fix panic test
helenosheaa Jul 21, 2021
6997228
test fix
ssoroka Jul 22, 2021
cc43b83
remove commented out config
helenosheaa Jul 22, 2021
7f77230
update dep list
ssoroka Jul 27, 2021
392bd96
fix linter issues
ssoroka Jul 27, 2021
6b9079a
test agent controller pulled out and refactored
helenosheaa Jul 27, 2021
f75c7f5
Update docs/CONFIG_API.md
sspaink Jul 27, 2021
3a4b44e
rename to agent helper, add comment, remove fanout and unsed func in …
helenosheaa Jul 30, 2021
9f03f38
fix ec2 tests to be init
helenosheaa Aug 2, 2021
f32b3dd
remove skip
helenosheaa Aug 2, 2021
34d5dc2
move k.done
helenosheaa Aug 2, 2021
8b593be
add delete/update plugin
helenosheaa Aug 3, 2021
9794b45
resolve agent feedback around potential race condition
ssoroka Aug 4, 2021
efa4eec
resolves feedback
ssoroka Aug 4, 2021
0ed2b60
resolves feedback
ssoroka Aug 4, 2021
cf7ced4
add allowed methods to mux router
helenosheaa Aug 4, 2021
06d4d3f
update config api docs location
ssoroka Aug 4, 2021
6730cc6
stop plugin test
helenosheaa Aug 6, 2021
84ef98a
linter fix
helenosheaa Aug 6, 2021
9992d86
Add shell script to test closing on sigint
reimda Aug 9, 2021
a6f3f4c
resolve some TODOs
ssoroka Aug 6, 2021
034e6e7
fix outputs not shutting down properly
ssoroka Aug 10, 2021
5d95894
log write errors
helenosheaa Aug 10, 2021
c1f904a
more log write errors
helenosheaa Aug 10, 2021
ffa54db
resolve feedback
ssoroka Aug 10, 2021
1cdd3d8
status code handling, refactor field names to be snake case
helenosheaa Aug 18, 2021
f920061
resolve linter
helenosheaa Aug 20, 2021
196645d
resolve linter
helenosheaa Aug 20, 2021
d905b9a
finish hooking up storage. add jsonfile storage module for easy testing
ssoroka Aug 27, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@
.DS_Store
process.yml
/.vscode
testdb.db
2 changes: 2 additions & 0 deletions accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ type Accumulator interface {
// Upgrade to a TrackingAccumulator with space for maxTracked
// metrics/batches.
WithTracking(maxTracked int) TrackingAccumulator

WithNewMetricMaker(logName string, logger Logger, f func(metric Metric) Metric) Accumulator
}

// TrackingID uniquely identifies a tracked metric group
Expand Down
49 changes: 49 additions & 0 deletions agent/accumulator.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package agent

import (
"sync"
"time"

"github.com/influxdata/telegraf"
Expand All @@ -14,6 +15,8 @@ type MetricMaker interface {
}

type accumulator struct {
sync.Mutex
sync.Cond
maker MetricMaker
metrics chan<- telegraf.Metric
precision time.Duration
Expand All @@ -28,6 +31,8 @@ func NewAccumulator(
metrics: metrics,
precision: time.Nanosecond,
}
acc.Cond.L = &acc.Mutex

return &acc
}

Expand Down Expand Up @@ -77,8 +82,13 @@ func (ac *accumulator) AddHistogram(
}

func (ac *accumulator) AddMetric(m telegraf.Metric) {
ac.Lock()
defer ac.Unlock()
m.SetTime(m.Time().Round(ac.precision))
if m := ac.maker.MakeMetric(m); m != nil {
if ac.metrics == nil {
ac.Cond.Wait() // unlock and wait for metrics to be set.
}
ac.metrics <- m
}
}
Expand All @@ -91,11 +101,24 @@ func (ac *accumulator) addFields(
t ...time.Time,
) {
m := metric.New(measurement, tags, fields, ac.getTime(t), tp)
ac.Lock()
defer ac.Unlock()
if m := ac.maker.MakeMetric(m); m != nil {
if ac.metrics == nil {
ac.Cond.Wait() // unlock and wait for metrics to be set.
}
ac.metrics <- m
}
}

// setOutput changes the destination of the accumulator
func (ac *accumulator) setOutput(outCh chan<- telegraf.Metric) {
ac.Lock()
defer ac.Unlock()
ac.metrics = outCh
ac.Cond.Broadcast()
}

// AddError passes a runtime error to the accumulator.
// The error will be tagged with the plugin name and written to the log.
func (ac *accumulator) AddError(err error) {
Expand Down Expand Up @@ -126,6 +149,32 @@ func (ac *accumulator) WithTracking(maxTracked int) telegraf.TrackingAccumulator
}
}

func (ac *accumulator) WithNewMetricMaker(logName string, logger telegraf.Logger, f func(m telegraf.Metric) telegraf.Metric) telegraf.Accumulator {
return &metricMakerAccumulator{
Accumulator: ac,
makerFunc: f,
logName: logName,
logger: logger,
}
}

type metricMakerAccumulator struct {
telegraf.Accumulator
logName string
logger telegraf.Logger
makerFunc func(m telegraf.Metric) telegraf.Metric
}

func (ma *metricMakerAccumulator) LogName() string {
return ma.logName
}
func (ma *metricMakerAccumulator) Log() telegraf.Logger {
return ma.logger
}
func (ma *metricMakerAccumulator) MakeMetric(m telegraf.Metric) telegraf.Metric {
return ma.makerFunc(m)
}

type trackingAccumulator struct {
telegraf.Accumulator
delivered chan telegraf.DeliveryInfo
Expand Down
Loading