From d1db38efc2c5ccab7b78dec168e27724e25d432c Mon Sep 17 00:00:00 2001 From: Marius Iversen Date: Tue, 9 Jun 2020 16:03:16 +0200 Subject: [PATCH] [Filebeat][HTTPJson Input]fixes issues with mapstring against JSONObject config (#19069) * fixes issues with mapstring against JSONobject, mm is always nil * added test for array response working --- .../filebeat/input/httpjson/httpjson_test.go | 54 ++++++++++++++++--- x-pack/filebeat/input/httpjson/input.go | 2 +- 2 files changed, 48 insertions(+), 8 deletions(-) diff --git a/x-pack/filebeat/input/httpjson/httpjson_test.go b/x-pack/filebeat/input/httpjson/httpjson_test.go index 33643ac29ab..601e2d2dc8f 100644 --- a/x-pack/filebeat/input/httpjson/httpjson_test.go +++ b/x-pack/filebeat/input/httpjson/httpjson_test.go @@ -95,7 +95,22 @@ func createCustomServer(newServer func(handler http.Handler) *httptest.Server) * })) } -func runTest(t *testing.T, isTLS bool, testRateLimitRetry bool, m map[string]interface{}, run func(input *HttpjsonInput, out *stubOutleter, t *testing.T)) { +func createCustomServerWithArrayResponse(newServer func(handler http.Handler) *httptest.Server) *httptest.Server { + return newServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + message := map[string]interface{}{ + "hello": []map[string]string{ + {"foo": "bar"}, + {"bar": "foo"}, + }, + } + b, _ := json.Marshal(message) + w.WriteHeader(http.StatusOK) + w.Write(b) + })) +} + +func runTest(t *testing.T, isTLS bool, testRateLimitRetry bool, testArrayResponse bool, m map[string]interface{}, run func(input *HttpjsonInput, out *stubOutleter, t *testing.T)) { testSetup(t) // Create an http test server according to whether TLS is used var newServer = httptest.NewServer @@ -106,6 +121,9 @@ func runTest(t *testing.T, isTLS bool, testRateLimitRetry bool, m map[string]int if testRateLimitRetry { ts = createCustomServer(newServer) } + if testArrayResponse { + ts = createCustomServerWithArrayResponse(newServer) + } defer ts.Close() m["url"] = ts.URL cfg := common.MustNewConfigFrom(m) @@ -391,7 +409,7 @@ func TestGET(t *testing.T) { "http_method": "GET", "interval": 0, } - runTest(t, false, false, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) { + runTest(t, false, false, false, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) { group, _ := errgroup.WithContext(context.Background()) group.Go(input.run) @@ -413,7 +431,7 @@ func TestGetHTTPS(t *testing.T) { "interval": 0, "ssl.verification_mode": "none", } - runTest(t, true, false, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) { + runTest(t, true, false, false, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) { group, _ := errgroup.WithContext(context.Background()) group.Go(input.run) @@ -434,7 +452,7 @@ func TestRateLimitRetry(t *testing.T) { "http_method": "GET", "interval": 0, } - runTest(t, false, true, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) { + runTest(t, false, true, false, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) { group, _ := errgroup.WithContext(context.Background()) group.Go(input.run) @@ -450,13 +468,35 @@ func TestRateLimitRetry(t *testing.T) { }) } +func TestArrayResponse(t *testing.T) { + m := map[string]interface{}{ + "http_method": "GET", + "json_objects_array": "hello", + "interval": 0, + } + runTest(t, false, false, true, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) { + group, _ := errgroup.WithContext(context.Background()) + group.Go(input.run) + + events, ok := out.waitForEvents(2) + if !ok { + t.Fatalf("Expected 2 events, but got %d.", len(events)) + } + input.Stop() + + if err := group.Wait(); err != nil { + t.Fatal(err) + } + }) +} + func TestPOST(t *testing.T) { m := map[string]interface{}{ "http_method": "POST", "http_request_body": map[string]interface{}{"test": "abc", "testNested": map[string]interface{}{"testNested1": 123}}, "interval": 0, } - runTest(t, false, false, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) { + runTest(t, false, false, false, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) { group, _ := errgroup.WithContext(context.Background()) group.Go(input.run) @@ -478,7 +518,7 @@ func TestRepeatedPOST(t *testing.T) { "http_request_body": map[string]interface{}{"test": "abc", "testNested": map[string]interface{}{"testNested1": 123}}, "interval": 10 ^ 9, } - runTest(t, false, false, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) { + runTest(t, false, false, false, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) { group, _ := errgroup.WithContext(context.Background()) group.Go(input.run) @@ -499,7 +539,7 @@ func TestRunStop(t *testing.T) { "http_method": "GET", "interval": 0, } - runTest(t, false, false, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) { + runTest(t, false, false, false, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) { input.Run() input.Stop() input.Run() diff --git a/x-pack/filebeat/input/httpjson/input.go b/x-pack/filebeat/input/httpjson/input.go index bcb2296c039..912261a58d8 100644 --- a/x-pack/filebeat/input/httpjson/input.go +++ b/x-pack/filebeat/input/httpjson/input.go @@ -337,7 +337,7 @@ func (in *HttpjsonInput) processHTTPRequest(ctx context.Context, client *http.Cl return err } } else { - v, err = common.MapStr(mm).GetValue(in.config.JSONObjects) + v, err = common.MapStr(obj).GetValue(in.config.JSONObjects) if err != nil { return err }