From 32ba7325b163927a3c81883eb67df1095e25b37e Mon Sep 17 00:00:00 2001 From: Sven Rebhan Date: Tue, 13 Aug 2024 22:35:50 +0200 Subject: [PATCH] Implement partial successful write handling --- internal/errors.go | 9 +- models/running_output.go | 72 +++++- models/running_output_test.go | 221 ++++++++++++++++-- plugins/common/limited/serializers.go | 13 +- plugins/common/limited/serializers_test.go | 9 +- plugins/outputs/influxdb/influxdb.go | 3 +- plugins/outputs/influxdb_v2/http.go | 83 ++++--- plugins/outputs/influxdb_v2/influxdb_v2.go | 4 + .../outputs/influxdb_v2/influxdb_v2_test.go | 113 +++++++++ 9 files changed, 450 insertions(+), 77 deletions(-) diff --git a/internal/errors.go b/internal/errors.go index 429225a118870..1c23a6312d9ef 100644 --- a/internal/errors.go +++ b/internal/errors.go @@ -46,11 +46,10 @@ func (e *FatalError) Unwrap() error { // WriteError type WriteError struct { - Err error - MetricsErrors []error - MetricsSuccess []int - MetricsFailRetry []int - MetricsFailFatal []int + Err error + MetricsErrors []error + MetricsSuccess []int + MetricsFatal []int } func (e *WriteError) Error() string { diff --git a/models/running_output.go b/models/running_output.go index 4cdd4d15abf83..fe1af7aaacb1f 100644 --- a/models/running_output.go +++ b/models/running_output.go @@ -284,7 +284,7 @@ func (r *RunningOutput) Write() error { atomic.StoreInt64(&r.newMetricsCount, 0) - // Only process the metrics in the buffer now. Metrics added while we are + // Only process the metrics in the buffer now. Metrics added while we are // writing will be sent on the next call. nBuffer := r.buffer.Len() nBatches := nBuffer/r.MetricBatchSize + 1 @@ -294,9 +294,38 @@ func (r *RunningOutput) Write() error { break } - err := r.writeMetrics(batch) - if err != nil { - r.buffer.Reject(batch) + if err := r.writeMetrics(batch); err != nil { + var writeErr *internal.WriteError + if errors.As(err, &writeErr) { + // Translate the indices of the write error back to metrics + accept := make([]telegraf.Metric, 0, len(writeErr.MetricsSuccess)) + dropped := make([]telegraf.Metric, 0, len(writeErr.MetricsFatal)) + keep := make([]telegraf.Metric, 0, len(batch)-len(writeErr.MetricsSuccess)-len(writeErr.MetricsFatal)) + used := make([]bool, len(batch)) + for _, idx := range writeErr.MetricsSuccess { + accept = append(accept, batch[idx]) + used[idx] = true + } + for _, idx := range writeErr.MetricsFatal { + dropped = append(dropped, batch[idx]) + used[idx] = true + } + for i, m := range batch { + if !used[i] { + keep = append(keep, m) + } + } + + // Notify the buffer on what to do + r.buffer.Accept(accept) + r.buffer.Accept(dropped) // TODO: There should be a way to mark those as lost in the stats + r.buffer.Reject(keep) + if !errors.Is(err, internal.ErrSizeLimitReached) { + continue + } + } else { + r.buffer.Reject(batch) + } return err } r.buffer.Accept(batch) @@ -322,9 +351,38 @@ func (r *RunningOutput) WriteBatch() error { return nil } - err := r.writeMetrics(batch) - if err != nil { - r.buffer.Reject(batch) + if err := r.writeMetrics(batch); err != nil { + var writeErr *internal.WriteError + if errors.As(err, &writeErr) { + // Translate the indices of the write error back to metrics + accept := make([]telegraf.Metric, 0, len(writeErr.MetricsSuccess)) + dropped := make([]telegraf.Metric, 0, len(writeErr.MetricsFatal)) + keep := make([]telegraf.Metric, 0, len(batch)-len(writeErr.MetricsSuccess)-len(writeErr.MetricsFatal)) + used := make([]bool, len(batch)) + for _, idx := range writeErr.MetricsSuccess { + accept = append(accept, batch[idx]) + used[idx] = true + } + for _, idx := range writeErr.MetricsFatal { + dropped = append(dropped, batch[idx]) + used[idx] = true + } + for i, m := range batch { + if !used[i] { + keep = append(keep, m) + } + } + + // Notify the buffer on what to do + r.buffer.Accept(accept) + r.buffer.Accept(dropped) // TODO: There should be a way to mark those as lost in the stats + r.buffer.Reject(keep) + if !errors.Is(err, internal.ErrSizeLimitReached) { + return nil + } + } else { + r.buffer.Reject(batch) + } return err } r.buffer.Accept(batch) diff --git a/models/running_output_test.go b/models/running_output_test.go index b043d5b0cd00f..385d86733c972 100644 --- a/models/running_output_test.go +++ b/models/running_output_test.go @@ -2,7 +2,6 @@ package models import ( "errors" - "fmt" "sync" "testing" "time" @@ -294,7 +293,7 @@ func TestRunningOutputWriteFail(t *testing.T) { } m := &mockOutput{} - m.failWrite = true + m.batchAcceptSize = -1 ro := NewRunningOutput(m, conf, 4, 12) // Fill buffer to limit twice @@ -313,7 +312,7 @@ func TestRunningOutputWriteFail(t *testing.T) { // no successful flush yet require.Empty(t, m.Metrics()) - m.failWrite = false + m.batchAcceptSize = 0 err = ro.Write() require.NoError(t, err) @@ -327,7 +326,7 @@ func TestRunningOutputWriteFailOrder(t *testing.T) { } m := &mockOutput{} - m.failWrite = true + m.batchAcceptSize = -1 ro := NewRunningOutput(m, conf, 100, 1000) // add 5 metrics @@ -343,7 +342,8 @@ func TestRunningOutputWriteFailOrder(t *testing.T) { // no successful flush yet require.Empty(t, m.Metrics()) - m.failWrite = false + m.batchAcceptSize = 0 + // add 5 more metrics for _, metric := range next5 { ro.AddMetric(metric) @@ -365,7 +365,7 @@ func TestRunningOutputWriteFailOrder2(t *testing.T) { } m := &mockOutput{} - m.failWrite = true + m.batchAcceptSize = -1 ro := NewRunningOutput(m, conf, 5, 100) // add 5 metrics @@ -408,7 +408,7 @@ func TestRunningOutputWriteFailOrder2(t *testing.T) { // no successful flush yet require.Empty(t, m.Metrics()) - m.failWrite = false + m.batchAcceptSize = 0 err = ro.Write() require.NoError(t, err) @@ -429,7 +429,7 @@ func TestRunningOutputWriteFailOrder3(t *testing.T) { } m := &mockOutput{} - m.failWrite = true + m.batchAcceptSize = -1 ro := NewRunningOutput(m, conf, 5, 1000) // add 5 metrics @@ -451,7 +451,8 @@ func TestRunningOutputWriteFailOrder3(t *testing.T) { require.Error(t, err) // unset fail and write metrics - m.failWrite = false + m.batchAcceptSize = 0 + err = ro.Write() require.NoError(t, err) @@ -671,7 +672,7 @@ func TestNonRetryableStartupBehaviorDefault(t *testing.T) { } } -func TestUntypedtartupBehaviorIgnore(t *testing.T) { +func TestUntypedStartupBehaviorIgnore(t *testing.T) { serr := errors.New("untyped err") for _, behavior := range []string{"", "error", "retry", "ignore"} { @@ -743,14 +744,186 @@ func TestPartiallyStarted(t *testing.T) { require.Equal(t, 3, mo.writes) } +func TestRunningOutputWritePartialSuccess(t *testing.T) { + plugin := &mockOutput{ + batchAcceptSize: 4, + } + model := NewRunningOutput(plugin, &OutputConfig{}, 5, 10) + require.NoError(t, model.Init()) + require.NoError(t, model.Connect()) + defer model.Close() + + // Fill buffer completely + for _, metric := range first5 { + model.AddMetric(metric) + } + for _, metric := range next5 { + model.AddMetric(metric) + } + + // We no not expect any successful flush yet + require.Empty(t, plugin.Metrics()) + require.Equal(t, 10, model.buffer.Len()) + + // Write to the output. This should only partially succeed with the first + // few metrics removed from buffer + require.ErrorIs(t, model.Write(), internal.ErrSizeLimitReached) + require.Len(t, plugin.metrics, 4) + require.Equal(t, 6, model.buffer.Len()) + + // The next write should remove the next metrics from the buffer + require.ErrorIs(t, model.Write(), internal.ErrSizeLimitReached) + require.Len(t, plugin.metrics, 8) + require.Equal(t, 2, model.buffer.Len()) + + // The last write should succeed straight away and all metrics should have + // been received by the output + require.NoError(t, model.Write()) + testutil.RequireMetricsEqual(t, append(first5, next5...), plugin.metrics) + require.Zero(t, model.buffer.Len()) +} + +func TestRunningOutputWritePartialSuccessAndLoss(t *testing.T) { + lost := 0 + plugin := &mockOutput{ + batchAcceptSize: 4, + metricFatalIndex: &lost, + } + model := NewRunningOutput(plugin, &OutputConfig{}, 5, 10) + require.NoError(t, model.Init()) + require.NoError(t, model.Connect()) + defer model.Close() + + // Fill buffer completely + for _, metric := range first5 { + model.AddMetric(metric) + } + for _, metric := range next5 { + model.AddMetric(metric) + } + expected := []telegraf.Metric{ + /*fatal,*/ first5[1], first5[2], first5[3], + /*fatal,*/ next5[0], next5[1], next5[2], + next5[3], next5[4], + } + + // We no not expect any successful flush yet + require.Empty(t, plugin.Metrics()) + require.Equal(t, 10, model.buffer.Len()) + + // Write to the output. This should only partially succeed with the first + // few metrics removed from buffer + require.ErrorIs(t, model.Write(), internal.ErrSizeLimitReached) + require.Len(t, plugin.metrics, 3) + require.Equal(t, 6, model.buffer.Len()) + + // The next write should remove the next metrics from the buffer + require.ErrorIs(t, model.Write(), internal.ErrSizeLimitReached) + require.Len(t, plugin.metrics, 6) + require.Equal(t, 2, model.buffer.Len()) + + // The last write should succeed straight away and all metrics should have + // been received by the output + require.NoError(t, model.Write()) + testutil.RequireMetricsEqual(t, expected, plugin.metrics) + require.Zero(t, model.buffer.Len()) +} + +func TestRunningOutputWriteBatchPartialSuccess(t *testing.T) { + plugin := &mockOutput{ + batchAcceptSize: 4, + } + model := NewRunningOutput(plugin, &OutputConfig{}, 5, 10) + require.NoError(t, model.Init()) + require.NoError(t, model.Connect()) + defer model.Close() + + // Fill buffer completely + for _, metric := range first5 { + model.AddMetric(metric) + } + for _, metric := range next5 { + model.AddMetric(metric) + } + + // We no not expect any successful flush yet + require.Empty(t, plugin.Metrics()) + require.Equal(t, 10, model.buffer.Len()) + + // Write to the output. This should only partially succeed with the first + // few metrics removed from buffer + require.ErrorIs(t, model.WriteBatch(), internal.ErrSizeLimitReached) + require.Len(t, plugin.metrics, 4) + require.Equal(t, 6, model.buffer.Len()) + + // The next write should remove the next metrics from the buffer + require.ErrorIs(t, model.WriteBatch(), internal.ErrSizeLimitReached) + require.Len(t, plugin.metrics, 8) + require.Equal(t, 2, model.buffer.Len()) + + // The last write should succeed straight away and all metrics should have + // been received by the output + require.NoError(t, model.WriteBatch()) + testutil.RequireMetricsEqual(t, append(first5, next5...), plugin.metrics) + require.Zero(t, model.buffer.Len()) +} + +func TestRunningOutputWriteBatchPartialSuccessAndLoss(t *testing.T) { + lost := 0 + plugin := &mockOutput{ + batchAcceptSize: 4, + metricFatalIndex: &lost, + } + model := NewRunningOutput(plugin, &OutputConfig{}, 5, 10) + require.NoError(t, model.Init()) + require.NoError(t, model.Connect()) + defer model.Close() + + // Fill buffer completely + for _, metric := range first5 { + model.AddMetric(metric) + } + for _, metric := range next5 { + model.AddMetric(metric) + } + expected := []telegraf.Metric{ + /*fatal,*/ first5[1], first5[2], first5[3], + /*fatal,*/ next5[0], next5[1], next5[2], + next5[3], next5[4], + } + + // We no not expect any successful flush yet + require.Empty(t, plugin.Metrics()) + require.Equal(t, 10, model.buffer.Len()) + + // Write to the output. This should only partially succeed with the first + // few metrics removed from buffer + require.ErrorIs(t, model.WriteBatch(), internal.ErrSizeLimitReached) + require.Len(t, plugin.metrics, 3) + require.Equal(t, 6, model.buffer.Len()) + + // The next write should remove the next metrics from the buffer + require.ErrorIs(t, model.WriteBatch(), internal.ErrSizeLimitReached) + require.Len(t, plugin.metrics, 6) + require.Equal(t, 2, model.buffer.Len()) + + // The last write should succeed straight away and all metrics should have + // been received by the output + require.NoError(t, model.WriteBatch()) + testutil.RequireMetricsEqual(t, expected, plugin.metrics) + require.Zero(t, model.buffer.Len()) +} + type mockOutput struct { sync.Mutex metrics []telegraf.Metric - // if true, mock write failure - failWrite bool + // Failing output simulation + batchAcceptSize int + metricFatalIndex *int + // Startup error simulation startupError error startupErrorCount int writes int @@ -779,21 +952,33 @@ func (m *mockOutput) SampleConfig() string { } func (m *mockOutput) Write(metrics []telegraf.Metric) error { - fmt.Println("writing") m.writes++ m.Lock() defer m.Unlock() - if m.failWrite { + + // Simulate a failed write + if m.batchAcceptSize < 0 { return errors.New("failed write") } - if m.metrics == nil { - m.metrics = []telegraf.Metric{} + // Simulate a successful write + if m.batchAcceptSize == 0 || len(metrics) <= m.batchAcceptSize { + m.metrics = append(m.metrics, metrics...) + return nil } - m.metrics = append(m.metrics, metrics...) - return nil + // Simulate a partially successful write + werr := &internal.WriteError{Err: internal.ErrSizeLimitReached} + for i, x := range metrics { + if m.metricFatalIndex != nil && i == *m.metricFatalIndex { + werr.MetricsFatal = append(werr.MetricsFatal, i) + } else if i < m.batchAcceptSize { + m.metrics = append(m.metrics, x) + werr.MetricsSuccess = append(werr.MetricsSuccess, i) + } + } + return werr } func (m *mockOutput) Metrics() []telegraf.Metric { diff --git a/plugins/common/limited/serializers.go b/plugins/common/limited/serializers.go index 2b7eab2a09281..42d0cc7d46af2 100644 --- a/plugins/common/limited/serializers.go +++ b/plugins/common/limited/serializers.go @@ -39,7 +39,7 @@ func (s *IndividualSerializer) Serialize(metric telegraf.Metric, limit int64) ([ } // The serialized metric fits into the limit, so output it - if buflen := int64(len(buf)); buflen <= limit || limit == 0 { + if buflen := int64(len(buf)); buflen <= limit { return buf, nil } @@ -51,8 +51,8 @@ func (s *IndividualSerializer) SerializeBatch(metrics []telegraf.Metric, limit i // Grow the buffer so it can hold at least the required size. This will // save us from reallocate often s.buffer.Reset() - if limit > 0 && limit < math.MaxInt64 { - s.buffer.Grow(int(min(int64(math.MaxInt), limit))) + if limit > 0 && limit < int64(math.MaxInt) { + s.buffer.Grow(int(limit)) } // Prepare a potential write error and be optimistic @@ -69,12 +69,12 @@ func (s *IndividualSerializer) SerializeBatch(metrics []telegraf.Metric, limit i // Failing serialization is a fatal error so mark the metric as such werr.Err = internal.ErrSerialization werr.MetricsErrors = append(werr.MetricsErrors, err) - werr.MetricsFailFatal = append(werr.MetricsFailFatal, i) + werr.MetricsFatal = append(werr.MetricsFatal, i) continue } // The serialized metric fits into the limit, so add it to the output - if usedAdded := used + int64(len(buf)); usedAdded <= limit || limit == 0 { + if usedAdded := used + int64(len(buf)); usedAdded <= limit { if _, err := s.buffer.Write(buf); err != nil { return nil, err } @@ -91,9 +91,6 @@ func (s *IndividualSerializer) SerializeBatch(metrics []telegraf.Metric, limit i // Adding the serialized metric would exceed the limit so exit with an // WriteError and fill in the required information werr.Err = internal.ErrSizeLimitReached - for j := i; j < len(metrics); j++ { - werr.MetricsFailRetry = append(werr.MetricsFailRetry, j) - } break } if werr.Err != nil { diff --git a/plugins/common/limited/serializers_test.go b/plugins/common/limited/serializers_test.go index 46224c313b8e8..94b6d1bf77582 100644 --- a/plugins/common/limited/serializers_test.go +++ b/plugins/common/limited/serializers_test.go @@ -1,6 +1,7 @@ package limited import ( + "math" "testing" "time" @@ -150,8 +151,7 @@ func TestIndividualSerializer(t *testing.T) { require.ErrorAs(t, err, &werr) require.ErrorIs(t, werr.Err, internal.ErrSizeLimitReached) require.EqualValues(t, []int{0, 1}, werr.MetricsSuccess) - require.Empty(t, werr.MetricsFailFatal) - require.EqualValues(t, []int{2, 3, 4, 5, 6, 7}, werr.MetricsFailRetry) + require.Empty(t, werr.MetricsFatal) require.Equal(t, expected[0], string(buf)) // Run again with the successful metrics removed @@ -159,8 +159,7 @@ func TestIndividualSerializer(t *testing.T) { require.ErrorAs(t, err, &werr) require.ErrorIs(t, werr.Err, internal.ErrSizeLimitReached) require.EqualValues(t, []int{0, 1, 2}, werr.MetricsSuccess) - require.Empty(t, werr.MetricsFailFatal) - require.EqualValues(t, []int{3, 4, 5}, werr.MetricsFailRetry) + require.Empty(t, werr.MetricsFatal) require.Equal(t, expected[1], string(buf)) // Final run with the successful metrics removed @@ -346,7 +345,7 @@ func TestIndividualSerializerUnlimited(t *testing.T) { serializer := NewIndividualSerializer(s) // Do the first serialization runs with all metrics - buf, err := serializer.SerializeBatch(input, 0) + buf, err := serializer.SerializeBatch(input, math.MaxInt64) require.NoError(t, err) require.Equal(t, expected, string(buf)) } diff --git a/plugins/outputs/influxdb/influxdb.go b/plugins/outputs/influxdb/influxdb.go index cc6ee06aba468..3ae4741fb00cd 100644 --- a/plugins/outputs/influxdb/influxdb.go +++ b/plugins/outputs/influxdb/influxdb.go @@ -172,7 +172,8 @@ func (i *InfluxDB) Write(metrics []telegraf.Metric) error { p := rand.Perm(len(i.clients)) for _, n := range p { client := i.clients[n] - if err := client.Write(ctx, metrics); err == nil { + err = client.Write(ctx, metrics) + if err == nil { return nil } diff --git a/plugins/outputs/influxdb_v2/http.go b/plugins/outputs/influxdb_v2/http.go index 715ca65296e1d..4472b43fb206d 100644 --- a/plugins/outputs/influxdb_v2/http.go +++ b/plugins/outputs/influxdb_v2/http.go @@ -161,52 +161,69 @@ func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error } batches := make(map[string][]telegraf.Metric) + batchIndices := make(map[string][]int) if c.BucketTag == "" { - err := c.writeBatch(ctx, c.Bucket, metrics) - if err != nil { - var apiErr *APIError - if errors.As(err, &apiErr) { - if apiErr.StatusCode == http.StatusRequestEntityTooLarge { - return c.splitAndWriteBatch(ctx, c.Bucket, metrics) - } - } - - return err + batches[c.Bucket] = metrics + batchIndices[c.Bucket] = make([]int, len(metrics)) + for i := range metrics { + batchIndices[c.Bucket][i] = i } } else { - for _, metric := range metrics { + for i, metric := range metrics { bucket, ok := metric.GetTag(c.BucketTag) if !ok { bucket = c.Bucket - } - - if _, ok := batches[bucket]; !ok { - batches[bucket] = make([]telegraf.Metric, 0) - } - - if c.ExcludeBucketTag { - // Avoid modifying the metric in case we need to retry the request. + } else if c.ExcludeBucketTag { + // Avoid modifying the metric if we do remove the tag metric = metric.Copy() metric.Accept() metric.RemoveTag(c.BucketTag) } batches[bucket] = append(batches[bucket], metric) + batchIndices[c.Bucket] = append(batchIndices[c.Bucket], i) } + } - for bucket, batch := range batches { - err := c.writeBatch(ctx, bucket, batch) - if err != nil { - var apiErr *APIError - if errors.As(err, &apiErr) { - if apiErr.StatusCode == http.StatusRequestEntityTooLarge { - return c.splitAndWriteBatch(ctx, c.Bucket, metrics) - } - } - - return err + var wErr internal.WriteError + for bucket, batch := range batches { + err := c.writeBatch(ctx, bucket, batch) + if err == nil { + wErr.MetricsSuccess = append(wErr.MetricsSuccess, batchIndices[bucket]...) + continue + } + + // Check if the request was too large and split it + var apiErr *APIError + if errors.As(err, &apiErr) { + if apiErr.StatusCode == http.StatusRequestEntityTooLarge { + return c.splitAndWriteBatch(ctx, c.Bucket, metrics) + } + wErr.Err = err + wErr.MetricsFatal = append(wErr.MetricsFatal, batchIndices[bucket]...) + return &wErr + } + + // Check if we got a write error and if so, translate the returned + // metric indices to return the original indices in case of bucketing + var writeErr *internal.WriteError + if errors.As(err, &writeErr) { + wErr.Err = writeErr.Err + for _, idx := range writeErr.MetricsSuccess { + wErr.MetricsSuccess = append(wErr.MetricsSuccess, batchIndices[bucket][idx]) } + for _, idx := range writeErr.MetricsFatal { + wErr.MetricsFatal = append(wErr.MetricsFatal, batchIndices[bucket][idx]) + } + if !errors.Is(writeErr.Err, internal.ErrSizeLimitReached) { + continue + } + return &wErr } + + // Return the error without special treatment + wErr.Err = err + return &wErr } return nil } @@ -276,7 +293,7 @@ func (c *httpClient) writeBatch(ctx context.Context, bucket string, metrics []te http.StatusMultiStatus, http.StatusAlreadyReported: c.retryCount = 0 - return nil + return werr } // We got an error and now try to decode further @@ -305,7 +322,7 @@ func (c *httpClient) writeBatch(ctx context.Context, bucket string, metrics []te http.StatusUnprocessableEntity, http.StatusNotAcceptable: c.Log.Errorf("Failed to write metric to %s (will be dropped: %s): %s\n", bucket, resp.Status, desc) - return nil + return nil // TODO: Return error with the failed metric being masked case http.StatusUnauthorized, http.StatusForbidden: return fmt.Errorf("failed to write metric to %s (%s): %s", bucket, resp.Status, desc) case http.StatusTooManyRequests, @@ -324,7 +341,7 @@ func (c *httpClient) writeBatch(ctx context.Context, bucket string, metrics []te // retrying will not make the request magically work. if len(resp.Status) > 0 && resp.Status[0] == '4' { c.Log.Errorf("Failed to write metric to %s (will be dropped: %s): %s\n", bucket, resp.Status, desc) - return nil + return nil // TODO: Return error with the failed metric being masked } // This is only until platform spec is fully implemented. As of the diff --git a/plugins/outputs/influxdb_v2/influxdb_v2.go b/plugins/outputs/influxdb_v2/influxdb_v2.go index df4fe9c88d13f..5c69ccc9f0a82 100644 --- a/plugins/outputs/influxdb_v2/influxdb_v2.go +++ b/plugins/outputs/influxdb_v2/influxdb_v2.go @@ -194,6 +194,10 @@ func (i *InfluxDB) Write(metrics []telegraf.Metric) error { for _, n := range rand.Perm(len(i.clients)) { client := i.clients[n] if err := client.Write(ctx, metrics); err != nil { + var werr *internal.WriteError + if errors.As(err, &werr) || errors.Is(err, internal.ErrSizeLimitReached) { + return err + } i.Log.Errorf("When writing to [%s]: %v", client.URL, err) continue } diff --git a/plugins/outputs/influxdb_v2/influxdb_v2_test.go b/plugins/outputs/influxdb_v2/influxdb_v2_test.go index af9fed082ba69..f4677750166c9 100644 --- a/plugins/outputs/influxdb_v2/influxdb_v2_test.go +++ b/plugins/outputs/influxdb_v2/influxdb_v2_test.go @@ -5,12 +5,15 @@ import ( "net" "net/http" "net/http/httptest" + "sync/atomic" "testing" "time" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/metric" + "github.com/influxdata/telegraf/plugins/common/ratelimiter" "github.com/influxdata/telegraf/plugins/common/tls" "github.com/influxdata/telegraf/plugins/outputs" influxdb "github.com/influxdata/telegraf/plugins/outputs/influxdb_v2" @@ -330,3 +333,113 @@ func TestTooLargeWriteRetry(t *testing.T) { } require.Error(t, plugin.Write(hugeMetrics)) } + +func TestRateLimit(t *testing.T) { + // Setup a test server + var received atomic.Uint64 + ts := httptest.NewServer( + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/api/v2/write": + if err := r.ParseForm(); err != nil { + w.WriteHeader(http.StatusUnprocessableEntity) + return + } + + body, err := io.ReadAll(r.Body) + if err != nil { + w.WriteHeader(http.StatusUnprocessableEntity) + return + } + received.Add(uint64(len(body))) + + w.WriteHeader(http.StatusNoContent) + + return + default: + w.WriteHeader(http.StatusNotFound) + return + } + }), + ) + defer ts.Close() + + // Setup plugin and connect + plugin := &influxdb.InfluxDB{ + URLs: []string{"http://" + ts.Listener.Addr().String()}, + Bucket: "telegraf", + ContentEncoding: "identity", + RateLimitConfig: ratelimiter.RateLimitConfig{ + Limit: 50, + Period: config.Duration(time.Second), + }, + Log: &testutil.Logger{}, + } + require.NoError(t, plugin.Init()) + require.NoError(t, plugin.Connect()) + defer plugin.Close() + + // Together the metric batch size is too big, split up, we get success + metrics := []telegraf.Metric{ + metric.New( + "cpu", + map[string]string{}, + map[string]interface{}{ + "value": 42.0, + }, + time.Unix(0, 1), + ), + metric.New( + "cpu", + map[string]string{}, + map[string]interface{}{ + "value": 99.0, + }, + time.Unix(0, 2), + ), + metric.New( + "operating_hours", + map[string]string{ + "machine": "A", + }, + map[string]interface{}{ + "value": 123.456, + }, + time.Unix(0, 3), + ), + metric.New( + "status", + map[string]string{ + "machine": "B", + }, + map[string]interface{}{ + "temp": 48.235, + "remaining": 999.999, + }, + time.Unix(0, 4), + ), + } + + // Write the metrics the first time. Only the first two metrics should be + // received by the server due to the rate limit. + require.ErrorIs(t, plugin.Write(metrics), internal.ErrSizeLimitReached) + require.LessOrEqual(t, received.Load(), uint64(30)) + + // A direct follow-up write attempt with the remaining metrics should fail + // due to the rate limit being reached + require.ErrorIs(t, plugin.Write(metrics[2:]), internal.ErrSizeLimitReached) + require.LessOrEqual(t, received.Load(), uint64(30)) + + // Wait for at least the period (plus some safety margin) to write the third metric + time.Sleep(time.Duration(plugin.RateLimitConfig.Period) + 100*time.Millisecond) + require.ErrorIs(t, plugin.Write(metrics[2:]), internal.ErrSizeLimitReached) + require.Greater(t, received.Load(), uint64(30)) + require.LessOrEqual(t, received.Load(), uint64(72)) + + // Wait again for the period for at least the period (plus some safety margin) + // to write the last metric. This should finally succeed as all metrics + // are written. + time.Sleep(time.Duration(plugin.RateLimitConfig.Period) + 100*time.Millisecond) + require.NoError(t, plugin.Write(metrics[3:])) + require.Equal(t, uint64(121), received.Load()) +}