Skip to content

Commit

Permalink
Cherry-pick #16315 to 7.x: Add a TLS test and more debug output to ht…
Browse files Browse the repository at this point in the history
…tpjson input (#16350)

* Add a TLS test and more debug output to httpjson input (#16315)
  • Loading branch information
Lei Qiu authored Feb 14, 2020
1 parent b95e145 commit d099376
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 21 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- move create-[module,fileset,fields] to mage and enable in x-pack/filebeat {pull}15836[15836]
- Add ECS tls and categorization fields to apache module. {issue}16032[16032] {pull}16121[16121]
- Add MQTT input. {issue}15602[15602] {pull}16204[16204]
- Add a TLS test and more debug output to httpjson input {pull}16315[16315]

*Heartbeat*

Expand Down
41 changes: 33 additions & 8 deletions x-pack/filebeat/input/httpjson/httpjson_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,14 @@ func testSetup(t *testing.T) {
})
}

func runTest(t *testing.T, m map[string]interface{}, run func(input *httpjsonInput, out *stubOutleter, t *testing.T)) {
// Setup httpbin environment
func runTest(t *testing.T, isTLS bool, m map[string]interface{}, run func(input *httpjsonInput, out *stubOutleter, t *testing.T)) {
testSetup(t)
// Create test http server
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Create an http test server according to whether TLS is used
var newServer = httptest.NewServer
if isTLS {
newServer = httptest.NewTLSServer
}
ts := newServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodPost {
req, err := ioutil.ReadAll(r.Body)
defer r.Body.Close()
Expand Down Expand Up @@ -154,7 +157,29 @@ func TestGET(t *testing.T) {
"http_method": "GET",
"interval": 0,
}
runTest(t, m, func(input *httpjsonInput, out *stubOutleter, t *testing.T) {
runTest(t, false, m, func(input *httpjsonInput, out *stubOutleter, t *testing.T) {
group, _ := errgroup.WithContext(context.Background())
group.Go(input.run)

events, ok := out.waitForEvents(1)
if !ok {
t.Fatalf("Expected 1 events, but got %d.", len(events))
}
input.Stop()

if err := group.Wait(); err != nil {
t.Fatal(err)
}
})
}

func TestGetHTTPS(t *testing.T) {
m := map[string]interface{}{
"http_method": "GET",
"interval": 0,
"ssl.verification_mode": "none",
}
runTest(t, true, m, func(input *httpjsonInput, out *stubOutleter, t *testing.T) {
group, _ := errgroup.WithContext(context.Background())
group.Go(input.run)

Expand All @@ -176,7 +201,7 @@ func TestPOST(t *testing.T) {
"http_request_body": map[string]interface{}{"test": "abc", "testNested": map[string]interface{}{"testNested1": 123}},
"interval": 0,
}
runTest(t, m, func(input *httpjsonInput, out *stubOutleter, t *testing.T) {
runTest(t, false, m, func(input *httpjsonInput, out *stubOutleter, t *testing.T) {
group, _ := errgroup.WithContext(context.Background())
group.Go(input.run)

Expand All @@ -198,7 +223,7 @@ func TestRepeatedPOST(t *testing.T) {
"http_request_body": map[string]interface{}{"test": "abc", "testNested": map[string]interface{}{"testNested1": 123}},
"interval": 10 ^ 9,
}
runTest(t, m, func(input *httpjsonInput, out *stubOutleter, t *testing.T) {
runTest(t, false, m, func(input *httpjsonInput, out *stubOutleter, t *testing.T) {
group, _ := errgroup.WithContext(context.Background())
group.Go(input.run)

Expand All @@ -219,7 +244,7 @@ func TestRunStop(t *testing.T) {
"http_method": "GET",
"interval": 0,
}
runTest(t, m, func(input *httpjsonInput, out *stubOutleter, t *testing.T) {
runTest(t, false, m, func(input *httpjsonInput, out *stubOutleter, t *testing.T) {
input.Run()
input.Stop()
input.Run()
Expand Down
28 changes: 15 additions & 13 deletions x-pack/filebeat/input/httpjson/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,31 +154,32 @@ func (in *httpjsonInput) processHTTPRequest(ctx context.Context, client *http.Cl
for {
req, err := in.createHTTPRequest(ctx, ri)
if err != nil {
return err
return errors.Wrapf(err, "failed to create http request")
}
msg, err := client.Do(req)
if err != nil {
return errors.New("failed to do http request. Stopping input worker - ")
}
if msg.StatusCode != http.StatusOK {
return errors.Errorf("return HTTP status is %s - ", msg.Status)
return errors.Wrapf(err, "failed to execute http client.Do")
}
responseData, err := ioutil.ReadAll(msg.Body)
defer msg.Body.Close()
msg.Body.Close()
if err != nil {
return err
return errors.Wrapf(err, "failed to read http.response.body")
}
if msg.StatusCode != http.StatusOK {
in.log.Debugw("HTTP request failed", "http.response.status_code", msg.StatusCode, "http.response.body", string(responseData))
return errors.Errorf("http request was unsuccessful with a status code %d", msg.StatusCode)
}
var m, v interface{}
err = json.Unmarshal(responseData, &m)
if err != nil {
return err
return errors.Wrapf(err, "failed to unmarshal http.response.body")
}
switch mmap := m.(type) {
case map[string]interface{}:
if in.config.JSONObjects == "" {
ok := in.outlet.OnEvent(makeEvent(string(responseData)))
if !ok {
return errors.New("function OnEvent returned false - ")
return errors.New("function OnEvent returned false")
}
} else {
v, err = common.MapStr(mmap).GetValue(in.config.JSONObjects)
Expand All @@ -192,11 +193,11 @@ func (in *httpjsonInput) processHTTPRequest(ctx context.Context, client *http.Cl
case map[string]interface{}:
d, err := json.Marshal(tv)
if err != nil {
return errors.New("failed to process http response data - ")
return errors.Wrapf(err, "failed to marshal json_objects_array")
}
ok := in.outlet.OnEvent(makeEvent(string(d)))
if !ok {
return errors.New("OnEvent returned false - ")
return errors.New("function OnEvent returned false")
}
default:
return errors.New("invalid json_objects_array configuration")
Expand All @@ -222,7 +223,7 @@ func (in *httpjsonInput) processHTTPRequest(ctx context.Context, client *http.Cl
case string:
ri.URL = v.(string)
default:
return errors.New("pagination ID is not string, which is required for URL - ")
return errors.New("pagination ID is not of string type")
}
}
if in.config.Pagination.ExtraBodyContent != nil {
Expand All @@ -232,7 +233,8 @@ func (in *httpjsonInput) processHTTPRequest(ctx context.Context, client *http.Cl
}
return nil
default:
return errors.New("response is not valid JSON - ")
in.log.Debugw("http.response.body is not valid JSON", string(responseData))
return errors.New("http.response.body is not valid JSON")
}
}
}
Expand Down

0 comments on commit d099376

Please sign in to comment.