Skip to content

Commit

Permalink
fix panic on streaming processers using logging (#8176)
Browse files Browse the repository at this point in the history
  • Loading branch information
ssoroka committed Oct 9, 2020
1 parent fac8181 commit d71d090
Show file tree
Hide file tree
Showing 7 changed files with 30 additions and 23 deletions.
5 changes: 4 additions & 1 deletion models/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func logName(pluginType, name, alias string) string {
return pluginType + "." + name + "::" + alias
}

func setLoggerOnPlugin(i interface{}, log telegraf.Logger) {
func SetLoggerOnPlugin(i interface{}, log telegraf.Logger) {
valI := reflect.ValueOf(i)

if valI.Type().Kind() != reflect.Ptr {
Expand All @@ -96,6 +96,9 @@ func setLoggerOnPlugin(i interface{}, log telegraf.Logger) {
if field.CanSet() {
field.Set(reflect.ValueOf(log))
}
default:
log.Debugf("Plugin %q defines a 'Log' field on its struct of an unexpected type %q. Expected telegraf.Logger",
valI.Type().Name(), field.Type().String())
}

return
Expand Down
2 changes: 1 addition & 1 deletion models/running_aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func NewRunningAggregator(aggregator telegraf.Aggregator, config *AggregatorConf
aggErrorsRegister.Incr(1)
})

setLoggerOnPlugin(aggregator, logger)
SetLoggerOnPlugin(aggregator, logger)

return &RunningAggregator{
Aggregator: aggregator,
Expand Down
2 changes: 1 addition & 1 deletion models/running_input.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func NewRunningInput(input telegraf.Input, config *InputConfig) *RunningInput {
inputErrorsRegister.Incr(1)
GlobalGatherErrors.Incr(1)
})
setLoggerOnPlugin(input, logger)
SetLoggerOnPlugin(input, logger)

return &RunningInput{
Input: input,
Expand Down
2 changes: 1 addition & 1 deletion models/running_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func NewRunningOutput(
logger.OnErr(func() {
writeErrorsRegister.Incr(1)
})
setLoggerOnPlugin(output, logger)
SetLoggerOnPlugin(output, logger)

if config.MetricBufferLimit > 0 {
bufferLimit = config.MetricBufferLimit
Expand Down
2 changes: 1 addition & 1 deletion models/running_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func NewRunningProcessor(processor telegraf.StreamingProcessor, config *Processo
logger.OnErr(func() {
processErrorsRegister.Incr(1)
})
setLoggerOnPlugin(processor, logger)
SetLoggerOnPlugin(processor, logger)

return &RunningProcessor{
Processor: processor,
Expand Down
37 changes: 19 additions & 18 deletions models/running_processor_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package models
package models_test

import (
"sort"
"testing"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/models"
"github.com/influxdata/telegraf/plugins/processors"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -52,7 +53,7 @@ func (p *MockProcessorToInit) Init() error {

func TestRunningProcessor_Init(t *testing.T) {
mock := MockProcessorToInit{}
rp := &RunningProcessor{
rp := &models.RunningProcessor{
Processor: processors.NewStreamingProcessorFromProcessor(&mock),
}
rp.Init()
Expand All @@ -75,7 +76,7 @@ func TagProcessor(key, value string) *MockProcessor {
func TestRunningProcessor_Apply(t *testing.T) {
type args struct {
Processor telegraf.StreamingProcessor
Config *ProcessorConfig
Config *models.ProcessorConfig
}

tests := []struct {
Expand All @@ -88,8 +89,8 @@ func TestRunningProcessor_Apply(t *testing.T) {
name: "inactive filter applies metrics",
args: args{
Processor: processors.NewStreamingProcessorFromProcessor(TagProcessor("apply", "true")),
Config: &ProcessorConfig{
Filter: Filter{},
Config: &models.ProcessorConfig{
Filter: models.Filter{},
},
},
input: []telegraf.Metric{
Expand Down Expand Up @@ -119,8 +120,8 @@ func TestRunningProcessor_Apply(t *testing.T) {
name: "filter applies",
args: args{
Processor: processors.NewStreamingProcessorFromProcessor(TagProcessor("apply", "true")),
Config: &ProcessorConfig{
Filter: Filter{
Config: &models.ProcessorConfig{
Filter: models.Filter{
NamePass: []string{"cpu"},
},
},
Expand Down Expand Up @@ -152,8 +153,8 @@ func TestRunningProcessor_Apply(t *testing.T) {
name: "filter doesn't apply",
args: args{
Processor: processors.NewStreamingProcessorFromProcessor(TagProcessor("apply", "true")),
Config: &ProcessorConfig{
Filter: Filter{
Config: &models.ProcessorConfig{
Filter: models.Filter{
NameDrop: []string{"cpu"},
},
},
Expand Down Expand Up @@ -183,7 +184,7 @@ func TestRunningProcessor_Apply(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
rp := &RunningProcessor{
rp := &models.RunningProcessor{
Processor: tt.args.Processor,
Config: tt.args.Config,
}
Expand All @@ -204,25 +205,25 @@ func TestRunningProcessor_Apply(t *testing.T) {
}

func TestRunningProcessor_Order(t *testing.T) {
rp1 := &RunningProcessor{
Config: &ProcessorConfig{
rp1 := &models.RunningProcessor{
Config: &models.ProcessorConfig{
Order: 1,
},
}
rp2 := &RunningProcessor{
Config: &ProcessorConfig{
rp2 := &models.RunningProcessor{
Config: &models.ProcessorConfig{
Order: 2,
},
}
rp3 := &RunningProcessor{
Config: &ProcessorConfig{
rp3 := &models.RunningProcessor{
Config: &models.ProcessorConfig{
Order: 3,
},
}

procs := RunningProcessors{rp2, rp3, rp1}
procs := models.RunningProcessors{rp2, rp3, rp1}
sort.Sort(procs)
require.Equal(t,
RunningProcessors{rp1, rp2, rp3},
models.RunningProcessors{rp1, rp2, rp3},
procs)
}
3 changes: 3 additions & 0 deletions plugins/processors/streamingprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package processors

import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/models"
)

// NewStreamingProcessorFromProcessor is a converter that turns a standard
Expand All @@ -16,6 +17,7 @@ func NewStreamingProcessorFromProcessor(p telegraf.Processor) telegraf.Streaming
type streamingProcessor struct {
processor telegraf.Processor
acc telegraf.Accumulator
Log telegraf.Logger
}

func (sp *streamingProcessor) SampleConfig() string {
Expand Down Expand Up @@ -46,6 +48,7 @@ func (sp *streamingProcessor) Stop() error {
// to call the Init method of the wrapped processor if
// needed
func (sp *streamingProcessor) Init() error {
models.SetLoggerOnPlugin(sp.processor, sp.Log)
if p, ok := sp.processor.(telegraf.Initializer); ok {
err := p.Init()
if err != nil {
Expand Down

0 comments on commit d71d090

Please sign in to comment.