Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix panic on streaming processers using logging #8176

Merged
merged 2 commits into from
Sep 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion models/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func logName(pluginType, name, alias string) string {
return pluginType + "." + name + "::" + alias
}

func setLoggerOnPlugin(i interface{}, log telegraf.Logger) {
func SetLoggerOnPlugin(i interface{}, log telegraf.Logger) {
valI := reflect.ValueOf(i)

if valI.Type().Kind() != reflect.Ptr {
Expand All @@ -96,6 +96,9 @@ func setLoggerOnPlugin(i interface{}, log telegraf.Logger) {
if field.CanSet() {
field.Set(reflect.ValueOf(log))
}
default:
log.Debugf("Plugin %q defines a 'Log' field on its struct of an unexpected type %q. Expected telegraf.Logger",
valI.Type().Name(), field.Type().String())
}

return
Expand Down
2 changes: 1 addition & 1 deletion models/running_aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func NewRunningAggregator(aggregator telegraf.Aggregator, config *AggregatorConf
aggErrorsRegister.Incr(1)
})

setLoggerOnPlugin(aggregator, logger)
SetLoggerOnPlugin(aggregator, logger)

return &RunningAggregator{
Aggregator: aggregator,
Expand Down
2 changes: 1 addition & 1 deletion models/running_input.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func NewRunningInput(input telegraf.Input, config *InputConfig) *RunningInput {
inputErrorsRegister.Incr(1)
GlobalGatherErrors.Incr(1)
})
setLoggerOnPlugin(input, logger)
SetLoggerOnPlugin(input, logger)

return &RunningInput{
Input: input,
Expand Down
2 changes: 1 addition & 1 deletion models/running_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func NewRunningOutput(
logger.OnErr(func() {
writeErrorsRegister.Incr(1)
})
setLoggerOnPlugin(output, logger)
SetLoggerOnPlugin(output, logger)

if config.MetricBufferLimit > 0 {
bufferLimit = config.MetricBufferLimit
Expand Down
2 changes: 1 addition & 1 deletion models/running_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func NewRunningProcessor(processor telegraf.StreamingProcessor, config *Processo
logger.OnErr(func() {
processErrorsRegister.Incr(1)
})
setLoggerOnPlugin(processor, logger)
SetLoggerOnPlugin(processor, logger)

return &RunningProcessor{
Processor: processor,
Expand Down
37 changes: 19 additions & 18 deletions models/running_processor_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package models
package models_test

import (
"sort"
"testing"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/models"
"github.com/influxdata/telegraf/plugins/processors"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -52,7 +53,7 @@ func (p *MockProcessorToInit) Init() error {

func TestRunningProcessor_Init(t *testing.T) {
mock := MockProcessorToInit{}
rp := &RunningProcessor{
rp := &models.RunningProcessor{
Processor: processors.NewStreamingProcessorFromProcessor(&mock),
}
rp.Init()
Expand All @@ -75,7 +76,7 @@ func TagProcessor(key, value string) *MockProcessor {
func TestRunningProcessor_Apply(t *testing.T) {
type args struct {
Processor telegraf.StreamingProcessor
Config *ProcessorConfig
Config *models.ProcessorConfig
}

tests := []struct {
Expand All @@ -88,8 +89,8 @@ func TestRunningProcessor_Apply(t *testing.T) {
name: "inactive filter applies metrics",
args: args{
Processor: processors.NewStreamingProcessorFromProcessor(TagProcessor("apply", "true")),
Config: &ProcessorConfig{
Filter: Filter{},
Config: &models.ProcessorConfig{
Filter: models.Filter{},
},
},
input: []telegraf.Metric{
Expand Down Expand Up @@ -119,8 +120,8 @@ func TestRunningProcessor_Apply(t *testing.T) {
name: "filter applies",
args: args{
Processor: processors.NewStreamingProcessorFromProcessor(TagProcessor("apply", "true")),
Config: &ProcessorConfig{
Filter: Filter{
Config: &models.ProcessorConfig{
Filter: models.Filter{
NamePass: []string{"cpu"},
},
},
Expand Down Expand Up @@ -152,8 +153,8 @@ func TestRunningProcessor_Apply(t *testing.T) {
name: "filter doesn't apply",
args: args{
Processor: processors.NewStreamingProcessorFromProcessor(TagProcessor("apply", "true")),
Config: &ProcessorConfig{
Filter: Filter{
Config: &models.ProcessorConfig{
Filter: models.Filter{
NameDrop: []string{"cpu"},
},
},
Expand Down Expand Up @@ -183,7 +184,7 @@ func TestRunningProcessor_Apply(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
rp := &RunningProcessor{
rp := &models.RunningProcessor{
Processor: tt.args.Processor,
Config: tt.args.Config,
}
Expand All @@ -204,25 +205,25 @@ func TestRunningProcessor_Apply(t *testing.T) {
}

func TestRunningProcessor_Order(t *testing.T) {
rp1 := &RunningProcessor{
Config: &ProcessorConfig{
rp1 := &models.RunningProcessor{
Config: &models.ProcessorConfig{
Order: 1,
},
}
rp2 := &RunningProcessor{
Config: &ProcessorConfig{
rp2 := &models.RunningProcessor{
Config: &models.ProcessorConfig{
Order: 2,
},
}
rp3 := &RunningProcessor{
Config: &ProcessorConfig{
rp3 := &models.RunningProcessor{
Config: &models.ProcessorConfig{
Order: 3,
},
}

procs := RunningProcessors{rp2, rp3, rp1}
procs := models.RunningProcessors{rp2, rp3, rp1}
sort.Sort(procs)
require.Equal(t,
RunningProcessors{rp1, rp2, rp3},
models.RunningProcessors{rp1, rp2, rp3},
procs)
}
3 changes: 3 additions & 0 deletions plugins/processors/streamingprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package processors

import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/models"
)

// NewStreamingProcessorFromProcessor is a converter that turns a standard
Expand All @@ -16,6 +17,7 @@ func NewStreamingProcessorFromProcessor(p telegraf.Processor) telegraf.Streaming
type streamingProcessor struct {
processor telegraf.Processor
acc telegraf.Accumulator
Log telegraf.Logger
}

func (sp *streamingProcessor) SampleConfig() string {
Expand Down Expand Up @@ -46,6 +48,7 @@ func (sp *streamingProcessor) Stop() error {
// to call the Init method of the wrapped processor if
// needed
func (sp *streamingProcessor) Init() error {
models.SetLoggerOnPlugin(sp.processor, sp.Log)
if p, ok := sp.processor.(telegraf.Initializer); ok {
err := p.Init()
if err != nil {
Expand Down