Skip to content

Commit

Permalink
chore(serializers)!: Remove old-style creation
Browse files Browse the repository at this point in the history
  • Loading branch information
srebhan committed Oct 2, 2024
1 parent 2b307c7 commit d4bd6c3
Show file tree
Hide file tree
Showing 41 changed files with 70 additions and 486 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@
delivery state update of un-parseable messages from `ACK` to `NACK` without
requeueing. This way, those messages are not lost and can optionally be
handled using a dead-letter exchange by other means.
- Removal of old-style serializer creation. This should not directly affect
users as it is an API change. All serializers in Telegraf are already ported
to the new framework. If you experience any issues with not being able to
create serializers let us know!

## v1.32.0 [2024-09-09]

Expand Down
9 changes: 0 additions & 9 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1185,15 +1185,6 @@ func (c *Config) addOutput(name string, table *ast.Table) error {
return err
}
t.SetSerializer(serializer)
} else if t, ok := output.(serializers.SerializerOutput); ok {
// Keep the old interface for backward compatibility
// DEPRECATED: Please switch your plugin to telegraf.Serializers
missThreshold = 1
serializer, err := c.addSerializer(name, table)
if err != nil {
return err
}
t.SetSerializer(serializer)
}

outputConfig, err := c.buildOutput(name, table)
Expand Down
125 changes: 0 additions & 125 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,28 +629,19 @@ func TestConfig_SerializerInterfaceNewFormat(t *testing.T) {
require.NoError(t, c.LoadConfig("./testdata/serializers_new.toml"))
require.Len(t, c.Outputs, len(formats))

cfg := serializers.Config{}
override := map[string]struct {
param map[string]interface{}
mask []string
}{}

expected := make([]telegraf.Serializer, 0, len(formats))
for _, format := range formats {
formatCfg := &cfg
formatCfg.DataFormat = format

logger := logging.New("serializers", format, "test")

var serializer telegraf.Serializer
if creator, found := serializers.Serializers[format]; found {
t.Logf("new-style %q", format)
serializer = creator()
} else {
t.Logf("old-style %q", format)
var err error
serializer, err = serializers.NewSerializer(formatCfg)
require.NoErrorf(t, err, "No serializer for format %q", format)
}

if settings, found := override[format]; found {
Expand Down Expand Up @@ -702,98 +693,6 @@ func TestConfig_SerializerInterfaceNewFormat(t *testing.T) {
}
}

func TestConfig_SerializerInterfaceOldFormat(t *testing.T) {
formats := []string{
"carbon2",
"csv",
"graphite",
"influx",
"json",
"msgpack",
"nowmetric",
"prometheus",
"prometheusremotewrite",
"splunkmetric",
"wavefront",
}

c := config.NewConfig()
require.NoError(t, c.LoadConfig("./testdata/serializers_old.toml"))
require.Len(t, c.Outputs, len(formats))

cfg := serializers.Config{}
override := map[string]struct {
param map[string]interface{}
mask []string
}{}

expected := make([]telegraf.Serializer, 0, len(formats))
for _, format := range formats {
formatCfg := &cfg
formatCfg.DataFormat = format

logger := logging.New("serializers", format, "test")

var serializer serializers.Serializer
if creator, found := serializers.Serializers[format]; found {
t.Logf("new-style %q", format)
serializer = creator()
} else {
t.Logf("old-style %q", format)
var err error
serializer, err = serializers.NewSerializer(formatCfg)
require.NoErrorf(t, err, "No serializer for format %q", format)
}

if settings, found := override[format]; found {
s := reflect.Indirect(reflect.ValueOf(serializer))
for key, value := range settings.param {
v := reflect.ValueOf(value)
s.FieldByName(key).Set(v)
}
}
models.SetLoggerOnPlugin(serializer, logger)
if s, ok := serializer.(telegraf.Initializer); ok {
require.NoError(t, s.Init())
}
expected = append(expected, serializer)
}
require.Len(t, expected, len(formats))

actual := make([]interface{}, 0)
for _, plugin := range c.Outputs {
output, ok := plugin.Output.(*MockupOutputPluginSerializerOld)
require.True(t, ok)
// Get the parser set with 'SetParser()'
if p, ok := output.Serializer.(*models.RunningSerializer); ok {
actual = append(actual, p.Serializer)
} else {
actual = append(actual, output.Serializer)
}
}
require.Len(t, actual, len(formats))

for i, format := range formats {
// Determine the underlying type of the serializer
stype := reflect.Indirect(reflect.ValueOf(expected[i])).Interface()
// Ignore all unexported fields and fields not relevant for functionality
options := []cmp.Option{
cmpopts.IgnoreUnexported(stype),
cmpopts.IgnoreUnexported(reflect.Indirect(reflect.ValueOf(serializers_prometheus.MetricTypes{})).Interface()),
cmpopts.IgnoreTypes(sync.Mutex{}, regexp.Regexp{}),
cmpopts.IgnoreInterfaces(struct{ telegraf.Logger }{}),
}
if settings, found := override[format]; found {
options = append(options, cmpopts.IgnoreFields(stype, settings.mask...))
}

// Do a manual comparison as require.EqualValues will also work on unexported fields
// that cannot be cleared or ignored.
diff := cmp.Diff(expected[i], actual[i], options...)
require.Emptyf(t, diff, "Difference in SetSerializer() for %q", format)
}
}

func TestConfig_ParserInterface(t *testing.T) {
formats := []string{
"collectd",
Expand Down Expand Up @@ -1502,27 +1401,6 @@ func (m *MockupOutputPlugin) Write(_ []telegraf.Metric) error {
return nil
}

// Mockup OUTPUT plugin for serializer testing to avoid cyclic dependencies
type MockupOutputPluginSerializerOld struct {
Serializer serializers.Serializer
}

func (m *MockupOutputPluginSerializerOld) SetSerializer(s serializers.Serializer) {
m.Serializer = s
}
func (*MockupOutputPluginSerializerOld) Connect() error {
return nil
}
func (*MockupOutputPluginSerializerOld) Close() error {
return nil
}
func (*MockupOutputPluginSerializerOld) SampleConfig() string {
return "Mockup test output plugin"
}
func (*MockupOutputPluginSerializerOld) Write(_ []telegraf.Metric) error {
return nil
}

type MockupOutputPluginSerializerNew struct {
Serializer telegraf.Serializer
}
Expand Down Expand Up @@ -1662,7 +1540,4 @@ func init() {
outputs.Add("serializer_test_new", func() telegraf.Output {
return &MockupOutputPluginSerializerNew{}
})
outputs.Add("serializer_test_old", func() telegraf.Output {
return &MockupOutputPluginSerializerOld{}
})
}
5 changes: 2 additions & 3 deletions models/running_serializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (

"github.com/influxdata/telegraf"
logging "github.com/influxdata/telegraf/logger"
"github.com/influxdata/telegraf/plugins/serializers"
"github.com/influxdata/telegraf/selfstat"
)

Expand All @@ -19,7 +18,7 @@ type SerializerConfig struct {
}

type RunningSerializer struct {
Serializer serializers.Serializer
Serializer telegraf.Serializer
Config *SerializerConfig
log telegraf.Logger

Expand All @@ -28,7 +27,7 @@ type RunningSerializer struct {
SerializationTime selfstat.Stat
}

func NewRunningSerializer(serializer serializers.Serializer, config *SerializerConfig) *RunningSerializer {
func NewRunningSerializer(serializer telegraf.Serializer, config *SerializerConfig) *RunningSerializer {
tags := map[string]string{"type": config.DataFormat}
if config.Alias != "" {
tags["alias"] = config.Alias
Expand Down
5 changes: 2 additions & 3 deletions plugins/outputs/amqp/amqp.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/influxdata/telegraf/plugins/common/proxy"
"github.com/influxdata/telegraf/plugins/common/tls"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/serializers"
)

//go:embed sample.conf
Expand Down Expand Up @@ -67,7 +66,7 @@ type AMQP struct {
tls.ClientConfig
proxy.TCPProxy

serializer serializers.Serializer
serializer telegraf.Serializer
connect func(*ClientConfig) (Client, error)
client Client
config *ClientConfig
Expand All @@ -84,7 +83,7 @@ func (*AMQP) SampleConfig() string {
return sampleConfig
}

func (q *AMQP) SetSerializer(serializer serializers.Serializer) {
func (q *AMQP) SetSerializer(serializer telegraf.Serializer) {
q.serializer = serializer
}

Expand Down
3 changes: 1 addition & 2 deletions plugins/outputs/azure_data_explorer/azure_data_explorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/internal/choice"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/serializers"
"github.com/influxdata/telegraf/plugins/serializers/json"
)

Expand All @@ -36,7 +35,7 @@ type AzureDataExplorer struct {
TableName string `toml:"table_name"`
CreateTables bool `toml:"create_tables"`
IngestionType string `toml:"ingestion_type"`
serializer serializers.Serializer
serializer telegraf.Serializer
kustoClient *kusto.Client
metricIngestors map[string]ingest.Ingestor
}
Expand Down
5 changes: 2 additions & 3 deletions plugins/outputs/cloud_pubsub/cloud_pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/serializers"
)

//go:embed sample.conf
Expand All @@ -45,7 +44,7 @@ type PubSub struct {

stubTopic func(id string) topic

serializer serializers.Serializer
serializer telegraf.Serializer
publishResults []publishResult
encoder internal.ContentEncoder
}
Expand All @@ -54,7 +53,7 @@ func (*PubSub) SampleConfig() string {
return sampleConfig
}

func (ps *PubSub) SetSerializer(serializer serializers.Serializer) {
func (ps *PubSub) SetSerializer(serializer telegraf.Serializer) {
ps.serializer = serializer
}

Expand Down
5 changes: 2 additions & 3 deletions plugins/outputs/event_hubs/event_hubs.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/serializers"
)

//go:embed sample.conf
Expand Down Expand Up @@ -62,7 +61,7 @@ type EventHubs struct {

Hub EventHubInterface
batchOptions []eventhub.BatchOption
serializer serializers.Serializer
serializer telegraf.Serializer
}

const (
Expand Down Expand Up @@ -104,7 +103,7 @@ func (e *EventHubs) Close() error {
return nil
}

func (e *EventHubs) SetSerializer(serializer serializers.Serializer) {
func (e *EventHubs) SetSerializer(serializer telegraf.Serializer) {
e.serializer = serializer
}

Expand Down
5 changes: 2 additions & 3 deletions plugins/outputs/exec/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/serializers"
)

//go:embed sample.conf
Expand All @@ -33,7 +32,7 @@ type Exec struct {
Log telegraf.Logger `toml:"-"`

runner Runner
serializer serializers.Serializer
serializer telegraf.Serializer
}

func (*Exec) SampleConfig() string {
Expand All @@ -47,7 +46,7 @@ func (e *Exec) Init() error {
}

// SetSerializer sets the serializer for the output.
func (e *Exec) SetSerializer(serializer serializers.Serializer) {
func (e *Exec) SetSerializer(serializer telegraf.Serializer) {
e.serializer = serializer
}

Expand Down
5 changes: 2 additions & 3 deletions plugins/outputs/execd/execd.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal/process"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/serializers"
)

//go:embed sample.conf
Expand All @@ -29,14 +28,14 @@ type Execd struct {
Log telegraf.Logger

process *process.Process
serializer serializers.Serializer
serializer telegraf.Serializer
}

func (*Execd) SampleConfig() string {
return sampleConfig
}

func (e *Execd) SetSerializer(s serializers.Serializer) {
func (e *Execd) SetSerializer(s telegraf.Serializer) {
e.serializer = s
}

Expand Down
5 changes: 2 additions & 3 deletions plugins/outputs/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/internal/rotate"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/serializers"
)

//go:embed sample.conf
Expand All @@ -32,14 +31,14 @@ type File struct {
encoder internal.ContentEncoder
writer io.Writer
closers []io.Closer
serializer serializers.Serializer
serializer telegraf.Serializer
}

func (*File) SampleConfig() string {
return sampleConfig
}

func (f *File) SetSerializer(serializer serializers.Serializer) {
func (f *File) SetSerializer(serializer telegraf.Serializer) {
f.serializer = serializer
}

Expand Down
Loading

0 comments on commit d4bd6c3

Please sign in to comment.