From 4e9f33b2199bcac0d0fb8f7210ff7634d7abfe8a Mon Sep 17 00:00:00 2001 From: Jaipradeesh Date: Tue, 13 Mar 2018 15:30:03 +0530 Subject: [PATCH] Metricbeat/HTTP: Support array in http/json metricset (#6480) Currently (before this commit) the http/json metricset in Metricbeat only can query information from http endpoints which expose map[string]interface{}. For endpoints which expose an array on the root level, the json metricset does not work. A config option is added `json.is_array | bool`. If someone configures array but a non array json object is returned, an error is logged. --- CHANGELOG.asciidoc | 1 + metricbeat/docs/modules/http.asciidoc | 1 + metricbeat/metricbeat.reference.yml | 1 + metricbeat/module/http/_meta/config.yml | 1 + metricbeat/module/http/_meta/test/main.go | 11 ++- metricbeat/module/http/json/_meta/data.json | 5 +- .../module/http/json/_meta/test/config.yml | 1 + metricbeat/module/http/json/json.go | 76 +++++++++++++------ .../module/http/json/json_integration_test.go | 44 ++++++++--- metricbeat/modules.d/http.yml.disabled | 1 + 10 files changed, 102 insertions(+), 40 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 15cf0afbe707..5ed49c2c0921 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -239,6 +239,7 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di - Refactor prometheus endpoint parsing to look similar to upstream prometheus {pull}6332[6332] - Update prometheus dependencies to latest {pull}6333[6333] - Making the http/json metricset GA. {pull}6471[6471] +- Add support for array in http/json metricset. {pull}6480[6480] *Packetbeat* diff --git a/metricbeat/docs/modules/http.asciidoc b/metricbeat/docs/modules/http.asciidoc index 2cb0e680a020..d23e3b3dee2d 100644 --- a/metricbeat/docs/modules/http.asciidoc +++ b/metricbeat/docs/modules/http.asciidoc @@ -35,6 +35,7 @@ metricbeat.modules: #method: "GET" #request.enabled: false #response.enabled: false + #json.is_array: false #dedot.enabled: false - module: http diff --git a/metricbeat/metricbeat.reference.yml b/metricbeat/metricbeat.reference.yml index 53a8619eb362..b14d5ad3c0a6 100644 --- a/metricbeat/metricbeat.reference.yml +++ b/metricbeat/metricbeat.reference.yml @@ -222,6 +222,7 @@ metricbeat.modules: #method: "GET" #request.enabled: false #response.enabled: false + #json.is_array: false #dedot.enabled: false - module: http diff --git a/metricbeat/module/http/_meta/config.yml b/metricbeat/module/http/_meta/config.yml index 02c717785e6a..286bddb897ba 100644 --- a/metricbeat/module/http/_meta/config.yml +++ b/metricbeat/module/http/_meta/config.yml @@ -8,6 +8,7 @@ #method: "GET" #request.enabled: false #response.enabled: false + #json.is_array: false #dedot.enabled: false - module: http diff --git a/metricbeat/module/http/_meta/test/main.go b/metricbeat/module/http/_meta/test/main.go index 9dc520f6e1a7..40e02deeaf30 100644 --- a/metricbeat/module/http/_meta/test/main.go +++ b/metricbeat/module/http/_meta/test/main.go @@ -7,13 +7,20 @@ import ( ) func main() { - http.HandleFunc("/", serve) + http.HandleFunc("/jsonarr", serveJSONArr) + http.HandleFunc("/jsonobj", serveJSONObj) + http.HandleFunc("/", serveJSONObj) + err := http.ListenAndServe(":8080", nil) if err != nil { log.Fatal("ListenAndServe: ", err) } } -func serve(w http.ResponseWriter, r *http.Request) { +func serveJSONArr(w http.ResponseWriter, r *http.Request) { + fmt.Fprint(w, `[{"hello1":"world1"}, {"hello2": "world2"}]`) +} + +func serveJSONObj(w http.ResponseWriter, r *http.Request) { fmt.Fprint(w, `{"hello":"world"}`) } diff --git a/metricbeat/module/http/json/_meta/data.json b/metricbeat/module/http/json/_meta/data.json index c6c865790282..af93a154c81d 100644 --- a/metricbeat/module/http/json/_meta/data.json +++ b/metricbeat/module/http/json/_meta/data.json @@ -5,15 +5,14 @@ "name": "host.example.com" }, "http": { - "testnamespace": { + "json": { "hello": "world" } }, "metricset": { - "host": "http:8080", + "host": "127.0.0.1:8080", "module": "http", "name": "json", - "namespace": "testnamespace", "rtt": 115 } } \ No newline at end of file diff --git a/metricbeat/module/http/json/_meta/test/config.yml b/metricbeat/module/http/json/_meta/test/config.yml index bec83bfa1d4a..57eaafb407d6 100644 --- a/metricbeat/module/http/json/_meta/test/config.yml +++ b/metricbeat/module/http/json/_meta/test/config.yml @@ -13,6 +13,7 @@ metricbeat.modules: headers: Accept: application/json request.enabled: true + json.is_array: false response.enabled: true #================================ Outputs ===================================== diff --git a/metricbeat/module/http/json/json.go b/metricbeat/module/http/json/json.go index 54a0d24b9d14..532fa493a205 100644 --- a/metricbeat/module/http/json/json.go +++ b/metricbeat/module/http/json/json.go @@ -49,6 +49,7 @@ type MetricSet struct { body string requestEnabled bool responseEnabled bool + jsonIsArray bool deDotEnabled bool } @@ -63,12 +64,14 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { Body string `config:"body"` RequestEnabled bool `config:"request.enabled"` ResponseEnabled bool `config:"response.enabled"` + JSONIsArray bool `config:"json.is_array"` DeDotEnabled bool `config:"dedot.enabled"` }{ Method: "GET", Body: "", RequestEnabled: false, ResponseEnabled: false, + JSONIsArray: false, DeDotEnabled: false, } @@ -91,37 +94,18 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { http: http, requestEnabled: config.RequestEnabled, responseEnabled: config.ResponseEnabled, + jsonIsArray: config.JSONIsArray, deDotEnabled: config.DeDotEnabled, }, nil } -// Fetch methods implements the data gathering and data conversion to the right format -// It returns the event which is then forward to the output. In case of an error, a -// descriptive error must be returned. -func (m *MetricSet) Fetch() (common.MapStr, error) { - response, err := m.http.FetchResponse() - if err != nil { - return nil, err - } - defer response.Body.Close() - - var jsonBody map[string]interface{} - var event map[string]interface{} - - body, err := ioutil.ReadAll(response.Body) - if err != nil { - return nil, err - } - - err = json.Unmarshal(body, &jsonBody) - if err != nil { - return nil, err - } +func (m *MetricSet) processBody(response *http.Response, jsonBody interface{}) common.MapStr { + var event common.MapStr if m.deDotEnabled { - event = common.DeDotJSON(jsonBody).(map[string]interface{}) + event = common.DeDotJSON(jsonBody).(common.MapStr) } else { - event = jsonBody + event = jsonBody.(common.MapStr) } if m.requestEnabled { @@ -148,7 +132,49 @@ func (m *MetricSet) Fetch() (common.MapStr, error) { // Set dynamic namespace event["_namespace"] = m.namespace - return event, nil + return event +} + +// Fetch methods implements the data gathering and data conversion to the right format +// It returns the event which is then forward to the output. In case of an error, a +// descriptive error must be returned. +func (m *MetricSet) Fetch() ([]common.MapStr, error) { + response, err := m.http.FetchResponse() + if err != nil { + return nil, err + } + defer response.Body.Close() + + var jsonBody common.MapStr + var jsonBodyArr []common.MapStr + var events []common.MapStr + + body, err := ioutil.ReadAll(response.Body) + if err != nil { + return nil, err + } + + if m.jsonIsArray { + err = json.Unmarshal(body, &jsonBodyArr) + if err != nil { + return nil, err + } + + for _, obj := range jsonBodyArr { + event := m.processBody(response, obj) + events = append(events, event) + } + } else { + err = json.Unmarshal(body, &jsonBody) + if err != nil { + return nil, err + } + + event := m.processBody(response, jsonBody) + events = append(events, event) + } + + return events, nil } func (m *MetricSet) getHeaders(header http.Header) map[string]string { diff --git a/metricbeat/module/http/json/json_integration_test.go b/metricbeat/module/http/json/json_integration_test.go index 30c7361e1733..44e19ae4b1c4 100644 --- a/metricbeat/module/http/json/json_integration_test.go +++ b/metricbeat/module/http/json/json_integration_test.go @@ -12,10 +12,10 @@ import ( mbtest "github.com/elastic/beats/metricbeat/mb/testing" ) -func TestFetch(t *testing.T) { +func TestFetchObject(t *testing.T) { compose.EnsureUp(t, "http") - f := mbtest.NewEventFetcher(t, getConfig()) + f := mbtest.NewEventsFetcher(t, getConfig("object")) event, err := f.Fetch() if !assert.NoError(t, err) { t.FailNow() @@ -24,23 +24,47 @@ func TestFetch(t *testing.T) { t.Logf("%s/%s event: %+v", f.Module().Name(), f.Name(), event) } +func TestFetchArray(t *testing.T) { + compose.EnsureUp(t, "http") + + f := mbtest.NewEventsFetcher(t, getConfig("array")) + event, err := f.Fetch() + if !assert.NoError(t, err) { + t.FailNow() + } + + t.Logf("%s/%s event: %+v", f.Module().Name(), f.Name(), event) +} func TestData(t *testing.T) { compose.EnsureUp(t, "http") - f := mbtest.NewEventFetcher(t, getConfig()) - err := mbtest.WriteEvent(f, t) + f := mbtest.NewEventsFetcher(t, getConfig("object")) + err := mbtest.WriteEvents(f, t) if err != nil { t.Fatal("write", err) } + } -func getConfig() map[string]interface{} { +func getConfig(jsonType string) map[string]interface{} { + var path string + var responseIsArray bool + switch jsonType { + case "object": + path = "/jsonobj" + responseIsArray = false + case "array": + path = "/jsonarr" + responseIsArray = true + } + return map[string]interface{}{ - "module": "http", - "metricsets": []string{"json"}, - "hosts": []string{getEnvHost() + ":" + getEnvPort()}, - "path": "/", - "namespace": "testnamespace", + "module": "http", + "metricsets": []string{"json"}, + "hosts": []string{getEnvHost() + ":" + getEnvPort()}, + "path": path, + "namespace": "testnamespace", + "json.is_array": responseIsArray, } } diff --git a/metricbeat/modules.d/http.yml.disabled b/metricbeat/modules.d/http.yml.disabled index 02c717785e6a..286bddb897ba 100644 --- a/metricbeat/modules.d/http.yml.disabled +++ b/metricbeat/modules.d/http.yml.disabled @@ -8,6 +8,7 @@ #method: "GET" #request.enabled: false #response.enabled: false + #json.is_array: false #dedot.enabled: false - module: http