diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index f75bb1624ef..db3c95c9760 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -213,6 +213,7 @@ Setting environmental variable ELASTIC_NETINFO:false in Elastic Agent pod will d - Made Azure Blob Storage input GA and updated docs accordingly. {pull}37128[37128] - Add request trace logging to http_endpoint input. {issue}36951[36951] {pull}36957[36957] - Made GCS input GA and updated docs accordingly. {pull}37127[37127] +- Suppress and log max HTTP request retry errors in CEL input. {pull}37160[37160] *Auditbeat* diff --git a/x-pack/filebeat/input/cel/input.go b/x-pack/filebeat/input/cel/input.go index b65f0ae8d10..50313a08e7a 100644 --- a/x-pack/filebeat/input/cel/input.go +++ b/x-pack/filebeat/input/cel/input.go @@ -730,14 +730,16 @@ func newClient(ctx context.Context, cfg config, log *logp.Logger) (*http.Client, c.CheckRedirect = checkRedirect(cfg.Resource, log) if cfg.Resource.Retry.getMaxAttempts() > 1 { + maxAttempts := cfg.Resource.Retry.getMaxAttempts() c = (&retryablehttp.Client{ HTTPClient: c, Logger: newRetryLog(log), RetryWaitMin: cfg.Resource.Retry.getWaitMin(), RetryWaitMax: cfg.Resource.Retry.getWaitMax(), - RetryMax: cfg.Resource.Retry.getMaxAttempts(), + RetryMax: maxAttempts, CheckRetry: retryablehttp.DefaultRetryPolicy, Backoff: retryablehttp.DefaultBackoff, + ErrorHandler: retryErrorHandler(maxAttempts, log), }).StandardClient() } @@ -831,6 +833,17 @@ func checkRedirect(cfg *ResourceConfig, log *logp.Logger) func(*http.Request, [] } } +// retryErrorHandler returns a retryablehttp.ErrorHandler that will log retry resignation +// but return the last retry attempt's response and a nil error so that the CEL code +// can evaluate the response status itself. Any error passed to the retryablehttp.ErrorHandler +// is returned unaltered. +func retryErrorHandler(max int, log *logp.Logger) retryablehttp.ErrorHandler { + return func(resp *http.Response, err error, numTries int) (*http.Response, error) { + log.Warnw("giving up retries", "method", resp.Request.Method, "url", resp.Request.URL, "retries", max+1) + return resp, err + } +} + func newRateLimiterFromConfig(cfg *ResourceConfig) *rate.Limiter { r := rate.Inf b := 1 diff --git a/x-pack/filebeat/input/cel/input_test.go b/x-pack/filebeat/input/cel/input_test.go index 1a0a7b44211..bc2e8828be5 100644 --- a/x-pack/filebeat/input/cel/input_test.go +++ b/x-pack/filebeat/input/cel/input_test.go @@ -683,6 +683,37 @@ var inputTests = []struct { {"hello": "world"}, }, }, + { + name: "retry_failure_no_success", + server: newTestServer(httptest.NewServer), + config: map[string]interface{}{ + "interval": 1, + "resource": map[string]interface{}{ + "retry": map[string]interface{}{ + "max_attempts": 2, + }, + }, + "program": ` + get(state.url).as(resp, { + "url": state.url, + "events": [ + bytes(resp.Body).decode_json(), + {"status": resp.StatusCode}, + ], + }) + `, + }, + handler: func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("content-type", "application/json") + w.WriteHeader(http.StatusGatewayTimeout) + //nolint:errcheck // No point checking errors in test server. + w.Write([]byte(`{"error":"we were too slow"}`)) + }, + want: []map[string]interface{}{ + {"error": "we were too slow"}, + {"status": float64(504)}, // Float because of JSON. + }, + }, { name: "POST_request",