diff --git a/internal/errors.go b/internal/errors.go index d1e098ea441ce..a36bda794932c 100644 --- a/internal/errors.go +++ b/internal/errors.go @@ -2,7 +2,11 @@ package internal import "errors" -var ErrNotConnected = errors.New("not connected") +var ( + ErrNotConnected = errors.New("not connected") + ErrSerialization = errors.New("serialization of metric(s) failed") + ErrSizeLimitReached = errors.New("size limit reached") +) // StartupError indicates an error that occurred during startup of a plugin // e.g. due to connectivity issues or resources being not yet available. diff --git a/models/running_output_test.go b/models/running_output_test.go index 3c8b9e5951e1a..9a60481d52fa6 100644 --- a/models/running_output_test.go +++ b/models/running_output_test.go @@ -245,7 +245,7 @@ func TestRunningOutputWriteFail(t *testing.T) { Filter: Filter{}, } - m := &mockOutput{failWrite: true} + m := &mockOutput{batchAcceptSize: -1} ro := NewRunningOutput(m, conf, 4, 12) // Fill buffer to limit twice @@ -264,7 +264,7 @@ func TestRunningOutputWriteFail(t *testing.T) { // no successful flush yet require.Empty(t, m.Metrics()) - m.failWrite = false + m.batchAcceptSize = 0 err = ro.Write() require.NoError(t, err) @@ -277,7 +277,7 @@ func TestRunningOutputWriteFailOrder(t *testing.T) { Filter: Filter{}, } - m := &mockOutput{failWrite: true} + m := &mockOutput{batchAcceptSize: -1} ro := NewRunningOutput(m, conf, 100, 1000) // add 5 metrics @@ -293,7 +293,8 @@ func TestRunningOutputWriteFailOrder(t *testing.T) { // no successful flush yet require.Empty(t, m.Metrics()) - m.failWrite = false + m.batchAcceptSize = 0 + // add 5 more metrics for _, metric := range next5 { ro.AddMetric(metric) @@ -314,7 +315,7 @@ func TestRunningOutputWriteFailOrder2(t *testing.T) { Filter: Filter{}, } - m := &mockOutput{failWrite: true} + m := &mockOutput{batchAcceptSize: -1} ro := NewRunningOutput(m, conf, 5, 100) // add 5 metrics @@ -357,7 +358,7 @@ func TestRunningOutputWriteFailOrder2(t *testing.T) { // no successful flush yet require.Empty(t, m.Metrics()) - m.failWrite = false + m.batchAcceptSize = 0 err = ro.Write() require.NoError(t, err) @@ -377,7 +378,7 @@ func TestRunningOutputWriteFailOrder3(t *testing.T) { Filter: Filter{}, } - m := &mockOutput{failWrite: true} + m := &mockOutput{batchAcceptSize: -1} ro := NewRunningOutput(m, conf, 5, 1000) // add 5 metrics @@ -399,7 +400,8 @@ func TestRunningOutputWriteFailOrder3(t *testing.T) { require.Error(t, err) // unset fail and write metrics - m.failWrite = false + m.batchAcceptSize = 0 + err = ro.Write() require.NoError(t, err) @@ -620,7 +622,7 @@ func TestRunningOutputNonRetryableStartupBehaviorDefault(t *testing.T) { } } -func TestRunningOutputUntypedtartupBehaviorIgnore(t *testing.T) { +func TestRunningOutputUntypedStartupBehaviorIgnore(t *testing.T) { serr := errors.New("untyped err") for _, behavior := range []string{"", "error", "retry", "ignore"} { @@ -692,12 +694,181 @@ func TestRunningOutputPartiallyStarted(t *testing.T) { require.Equal(t, 3, mo.writes) } +func TestRunningOutputWritePartialSuccess(t *testing.T) { + plugin := &mockOutput{ + batchAcceptSize: 4, + } + model := NewRunningOutput(plugin, &OutputConfig{}, 5, 10) + require.NoError(t, model.Init()) + require.NoError(t, model.Connect()) + defer model.Close() + + // Fill buffer completely + for _, metric := range first5 { + model.AddMetric(metric) + } + for _, metric := range next5 { + model.AddMetric(metric) + } + + // We no not expect any successful flush yet + require.Empty(t, plugin.Metrics()) + require.Equal(t, 10, model.buffer.Len()) + + // Write to the output. This should only partially succeed with the first + // few metrics removed from buffer + require.ErrorIs(t, model.Write(), internal.ErrSizeLimitReached) + require.Len(t, plugin.metrics, 4) + require.Equal(t, 6, model.buffer.Len()) + + // The next write should remove the next metrics from the buffer + require.ErrorIs(t, model.Write(), internal.ErrSizeLimitReached) + require.Len(t, plugin.metrics, 8) + require.Equal(t, 2, model.buffer.Len()) + + // The last write should succeed straight away and all metrics should have + // been received by the output + require.NoError(t, model.Write()) + testutil.RequireMetricsEqual(t, append(first5, next5...), plugin.metrics) + require.Zero(t, model.buffer.Len()) +} + +func TestRunningOutputWritePartialSuccessAndLoss(t *testing.T) { + lost := 0 + plugin := &mockOutput{ + batchAcceptSize: 4, + metricFatalIndex: &lost, + } + model := NewRunningOutput(plugin, &OutputConfig{}, 5, 10) + require.NoError(t, model.Init()) + require.NoError(t, model.Connect()) + defer model.Close() + + // Fill buffer completely + for _, metric := range first5 { + model.AddMetric(metric) + } + for _, metric := range next5 { + model.AddMetric(metric) + } + expected := []telegraf.Metric{ + /* fatal, */ first5[1], first5[2], first5[3], + /* fatal, */ next5[0], next5[1], next5[2], + next5[3], next5[4], + } + + // We no not expect any successful flush yet + require.Empty(t, plugin.Metrics()) + require.Equal(t, 10, model.buffer.Len()) + + // Write to the output. This should only partially succeed with the first + // few metrics removed from buffer + require.ErrorIs(t, model.Write(), internal.ErrSizeLimitReached) + require.Len(t, plugin.metrics, 3) + require.Equal(t, 6, model.buffer.Len()) + + // The next write should remove the next metrics from the buffer + require.ErrorIs(t, model.Write(), internal.ErrSizeLimitReached) + require.Len(t, plugin.metrics, 6) + require.Equal(t, 2, model.buffer.Len()) + + // The last write should succeed straight away and all metrics should have + // been received by the output + require.NoError(t, model.Write()) + testutil.RequireMetricsEqual(t, expected, plugin.metrics) + require.Zero(t, model.buffer.Len()) +} + +func TestRunningOutputWriteBatchPartialSuccess(t *testing.T) { + plugin := &mockOutput{ + batchAcceptSize: 4, + } + model := NewRunningOutput(plugin, &OutputConfig{}, 5, 10) + require.NoError(t, model.Init()) + require.NoError(t, model.Connect()) + defer model.Close() + + // Fill buffer completely + for _, metric := range first5 { + model.AddMetric(metric) + } + for _, metric := range next5 { + model.AddMetric(metric) + } + + // We no not expect any successful flush yet + require.Empty(t, plugin.Metrics()) + require.Equal(t, 10, model.buffer.Len()) + + // Write to the output. This should only partially succeed with the first + // few metrics removed from buffer + require.ErrorIs(t, model.WriteBatch(), internal.ErrSizeLimitReached) + require.Len(t, plugin.metrics, 4) + require.Equal(t, 6, model.buffer.Len()) + + // The next write should remove the next metrics from the buffer + require.ErrorIs(t, model.WriteBatch(), internal.ErrSizeLimitReached) + require.Len(t, plugin.metrics, 8) + require.Equal(t, 2, model.buffer.Len()) + + // The last write should succeed straight away and all metrics should have + // been received by the output + require.NoError(t, model.WriteBatch()) + testutil.RequireMetricsEqual(t, append(first5, next5...), plugin.metrics) + require.Zero(t, model.buffer.Len()) +} + +func TestRunningOutputWriteBatchPartialSuccessAndLoss(t *testing.T) { + lost := 0 + plugin := &mockOutput{ + batchAcceptSize: 4, + metricFatalIndex: &lost, + } + model := NewRunningOutput(plugin, &OutputConfig{}, 5, 10) + require.NoError(t, model.Init()) + require.NoError(t, model.Connect()) + defer model.Close() + + // Fill buffer completely + for _, metric := range first5 { + model.AddMetric(metric) + } + for _, metric := range next5 { + model.AddMetric(metric) + } + expected := []telegraf.Metric{ + /* fatal, */ first5[1], first5[2], first5[3], + /* fatal, */ next5[0], next5[1], next5[2], + next5[3], next5[4], + } + + // We no not expect any successful flush yet + require.Empty(t, plugin.Metrics()) + require.Equal(t, 10, model.buffer.Len()) + + // Write to the output. This should only partially succeed with the first + // few metrics removed from buffer + require.ErrorIs(t, model.WriteBatch(), internal.ErrSizeLimitReached) + require.Len(t, plugin.metrics, 3) + require.Equal(t, 6, model.buffer.Len()) + + // The next write should remove the next metrics from the buffer + require.ErrorIs(t, model.WriteBatch(), internal.ErrSizeLimitReached) + require.Len(t, plugin.metrics, 6) + require.Equal(t, 2, model.buffer.Len()) + + // The last write should succeed straight away and all metrics should have + // been received by the output + require.NoError(t, model.WriteBatch()) + testutil.RequireMetricsEqual(t, expected, plugin.metrics) + require.Zero(t, model.buffer.Len()) +} + // Benchmark adding metrics. func BenchmarkRunningOutputAddWrite(b *testing.B) { conf := &OutputConfig{ Filter: Filter{}, } - m := &perfOutput{} ro := NewRunningOutput(m, conf, 1000, 10000) @@ -712,7 +883,6 @@ func BenchmarkRunningOutputAddWriteEvery100(b *testing.B) { conf := &OutputConfig{ Filter: Filter{}, } - m := &perfOutput{} ro := NewRunningOutput(m, conf, 1000, 10000) @@ -729,10 +899,8 @@ func BenchmarkRunningOutputAddFailWrites(b *testing.B) { conf := &OutputConfig{ Filter: Filter{}, } - m := &perfOutput{failWrite: true} ro := NewRunningOutput(m, conf, 1000, 10000) - for n := 0; n < b.N; n++ { ro.AddMetric(testutil.TestMetric(101, "metric1")) } @@ -743,9 +911,11 @@ type mockOutput struct { metrics []telegraf.Metric - // if true, mock write failure - failWrite bool + // Failing output simulation + batchAcceptSize int + metricFatalIndex *int + // Startup error simulation startupError error startupErrorCount int writes int @@ -761,11 +931,11 @@ func (m *mockOutput) Connect() error { return m.startupError } -func (m *mockOutput) Close() error { +func (*mockOutput) Close() error { return nil } -func (m *mockOutput) SampleConfig() string { +func (*mockOutput) SampleConfig() string { return "" } @@ -774,12 +944,29 @@ func (m *mockOutput) Write(metrics []telegraf.Metric) error { m.Lock() defer m.Unlock() - if m.failWrite { + + // Simulate a failed write + if m.batchAcceptSize < 0 { return errors.New("failed write") } - m.metrics = append(m.metrics, metrics...) - return nil + // Simulate a successful write + if m.batchAcceptSize == 0 || len(metrics) <= m.batchAcceptSize { + m.metrics = append(m.metrics, metrics...) + return nil + } + + // Simulate a partially successful write + werr := &internal.PartialWriteError{Err: internal.ErrSizeLimitReached} + for i, x := range metrics { + if m.metricFatalIndex != nil && i == *m.metricFatalIndex { + werr.MetricsReject = append(werr.MetricsReject, i) + } else if i < m.batchAcceptSize { + m.metrics = append(m.metrics, x) + werr.MetricsAccept = append(werr.MetricsAccept, i) + } + } + return werr } func (m *mockOutput) Metrics() []telegraf.Metric { diff --git a/plugins/common/ratelimiter/config.go b/plugins/common/ratelimiter/config.go new file mode 100644 index 0000000000000..a2ca077c05f59 --- /dev/null +++ b/plugins/common/ratelimiter/config.go @@ -0,0 +1,19 @@ +package ratelimiter + +import ( + "time" + + "github.com/influxdata/telegraf/config" +) + +type RateLimitConfig struct { + Limit config.Size `toml:"rate_limit"` + Period config.Duration `toml:"rate_limit_period"` +} + +func (cfg *RateLimitConfig) CreateRateLimiter() *RateLimiter { + return &RateLimiter{ + limit: int64(cfg.Limit), + period: time.Duration(cfg.Period), + } +} diff --git a/plugins/common/ratelimiter/limiters.go b/plugins/common/ratelimiter/limiters.go new file mode 100644 index 0000000000000..f24d08b6239f1 --- /dev/null +++ b/plugins/common/ratelimiter/limiters.go @@ -0,0 +1,66 @@ +package ratelimiter + +import ( + "errors" + "math" + "time" +) + +var ( + ErrLimitExceeded = errors.New("not enough tokens") +) + +type RateLimiter struct { + limit int64 + period time.Duration + periodStart time.Time + remaining int64 +} + +func (r *RateLimiter) Remaining(t time.Time) int64 { + if r.limit == 0 { + return math.MaxInt64 + } + + // Check for corner case + if !r.periodStart.Before(t) { + return 0 + } + + // We are in a new period, so the complete size is available + deltat := t.Sub(r.periodStart) + if deltat >= r.period { + return r.limit + } + + return r.remaining +} + +func (r *RateLimiter) Accept(t time.Time, used int64) { + if r.limit == 0 || r.periodStart.After(t) { + return + } + + // Remember the first query and reset if we are in a new period + if r.periodStart.IsZero() { + r.periodStart = t + r.remaining = r.limit + } else if deltat := t.Sub(r.periodStart); deltat >= r.period { + r.periodStart = r.periodStart.Add(deltat.Truncate(r.period)) + r.remaining = r.limit + } + + // Update the state + r.remaining = max(r.remaining-used, 0) +} + +func (r *RateLimiter) Undo(t time.Time, used int64) { + // Do nothing if we are not in the current period or unlimited because we + // already reset the limit on a new window. + if r.limit == 0 || r.periodStart.IsZero() || r.periodStart.After(t) || t.Sub(r.periodStart) >= r.period { + return + } + + // Undo the state update + r.remaining = min(r.remaining+used, r.limit) +} diff --git a/plugins/common/ratelimiter/limiters_test.go b/plugins/common/ratelimiter/limiters_test.go new file mode 100644 index 0000000000000..e886b1cc80221 --- /dev/null +++ b/plugins/common/ratelimiter/limiters_test.go @@ -0,0 +1,176 @@ +package ratelimiter + +import ( + "math" + "testing" + "time" + + "github.com/influxdata/telegraf/config" + "github.com/stretchr/testify/require" +) + +func TestUnlimited(t *testing.T) { + cfg := &RateLimitConfig{} + limiter := cfg.CreateRateLimiter() + + start := time.Now() + end := start.Add(30 * time.Minute) + for ts := start; ts.Before(end); ts = ts.Add(1 * time.Minute) { + require.EqualValues(t, int64(math.MaxInt64), limiter.Remaining(ts)) + } +} + +func TestUnlimitedWithPeriod(t *testing.T) { + cfg := &RateLimitConfig{ + Period: config.Duration(5 * time.Minute), + } + limiter := cfg.CreateRateLimiter() + + start := time.Now() + end := start.Add(30 * time.Minute) + for ts := start; ts.Before(end); ts = ts.Add(1 * time.Minute) { + require.EqualValues(t, int64(math.MaxInt64), limiter.Remaining(ts)) + } +} + +func TestLimited(t *testing.T) { + tests := []struct { + name string + cfg *RateLimitConfig + step time.Duration + request []int64 + expected []int64 + }{ + { + name: "constant usage", + cfg: &RateLimitConfig{ + Limit: config.Size(1024), + Period: config.Duration(5 * time.Minute), + }, + step: time.Minute, + request: []int64{300}, + expected: []int64{1024, 724, 424, 124, 0, 1024, 724, 424, 124, 0}, + }, + { + name: "variable usage", + cfg: &RateLimitConfig{ + Limit: config.Size(1024), + Period: config.Duration(5 * time.Minute), + }, + step: time.Minute, + request: []int64{256, 128, 512, 64, 64, 1024, 0, 0, 0, 0, 128, 4096, 4096, 4096, 4096, 4096}, + expected: []int64{1024, 768, 640, 128, 64, 1024, 0, 0, 0, 0, 1024, 896, 0, 0, 0, 1024}, + }, + } + + // Run the test with an offset of period multiples + for _, tt := range tests { + t.Run(tt.name+" at period", func(t *testing.T) { + // Setup the limiter + limiter := tt.cfg.CreateRateLimiter() + + // Compute the actual values + start := time.Now().Truncate(tt.step) + for i, expected := range tt.expected { + ts := start.Add(time.Duration(i) * tt.step) + remaining := limiter.Remaining(ts) + use := min(remaining, tt.request[i%len(tt.request)]) + require.Equalf(t, expected, remaining, "mismatch at index %d", i) + limiter.Accept(ts, use) + } + }) + } + + // Run the test at a time of period multiples + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Setup the limiter + limiter := tt.cfg.CreateRateLimiter() + + // Compute the actual values + start := time.Now().Truncate(tt.step).Add(1 * time.Second) + for i, expected := range tt.expected { + ts := start.Add(time.Duration(i) * tt.step) + remaining := limiter.Remaining(ts) + use := min(remaining, tt.request[i%len(tt.request)]) + require.Equalf(t, expected, remaining, "mismatch at index %d", i) + limiter.Accept(ts, use) + } + }) + } +} + +func TestUndo(t *testing.T) { + tests := []struct { + name string + cfg *RateLimitConfig + step time.Duration + request []int64 + expected []int64 + }{ + { + name: "constant usage", + cfg: &RateLimitConfig{ + Limit: config.Size(1024), + Period: config.Duration(5 * time.Minute), + }, + step: time.Minute, + request: []int64{300}, + expected: []int64{1024, 724, 424, 124, 124, 1024, 724, 424, 124, 124}, + }, + { + name: "variable usage", + cfg: &RateLimitConfig{ + Limit: config.Size(1024), + Period: config.Duration(5 * time.Minute), + }, + step: time.Minute, + request: []int64{256, 128, 512, 64, 64, 1024, 0, 0, 0, 0, 128, 4096, 4096, 4096, 4096, 4096}, + expected: []int64{1024, 768, 640, 128, 64, 1024, 0, 0, 0, 0, 1024, 896, 896, 896, 896, 1024}, + }, + } + + // Run the test with an offset of period multiples + for _, tt := range tests { + t.Run(tt.name+" at period", func(t *testing.T) { + // Setup the limiter + limiter := tt.cfg.CreateRateLimiter() + + // Compute the actual values + start := time.Now().Truncate(tt.step) + for i, expected := range tt.expected { + ts := start.Add(time.Duration(i) * tt.step) + remaining := limiter.Remaining(ts) + use := min(remaining, tt.request[i%len(tt.request)]) + require.Equalf(t, expected, remaining, "mismatch at index %d", i) + limiter.Accept(ts, use) + // Undo too large operations + if tt.request[i%len(tt.request)] > remaining { + limiter.Undo(ts, use) + } + } + }) + } + + // Run the test at a time of period multiples + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Setup the limiter + limiter := tt.cfg.CreateRateLimiter() + + // Compute the actual values + start := time.Now().Truncate(tt.step).Add(1 * time.Second) + for i, expected := range tt.expected { + ts := start.Add(time.Duration(i) * tt.step) + remaining := limiter.Remaining(ts) + use := min(remaining, tt.request[i%len(tt.request)]) + require.Equalf(t, expected, remaining, "mismatch at index %d", i) + limiter.Accept(ts, use) + // Undo too large operations + if tt.request[i%len(tt.request)] > remaining { + limiter.Undo(ts, use) + } + } + }) + } +} diff --git a/plugins/common/ratelimiter/serializers.go b/plugins/common/ratelimiter/serializers.go new file mode 100644 index 0000000000000..6bd6ce78e0ff9 --- /dev/null +++ b/plugins/common/ratelimiter/serializers.go @@ -0,0 +1,100 @@ +package ratelimiter + +import ( + "bytes" + "math" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" +) + +// Serializer interface abstracting the different implementations of a +// limited-size serializer +type Serializer interface { + Serialize(metric telegraf.Metric, limit int64) ([]byte, error) + SerializeBatch(metrics []telegraf.Metric, limit int64) ([]byte, error) +} + +// Individual serializers do serialize each metric individually using the +// serializer's Serialize() function and add the resulting output to the buffer +// until the limit is reached. This only works for serializers NOT requiring +// the serialization of a batch as-a-whole. +type IndividualSerializer struct { + serializer telegraf.Serializer + buffer *bytes.Buffer +} + +func NewIndividualSerializer(s telegraf.Serializer) *IndividualSerializer { + return &IndividualSerializer{ + serializer: s, + buffer: &bytes.Buffer{}, + } +} + +func (s *IndividualSerializer) Serialize(metric telegraf.Metric, limit int64) ([]byte, error) { + // Do the serialization + buf, err := s.serializer.Serialize(metric) + if err != nil { + return nil, err + } + + // The serialized metric fits into the limit, so output it + if buflen := int64(len(buf)); buflen <= limit { + return buf, nil + } + + // The serialized metric exceeds the limit + return nil, internal.ErrSizeLimitReached +} + +func (s *IndividualSerializer) SerializeBatch(metrics []telegraf.Metric, limit int64) ([]byte, error) { + // Grow the buffer so it can hold at least the required size. This will + // save us from reallocate often + s.buffer.Reset() + if limit > 0 && limit < int64(math.MaxInt) { + s.buffer.Grow(int(limit)) + } + + // Prepare a potential write error and be optimistic + werr := &internal.PartialWriteError{ + MetricsAccept: make([]int, 0, len(metrics)), + } + + // Iterate through the metrics, serialize them and add them to the output + // buffer if they are within the size limit. + var used int64 + for i, m := range metrics { + buf, err := s.serializer.Serialize(m) + if err != nil { + // Failing serialization is a fatal error so mark the metric as such + werr.Err = internal.ErrSerialization + werr.MetricsReject = append(werr.MetricsReject, i) + werr.MetricsRejectErrors = append(werr.MetricsRejectErrors, err) + continue + } + + // The serialized metric fits into the limit, so add it to the output + if usedAdded := used + int64(len(buf)); usedAdded <= limit { + if _, err := s.buffer.Write(buf); err != nil { + return nil, err + } + werr.MetricsAccept = append(werr.MetricsAccept, i) + used = usedAdded + continue + } + + // Return only the size-limit-reached error if all metrics failed. + if used == 0 { + return nil, internal.ErrSizeLimitReached + } + + // Adding the serialized metric would exceed the limit so exit with an + // WriteError and fill in the required information + werr.Err = internal.ErrSizeLimitReached + break + } + if werr.Err != nil { + return s.buffer.Bytes(), werr + } + return s.buffer.Bytes(), nil +} diff --git a/plugins/common/ratelimiter/serializers_test.go b/plugins/common/ratelimiter/serializers_test.go new file mode 100644 index 0000000000000..06cc88a395674 --- /dev/null +++ b/plugins/common/ratelimiter/serializers_test.go @@ -0,0 +1,351 @@ +package ratelimiter + +import ( + "math" + "testing" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/metric" + "github.com/influxdata/telegraf/plugins/serializers/influx" + "github.com/stretchr/testify/require" +) + +func TestIndividualSerializer(t *testing.T) { + input := []telegraf.Metric{ + metric.New( + "serializer_test", + map[string]string{ + "source": "localhost", + "location": "factory_north", + "machine": "A", + "status": "ok", + }, + map[string]interface{}{ + "operating_hours": 123, + "temperature": 25.0, + "pressure": 1023.4, + }, + time.Unix(1722443551, 0), + ), + metric.New( + "serializer_test", + map[string]string{ + "source": "localhost", + "location": "factory_north", + "machine": "B", + "status": "failed", + }, + map[string]interface{}{ + "operating_hours": 8430, + "temperature": 65.2, + "pressure": 985.9, + }, + time.Unix(1722443554, 0), + ), + metric.New( + "serializer_test", + map[string]string{ + "source": "localhost", + "location": "factory_north", + "machine": "C", + "status": "warning", + }, + map[string]interface{}{ + "operating_hours": 6765, + "temperature": 42.5, + "pressure": 986.1, + }, + time.Unix(1722443555, 0), + ), + metric.New( + "device", + map[string]string{ + "source": "localhost", + "location": "factory_north", + }, + map[string]interface{}{ + "status": "ok", + }, + time.Unix(1722443556, 0), + ), + metric.New( + "serializer_test", + map[string]string{ + "source": "gateway_af43e", + "location": "factory_south", + "machine": "A", + "status": "ok", + }, + map[string]interface{}{ + "operating_hours": 5544, + "temperature": 18.6, + "pressure": 1069.4, + }, + time.Unix(1722443552, 0), + ), + metric.New( + "serializer_test", + map[string]string{ + "source": "gateway_af43e", + "location": "factory_south", + "machine": "B", + "status": "ok", + }, + map[string]interface{}{ + "operating_hours": 65, + "temperature": 29.7, + "pressure": 1101.2, + }, + time.Unix(1722443553, 0), + ), + metric.New( + "device", + map[string]string{ + "source": "gateway_af43e", + "location": "factory_south", + }, + map[string]interface{}{ + "status": "ok", + }, + time.Unix(1722443559, 0), + ), + metric.New( + "serializer_test", + map[string]string{ + "source": "gateway_af43e", + "location": "factory_south", + "machine": "C", + "status": "off", + }, + map[string]interface{}{ + "operating_hours": 0, + "temperature": 0.0, + "pressure": 0.0, + }, + time.Unix(1722443562, 0), + ), + } + //nolint:lll // Resulting metrics should not be wrapped for readability + expected := []string{ + "serializer_test,location=factory_north,machine=A,source=localhost,status=ok operating_hours=123i,pressure=1023.4,temperature=25 1722443551000000000\n" + + "serializer_test,location=factory_north,machine=B,source=localhost,status=failed operating_hours=8430i,pressure=985.9,temperature=65.2 1722443554000000000\n", + "serializer_test,location=factory_north,machine=C,source=localhost,status=warning operating_hours=6765i,pressure=986.1,temperature=42.5 1722443555000000000\n" + + "device,location=factory_north,source=localhost status=\"ok\" 1722443556000000000\n" + + "serializer_test,location=factory_south,machine=A,source=gateway_af43e,status=ok operating_hours=5544i,pressure=1069.4,temperature=18.6 1722443552000000000\n", + "serializer_test,location=factory_south,machine=B,source=gateway_af43e,status=ok operating_hours=65i,pressure=1101.2,temperature=29.7 1722443553000000000\n" + + "device,location=factory_south,source=gateway_af43e status=\"ok\" 1722443559000000000\n" + + "serializer_test,location=factory_south,machine=C,source=gateway_af43e,status=off operating_hours=0i,pressure=0,temperature=0 1722443562000000000\n", + } + + // Setup the limited serializer + s := &influx.Serializer{SortFields: true} + require.NoError(t, s.Init()) + serializer := NewIndividualSerializer(s) + + var werr *internal.PartialWriteError + + // Do the first serialization runs with all metrics + buf, err := serializer.SerializeBatch(input, 400) + require.ErrorAs(t, err, &werr) + require.ErrorIs(t, werr.Err, internal.ErrSizeLimitReached) + require.EqualValues(t, []int{0, 1}, werr.MetricsAccept) + require.Empty(t, werr.MetricsReject) + require.Equal(t, expected[0], string(buf)) + + // Run again with the successful metrics removed + buf, err = serializer.SerializeBatch(input[2:], 400) + require.ErrorAs(t, err, &werr) + require.ErrorIs(t, werr.Err, internal.ErrSizeLimitReached) + require.EqualValues(t, []int{0, 1, 2}, werr.MetricsAccept) + require.Empty(t, werr.MetricsReject) + require.Equal(t, expected[1], string(buf)) + + // Final run with the successful metrics removed + buf, err = serializer.SerializeBatch(input[5:], 400) + require.NoError(t, err) + require.Equal(t, expected[2], string(buf)) +} + +func TestIndividualSerializerFirstTooBig(t *testing.T) { + input := []telegraf.Metric{ + metric.New( + "serializer_test", + map[string]string{ + "source": "localhost", + "location": "factory_north", + "machine": "A", + "status": "ok", + }, + map[string]interface{}{ + "operating_hours": 123, + "temperature": 25.0, + "pressure": 1023.4, + }, + time.Unix(1722443551, 0), + ), + metric.New( + "serializer_test", + map[string]string{ + "source": "localhost", + "location": "factory_north", + "machine": "B", + "status": "failed", + }, + map[string]interface{}{ + "operating_hours": 8430, + "temperature": 65.2, + "pressure": 985.9, + }, + time.Unix(1722443554, 0), + ), + } + + // Setup the limited serializer + s := &influx.Serializer{SortFields: true} + require.NoError(t, s.Init()) + serializer := NewIndividualSerializer(s) + + // The first metric will already exceed the size so all metrics fail and + // we expect a shortcut error. + buf, err := serializer.SerializeBatch(input, 100) + require.ErrorIs(t, err, internal.ErrSizeLimitReached) + require.Empty(t, buf) +} + +func TestIndividualSerializerUnlimited(t *testing.T) { + input := []telegraf.Metric{ + metric.New( + "serializer_test", + map[string]string{ + "source": "localhost", + "location": "factory_north", + "machine": "A", + "status": "ok", + }, + map[string]interface{}{ + "operating_hours": 123, + "temperature": 25.0, + "pressure": 1023.4, + }, + time.Unix(1722443551, 0), + ), + metric.New( + "serializer_test", + map[string]string{ + "source": "localhost", + "location": "factory_north", + "machine": "B", + "status": "failed", + }, + map[string]interface{}{ + "operating_hours": 8430, + "temperature": 65.2, + "pressure": 985.9, + }, + time.Unix(1722443554, 0), + ), + metric.New( + "serializer_test", + map[string]string{ + "source": "localhost", + "location": "factory_north", + "machine": "C", + "status": "warning", + }, + map[string]interface{}{ + "operating_hours": 6765, + "temperature": 42.5, + "pressure": 986.1, + }, + time.Unix(1722443555, 0), + ), + metric.New( + "device", + map[string]string{ + "source": "localhost", + "location": "factory_north", + }, + map[string]interface{}{ + "status": "ok", + }, + time.Unix(1722443556, 0), + ), + metric.New( + "serializer_test", + map[string]string{ + "source": "gateway_af43e", + "location": "factory_south", + "machine": "A", + "status": "ok", + }, + map[string]interface{}{ + "operating_hours": 5544, + "temperature": 18.6, + "pressure": 1069.4, + }, + time.Unix(1722443552, 0), + ), + metric.New( + "serializer_test", + map[string]string{ + "source": "gateway_af43e", + "location": "factory_south", + "machine": "B", + "status": "ok", + }, + map[string]interface{}{ + "operating_hours": 65, + "temperature": 29.7, + "pressure": 1101.2, + }, + time.Unix(1722443553, 0), + ), + metric.New( + "device", + map[string]string{ + "source": "gateway_af43e", + "location": "factory_south", + }, + map[string]interface{}{ + "status": "ok", + }, + time.Unix(1722443559, 0), + ), + metric.New( + "serializer_test", + map[string]string{ + "source": "gateway_af43e", + "location": "factory_south", + "machine": "C", + "status": "off", + }, + map[string]interface{}{ + "operating_hours": 0, + "temperature": 0.0, + "pressure": 0.0, + }, + time.Unix(1722443562, 0), + ), + } + //nolint:lll // Resulting metrics should not be wrapped for readability + expected := "serializer_test,location=factory_north,machine=A,source=localhost,status=ok operating_hours=123i,pressure=1023.4,temperature=25 1722443551000000000\n" + + "serializer_test,location=factory_north,machine=B,source=localhost,status=failed operating_hours=8430i,pressure=985.9,temperature=65.2 1722443554000000000\n" + + "serializer_test,location=factory_north,machine=C,source=localhost,status=warning operating_hours=6765i,pressure=986.1,temperature=42.5 1722443555000000000\n" + + "device,location=factory_north,source=localhost status=\"ok\" 1722443556000000000\n" + + "serializer_test,location=factory_south,machine=A,source=gateway_af43e,status=ok operating_hours=5544i,pressure=1069.4,temperature=18.6 1722443552000000000\n" + + "serializer_test,location=factory_south,machine=B,source=gateway_af43e,status=ok operating_hours=65i,pressure=1101.2,temperature=29.7 1722443553000000000\n" + + "device,location=factory_south,source=gateway_af43e status=\"ok\" 1722443559000000000\n" + + "serializer_test,location=factory_south,machine=C,source=gateway_af43e,status=off operating_hours=0i,pressure=0,temperature=0 1722443562000000000\n" + + // Setup the limited serializer + s := &influx.Serializer{SortFields: true} + require.NoError(t, s.Init()) + serializer := NewIndividualSerializer(s) + + // Do the first serialization runs with all metrics + buf, err := serializer.SerializeBatch(input, math.MaxInt64) + require.NoError(t, err) + require.Equal(t, expected, string(buf)) +}