Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

shim logger improvements #7865

Merged
merged 2 commits into from
Jul 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion models/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func logName(pluginType, name, alias string) string {
return pluginType + "." + name + "::" + alias
}

func setLogIfExist(i interface{}, log telegraf.Logger) {
func setLoggerOnPlugin(i interface{}, log telegraf.Logger) {
valI := reflect.ValueOf(i)

if valI.Type().Kind() != reflect.Ptr {
Expand Down
2 changes: 1 addition & 1 deletion models/running_aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func NewRunningAggregator(aggregator telegraf.Aggregator, config *AggregatorConf
aggErrorsRegister.Incr(1)
})

setLogIfExist(aggregator, logger)
setLoggerOnPlugin(aggregator, logger)

return &RunningAggregator{
Aggregator: aggregator,
Expand Down
2 changes: 1 addition & 1 deletion models/running_input.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func NewRunningInput(input telegraf.Input, config *InputConfig) *RunningInput {
inputErrorsRegister.Incr(1)
GlobalGatherErrors.Incr(1)
})
setLogIfExist(input, logger)
setLoggerOnPlugin(input, logger)

return &RunningInput{
Input: input,
Expand Down
2 changes: 1 addition & 1 deletion models/running_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func NewRunningOutput(
logger.OnErr(func() {
writeErrorsRegister.Incr(1)
})
setLogIfExist(output, logger)
setLoggerOnPlugin(output, logger)

if config.MetricBufferLimit > 0 {
bufferLimit = config.MetricBufferLimit
Expand Down
2 changes: 1 addition & 1 deletion models/running_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func NewRunningProcessor(processor telegraf.StreamingProcessor, config *Processo
logger.OnErr(func() {
processErrorsRegister.Incr(1)
})
setLogIfExist(processor, logger)
setLoggerOnPlugin(processor, logger)

return &RunningProcessor{
Processor: processor,
Expand Down
4 changes: 4 additions & 0 deletions plugins/common/shim/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"io/ioutil"
"log"
"os"

"github.com/BurntSushi/toml"
Expand Down Expand Up @@ -152,14 +153,17 @@ func DefaultImportedPlugins() (config, error) {
Outputs: map[string][]toml.Primitive{},
}
for name := range inputs.Inputs {
log.Println("No config found. Loading default config for plugin", name)
conf.Inputs[name] = []toml.Primitive{}
return conf, nil
}
for name := range processors.Processors {
log.Println("No config found. Loading default config for plugin", name)
conf.Processors[name] = []toml.Primitive{}
return conf, nil
}
for name := range outputs.Outputs {
log.Println("No config found. Loading default config for plugin", name)
conf.Outputs[name] = []toml.Primitive{}
return conf, nil
}
Expand Down
16 changes: 10 additions & 6 deletions plugins/common/shim/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

// AddInput adds the input to the shim. Later calls to Run() will run this input.
func (s *Shim) AddInput(input telegraf.Input) error {
setLoggerOnPlugin(input, NewLogger())
if p, ok := input.(telegraf.Initializer); ok {
err := p.Init()
if err != nil {
Expand Down Expand Up @@ -57,13 +58,16 @@ func (s *Shim) RunInput(pollInterval time.Duration) error {
wg.Done()
}()

scanner := bufio.NewScanner(s.stdin)
for scanner.Scan() {
// push a non-blocking message to trigger metric collection.
s.pushCollectMetricsRequest()
}
go func() {
scanner := bufio.NewScanner(s.stdin)
for scanner.Scan() {
// push a non-blocking message to trigger metric collection.
s.pushCollectMetricsRequest()
}

cancel() // cancel gracefully stops gathering
}()

cancel() // cancel gracefully stops gathering
wg.Wait() // wait for writing to stdout to finish
return nil
}
Expand Down
89 changes: 89 additions & 0 deletions plugins/common/shim/logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package shim

import (
"fmt"
"log"
"os"
"reflect"

"github.com/influxdata/telegraf"
)

func init() {
log.SetOutput(os.Stderr)
}

// Logger defines a logging structure for plugins.
// external plugins can only ever write to stderr and writing to stdout
// would interfere with input/processor writing out of metrics.
type Logger struct{}

// NewLogger creates a new logger instance
func NewLogger() *Logger {
return &Logger{}
}

// Errorf logs an error message, patterned after log.Printf.
func (l *Logger) Errorf(format string, args ...interface{}) {
log.Printf("E! "+format, args...)
}

// Error logs an error message, patterned after log.Print.
func (l *Logger) Error(args ...interface{}) {
log.Print("E! ", fmt.Sprint(args...))
}

// Debugf logs a debug message, patterned after log.Printf.
func (l *Logger) Debugf(format string, args ...interface{}) {
log.Printf("D! "+format, args...)
}

// Debug logs a debug message, patterned after log.Print.
func (l *Logger) Debug(args ...interface{}) {
log.Print("D! ", fmt.Sprint(args...))
}

// Warnf logs a warning message, patterned after log.Printf.
func (l *Logger) Warnf(format string, args ...interface{}) {
log.Printf("W! "+format, args...)
}

// Warn logs a warning message, patterned after log.Print.
func (l *Logger) Warn(args ...interface{}) {
log.Print("W! ", fmt.Sprint(args...))
}

// Infof logs an information message, patterned after log.Printf.
func (l *Logger) Infof(format string, args ...interface{}) {
log.Printf("I! "+format, args...)
}

// Info logs an information message, patterned after log.Print.
func (l *Logger) Info(args ...interface{}) {
log.Print("I! ", fmt.Sprint(args...))
}

// setLoggerOnPlugin injects the logger into the plugin,
// if it defines Log telegraf.Logger. This is sort of like SetLogger but using
// reflection instead of forcing the plugin author to define the function for it
func setLoggerOnPlugin(i interface{}, log telegraf.Logger) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why rename this when it's copied verbatim from models/log.go? I don't particularly like the name setLogIfExist but it should match models/log.go. If you're going to rename it, change both places.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this suggestion.

valI := reflect.ValueOf(i)

if valI.Type().Kind() != reflect.Ptr {
valI = reflect.New(reflect.TypeOf(i))
}

field := valI.Elem().FieldByName("Log")
if !field.IsValid() {
return
}

switch field.Type().String() {
case "telegraf.Logger":
if field.CanSet() {
field.Set(reflect.ValueOf(log))
}
}

return
}
1 change: 1 addition & 0 deletions plugins/common/shim/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

// AddOutput adds the input to the shim. Later calls to Run() will run this.
func (s *Shim) AddOutput(output telegraf.Output) error {
setLoggerOnPlugin(output, NewLogger())
if p, ok := output.(telegraf.Initializer); ok {
err := p.Init()
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions plugins/common/shim/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@ import (

// AddProcessor adds the processor to the shim. Later calls to Run() will run this.
func (s *Shim) AddProcessor(processor telegraf.Processor) error {
setLoggerOnPlugin(processor, NewLogger())
p := processors.NewStreamingProcessorFromProcessor(processor)
return s.AddStreamingProcessor(p)
}

// AddStreamingProcessor adds the processor to the shim. Later calls to Run() will run this.
func (s *Shim) AddStreamingProcessor(processor telegraf.StreamingProcessor) error {
setLoggerOnPlugin(processor, NewLogger())
if p, ok := processor.(telegraf.Initializer); ok {
err := p.Init()
if err != nil {
Expand Down
7 changes: 7 additions & 0 deletions plugins/inputs/execd/shim/goshim.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,15 @@ type Shim struct {
stderr io.Writer
}

var (
oldpkg = "github.com/influxdata/telegraf/plugins/inputs/execd/shim"
newpkg = "github.com/influxdata/telegraf/plugins/common/shim"
)

// New creates a new shim interface
func New() *Shim {
fmt.Fprintf(os.Stderr, "%s is deprecated; please change your import to %s\n",
oldpkg, newpkg)
return &Shim{
stdin: os.Stdin,
stdout: os.Stdout,
Expand Down