Skip to content

Commit

Permalink
chore(processors): Convert processors to ParserPlugins (influxdata#…
Browse files Browse the repository at this point in the history
  • Loading branch information
srebhan authored Sep 26, 2022
1 parent d982ed9 commit 8a9c2ee
Show file tree
Hide file tree
Showing 8 changed files with 337 additions and 123 deletions.
95 changes: 63 additions & 32 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -687,12 +687,12 @@ func (c *Config) probeParser(table *ast.Table) bool {
return ok
}

func (c *Config) addParser(parentname string, table *ast.Table) (*models.RunningParser, error) {
func (c *Config) addParser(parentcategory, parentname string, table *ast.Table) (*models.RunningParser, error) {
var dataformat string
c.getFieldString(table, "data_format", &dataformat)

if dataformat == "" {
if parentname == "exec" {
if parentcategory == "inputs" && parentname == "exec" {
// Legacy support, exec plugin originally parsed JSON by default.
dataformat = "json"
} else {
Expand Down Expand Up @@ -735,51 +735,82 @@ func (c *Config) addProcessor(name string, table *ast.Table) error {
}
return fmt.Errorf("Undefined but requested processor: %s", name)
}
streamingProcessor := creator()

// For processors with parsers we need to compute the set of
// options that is not covered by both, the parser and the processor.
// We achieve this by keeping a local book of missing entries
// that counts the number of misses. In case we have a parser
// for the input both need to miss the entry. We count the
// missing entries at the end.
missCount := make(map[string]int)
c.setLocalMissingTomlFieldTracker(missCount)
defer c.resetMissingTomlFieldTracker()

processorConfig, err := c.buildProcessor(name, table)
if err != nil {
return err
}

rf, err := c.newRunningProcessor(creator, processorConfig, table)
if err != nil {
var processor interface{}
processor = streamingProcessor
if p, ok := streamingProcessor.(unwrappable); ok {
processor = p.Unwrap()
}

// If the (underlying) processor has a SetParser or SetParserFunc function,
// it can accept arbitrary data-formats, so build the requested parser and
// set it.
if t, ok := processor.(telegraf.ParserPlugin); ok {
parser, err := c.addParser("processors", name, table)
if err != nil {
return fmt.Errorf("adding parser failed: %w", err)
}
t.SetParser(parser)
}

if t, ok := processor.(telegraf.ParserFuncPlugin); ok {
if !c.probeParser(table) {
return errors.New("parser not found")
}
t.SetParserFunc(func() (telegraf.Parser, error) {
return c.addParser("processors", name, table)
})
}

// Setup the processor
if err := c.setupProcessorOptions(processorConfig.Name, streamingProcessor, table); err != nil {
return err
}

rf := models.NewRunningProcessor(streamingProcessor, processorConfig)
c.Processors = append(c.Processors, rf)

// save a copy for the aggregator
rf, err = c.newRunningProcessor(creator, processorConfig, table)
if err != nil {
// Save a copy for the aggregator
if err := c.setupProcessorOptions(processorConfig.Name, streamingProcessor, table); err != nil {
return err
}

rf = models.NewRunningProcessor(streamingProcessor, processorConfig)
c.AggProcessors = append(c.AggProcessors, rf)

return nil
}

func (c *Config) newRunningProcessor(
creator processors.StreamingCreator,
processorConfig *models.ProcessorConfig,
table *ast.Table,
) (*models.RunningProcessor, error) {
processor := creator()

func (c *Config) setupProcessorOptions(name string, processor telegraf.StreamingProcessor, table *ast.Table) error {
if p, ok := processor.(unwrappable); ok {
if err := c.toml.UnmarshalTable(table, p.Unwrap()); err != nil {
return nil, err
}
} else {
if err := c.toml.UnmarshalTable(table, processor); err != nil {
return nil, err
unwrapped := p.Unwrap()
if err := c.toml.UnmarshalTable(table, unwrapped); err != nil {
return fmt.Errorf("unmarshalling unwrappable failed: %w", err)
}
return c.printUserDeprecation("processors", name, unwrapped)
}

if err := c.printUserDeprecation("processors", processorConfig.Name, processor); err != nil {
return nil, err
if err := c.toml.UnmarshalTable(table, processor); err != nil {
return fmt.Errorf("unmarshalling failed: %w", err)
}

rf := models.NewRunningProcessor(processor, processorConfig)
return rf, nil
return c.printUserDeprecation("processors", name, processor)
}

func (c *Config) addOutput(name string, table *ast.Table) error {
Expand Down Expand Up @@ -860,8 +891,8 @@ func (c *Config) addInput(name string, table *ast.Table) error {

// If the input has a SetParser or SetParserFunc function, it can accept
// arbitrary data-formats, so build the requested parser and set it.
if t, ok := input.(telegraf.ParserInput); ok {
parser, err := c.addParser(name, table)
if t, ok := input.(telegraf.ParserPlugin); ok {
parser, err := c.addParser("inputs", name, table)
if err != nil {
return fmt.Errorf("adding parser failed: %w", err)
}
Expand All @@ -870,30 +901,30 @@ func (c *Config) addInput(name string, table *ast.Table) error {

// Keep the old interface for backward compatibility
if t, ok := input.(parsers.ParserInput); ok {
// DEPRECATED: Please switch your plugin to telegraf.ParserInput.
parser, err := c.addParser(name, table)
// DEPRECATED: Please switch your plugin to telegraf.ParserPlugin.
parser, err := c.addParser("inputs", name, table)
if err != nil {
return fmt.Errorf("adding parser failed: %w", err)
}
t.SetParser(parser)
}

if t, ok := input.(telegraf.ParserFuncInput); ok {
if t, ok := input.(telegraf.ParserFuncPlugin); ok {
if !c.probeParser(table) {
return errors.New("parser not found")
}
t.SetParserFunc(func() (telegraf.Parser, error) {
return c.addParser(name, table)
return c.addParser("inputs", name, table)
})
}

if t, ok := input.(parsers.ParserFuncInput); ok {
// DEPRECATED: Please switch your plugin to telegraf.ParserFuncInput.
// DEPRECATED: Please switch your plugin to telegraf.ParserFuncPlugin.
if !c.probeParser(table) {
return errors.New("parser not found")
}
t.SetParserFunc(func() (parsers.Parser, error) {
return c.addParser(name, table)
return c.addParser("inputs", name, table)
})
}

Expand Down
144 changes: 144 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/influxdata/telegraf/plugins/parsers"
_ "github.com/influxdata/telegraf/plugins/parsers/all" // Blank import to have all parsers for testing
"github.com/influxdata/telegraf/plugins/parsers/json"
"github.com/influxdata/telegraf/plugins/processors"
)

func TestReadBinaryFile(t *testing.T) {
Expand Down Expand Up @@ -654,6 +655,128 @@ func TestConfig_ParserInterfaceOldFormat(t *testing.T) {
}
}

func TestConfig_ProcessorsWithParsers(t *testing.T) {
formats := []string{
"collectd",
"csv",
"dropwizard",
"form_urlencoded",
"graphite",
"grok",
"influx",
"json",
"json_v2",
"logfmt",
"nagios",
"prometheus",
"prometheusremotewrite",
"value",
"wavefront",
"xml", "xpath_json", "xpath_msgpack", "xpath_protobuf",
}

c := NewConfig()
require.NoError(t, c.LoadConfig("./testdata/processors_with_parsers.toml"))
require.Len(t, c.Processors, len(formats))

override := map[string]struct {
param map[string]interface{}
mask []string
}{
"csv": {
param: map[string]interface{}{
"HeaderRowCount": 42,
},
mask: []string{"TimeFunc", "ResetMode"},
},
"xpath_protobuf": {
param: map[string]interface{}{
"ProtobufMessageDef": "testdata/addressbook.proto",
"ProtobufMessageType": "addressbook.AddressBook",
},
},
}

expected := make([]telegraf.Parser, 0, len(formats))
for _, format := range formats {
logger := models.NewLogger("parsers", format, "processors_with_parsers")

creator, found := parsers.Parsers[format]
require.Truef(t, found, "No parser for format %q", format)

parser := creator("parser_test")
if settings, found := override[format]; found {
s := reflect.Indirect(reflect.ValueOf(parser))
for key, value := range settings.param {
v := reflect.ValueOf(value)
s.FieldByName(key).Set(v)
}
}
models.SetLoggerOnPlugin(parser, logger)
if p, ok := parser.(telegraf.Initializer); ok {
require.NoError(t, p.Init())
}
expected = append(expected, parser)
}
require.Len(t, expected, len(formats))

actual := make([]interface{}, 0)
generated := make([]interface{}, 0)
for _, plugin := range c.Processors {
var processorIF telegraf.Processor
if p, ok := plugin.Processor.(unwrappable); ok {
processorIF = p.Unwrap()
} else {
processorIF = plugin.Processor.(telegraf.Processor)
}
require.NotNil(t, processorIF)

processor, ok := processorIF.(*MockupProcessorPluginParser)
require.True(t, ok)

// Get the parser set with 'SetParser()'
if p, ok := processor.Parser.(*models.RunningParser); ok {
actual = append(actual, p.Parser)
} else {
actual = append(actual, processor.Parser)
}
// Get the parser set with 'SetParserFunc()'
if processor.ParserFunc != nil {
g, err := processor.ParserFunc()
require.NoError(t, err)
if rp, ok := g.(*models.RunningParser); ok {
generated = append(generated, rp.Parser)
} else {
generated = append(generated, g)
}
} else {
generated = append(generated, nil)
}
}
require.Len(t, actual, len(formats))

for i, format := range formats {
// Determine the underlying type of the parser
stype := reflect.Indirect(reflect.ValueOf(expected[i])).Interface()
// Ignore all unexported fields and fields not relevant for functionality
options := []cmp.Option{
cmpopts.IgnoreUnexported(stype),
cmpopts.IgnoreTypes(sync.Mutex{}),
cmpopts.IgnoreInterfaces(struct{ telegraf.Logger }{}),
}
if settings, found := override[format]; found {
options = append(options, cmpopts.IgnoreFields(stype, settings.mask...))
}

// Do a manual comparision as require.EqualValues will also work on unexported fields
// that cannot be cleared or ignored.
diff := cmp.Diff(expected[i], actual[i], options...)
require.Emptyf(t, diff, "Difference in SetParser() for %q", format)
diff = cmp.Diff(expected[i], generated[i], options...)
require.Emptyf(t, diff, "Difference in SetParserFunc() for %q", format)
}
}

/*** Mockup INPUT plugin for (old) parser testing to avoid cyclic dependencies ***/
type MockupInputPluginParserOld struct {
Parser parsers.Parser
Expand Down Expand Up @@ -698,6 +821,24 @@ func (m *MockupInputPlugin) SampleConfig() string { return "Moc
func (m *MockupInputPlugin) Gather(acc telegraf.Accumulator) error { return nil }
func (m *MockupInputPlugin) SetParser(parser telegraf.Parser) { m.parser = parser }

/*** Mockup PROCESSOR plugin for testing to avoid cyclic dependencies ***/
type MockupProcessorPluginParser struct {
Parser telegraf.Parser
ParserFunc telegraf.ParserFunc
}

func (m *MockupProcessorPluginParser) Start(acc telegraf.Accumulator) error { return nil }
func (m *MockupProcessorPluginParser) Stop() error { return nil }
func (m *MockupProcessorPluginParser) SampleConfig() string {
return "Mockup test processor plugin with parser"
}
func (m *MockupProcessorPluginParser) Apply(in ...telegraf.Metric) []telegraf.Metric { return nil }
func (m *MockupProcessorPluginParser) Add(metric telegraf.Metric, acc telegraf.Accumulator) error {
return nil
}
func (m *MockupProcessorPluginParser) SetParser(parser telegraf.Parser) { m.Parser = parser }
func (m *MockupProcessorPluginParser) SetParserFunc(f telegraf.ParserFunc) { m.ParserFunc = f }

/*** Mockup OUTPUT plugin for testing to avoid cyclic dependencies ***/
type MockupOuputPlugin struct {
URL string `toml:"url"`
Expand All @@ -723,6 +864,9 @@ func init() {
inputs.Add("memcached", func() telegraf.Input { return &MockupInputPlugin{} })
inputs.Add("procstat", func() telegraf.Input { return &MockupInputPlugin{} })

// Register the mockup output plugin for the required names
processors.Add("parser_test", func() telegraf.Processor { return &MockupProcessorPluginParser{} })

// Register the mockup output plugin for the required names
outputs.Add("azure_monitor", func() telegraf.Output { return &MockupOuputPlugin{NamespacePrefix: "Telegraf/"} })
outputs.Add("http", func() telegraf.Output { return &MockupOuputPlugin{} })
Expand Down
Loading

0 comments on commit 8a9c2ee

Please sign in to comment.