Skip to content

Commit

Permalink
Start adding names to instances
Browse files Browse the repository at this point in the history
  • Loading branch information
kuskoman committed Dec 16, 2024
1 parent a32b098 commit b46fc3c
Show file tree
Hide file tree
Showing 11 changed files with 134 additions and 56 deletions.
12 changes: 7 additions & 5 deletions internal/collectors/nodeinfo/nodeinfo_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,9 @@ func (c *NodeinfoCollector) Collect(ctx context.Context, ch chan<- prometheus.Me

func (collector *NodeinfoCollector) collectSingleInstance(client logstash_client.Client, ctx context.Context, ch chan<- prometheus.Metric) error {
endpoint := client.GetEndpoint()
metricsHelper := prometheus_helper.SimpleMetricsHelper{Channel: ch, Labels: []string{endpoint}}
name := client.Name()
defaultLabels := []string{endpoint, name}
metricsHelper := prometheus_helper.SimpleMetricsHelper{Channel: ch, Labels: []string{}, DefaultLabels: defaultLabels}

nodeInfo, err := client.GetNodeInfo(ctx)
if err != nil {
Expand All @@ -117,16 +119,16 @@ func (collector *NodeinfoCollector) collectSingleInstance(client logstash_client
}

// ***** NODE *****
metricsHelper.Labels = []string{nodeInfo.Name, nodeInfo.Version, nodeInfo.Host, nodeInfo.HTTPAddress, nodeInfo.ID, endpoint}
metricsHelper.Labels = []string{nodeInfo.Name, nodeInfo.Version, nodeInfo.Host, nodeInfo.HTTPAddress, nodeInfo.ID}
metricsHelper.NewIntMetric(collector.NodeInfos, prometheus.CounterValue, 1)
// ****************

// ***** BUILD *****
metricsHelper.Labels = []string{nodeInfo.BuildDate, nodeInfo.BuildSHA, strconv.FormatBool(nodeInfo.BuildSnapshot), endpoint}
metricsHelper.Labels = []string{nodeInfo.BuildDate, nodeInfo.BuildSHA, strconv.FormatBool(nodeInfo.BuildSnapshot)}
metricsHelper.NewIntMetric(collector.BuildInfos, prometheus.CounterValue, 1)
// *****************

metricsHelper.Labels = []string{endpoint}
metricsHelper.Labels = []string{}

// ***** UP *****
metricsHelper.NewIntMetric(collector.Up, prometheus.GaugeValue, 1)
Expand All @@ -139,7 +141,7 @@ func (collector *NodeinfoCollector) collectSingleInstance(client logstash_client
// ********************

// ***** STATUS *****
metricsHelper.Labels = []string{nodeInfo.Status, endpoint}
metricsHelper.Labels = []string{nodeInfo.Status}
metricsHelper.NewIntMetric(collector.Status, prometheus.CounterValue, 1)
// ******************

Expand Down
8 changes: 8 additions & 0 deletions internal/collectors/nodeinfo/nodeinfo_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ func (m *mockClient) GetEndpoint() string {
return ""
}

func (m *mockClient) Name() string {
return ""
}

type errorMockClient struct{}

func (m *errorMockClient) GetNodeInfo(ctx context.Context) (*responses.NodeInfoResponse, error) {
Expand All @@ -55,6 +59,10 @@ func (m *errorMockClient) GetEndpoint() string {
return ""
}

func (m *errorMockClient) Name() string {
return ""
}

func TestCollectNotNil(t *testing.T) {
runTest := func(t *testing.T, clients []logstash_client.Client) {
collector := NewNodeinfoCollector(clients)
Expand Down
17 changes: 9 additions & 8 deletions internal/collectors/nodestats/nodestats_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,8 @@ func (collector *NodestatsCollector) collectSingleInstance(client logstash_clien
}

endpoint := client.GetEndpoint()
metricsHelper := prometheus_helper.SimpleMetricsHelper{Channel: ch, Labels: []string{endpoint}}
name := client.Name()
metricsHelper := prometheus_helper.SimpleMetricsHelper{Channel: ch, Labels: []string{}, DefaultLabels: []string{endpoint, name}}

// ************ THREADS ************
threadsStats := nodeStats.Jvm.Threads
Expand All @@ -214,7 +215,7 @@ func (collector *NodestatsCollector) collectSingleInstance(client logstash_clien

// ********* POOLS *********
// *** YOUNG ***
metricsHelper.Labels = []string{"young", endpoint}
metricsHelper.Labels = []string{"young"}
metricsHelper.NewInt64Metric(collector.JvmMemPoolPeakUsedInBytes, prometheus.GaugeValue, memStats.Pools.Young.PeakUsedInBytes)
metricsHelper.NewInt64Metric(collector.JvmMemPoolUsedInBytes, prometheus.GaugeValue, memStats.Pools.Young.UsedInBytes)
metricsHelper.NewInt64Metric(collector.JvmMemPoolPeakMaxInBytes, prometheus.GaugeValue, memStats.Pools.Young.PeakMaxInBytes)
Expand All @@ -223,7 +224,7 @@ func (collector *NodestatsCollector) collectSingleInstance(client logstash_clien
// *************

// *** OLD ***
metricsHelper.Labels = []string{"old", endpoint}
metricsHelper.Labels = []string{"old"}
metricsHelper.NewInt64Metric(collector.JvmMemPoolPeakUsedInBytes, prometheus.GaugeValue, memStats.Pools.Old.PeakUsedInBytes)
metricsHelper.NewInt64Metric(collector.JvmMemPoolUsedInBytes, prometheus.GaugeValue, memStats.Pools.Old.UsedInBytes)
metricsHelper.NewInt64Metric(collector.JvmMemPoolPeakMaxInBytes, prometheus.GaugeValue, memStats.Pools.Old.PeakMaxInBytes)
Expand All @@ -232,7 +233,7 @@ func (collector *NodestatsCollector) collectSingleInstance(client logstash_clien
// ***********

// *** SURVIVOR ***
metricsHelper.Labels = []string{"survivor", endpoint}
metricsHelper.Labels = []string{"survivor"}
metricsHelper.NewInt64Metric(collector.JvmMemPoolPeakUsedInBytes, prometheus.GaugeValue, memStats.Pools.Survivor.PeakUsedInBytes)
metricsHelper.NewInt64Metric(collector.JvmMemPoolUsedInBytes, prometheus.GaugeValue, memStats.Pools.Survivor.UsedInBytes)
metricsHelper.NewInt64Metric(collector.JvmMemPoolPeakMaxInBytes, prometheus.GaugeValue, memStats.Pools.Survivor.PeakMaxInBytes)
Expand All @@ -244,19 +245,19 @@ func (collector *NodestatsCollector) collectSingleInstance(client logstash_clien

// ************ GC ************
// ********* YOUNG *********
metricsHelper.Labels = []string{"young", endpoint}
metricsHelper.Labels = []string{"young"}
metricsHelper.NewIntMetric(collector.JvmGcCollectionCount, prometheus.CounterValue, nodeStats.Jvm.Gc.Collectors.Young.CollectionCount)
metricsHelper.NewIntMetric(collector.JvmGcCollectionTimeInMillis, prometheus.CounterValue, nodeStats.Jvm.Gc.Collectors.Young.CollectionTimeInMillis)
// *************************

// ********* OLD *********
metricsHelper.Labels = []string{"old", endpoint}
metricsHelper.Labels = []string{"old"}
metricsHelper.NewIntMetric(collector.JvmGcCollectionCount, prometheus.CounterValue, nodeStats.Jvm.Gc.Collectors.Old.CollectionCount)
metricsHelper.NewIntMetric(collector.JvmGcCollectionTimeInMillis, prometheus.CounterValue, nodeStats.Jvm.Gc.Collectors.Old.CollectionTimeInMillis)
// *************************
// ********************************

metricsHelper.Labels = []string{endpoint}
metricsHelper.Labels = []string{}

// ************ UPTIME ************
metricsHelper.NewIntMetric(collector.JvmUptimeMillis, prometheus.GaugeValue, nodeStats.Jvm.UptimeInMillis)
Expand Down Expand Up @@ -307,7 +308,7 @@ func (collector *NodestatsCollector) collectSingleInstance(client logstash_clien
// ******************************

for pipelineId, pipelineStats := range nodeStats.Pipelines {
collector.pipelineSubcollector.Collect(&pipelineStats, pipelineId, ch, endpoint)
collector.pipelineSubcollector.Collect(&pipelineStats, pipelineId, ch, endpoint, name)
}

return nil
Expand Down
8 changes: 8 additions & 0 deletions internal/collectors/nodestats/nodestats_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ func (m *mockClient) GetEndpoint() string {
return ""
}

func (m *mockClient) Name() string {
return ""
}

type errorMockClient struct{}

func (m *errorMockClient) GetNodeInfo(ctx context.Context) (*responses.NodeInfoResponse, error) {
Expand All @@ -55,6 +59,10 @@ func (m *errorMockClient) GetEndpoint() string {
return ""
}

func (m *errorMockClient) Name() string {
return ""
}

func TestCollectNotNil(t *testing.T) {
t.Parallel()

Expand Down
18 changes: 9 additions & 9 deletions internal/collectors/nodestats/pipeline_subcollector.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,11 @@ func NewPipelineSubcollector() *PipelineSubcollector {
}
}

func (subcollector *PipelineSubcollector) Collect(pipeStats *responses.SinglePipelineResponse, pipelineID string, ch chan<- prometheus.Metric, endpoint string) {
func (subcollector *PipelineSubcollector) Collect(pipeStats *responses.SinglePipelineResponse, pipelineID string, ch chan<- prometheus.Metric, endpoint string, name string) {
collectingStart := time.Now()
slog.Debug("collecting pipeline stats for pipeline", "pipelineID", pipelineID)

metricsHelper := prometheus_helper.SimpleMetricsHelper{Channel: ch, Labels: []string{pipelineID, endpoint}}
metricsHelper := prometheus_helper.SimpleMetricsHelper{Channel: ch, Labels: []string{pipelineID}, DefaultLabels: []string{endpoint, name}}

// ***** EVENTS *****
metricsHelper.NewInt64Metric(subcollector.EventsOut, prometheus.CounterValue, pipeStats.Events.Out)
Expand Down Expand Up @@ -183,11 +183,11 @@ func (subcollector *PipelineSubcollector) Collect(pipeStats *responses.SinglePip

// Response codes returned by output Bulk Requests
for code, count := range plugin.BulkRequests.Responses {
metricsHelper.Labels = []string{pluginType, plugin.Name, plugin.ID, code, pipelineID, endpoint}
metricsHelper.Labels = []string{pluginType, plugin.Name, plugin.ID, code, pipelineID}
metricsHelper.NewIntMetric(subcollector.PipelinePluginBulkRequestResponses, prometheus.CounterValue, count)
}

metricsHelper.Labels = []string{pluginType, plugin.Name, plugin.ID, pipelineID, endpoint}
metricsHelper.Labels = []string{pluginType, plugin.Name, plugin.ID, pipelineID}
metricsHelper.NewIntMetric(subcollector.PipelinePluginDocumentsSuccesses, prometheus.CounterValue, plugin.Documents.Successes)
metricsHelper.NewIntMetric(subcollector.PipelinePluginDocumentsNonRetryableFailures, prometheus.CounterValue, plugin.Documents.NonRetryableFailures)
metricsHelper.NewIntMetric(subcollector.PipelinePluginBulkRequestErrors, prometheus.CounterValue, plugin.BulkRequests.WithErrors)
Expand All @@ -199,7 +199,7 @@ func (subcollector *PipelineSubcollector) Collect(pipeStats *responses.SinglePip
pluginType := "input"
slog.Debug("collecting pipeline plugin stats for pipeline", "plugin type", pluginType, "name", plugin.Name, "id", plugin.ID, "pipelineID", pipelineID, "endpoint", endpoint)

metricsHelper.Labels = []string{pluginType, plugin.Name, plugin.ID, pipelineID, endpoint}
metricsHelper.Labels = []string{pluginType, plugin.Name, plugin.ID, pipelineID}
metricsHelper.NewIntMetric(subcollector.PipelinePluginEventsOut, prometheus.CounterValue, plugin.Events.Out)
metricsHelper.NewIntMetric(subcollector.PipelinePluginEventsQueuePushDuration, prometheus.GaugeValue, plugin.Events.QueuePushDurationInMillis)
}
Expand All @@ -211,12 +211,12 @@ func (subcollector *PipelineSubcollector) Collect(pipeStats *responses.SinglePip
slog.Debug("collecting pipeline plugin stats for pipeline", "plugin type", pluginType, "name", plugin.Name, "id", plugin.ID, "pipelineID", pipelineID, "endpoint", endpoint)

pluginType = "codec:encode"
metricsHelper.Labels = []string{pluginType, plugin.Name, plugin.ID, pipelineID, endpoint}
metricsHelper.Labels = []string{pluginType, plugin.Name, plugin.ID, pipelineID}
metricsHelper.NewIntMetric(subcollector.PipelinePluginEventsIn, prometheus.CounterValue, plugin.Encode.WritesIn)
metricsHelper.NewIntMetric(subcollector.PipelinePluginEventsDuration, prometheus.CounterValue, plugin.Encode.DurationInMillis)

pluginType = "codec:decode"
metricsHelper.Labels = []string{pluginType, plugin.Name, plugin.ID, pipelineID, endpoint}
metricsHelper.Labels = []string{pluginType, plugin.Name, plugin.ID, pipelineID}
metricsHelper.NewIntMetric(subcollector.PipelinePluginEventsIn, prometheus.CounterValue, plugin.Decode.WritesIn)
metricsHelper.NewIntMetric(subcollector.PipelinePluginEventsOut, prometheus.CounterValue, plugin.Decode.Out)
metricsHelper.NewIntMetric(subcollector.PipelinePluginEventsDuration, prometheus.CounterValue, plugin.Decode.DurationInMillis)
Expand All @@ -228,7 +228,7 @@ func (subcollector *PipelineSubcollector) Collect(pipeStats *responses.SinglePip
pluginType := "filter"
slog.Debug("collecting pipeline plugin stats for pipeline", "plugin type", pluginType, "name", plugin.Name, "id", plugin.ID, "pipelineID", pipelineID, "endpoint", endpoint)

metricsHelper.Labels = []string{pluginType, plugin.Name, plugin.ID, pipelineID, endpoint}
metricsHelper.Labels = []string{pluginType, plugin.Name, plugin.ID, pipelineID}
metricsHelper.NewIntMetric(subcollector.PipelinePluginEventsIn, prometheus.CounterValue, plugin.Events.In)
metricsHelper.NewIntMetric(subcollector.PipelinePluginEventsOut, prometheus.CounterValue, plugin.Events.Out)
metricsHelper.NewIntMetric(subcollector.PipelinePluginEventsDuration, prometheus.CounterValue, plugin.Events.DurationInMillis)
Expand All @@ -240,7 +240,7 @@ func (subcollector *PipelineSubcollector) Collect(pipeStats *responses.SinglePip
pluginType := "output"
slog.Debug("collecting pipeline plugin stats for pipeline", "plugin type", pluginType, "name", plugin.Name, "id", plugin.ID, "pipelineID", pipelineID, "endpoint", endpoint)

metricsHelper.Labels = []string{pluginType, plugin.Name, plugin.ID, pipelineID, endpoint}
metricsHelper.Labels = []string{pluginType, plugin.Name, plugin.ID, pipelineID}
metricsHelper.NewIntMetric(subcollector.PipelinePluginEventsIn, prometheus.CounterValue, plugin.Events.In)
metricsHelper.NewIntMetric(subcollector.PipelinePluginEventsOut, prometheus.CounterValue, plugin.Events.Out)
metricsHelper.NewIntMetric(subcollector.PipelinePluginEventsDuration, prometheus.CounterValue, plugin.Events.DurationInMillis)
Expand Down
22 changes: 21 additions & 1 deletion internal/fetcher/logstash_client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@ import (
"crypto/tls"
"encoding/json"
"net/http"
"regexp"
"strings"

"github.com/kuskoman/logstash-exporter/internal/fetcher/responses"
)

// Client is an interface for the Logstash client able to fetch data from the Logstash API
type Client interface {
Name() string
GetNodeInfo(ctx context.Context) (*responses.NodeInfoResponse, error)
GetNodeStats(ctx context.Context) (*responses.NodeStatsResponse, error)

Expand All @@ -21,16 +24,32 @@ type Client interface {
type DefaultClient struct {
httpClient *http.Client
endpoint string
name string
}

func (client *DefaultClient) GetEndpoint() string {
return client.endpoint
}

func (client *DefaultClient) Name() string {
if client.name == "" {
return client.convertHostnameToName()
}

return client.name
}

// convertHostnameToName converts a hostname to a name, that contains only alphanumeric characters and underscores
// Example: http://localhost:9600 -> localhost_9600
func (client *DefaultClient) convertHostnameToName() string {
re := regexp.MustCompile(`[^a-zA-Z0-9]+`)
return strings.Trim(re.ReplaceAllString(client.endpoint, "_"), "_")
}

const defaultLogstashEndpoint = "http://localhost:9600"

// NewClient returns a new instance of the DefaultClient configured with the given endpoint
func NewClient(endpoint string, httpInsecure bool) Client {
func NewClient(endpoint string, httpInsecure bool, name string) Client {
if endpoint == "" {
endpoint = defaultLogstashEndpoint
}
Expand All @@ -42,6 +61,7 @@ func NewClient(endpoint string, httpInsecure bool) Client {
},
},
endpoint: endpoint,
name: name,
}
}

Expand Down
Loading

0 comments on commit b46fc3c

Please sign in to comment.