Skip to content

Commit

Permalink
Fix a connection error in httpjson input (elastic#16123)
Browse files Browse the repository at this point in the history
* Fix a connection error in httpjson input
  • Loading branch information
Lei Qiu authored Feb 7, 2020
1 parent adcd962 commit 7325028
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 14 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Improve `elasticsearch/audit` fileset to handle timestamps correctly. {pull}15942[15942]
- Prevent Elasticsearch from spewing log warnings about redundant wildcards when setting up ingest pipelines for the `elasticsearch` module. {issue}15840[15840] {pull}15900[15900]
- Fix mapping error for cloudtrail additionalEventData field {pull}16088[16088]
- Fix a connection error in httpjson input. {pull}16123[16123]

*Heartbeat*

Expand Down
3 changes: 0 additions & 3 deletions x-pack/filebeat/input/httpjson/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,6 @@ type Pagination struct {
}

func (c *config) Validate() error {
if c.Interval < 3600*time.Second && c.Interval != 0 {
return errors.New("httpjson input: interval must not be less than 3600 seconds - ")
}
switch strings.ToUpper(c.HTTPMethod) {
case "GET":
break
Expand Down
22 changes: 22 additions & 0 deletions x-pack/filebeat/input/httpjson/httpjson_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,28 @@ func TestPOST(t *testing.T) {
})
}

func TestRepeatedPOST(t *testing.T) {
m := map[string]interface{}{
"http_method": "POST",
"http_request_body": map[string]interface{}{"test": "abc", "testNested": map[string]interface{}{"testNested1": 123}},
"interval": 10 ^ 9,
}
runTest(t, m, func(input *httpjsonInput, out *stubOutleter, t *testing.T) {
group, _ := errgroup.WithContext(context.Background())
group.Go(input.run)

events, ok := out.waitForEvents(3)
if !ok {
t.Fatalf("Expected 3 events, but got %d.", len(events))
}
input.Stop()

if err := group.Wait(); err != nil {
t.Fatal(err)
}
})
}

func TestRunStop(t *testing.T) {
m := map[string]interface{}{
"http_method": "GET",
Expand Down
19 changes: 8 additions & 11 deletions x-pack/filebeat/input/httpjson/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,12 @@ func (in *httpjsonInput) createHTTPRequest(ctx context.Context, ri *requestInfo)
}

// processHTTPRequest processes HTTP request, and handles pagination if enabled
func (in *httpjsonInput) processHTTPRequest(ctx context.Context, client *http.Client, req *http.Request, ri *requestInfo) error {
func (in *httpjsonInput) processHTTPRequest(ctx context.Context, client *http.Client, ri *requestInfo) error {
for {
req, err := in.createHTTPRequest(ctx, ri)
if err != nil {
return err
}
msg, err := client.Do(req)
if err != nil {
return errors.New("failed to do http request. Stopping input worker - ")
Expand Down Expand Up @@ -224,10 +228,6 @@ func (in *httpjsonInput) processHTTPRequest(ctx context.Context, client *http.Cl
if in.config.Pagination.ExtraBodyContent != nil {
ri.ContentMap.Update(common.MapStr(in.config.Pagination.ExtraBodyContent))
}
req, err = in.createHTTPRequest(ctx, ri)
if err != nil {
return err
}
continue
}
return nil
Expand Down Expand Up @@ -274,11 +274,7 @@ func (in *httpjsonInput) run() error {
if in.config.HTTPMethod == "POST" && in.config.HTTPRequestBody != nil {
ri.ContentMap.Update(common.MapStr(in.config.HTTPRequestBody))
}
req, err := in.createHTTPRequest(ctx, ri)
if err != nil {
return err
}
err = in.processHTTPRequest(ctx, client, req, ri)
err = in.processHTTPRequest(ctx, client, ri)
if err == nil && in.Interval > 0 {
ticker := time.NewTicker(in.Interval)
defer ticker.Stop()
Expand All @@ -288,7 +284,8 @@ func (in *httpjsonInput) run() error {
in.log.Info("Context done.")
return nil
case <-ticker.C:
err = in.processHTTPRequest(ctx, client, req, ri)
in.log.Info("Process another repeated request.")
err = in.processHTTPRequest(ctx, client, ri)
if err != nil {
return err
}
Expand Down

0 comments on commit 7325028

Please sign in to comment.