diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 03513028cb8..c1d39cb2d03 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -117,6 +117,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fix default index pattern in IBM MQ filebeat dashboard. {pull}17146[17146] - Fix `elasticsearch.gc` fileset to not collect _all_ logs when Elasticsearch is running in Docker. {issue}13164[13164] {issue}16583[16583] {pull}17164[17164] - Fixed a mapping exception when ingesting CEF logs that used the spriv or dpriv extensions. {issue}17216[17216] {pull}17220[17220] +- Fix issue 17734 to retry on rate-limit error in the Filebeat httpjson input. {issue}17734[17734] {pull}17735[17735] *Heartbeat* diff --git a/x-pack/filebeat/input/httpjson/httpjson_test.go b/x-pack/filebeat/input/httpjson/httpjson_test.go index 107cc3778a6..4faa190544e 100644 --- a/x-pack/filebeat/input/httpjson/httpjson_test.go +++ b/x-pack/filebeat/input/httpjson/httpjson_test.go @@ -12,8 +12,10 @@ import ( "net/http" "net/http/httptest" "regexp" + "strconv" "sync" "testing" + "time" "golang.org/x/sync/errgroup" @@ -36,14 +38,8 @@ func testSetup(t *testing.T) { }) } -func runTest(t *testing.T, isTLS bool, m map[string]interface{}, run func(input *HttpjsonInput, out *stubOutleter, t *testing.T)) { - testSetup(t) - // Create an http test server according to whether TLS is used - var newServer = httptest.NewServer - if isTLS { - newServer = httptest.NewTLSServer - } - ts := newServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { +func createServer(newServer func(handler http.Handler) *httptest.Server) *httptest.Server { + return newServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if r.Method == http.MethodPost { req, err := ioutil.ReadAll(r.Body) defer r.Body.Close() @@ -72,6 +68,44 @@ func runTest(t *testing.T, isTLS bool, m map[string]interface{}, run func(input w.Write(b) } })) +} + +func createCustomServer(newServer func(handler http.Handler) *httptest.Server) *httptest.Server { + var isRetry bool + return newServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + if !isRetry { + w.Header().Set("X-Rate-Limit-Limit", "0") + w.Header().Set("X-Rate-Limit-Remaining", "0") + w.Header().Set("X-Rate-Limit-Reset", strconv.FormatInt(time.Now().Unix(), 10)) + w.WriteHeader(http.StatusTooManyRequests) + w.Write([]byte{}) + isRetry = true + } else { + message := map[string]interface{}{ + "hello": "world", + "embedded": map[string]string{ + "hello": "world", + }, + } + b, _ := json.Marshal(message) + w.WriteHeader(http.StatusOK) + w.Write(b) + } + })) +} + +func runTest(t *testing.T, isTLS bool, testRateLimitRetry bool, m map[string]interface{}, run func(input *HttpjsonInput, out *stubOutleter, t *testing.T)) { + testSetup(t) + // Create an http test server according to whether TLS is used + var newServer = httptest.NewServer + if isTLS { + newServer = httptest.NewTLSServer + } + ts := createServer(newServer) + if testRateLimitRetry { + ts = createCustomServer(newServer) + } defer ts.Close() m["url"] = ts.URL cfg := common.MustNewConfigFrom(m) @@ -337,7 +371,7 @@ func TestGET(t *testing.T) { "http_method": "GET", "interval": 0, } - runTest(t, false, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) { + runTest(t, false, false, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) { group, _ := errgroup.WithContext(context.Background()) group.Go(input.run) @@ -359,7 +393,28 @@ func TestGetHTTPS(t *testing.T) { "interval": 0, "ssl.verification_mode": "none", } - runTest(t, true, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) { + runTest(t, true, false, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) { + group, _ := errgroup.WithContext(context.Background()) + group.Go(input.run) + + events, ok := out.waitForEvents(1) + if !ok { + t.Fatalf("Expected 1 events, but got %d.", len(events)) + } + input.Stop() + + if err := group.Wait(); err != nil { + t.Fatal(err) + } + }) +} + +func TestRateLimitRetry(t *testing.T) { + m := map[string]interface{}{ + "http_method": "GET", + "interval": 0, + } + runTest(t, false, true, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) { group, _ := errgroup.WithContext(context.Background()) group.Go(input.run) @@ -381,7 +436,7 @@ func TestPOST(t *testing.T) { "http_request_body": map[string]interface{}{"test": "abc", "testNested": map[string]interface{}{"testNested1": 123}}, "interval": 0, } - runTest(t, false, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) { + runTest(t, false, false, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) { group, _ := errgroup.WithContext(context.Background()) group.Go(input.run) @@ -403,7 +458,7 @@ func TestRepeatedPOST(t *testing.T) { "http_request_body": map[string]interface{}{"test": "abc", "testNested": map[string]interface{}{"testNested1": 123}}, "interval": 10 ^ 9, } - runTest(t, false, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) { + runTest(t, false, false, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) { group, _ := errgroup.WithContext(context.Background()) group.Go(input.run) @@ -424,7 +479,7 @@ func TestRunStop(t *testing.T) { "http_method": "GET", "interval": 0, } - runTest(t, false, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) { + runTest(t, false, false, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) { input.Run() input.Stop() input.Run() diff --git a/x-pack/filebeat/input/httpjson/input.go b/x-pack/filebeat/input/httpjson/input.go index c7a67a6e1a4..4801223c583 100644 --- a/x-pack/filebeat/input/httpjson/input.go +++ b/x-pack/filebeat/input/httpjson/input.go @@ -300,6 +300,12 @@ func (in *HttpjsonInput) processHTTPRequest(ctx context.Context, client *http.Cl } if msg.StatusCode != http.StatusOK { in.log.Debugw("HTTP request failed", "http.response.status_code", msg.StatusCode, "http.response.body", string(responseData)) + if msg.StatusCode == http.StatusTooManyRequests { + if err = in.applyRateLimit(ctx, header, in.config.RateLimit); err != nil { + return err + } + continue + } return errors.Errorf("http request was unsuccessful with a status code %d", msg.StatusCode) } var m, v interface{}