diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 03fb4fab87f..bb2e67f78a7 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -66,6 +66,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Improve `elasticsearch/audit` fileset to handle timestamps correctly. {pull}15942[15942] - Prevent Elasticsearch from spewing log warnings about redundant wildcards when setting up ingest pipelines for the `elasticsearch` module. {issue}15840[15840] {pull}15900[15900] - Fix mapping error for cloudtrail additionalEventData field {pull}16088[16088] +- Fix a connection error in httpjson input. {pull}16123[16123] *Heartbeat* diff --git a/x-pack/filebeat/input/httpjson/config.go b/x-pack/filebeat/input/httpjson/config.go index 5a95675f649..a1baece249f 100644 --- a/x-pack/filebeat/input/httpjson/config.go +++ b/x-pack/filebeat/input/httpjson/config.go @@ -38,9 +38,6 @@ type Pagination struct { } func (c *config) Validate() error { - if c.Interval < 3600*time.Second && c.Interval != 0 { - return errors.New("httpjson input: interval must not be less than 3600 seconds - ") - } switch strings.ToUpper(c.HTTPMethod) { case "GET": break diff --git a/x-pack/filebeat/input/httpjson/httpjson_test.go b/x-pack/filebeat/input/httpjson/httpjson_test.go index 77cd867494f..6c8ad1a98b6 100644 --- a/x-pack/filebeat/input/httpjson/httpjson_test.go +++ b/x-pack/filebeat/input/httpjson/httpjson_test.go @@ -192,6 +192,28 @@ func TestPOST(t *testing.T) { }) } +func TestRepeatedPOST(t *testing.T) { + m := map[string]interface{}{ + "http_method": "POST", + "http_request_body": map[string]interface{}{"test": "abc", "testNested": map[string]interface{}{"testNested1": 123}}, + "interval": 10 ^ 9, + } + runTest(t, m, func(input *httpjsonInput, out *stubOutleter, t *testing.T) { + group, _ := errgroup.WithContext(context.Background()) + group.Go(input.run) + + events, ok := out.waitForEvents(3) + if !ok { + t.Fatalf("Expected 3 events, but got %d.", len(events)) + } + input.Stop() + + if err := group.Wait(); err != nil { + t.Fatal(err) + } + }) +} + func TestRunStop(t *testing.T) { m := map[string]interface{}{ "http_method": "GET", diff --git a/x-pack/filebeat/input/httpjson/input.go b/x-pack/filebeat/input/httpjson/input.go index 8ba0c6550de..e733648b32d 100644 --- a/x-pack/filebeat/input/httpjson/input.go +++ b/x-pack/filebeat/input/httpjson/input.go @@ -150,8 +150,12 @@ func (in *httpjsonInput) createHTTPRequest(ctx context.Context, ri *requestInfo) } // processHTTPRequest processes HTTP request, and handles pagination if enabled -func (in *httpjsonInput) processHTTPRequest(ctx context.Context, client *http.Client, req *http.Request, ri *requestInfo) error { +func (in *httpjsonInput) processHTTPRequest(ctx context.Context, client *http.Client, ri *requestInfo) error { for { + req, err := in.createHTTPRequest(ctx, ri) + if err != nil { + return err + } msg, err := client.Do(req) if err != nil { return errors.New("failed to do http request. Stopping input worker - ") @@ -224,10 +228,6 @@ func (in *httpjsonInput) processHTTPRequest(ctx context.Context, client *http.Cl if in.config.Pagination.ExtraBodyContent != nil { ri.ContentMap.Update(common.MapStr(in.config.Pagination.ExtraBodyContent)) } - req, err = in.createHTTPRequest(ctx, ri) - if err != nil { - return err - } continue } return nil @@ -274,11 +274,7 @@ func (in *httpjsonInput) run() error { if in.config.HTTPMethod == "POST" && in.config.HTTPRequestBody != nil { ri.ContentMap.Update(common.MapStr(in.config.HTTPRequestBody)) } - req, err := in.createHTTPRequest(ctx, ri) - if err != nil { - return err - } - err = in.processHTTPRequest(ctx, client, req, ri) + err = in.processHTTPRequest(ctx, client, ri) if err == nil && in.Interval > 0 { ticker := time.NewTicker(in.Interval) defer ticker.Stop() @@ -288,7 +284,8 @@ func (in *httpjsonInput) run() error { in.log.Info("Context done.") return nil case <-ticker.C: - err = in.processHTTPRequest(ctx, client, req, ri) + in.log.Info("Process another repeated request.") + err = in.processHTTPRequest(ctx, client, ri) if err != nil { return err }