diff --git a/collector/cluster_info.go b/collector/cluster_info.go index d42ca57c..1714986a 100644 --- a/collector/cluster_info.go +++ b/collector/cluster_info.go @@ -77,7 +77,7 @@ type VersionInfo struct { LuceneVersion semver.Version `json:"lucene_version"` } -func (c *ClusterInfoCollector) Update(ctx context.Context, ch chan<- prometheus.Metric) error { +func (c *ClusterInfoCollector) Update(_ context.Context, ch chan<- prometheus.Metric) error { resp, err := c.hc.Get(c.u.String()) if err != nil { return err diff --git a/collector/cluster_settings_test.go b/collector/cluster_settings_test.go index dac21857..b4b98a09 100644 --- a/collector/cluster_settings_test.go +++ b/collector/cluster_settings_test.go @@ -14,7 +14,6 @@ package collector import ( - "context" "io" "net/http" "net/http/httptest" @@ -24,21 +23,9 @@ import ( "testing" "github.com/go-kit/log" - "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/testutil" ) -type wrapCollector struct { - c Collector -} - -func (w wrapCollector) Describe(ch chan<- *prometheus.Desc) { -} - -func (w wrapCollector) Collect(ch chan<- prometheus.Metric) { - w.c.Update(context.Background(), ch) -} - func TestClusterSettingsStats(t *testing.T) { // Testcases created using: // docker run -d -p 9200:9200 elasticsearch:VERSION-alpine diff --git a/collector/collector_test.go b/collector/collector_test.go new file mode 100644 index 00000000..80c7fa5d --- /dev/null +++ b/collector/collector_test.go @@ -0,0 +1,36 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package collector + +import ( + "context" + + "github.com/prometheus/client_golang/prometheus" +) + +// wrapCollector is a util to let you test your Collector implementation. +// +// Use this with prometheus/client_golang/prometheus/testutil to test metric output, for example: +// +// testutil.CollectAndCompare(wrapCollector{c}, strings.NewReader(want)) +type wrapCollector struct { + c Collector +} + +func (w wrapCollector) Describe(_ chan<- *prometheus.Desc) { +} + +func (w wrapCollector) Collect(ch chan<- prometheus.Metric) { + w.c.Update(context.Background(), ch) +} diff --git a/collector/nodes_test.go b/collector/nodes_test.go index 3b4e4a5a..8f9468db 100644 --- a/collector/nodes_test.go +++ b/collector/nodes_test.go @@ -138,7 +138,7 @@ type basicAuth struct { Next http.Handler } -func (h *basicAuth) checkAuth(w http.ResponseWriter, r *http.Request) bool { +func (h *basicAuth) checkAuth(_ http.ResponseWriter, r *http.Request) bool { s := strings.SplitN(r.Header.Get("Authorization"), " ", 2) if len(s) != 2 { return false diff --git a/collector/tasks.go b/collector/tasks.go new file mode 100644 index 00000000..e171b67c --- /dev/null +++ b/collector/tasks.go @@ -0,0 +1,143 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package collector + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + + "github.com/alecthomas/kingpin/v2" + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/prometheus/client_golang/prometheus" +) + +// filterByTask global required because collector interface doesn't expose any way to take +// constructor args. +var actionFilter string + +var taskActionDesc = prometheus.NewDesc( + prometheus.BuildFQName(namespace, "task_stats", "action"), + "Number of tasks of a certain action", + []string{"action"}, nil) + +func init() { + kingpin.Flag("tasks.actions", + "Filter on task actions. Used in same way as Task API actions param"). + Default("indices:*").StringVar(&actionFilter) + registerCollector("tasks", defaultDisabled, NewTaskCollector) +} + +// Task Information Struct +type TaskCollector struct { + logger log.Logger + hc *http.Client + u *url.URL +} + +// NewTaskCollector defines Task Prometheus metrics +func NewTaskCollector(logger log.Logger, u *url.URL, hc *http.Client) (Collector, error) { + level.Info(logger).Log("msg", "task collector created", + "actionFilter", actionFilter, + ) + + return &TaskCollector{ + logger: logger, + hc: hc, + u: u, + }, nil +} + +func (t *TaskCollector) Update(ctx context.Context, ch chan<- prometheus.Metric) error { + tasks, err := t.fetchTasks(ctx) + if err != nil { + return fmt.Errorf("failed to fetch and decode task stats: %w", err) + } + + stats := AggregateTasks(tasks) + for action, count := range stats.CountByAction { + ch <- prometheus.MustNewConstMetric( + taskActionDesc, + prometheus.GaugeValue, + float64(count), + action, + ) + } + return nil +} + +func (t *TaskCollector) fetchTasks(_ context.Context) (tasksResponse, error) { + u := t.u.ResolveReference(&url.URL{Path: "_tasks"}) + q := u.Query() + q.Set("group_by", "none") + q.Set("actions", actionFilter) + u.RawQuery = q.Encode() + + var tr tasksResponse + res, err := t.hc.Get(u.String()) + if err != nil { + return tr, fmt.Errorf("failed to get data stream stats health from %s://%s:%s%s: %s", + u.Scheme, u.Hostname(), u.Port(), u.Path, err) + } + + defer func() { + err = res.Body.Close() + if err != nil { + level.Warn(t.logger).Log( + "msg", "failed to close http.Client", + "err", err, + ) + } + }() + + if res.StatusCode != http.StatusOK { + return tr, fmt.Errorf("HTTP Request to %v failed with code %d", u.String(), res.StatusCode) + } + + bts, err := io.ReadAll(res.Body) + if err != nil { + return tr, err + } + + err = json.Unmarshal(bts, &tr) + return tr, err +} + +// tasksResponse is a representation of the Task management API. +type tasksResponse struct { + Tasks []taskResponse `json:"tasks"` +} + +// taskResponse is a representation of the individual task item returned by task API endpoint. +// +// We only parse a very limited amount of this API for use in aggregation. +type taskResponse struct { + Action string `json:"action"` +} + +type aggregatedTaskStats struct { + CountByAction map[string]int64 +} + +func AggregateTasks(t tasksResponse) aggregatedTaskStats { + actions := map[string]int64{} + for _, task := range t.Tasks { + actions[task.Action]++ + } + return aggregatedTaskStats{CountByAction: actions} +} diff --git a/collector/tasks_test.go b/collector/tasks_test.go new file mode 100644 index 00000000..d5da114f --- /dev/null +++ b/collector/tasks_test.go @@ -0,0 +1,78 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package collector + +import ( + "fmt" + "net/http" + "net/http/httptest" + "net/url" + "strings" + "testing" + + "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus/testutil" +) + +func TestTasks(t *testing.T) { + // Test data was collected by running the following: + // # create container + // docker run -d --name elasticsearch -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" elasticsearch:7.17.11 + // sleep 15 + // # start some busy work in background + // for i in $(seq 1 500) + // do + // curl -o /dev/null -sX POST "localhost:9200/a1/_doc" -H 'Content-Type: application/json' -d'{"a1": "'"$i"'"}' + // sleep .01 + // curl -o /dev/null -sX POST "localhost:9200/a1/_doc" -H 'Content-Type: application/json' -d'{"a2": "'"$i"'"}' + // sleep .01 + // curl -o /dev/null -sX POST "localhost:9200/a1/_doc" -H 'Content-Type: application/json' -d'{"a3": "'"$i"'"}' + // sleep .01 + // done & + // # try and collect a good sample + // curl -X GET 'localhost:9200/_tasks?group_by=none&actions=indices:*' + // # cleanup + // docker rm --force elasticsearch + tcs := map[string]string{ + "7.17": `{"tasks":[{"node":"9lWCm1y_QkujaAg75bVx7A","id":70,"type":"transport","action":"indices:admin/index_template/put","start_time_in_millis":1695900464655,"running_time_in_nanos":308640039,"cancellable":false,"headers":{}},{"node":"9lWCm1y_QkujaAg75bVx7A","id":73,"type":"transport","action":"indices:admin/index_template/put","start_time_in_millis":1695900464683,"running_time_in_nanos":280672000,"cancellable":false,"headers":{}},{"node":"9lWCm1y_QkujaAg75bVx7A","id":76,"type":"transport","action":"indices:admin/index_template/put","start_time_in_millis":1695900464711,"running_time_in_nanos":253247906,"cancellable":false,"headers":{}},{"node":"9lWCm1y_QkujaAg75bVx7A","id":93,"type":"transport","action":"indices:admin/index_template/put","start_time_in_millis":1695900464904,"running_time_in_nanos":60230460,"cancellable":false,"headers":{}},{"node":"9lWCm1y_QkujaAg75bVx7A","id":50,"type":"transport","action":"indices:data/write/index","start_time_in_millis":1695900464229,"running_time_in_nanos":734480468,"cancellable":false,"headers":{}},{"node":"9lWCm1y_QkujaAg75bVx7A","id":51,"type":"transport","action":"indices:admin/auto_create","start_time_in_millis":1695900464235,"running_time_in_nanos":729223933,"cancellable":false,"headers":{}}]}`, + } + want := `# HELP elasticsearch_task_stats_action Number of tasks of a certain action +# TYPE elasticsearch_task_stats_action gauge +elasticsearch_task_stats_action{action="indices:admin/auto_create"} 1 +elasticsearch_task_stats_action{action="indices:admin/index_template/put"} 4 +elasticsearch_task_stats_action{action="indices:data/write/index"} 1 +` + for ver, out := range tcs { + t.Run(ver, func(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + fmt.Fprintln(w, out) + })) + defer ts.Close() + + u, err := url.Parse(ts.URL) + if err != nil { + t.Fatalf("Failed to parse URL: %s", err) + } + + c, err := NewTaskCollector(log.NewNopLogger(), u, ts.Client()) + if err != nil { + t.Fatalf("Failed to create collector: %v", err) + } + + if err := testutil.CollectAndCompare(wrapCollector{c}, strings.NewReader(want)); err != nil { + t.Fatalf("Metrics did not match: %v", err) + } + }) + } +} diff --git a/pkg/clusterinfo/clusterinfo_test.go b/pkg/clusterinfo/clusterinfo_test.go index beb3ff6b..91841682 100644 --- a/pkg/clusterinfo/clusterinfo_test.go +++ b/pkg/clusterinfo/clusterinfo_test.go @@ -44,7 +44,7 @@ const ( type mockES struct{} -func (mockES) ServeHTTP(w http.ResponseWriter, r *http.Request) { +func (mockES) ServeHTTP(w http.ResponseWriter, _ *http.Request) { fmt.Fprintf(w, `{ "name" : "%s",