From a009078919b08326cd93452cd6825aee28989b99 Mon Sep 17 00:00:00 2001 From: KS Chan Date: Sun, 21 Aug 2016 11:09:44 +0800 Subject: [PATCH] WIP --- metricbeat/include/list.go | 5 + .../module/nginx/pluscache/_beat/data.json | 0 .../nginx/pluscache/_beat/docs.asciidoc | 1 + .../module/nginx/pluscache/_beat/fields.yml | 9 ++ metricbeat/module/nginx/pluscache/data.go | 42 +++++++ .../module/nginx/pluscache/pluscache.go | 117 ++++++++++++++++++ .../pluscache/pluscache_integration_test.go | 42 +++++++ .../nginx/plustcpupstream/_beat/data.json | 0 .../nginx/plustcpupstream/_beat/docs.asciidoc | 1 + .../nginx/plustcpupstream/_beat/fields.yml | 9 ++ .../module/nginx/plustcpupstream/data.go | 42 +++++++ .../nginx/plustcpupstream/plustcpupstream.go | 117 ++++++++++++++++++ .../plustcpupstream_integration_test.go | 42 +++++++ .../module/nginx/plustcpzone/_beat/data.json | 0 .../nginx/plustcpzone/_beat/docs.asciidoc | 1 + .../module/nginx/plustcpzone/_beat/fields.yml | 9 ++ metricbeat/module/nginx/plustcpzone/data.go | 42 +++++++ .../module/nginx/plustcpzone/plustcpzone.go | 117 ++++++++++++++++++ .../plustcpzone_integration_test.go | 42 +++++++ .../module/nginx/plusupstream/_beat/data.json | 0 .../nginx/plusupstream/_beat/docs.asciidoc | 1 + .../nginx/plusupstream/_beat/fields.yml | 9 ++ metricbeat/module/nginx/plusupstream/data.go | 42 +++++++ .../module/nginx/plusupstream/plusupstream.go | 117 ++++++++++++++++++ .../plusupstream_integration_test.go | 42 +++++++ .../module/nginx/pluszone/_beat/data.json | 0 .../module/nginx/pluszone/_beat/docs.asciidoc | 1 + .../module/nginx/pluszone/_beat/fields.yml | 9 ++ metricbeat/module/nginx/pluszone/data.go | 42 +++++++ metricbeat/module/nginx/pluszone/pluszone.go | 117 ++++++++++++++++++ .../pluszone/pluszone_integration_test.go | 42 +++++++ metricbeat/module/nginx/util.go | 40 ++++++ 32 files changed, 1100 insertions(+) create mode 100644 metricbeat/module/nginx/pluscache/_beat/data.json create mode 100644 metricbeat/module/nginx/pluscache/_beat/docs.asciidoc create mode 100644 metricbeat/module/nginx/pluscache/_beat/fields.yml create mode 100644 metricbeat/module/nginx/pluscache/data.go create mode 100644 metricbeat/module/nginx/pluscache/pluscache.go create mode 100644 metricbeat/module/nginx/pluscache/pluscache_integration_test.go create mode 100644 metricbeat/module/nginx/plustcpupstream/_beat/data.json create mode 100644 metricbeat/module/nginx/plustcpupstream/_beat/docs.asciidoc create mode 100644 metricbeat/module/nginx/plustcpupstream/_beat/fields.yml create mode 100644 metricbeat/module/nginx/plustcpupstream/data.go create mode 100644 metricbeat/module/nginx/plustcpupstream/plustcpupstream.go create mode 100644 metricbeat/module/nginx/plustcpupstream/plustcpupstream_integration_test.go create mode 100644 metricbeat/module/nginx/plustcpzone/_beat/data.json create mode 100644 metricbeat/module/nginx/plustcpzone/_beat/docs.asciidoc create mode 100644 metricbeat/module/nginx/plustcpzone/_beat/fields.yml create mode 100644 metricbeat/module/nginx/plustcpzone/data.go create mode 100644 metricbeat/module/nginx/plustcpzone/plustcpzone.go create mode 100644 metricbeat/module/nginx/plustcpzone/plustcpzone_integration_test.go create mode 100644 metricbeat/module/nginx/plusupstream/_beat/data.json create mode 100644 metricbeat/module/nginx/plusupstream/_beat/docs.asciidoc create mode 100644 metricbeat/module/nginx/plusupstream/_beat/fields.yml create mode 100644 metricbeat/module/nginx/plusupstream/data.go create mode 100644 metricbeat/module/nginx/plusupstream/plusupstream.go create mode 100644 metricbeat/module/nginx/plusupstream/plusupstream_integration_test.go create mode 100644 metricbeat/module/nginx/pluszone/_beat/data.json create mode 100644 metricbeat/module/nginx/pluszone/_beat/docs.asciidoc create mode 100644 metricbeat/module/nginx/pluszone/_beat/fields.yml create mode 100644 metricbeat/module/nginx/pluszone/data.go create mode 100644 metricbeat/module/nginx/pluszone/pluszone.go create mode 100644 metricbeat/module/nginx/pluszone/pluszone_integration_test.go create mode 100644 metricbeat/module/nginx/util.go diff --git a/metricbeat/include/list.go b/metricbeat/include/list.go index 2ba265d37c3..526a46901b4 100644 --- a/metricbeat/include/list.go +++ b/metricbeat/include/list.go @@ -21,6 +21,11 @@ import ( _ "github.com/elastic/beats/metricbeat/module/mysql" _ "github.com/elastic/beats/metricbeat/module/mysql/status" _ "github.com/elastic/beats/metricbeat/module/nginx" + _ "github.com/elastic/beats/metricbeat/module/nginx/pluscache" + _ "github.com/elastic/beats/metricbeat/module/nginx/plustcpupstream" + _ "github.com/elastic/beats/metricbeat/module/nginx/plustcpzone" + _ "github.com/elastic/beats/metricbeat/module/nginx/plusupstream" + _ "github.com/elastic/beats/metricbeat/module/nginx/pluszone" _ "github.com/elastic/beats/metricbeat/module/nginx/stubstatus" _ "github.com/elastic/beats/metricbeat/module/postgresql" _ "github.com/elastic/beats/metricbeat/module/postgresql/activity" diff --git a/metricbeat/module/nginx/pluscache/_beat/data.json b/metricbeat/module/nginx/pluscache/_beat/data.json new file mode 100644 index 00000000000..e69de29bb2d diff --git a/metricbeat/module/nginx/pluscache/_beat/docs.asciidoc b/metricbeat/module/nginx/pluscache/_beat/docs.asciidoc new file mode 100644 index 00000000000..61738c0041c --- /dev/null +++ b/metricbeat/module/nginx/pluscache/_beat/docs.asciidoc @@ -0,0 +1 @@ +=== Nginx PlusCache MetricSet diff --git a/metricbeat/module/nginx/pluscache/_beat/fields.yml b/metricbeat/module/nginx/pluscache/_beat/fields.yml new file mode 100644 index 00000000000..32d5f293a34 --- /dev/null +++ b/metricbeat/module/nginx/pluscache/_beat/fields.yml @@ -0,0 +1,9 @@ +- name: pluscache + type: group + description: > + `pluscache` reads server cache status from Nginx ngx_http_status_module. + fields: + - name: hostname + type: keyword + description: > + Nginx hostname diff --git a/metricbeat/module/nginx/pluscache/data.go b/metricbeat/module/nginx/pluscache/data.go new file mode 100644 index 00000000000..391f801ca79 --- /dev/null +++ b/metricbeat/module/nginx/pluscache/data.go @@ -0,0 +1,42 @@ +package pluscache + +import ( + "encoding/json" + "io" + "io/ioutil" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/metricbeat/module/nginx" +) + +// Map body to []MapStr +func eventMapping(m *MetricSet, body io.ReadCloser, hostname string, metricset string) ([]common.MapStr, error) { + // Nginx plus server caches: + b, err := ioutil.ReadAll(body) + if err != nil { + return nil, err + } + + var caches map[string]interface{} + if err := json.Unmarshal([]byte(b), &caches); err != nil { + return nil, err + } + caches = nginx.Ftoi(caches) + + events := []common.MapStr{} + + for name, cache := range caches { + event := common.MapStr{ + "hostname": hostname, + "name": name, + } + + for k, v := range cache.(map[string]interface{}) { + event[k] = v + } + + events = append(events, event) + } + + return events, nil +} diff --git a/metricbeat/module/nginx/pluscache/pluscache.go b/metricbeat/module/nginx/pluscache/pluscache.go new file mode 100644 index 00000000000..783d596dc4d --- /dev/null +++ b/metricbeat/module/nginx/pluscache/pluscache.go @@ -0,0 +1,117 @@ +// Package pluscache reads server cache status from Nginx, ngx_http_status_module is required. +package pluscache + +import ( + "fmt" + "net/http" + "net/url" + "strings" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/metricbeat/mb" +) + +const ( + // defaultScheme is the default scheme to use when it is not specified in + // the host config. + defaultScheme = "http" + + // defaultPath is the default path to the ngx_http_status_module server cache endpoint on Nginx. + defaultPath = "/status/caches" +) + +var ( + debugf = logp.MakeDebug("nginx-status") +) + +func init() { + if err := mb.Registry.AddMetricSet("nginx", "pluscache", New); err != nil { + panic(err) + } +} + +// MetricSet for fetching Nginx plus status. +type MetricSet struct { + mb.BaseMetricSet + + client *http.Client // HTTP client that is reused across requests. + url string // Nginx pluscache endpoint URL. + + requests int +} + +// New creates new instance of MetricSet +func New(base mb.BaseMetricSet) (mb.MetricSet, error) { + // Additional configuration options + config := struct { + ServerStatusPath string `config:"server_status_path"` + }{ + ServerStatusPath: defaultPath, + } + + if err := base.Module().UnpackConfig(&config); err != nil { + return nil, err + } + + u, err := getURL(config.ServerStatusPath, base.Host()) + if err != nil { + return nil, err + } + + debugf("nginx-pluscache URL=%s", u) + return &MetricSet{ + BaseMetricSet: base, + url: u.String(), + client: &http.Client{Timeout: base.Module().Config().Timeout}, + requests: 0, + }, nil +} + +// Fetch makes an HTTP request to fetch status metrics from the pluscache endpoint. +func (m *MetricSet) Fetch() ([]common.MapStr, error) { + req, err := http.NewRequest("GET", m.url, nil) + resp, err := m.client.Do(req) + if err != nil { + return nil, fmt.Errorf("error making http request: %v", err) + } + defer resp.Body.Close() + + if resp.StatusCode != 200 { + return nil, fmt.Errorf("HTTP error %d: %s", resp.StatusCode, resp.Status) + } + + return eventMapping(m, resp.Body, m.Host(), m.Name()) +} + +// getURL constructs a URL from the rawHost value and path if one was not set in the rawHost value. +func getURL(statusPath, rawHost string) (*url.URL, error) { + u, err := url.Parse(rawHost) + if err != nil { + return nil, fmt.Errorf("error parsing nginx host: %v", err) + } + + if u.Scheme == "" { + // Add scheme and re-parse. + u, err = url.Parse(fmt.Sprintf("%s://%s", "http", rawHost)) + if err != nil { + return nil, fmt.Errorf("error parsing nginx host: %v", err) + } + } + + if u.Host == "" { + return nil, fmt.Errorf("error parsing nginx host: empty host") + } + + if u.Path == "" { + // The path given in the host config takes precedence over the + // server_status_path config value. + path := statusPath + if !strings.HasPrefix(path, "/") { + path = "/" + path + } + u.Path = path + } + + return u, nil +} diff --git a/metricbeat/module/nginx/pluscache/pluscache_integration_test.go b/metricbeat/module/nginx/pluscache/pluscache_integration_test.go new file mode 100644 index 00000000000..0dd68975018 --- /dev/null +++ b/metricbeat/module/nginx/pluscache/pluscache_integration_test.go @@ -0,0 +1,42 @@ +// +build integration + +package pluscache + +import ( + "testing" + + mbtest "github.com/elastic/beats/metricbeat/mb/testing" + "github.com/elastic/beats/metricbeat/module/nginx" + + "github.com/stretchr/testify/assert" +) + +func TestFetch(t *testing.T) { + f := mbtest.NewEventFetcher(t, getConfig()) + event, err := f.Fetch() + if !assert.NoError(t, err) { + t.FailNow() + } + + t.Logf("%s/%s event: %+v", f.Module().Name(), f.Name(), event) + + // Check number of fields. + assert.Equal(t, 10, len(event)) +} + +func TestData(t *testing.T) { + f := mbtest.NewEventFetcher(t, getConfig()) + + err := mbtest.WriteEvent(f, t) + if err != nil { + t.Fatal("write", err) + } +} + +func getConfig() map[string]interface{} { + return map[string]interface{}{ + "module": "nginx", + "metricsets": []string{"pluscache"}, + "hosts": []string{nginx.GetNginxEnvHost()}, + } +} diff --git a/metricbeat/module/nginx/plustcpupstream/_beat/data.json b/metricbeat/module/nginx/plustcpupstream/_beat/data.json new file mode 100644 index 00000000000..e69de29bb2d diff --git a/metricbeat/module/nginx/plustcpupstream/_beat/docs.asciidoc b/metricbeat/module/nginx/plustcpupstream/_beat/docs.asciidoc new file mode 100644 index 00000000000..0c8a560d598 --- /dev/null +++ b/metricbeat/module/nginx/plustcpupstream/_beat/docs.asciidoc @@ -0,0 +1 @@ +=== Nginx PlusTCPUpstream MetricSet diff --git a/metricbeat/module/nginx/plustcpupstream/_beat/fields.yml b/metricbeat/module/nginx/plustcpupstream/_beat/fields.yml new file mode 100644 index 00000000000..9801dcd7274 --- /dev/null +++ b/metricbeat/module/nginx/plustcpupstream/_beat/fields.yml @@ -0,0 +1,9 @@ +- name: plustcpupstream + type: group + description: > + `plustcpupstream` reads server tcpupstream status from Nginx ngx_http_status_module. + fields: + - name: hostname + type: keyword + description: > + Nginx hostname diff --git a/metricbeat/module/nginx/plustcpupstream/data.go b/metricbeat/module/nginx/plustcpupstream/data.go new file mode 100644 index 00000000000..f05c7636ffb --- /dev/null +++ b/metricbeat/module/nginx/plustcpupstream/data.go @@ -0,0 +1,42 @@ +package plustcpupstream + +import ( + "encoding/json" + "io" + "io/ioutil" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/metricbeat/module/nginx" +) + +// Map body to []MapStr +func eventMapping(m *MetricSet, body io.ReadCloser, hostname string, metricset string) ([]common.MapStr, error) { + // Nginx plus server tcpupstreams: + b, err := ioutil.ReadAll(body) + if err != nil { + return nil, err + } + + var tcpupstreams map[string]interface{} + if err := json.Unmarshal([]byte(b), &tcpupstreams); err != nil { + return nil, err + } + tcpupstreams = nginx.Ftoi(tcpupstreams) + + events := []common.MapStr{} + + for name, tcpupstream := range tcpupstreams { + event := common.MapStr{ + "hostname": hostname, + "name": name, + } + + for k, v := range tcpupstream.(map[string]interface{}) { + event[k] = v + } + + events = append(events, event) + } + + return events, nil +} diff --git a/metricbeat/module/nginx/plustcpupstream/plustcpupstream.go b/metricbeat/module/nginx/plustcpupstream/plustcpupstream.go new file mode 100644 index 00000000000..343df78198d --- /dev/null +++ b/metricbeat/module/nginx/plustcpupstream/plustcpupstream.go @@ -0,0 +1,117 @@ +// Package plustcpupstream reads server tcpupstream status from Nginx, ngx_http_status_module is required. +package plustcpupstream + +import ( + "fmt" + "net/http" + "net/url" + "strings" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/metricbeat/mb" +) + +const ( + // defaultScheme is the default scheme to use when it is not specified in + // the host config. + defaultScheme = "http" + + // defaultPath is the default path to the ngx_http_status_module server tcpupstream endpoint on Nginx. + defaultPath = "/status/tcpupstreams" +) + +var ( + debugf = logp.MakeDebug("nginx-status") +) + +func init() { + if err := mb.Registry.AddMetricSet("nginx", "plustcpupstream", New); err != nil { + panic(err) + } +} + +// MetricSet for fetching Nginx plus status. +type MetricSet struct { + mb.BaseMetricSet + + client *http.Client // HTTP client that is reused across requests. + url string // Nginx plustcpupstream endpoint URL. + + requests int +} + +// New creates new instance of MetricSet +func New(base mb.BaseMetricSet) (mb.MetricSet, error) { + // Additional configuration options + config := struct { + ServerStatusPath string `config:"server_status_path"` + }{ + ServerStatusPath: defaultPath, + } + + if err := base.Module().UnpackConfig(&config); err != nil { + return nil, err + } + + u, err := getURL(config.ServerStatusPath, base.Host()) + if err != nil { + return nil, err + } + + debugf("nginx-plustcpupstream URL=%s", u) + return &MetricSet{ + BaseMetricSet: base, + url: u.String(), + client: &http.Client{Timeout: base.Module().Config().Timeout}, + requests: 0, + }, nil +} + +// Fetch makes an HTTP request to fetch status metrics from the plustcpupstream endpoint. +func (m *MetricSet) Fetch() ([]common.MapStr, error) { + req, err := http.NewRequest("GET", m.url, nil) + resp, err := m.client.Do(req) + if err != nil { + return nil, fmt.Errorf("error making http request: %v", err) + } + defer resp.Body.Close() + + if resp.StatusCode != 200 { + return nil, fmt.Errorf("HTTP error %d: %s", resp.StatusCode, resp.Status) + } + + return eventMapping(m, resp.Body, m.Host(), m.Name()) +} + +// getURL constructs a URL from the rawHost value and path if one was not set in the rawHost value. +func getURL(statusPath, rawHost string) (*url.URL, error) { + u, err := url.Parse(rawHost) + if err != nil { + return nil, fmt.Errorf("error parsing nginx host: %v", err) + } + + if u.Scheme == "" { + // Add scheme and re-parse. + u, err = url.Parse(fmt.Sprintf("%s://%s", "http", rawHost)) + if err != nil { + return nil, fmt.Errorf("error parsing nginx host: %v", err) + } + } + + if u.Host == "" { + return nil, fmt.Errorf("error parsing nginx host: empty host") + } + + if u.Path == "" { + // The path given in the host config takes precedence over the + // server_status_path config value. + path := statusPath + if !strings.HasPrefix(path, "/") { + path = "/" + path + } + u.Path = path + } + + return u, nil +} diff --git a/metricbeat/module/nginx/plustcpupstream/plustcpupstream_integration_test.go b/metricbeat/module/nginx/plustcpupstream/plustcpupstream_integration_test.go new file mode 100644 index 00000000000..055dbc9188e --- /dev/null +++ b/metricbeat/module/nginx/plustcpupstream/plustcpupstream_integration_test.go @@ -0,0 +1,42 @@ +// +build integration + +package plustcpupstream + +import ( + "testing" + + mbtest "github.com/elastic/beats/metricbeat/mb/testing" + "github.com/elastic/beats/metricbeat/module/nginx" + + "github.com/stretchr/testify/assert" +) + +func TestFetch(t *testing.T) { + f := mbtest.NewEventFetcher(t, getConfig()) + event, err := f.Fetch() + if !assert.NoError(t, err) { + t.FailNow() + } + + t.Logf("%s/%s event: %+v", f.Module().Name(), f.Name(), event) + + // Check number of fields. + assert.Equal(t, 10, len(event)) +} + +func TestData(t *testing.T) { + f := mbtest.NewEventFetcher(t, getConfig()) + + err := mbtest.WriteEvent(f, t) + if err != nil { + t.Fatal("write", err) + } +} + +func getConfig() map[string]interface{} { + return map[string]interface{}{ + "module": "nginx", + "metricsets": []string{"plustcpupstream"}, + "hosts": []string{nginx.GetNginxEnvHost()}, + } +} diff --git a/metricbeat/module/nginx/plustcpzone/_beat/data.json b/metricbeat/module/nginx/plustcpzone/_beat/data.json new file mode 100644 index 00000000000..e69de29bb2d diff --git a/metricbeat/module/nginx/plustcpzone/_beat/docs.asciidoc b/metricbeat/module/nginx/plustcpzone/_beat/docs.asciidoc new file mode 100644 index 00000000000..9dd36704c48 --- /dev/null +++ b/metricbeat/module/nginx/plustcpzone/_beat/docs.asciidoc @@ -0,0 +1 @@ +=== Nginx PlusTCPZone MetricSet diff --git a/metricbeat/module/nginx/plustcpzone/_beat/fields.yml b/metricbeat/module/nginx/plustcpzone/_beat/fields.yml new file mode 100644 index 00000000000..6e52c968fca --- /dev/null +++ b/metricbeat/module/nginx/plustcpzone/_beat/fields.yml @@ -0,0 +1,9 @@ +- name: plustcpzone + type: group + description: > + `plustcpzone` reads server tcpzone status from Nginx ngx_http_status_module. + fields: + - name: hostname + type: keyword + description: > + Nginx hostname diff --git a/metricbeat/module/nginx/plustcpzone/data.go b/metricbeat/module/nginx/plustcpzone/data.go new file mode 100644 index 00000000000..14a7425de5a --- /dev/null +++ b/metricbeat/module/nginx/plustcpzone/data.go @@ -0,0 +1,42 @@ +package plustcpzone + +import ( + "encoding/json" + "io" + "io/ioutil" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/metricbeat/module/nginx" +) + +// Map body to []MapStr +func eventMapping(m *MetricSet, body io.ReadCloser, hostname string, metricset string) ([]common.MapStr, error) { + // Nginx plus server tcpzones: + b, err := ioutil.ReadAll(body) + if err != nil { + return nil, err + } + + var tcpzones map[string]interface{} + if err := json.Unmarshal([]byte(b), &tcpzones); err != nil { + return nil, err + } + tcpzones = nginx.Ftoi(tcpzones) + + events := []common.MapStr{} + + for name, tcpzone := range tcpzones { + event := common.MapStr{ + "hostname": hostname, + "name": name, + } + + for k, v := range tcpzone.(map[string]interface{}) { + event[k] = v + } + + events = append(events, event) + } + + return events, nil +} diff --git a/metricbeat/module/nginx/plustcpzone/plustcpzone.go b/metricbeat/module/nginx/plustcpzone/plustcpzone.go new file mode 100644 index 00000000000..3ea4a7409b4 --- /dev/null +++ b/metricbeat/module/nginx/plustcpzone/plustcpzone.go @@ -0,0 +1,117 @@ +// Package plustcpzone reads server tcpzone status from Nginx, ngx_http_status_module is required. +package plustcpzone + +import ( + "fmt" + "net/http" + "net/url" + "strings" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/metricbeat/mb" +) + +const ( + // defaultScheme is the default scheme to use when it is not specified in + // the host config. + defaultScheme = "http" + + // defaultPath is the default path to the ngx_http_status_module server tcpzone endpoint on Nginx. + defaultPath = "/status/server_tcpzones" +) + +var ( + debugf = logp.MakeDebug("nginx-status") +) + +func init() { + if err := mb.Registry.AddMetricSet("nginx", "plustcpzone", New); err != nil { + panic(err) + } +} + +// MetricSet for fetching Nginx plus status. +type MetricSet struct { + mb.BaseMetricSet + + client *http.Client // HTTP client that is reused across requests. + url string // Nginx plustcpzone endpoint URL. + + requests int +} + +// New creates new instance of MetricSet +func New(base mb.BaseMetricSet) (mb.MetricSet, error) { + // Additional configuration options + config := struct { + ServerStatusPath string `config:"server_status_path"` + }{ + ServerStatusPath: defaultPath, + } + + if err := base.Module().UnpackConfig(&config); err != nil { + return nil, err + } + + u, err := getURL(config.ServerStatusPath, base.Host()) + if err != nil { + return nil, err + } + + debugf("nginx-plustcpzone URL=%s", u) + return &MetricSet{ + BaseMetricSet: base, + url: u.String(), + client: &http.Client{Timeout: base.Module().Config().Timeout}, + requests: 0, + }, nil +} + +// Fetch makes an HTTP request to fetch status metrics from the plustcpzone endpoint. +func (m *MetricSet) Fetch() ([]common.MapStr, error) { + req, err := http.NewRequest("GET", m.url, nil) + resp, err := m.client.Do(req) + if err != nil { + return nil, fmt.Errorf("error making http request: %v", err) + } + defer resp.Body.Close() + + if resp.StatusCode != 200 { + return nil, fmt.Errorf("HTTP error %d: %s", resp.StatusCode, resp.Status) + } + + return eventMapping(m, resp.Body, m.Host(), m.Name()) +} + +// getURL constructs a URL from the rawHost value and path if one was not set in the rawHost value. +func getURL(statusPath, rawHost string) (*url.URL, error) { + u, err := url.Parse(rawHost) + if err != nil { + return nil, fmt.Errorf("error parsing nginx host: %v", err) + } + + if u.Scheme == "" { + // Add scheme and re-parse. + u, err = url.Parse(fmt.Sprintf("%s://%s", "http", rawHost)) + if err != nil { + return nil, fmt.Errorf("error parsing nginx host: %v", err) + } + } + + if u.Host == "" { + return nil, fmt.Errorf("error parsing nginx host: empty host") + } + + if u.Path == "" { + // The path given in the host config takes precedence over the + // server_status_path config value. + path := statusPath + if !strings.HasPrefix(path, "/") { + path = "/" + path + } + u.Path = path + } + + return u, nil +} diff --git a/metricbeat/module/nginx/plustcpzone/plustcpzone_integration_test.go b/metricbeat/module/nginx/plustcpzone/plustcpzone_integration_test.go new file mode 100644 index 00000000000..bc1ffdc66f1 --- /dev/null +++ b/metricbeat/module/nginx/plustcpzone/plustcpzone_integration_test.go @@ -0,0 +1,42 @@ +// +build integration + +package plustcpzone + +import ( + "testing" + + mbtest "github.com/elastic/beats/metricbeat/mb/testing" + "github.com/elastic/beats/metricbeat/module/nginx" + + "github.com/stretchr/testify/assert" +) + +func TestFetch(t *testing.T) { + f := mbtest.NewEventFetcher(t, getConfig()) + event, err := f.Fetch() + if !assert.NoError(t, err) { + t.FailNow() + } + + t.Logf("%s/%s event: %+v", f.Module().Name(), f.Name(), event) + + // Check number of fields. + assert.Equal(t, 10, len(event)) +} + +func TestData(t *testing.T) { + f := mbtest.NewEventFetcher(t, getConfig()) + + err := mbtest.WriteEvent(f, t) + if err != nil { + t.Fatal("write", err) + } +} + +func getConfig() map[string]interface{} { + return map[string]interface{}{ + "module": "nginx", + "metricsets": []string{"plustcpzone"}, + "hosts": []string{nginx.GetNginxEnvHost()}, + } +} diff --git a/metricbeat/module/nginx/plusupstream/_beat/data.json b/metricbeat/module/nginx/plusupstream/_beat/data.json new file mode 100644 index 00000000000..e69de29bb2d diff --git a/metricbeat/module/nginx/plusupstream/_beat/docs.asciidoc b/metricbeat/module/nginx/plusupstream/_beat/docs.asciidoc new file mode 100644 index 00000000000..4aabcb61c03 --- /dev/null +++ b/metricbeat/module/nginx/plusupstream/_beat/docs.asciidoc @@ -0,0 +1 @@ +=== Nginx PlusUpstream MetricSet diff --git a/metricbeat/module/nginx/plusupstream/_beat/fields.yml b/metricbeat/module/nginx/plusupstream/_beat/fields.yml new file mode 100644 index 00000000000..c1f4291f456 --- /dev/null +++ b/metricbeat/module/nginx/plusupstream/_beat/fields.yml @@ -0,0 +1,9 @@ +- name: plusupstream + type: group + description: > + `plusupstream` reads server upstream status from Nginx ngx_http_status_module. + fields: + - name: hostname + type: keyword + description: > + Nginx hostname diff --git a/metricbeat/module/nginx/plusupstream/data.go b/metricbeat/module/nginx/plusupstream/data.go new file mode 100644 index 00000000000..5783eec3a25 --- /dev/null +++ b/metricbeat/module/nginx/plusupstream/data.go @@ -0,0 +1,42 @@ +package plusupstream + +import ( + "encoding/json" + "io" + "io/ioutil" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/metricbeat/module/nginx" +) + +// Map body to []MapStr +func eventMapping(m *MetricSet, body io.ReadCloser, hostname string, metricset string) ([]common.MapStr, error) { + // Nginx plus server upstreams: + b, err := ioutil.ReadAll(body) + if err != nil { + return nil, err + } + + var upstreams map[string]interface{} + if err := json.Unmarshal([]byte(b), &upstreams); err != nil { + return nil, err + } + upstreams = nginx.Ftoi(upstreams) + + events := []common.MapStr{} + + for name, upstream := range upstreams { + event := common.MapStr{ + "hostname": hostname, + "name": name, + } + + for k, v := range upstream.(map[string]interface{}) { + event[k] = v + } + + events = append(events, event) + } + + return events, nil +} diff --git a/metricbeat/module/nginx/plusupstream/plusupstream.go b/metricbeat/module/nginx/plusupstream/plusupstream.go new file mode 100644 index 00000000000..4fbcc9402b8 --- /dev/null +++ b/metricbeat/module/nginx/plusupstream/plusupstream.go @@ -0,0 +1,117 @@ +// Package plusupstream reads server upstream status from Nginx, ngx_http_status_module is required. +package plusupstream + +import ( + "fmt" + "net/http" + "net/url" + "strings" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/metricbeat/mb" +) + +const ( + // defaultScheme is the default scheme to use when it is not specified in + // the host config. + defaultScheme = "http" + + // defaultPath is the default path to the ngx_http_status_module server upstream endpoint on Nginx. + defaultPath = "/status/upstreams" +) + +var ( + debugf = logp.MakeDebug("nginx-status") +) + +func init() { + if err := mb.Registry.AddMetricSet("nginx", "plusupstream", New); err != nil { + panic(err) + } +} + +// MetricSet for fetching Nginx plus status. +type MetricSet struct { + mb.BaseMetricSet + + client *http.Client // HTTP client that is reused across requests. + url string // Nginx plusupstream endpoint URL. + + requests int +} + +// New creates new instance of MetricSet +func New(base mb.BaseMetricSet) (mb.MetricSet, error) { + // Additional configuration options + config := struct { + ServerStatusPath string `config:"server_status_path"` + }{ + ServerStatusPath: defaultPath, + } + + if err := base.Module().UnpackConfig(&config); err != nil { + return nil, err + } + + u, err := getURL(config.ServerStatusPath, base.Host()) + if err != nil { + return nil, err + } + + debugf("nginx-plusupstream URL=%s", u) + return &MetricSet{ + BaseMetricSet: base, + url: u.String(), + client: &http.Client{Timeout: base.Module().Config().Timeout}, + requests: 0, + }, nil +} + +// Fetch makes an HTTP request to fetch status metrics from the plusupstream endpoint. +func (m *MetricSet) Fetch() ([]common.MapStr, error) { + req, err := http.NewRequest("GET", m.url, nil) + resp, err := m.client.Do(req) + if err != nil { + return nil, fmt.Errorf("error making http request: %v", err) + } + defer resp.Body.Close() + + if resp.StatusCode != 200 { + return nil, fmt.Errorf("HTTP error %d: %s", resp.StatusCode, resp.Status) + } + + return eventMapping(m, resp.Body, m.Host(), m.Name()) +} + +// getURL constructs a URL from the rawHost value and path if one was not set in the rawHost value. +func getURL(statusPath, rawHost string) (*url.URL, error) { + u, err := url.Parse(rawHost) + if err != nil { + return nil, fmt.Errorf("error parsing nginx host: %v", err) + } + + if u.Scheme == "" { + // Add scheme and re-parse. + u, err = url.Parse(fmt.Sprintf("%s://%s", "http", rawHost)) + if err != nil { + return nil, fmt.Errorf("error parsing nginx host: %v", err) + } + } + + if u.Host == "" { + return nil, fmt.Errorf("error parsing nginx host: empty host") + } + + if u.Path == "" { + // The path given in the host config takes precedence over the + // server_status_path config value. + path := statusPath + if !strings.HasPrefix(path, "/") { + path = "/" + path + } + u.Path = path + } + + return u, nil +} diff --git a/metricbeat/module/nginx/plusupstream/plusupstream_integration_test.go b/metricbeat/module/nginx/plusupstream/plusupstream_integration_test.go new file mode 100644 index 00000000000..ed36db29c87 --- /dev/null +++ b/metricbeat/module/nginx/plusupstream/plusupstream_integration_test.go @@ -0,0 +1,42 @@ +// +build integration + +package plusupstream + +import ( + "testing" + + mbtest "github.com/elastic/beats/metricbeat/mb/testing" + "github.com/elastic/beats/metricbeat/module/nginx" + + "github.com/stretchr/testify/assert" +) + +func TestFetch(t *testing.T) { + f := mbtest.NewEventFetcher(t, getConfig()) + event, err := f.Fetch() + if !assert.NoError(t, err) { + t.FailNow() + } + + t.Logf("%s/%s event: %+v", f.Module().Name(), f.Name(), event) + + // Check number of fields. + assert.Equal(t, 10, len(event)) +} + +func TestData(t *testing.T) { + f := mbtest.NewEventFetcher(t, getConfig()) + + err := mbtest.WriteEvent(f, t) + if err != nil { + t.Fatal("write", err) + } +} + +func getConfig() map[string]interface{} { + return map[string]interface{}{ + "module": "nginx", + "metricsets": []string{"plusupstream"}, + "hosts": []string{nginx.GetNginxEnvHost()}, + } +} diff --git a/metricbeat/module/nginx/pluszone/_beat/data.json b/metricbeat/module/nginx/pluszone/_beat/data.json new file mode 100644 index 00000000000..e69de29bb2d diff --git a/metricbeat/module/nginx/pluszone/_beat/docs.asciidoc b/metricbeat/module/nginx/pluszone/_beat/docs.asciidoc new file mode 100644 index 00000000000..d20d21e06fb --- /dev/null +++ b/metricbeat/module/nginx/pluszone/_beat/docs.asciidoc @@ -0,0 +1 @@ +=== Nginx PlusZone MetricSet diff --git a/metricbeat/module/nginx/pluszone/_beat/fields.yml b/metricbeat/module/nginx/pluszone/_beat/fields.yml new file mode 100644 index 00000000000..cb21abf26bd --- /dev/null +++ b/metricbeat/module/nginx/pluszone/_beat/fields.yml @@ -0,0 +1,9 @@ +- name: pluszone + type: group + description: > + `pluszone` reads server zone status from Nginx ngx_http_status_module. + fields: + - name: hostname + type: keyword + description: > + Nginx hostname diff --git a/metricbeat/module/nginx/pluszone/data.go b/metricbeat/module/nginx/pluszone/data.go new file mode 100644 index 00000000000..9ae1852f68d --- /dev/null +++ b/metricbeat/module/nginx/pluszone/data.go @@ -0,0 +1,42 @@ +package pluszone + +import ( + "encoding/json" + "io" + "io/ioutil" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/metricbeat/module/nginx" +) + +// Map body to []MapStr +func eventMapping(m *MetricSet, body io.ReadCloser, hostname string, metricset string) ([]common.MapStr, error) { + // Nginx plus server zones: + b, err := ioutil.ReadAll(body) + if err != nil { + return nil, err + } + + var zones map[string]interface{} + if err := json.Unmarshal([]byte(b), &zones); err != nil { + return nil, err + } + zones = nginx.Ftoi(zones) + + events := []common.MapStr{} + + for name, zone := range zones { + event := common.MapStr{ + "hostname": hostname, + "name": name, + } + + for k, v := range zone.(map[string]interface{}) { + event[k] = v + } + + events = append(events, event) + } + + return events, nil +} diff --git a/metricbeat/module/nginx/pluszone/pluszone.go b/metricbeat/module/nginx/pluszone/pluszone.go new file mode 100644 index 00000000000..c70ebab4836 --- /dev/null +++ b/metricbeat/module/nginx/pluszone/pluszone.go @@ -0,0 +1,117 @@ +// Package pluszone reads server zone status from Nginx, ngx_http_status_module is required. +package pluszone + +import ( + "fmt" + "net/http" + "net/url" + "strings" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/metricbeat/mb" +) + +const ( + // defaultScheme is the default scheme to use when it is not specified in + // the host config. + defaultScheme = "http" + + // defaultPath is the default path to the ngx_http_status_module server zone endpoint on Nginx. + defaultPath = "/status/server_zones" +) + +var ( + debugf = logp.MakeDebug("nginx-status") +) + +func init() { + if err := mb.Registry.AddMetricSet("nginx", "pluszone", New); err != nil { + panic(err) + } +} + +// MetricSet for fetching Nginx plus status. +type MetricSet struct { + mb.BaseMetricSet + + client *http.Client // HTTP client that is reused across requests. + url string // Nginx pluszone endpoint URL. + + requests int +} + +// New creates new instance of MetricSet +func New(base mb.BaseMetricSet) (mb.MetricSet, error) { + // Additional configuration options + config := struct { + ServerStatusPath string `config:"server_status_path"` + }{ + ServerStatusPath: defaultPath, + } + + if err := base.Module().UnpackConfig(&config); err != nil { + return nil, err + } + + u, err := getURL(config.ServerStatusPath, base.Host()) + if err != nil { + return nil, err + } + + debugf("nginx-pluszone URL=%s", u) + return &MetricSet{ + BaseMetricSet: base, + url: u.String(), + client: &http.Client{Timeout: base.Module().Config().Timeout}, + requests: 0, + }, nil +} + +// Fetch makes an HTTP request to fetch status metrics from the pluszone endpoint. +func (m *MetricSet) Fetch() ([]common.MapStr, error) { + req, err := http.NewRequest("GET", m.url, nil) + resp, err := m.client.Do(req) + if err != nil { + return nil, fmt.Errorf("error making http request: %v", err) + } + defer resp.Body.Close() + + if resp.StatusCode != 200 { + return nil, fmt.Errorf("HTTP error %d: %s", resp.StatusCode, resp.Status) + } + + return eventMapping(m, resp.Body, m.Host(), m.Name()) +} + +// getURL constructs a URL from the rawHost value and path if one was not set in the rawHost value. +func getURL(statusPath, rawHost string) (*url.URL, error) { + u, err := url.Parse(rawHost) + if err != nil { + return nil, fmt.Errorf("error parsing nginx host: %v", err) + } + + if u.Scheme == "" { + // Add scheme and re-parse. + u, err = url.Parse(fmt.Sprintf("%s://%s", "http", rawHost)) + if err != nil { + return nil, fmt.Errorf("error parsing nginx host: %v", err) + } + } + + if u.Host == "" { + return nil, fmt.Errorf("error parsing nginx host: empty host") + } + + if u.Path == "" { + // The path given in the host config takes precedence over the + // server_status_path config value. + path := statusPath + if !strings.HasPrefix(path, "/") { + path = "/" + path + } + u.Path = path + } + + return u, nil +} diff --git a/metricbeat/module/nginx/pluszone/pluszone_integration_test.go b/metricbeat/module/nginx/pluszone/pluszone_integration_test.go new file mode 100644 index 00000000000..45059ae4f80 --- /dev/null +++ b/metricbeat/module/nginx/pluszone/pluszone_integration_test.go @@ -0,0 +1,42 @@ +// +build integration + +package pluszone + +import ( + "testing" + + mbtest "github.com/elastic/beats/metricbeat/mb/testing" + "github.com/elastic/beats/metricbeat/module/nginx" + + "github.com/stretchr/testify/assert" +) + +func TestFetch(t *testing.T) { + f := mbtest.NewEventFetcher(t, getConfig()) + event, err := f.Fetch() + if !assert.NoError(t, err) { + t.FailNow() + } + + t.Logf("%s/%s event: %+v", f.Module().Name(), f.Name(), event) + + // Check number of fields. + assert.Equal(t, 10, len(event)) +} + +func TestData(t *testing.T) { + f := mbtest.NewEventFetcher(t, getConfig()) + + err := mbtest.WriteEvent(f, t) + if err != nil { + t.Fatal("write", err) + } +} + +func getConfig() map[string]interface{} { + return map[string]interface{}{ + "module": "nginx", + "metricsets": []string{"pluszone"}, + "hosts": []string{nginx.GetNginxEnvHost()}, + } +} diff --git a/metricbeat/module/nginx/util.go b/metricbeat/module/nginx/util.go new file mode 100644 index 00000000000..4a477b239a5 --- /dev/null +++ b/metricbeat/module/nginx/util.go @@ -0,0 +1,40 @@ +package nginx + +// Ftoi returns a copy of input map where float values are casted to int. +// The conversion is applied to nested maps and arrays as well. +func Ftoi(in map[string]interface{}) map[string]interface{} { + out := map[string]interface{}{} + + for k, v := range in { + switch v.(type) { + case float64: + vt := v.(float64) + out[k] = int(vt) + case map[string]interface{}: + vt := v.(map[string]interface{}) + out[k] = Ftoi(vt) + case []interface{}: + vt := v.([]interface{}) + l := len(vt) + a := make([]interface{}, l) + for i := 0; i < l; i++ { + e := vt[i] + switch e.(type) { + case float64: + et := e.(float64) + a[i] = int(et) + case map[string]interface{}: + et := e.(map[string]interface{}) + a[i] = Ftoi(et) + default: + a[i] = e + } + } + out[k] = a + default: + out[k] = v + } + } + + return out +}