Skip to content

Commit

Permalink
Re-implement connected_client to have deterministic cardinality
Browse files Browse the repository at this point in the history
  • Loading branch information
mindw committed Sep 11, 2024
1 parent 2e45f7e commit 11daff8
Show file tree
Hide file tree
Showing 4 changed files with 368 additions and 61 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ test:
TEST_REDIS_URI="redis://localhost:16384" \
TEST_REDIS5_URI="redis://localhost:16383" \
TEST_REDIS6_URI="redis://localhost:16379" \
TEST_REDIS74_URI="redis://localhost:16385" \
TEST_VALKEY7_URI="redis://localhost:16384" \
TEST_REDIS_2_8_URI="redis://localhost:16381" \
TEST_KEYDB01_URI="redis://localhost:16401" \
Expand Down
7 changes: 6 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
version: '3'
services:

valkey7:
Expand All @@ -20,6 +19,12 @@ services:
ports:
- "16379:6379"

redis74:
image: redis:7.4
command: "redis-server --protected-mode no --dbfilename dump74.rdb"
ports:
- "16385:6379"

pwd-redis5:
image: redis:5
command: "redis-server --requirepass redis-password --dbfilename dump5-pwd.rdb"
Expand Down
285 changes: 259 additions & 26 deletions exporter/clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,26 @@ import (
)

type ClientInfo struct {
Id,
Name,
User,
CreatedAt,
IdleSince,
Flags,
Db,
OMem,
Cmd,
Host,
Port,
Resp string
CreatedAt,
IdleSince,
Sub,
Psub,
Ssub,
Watch,
Qbuf,
QbufFree,
Obl,
Oll,
OMem,
TotMem int64
}

/*
Expand All @@ -36,6 +45,8 @@ func parseClientListString(clientInfo string) (*ClientInfo, bool) {
return nil, false
}
connectedClient := ClientInfo{}
connectedClient.Ssub = -1 // mark it as missing - introduced in Redis 7.0.3
connectedClient.Watch = -1 // mark it as missing - introduced in Redis 7.4
for _, kvPart := range strings.Split(clientInfo, " ") {
vPart := strings.Split(kvPart, "=")
if len(vPart) != 2 {
Expand All @@ -44,32 +55,100 @@ func parseClientListString(clientInfo string) (*ClientInfo, bool) {
}

switch vPart[0] {
case "id":
connectedClient.Id = vPart[1]
case "name":
connectedClient.Name = vPart[1]
case "user":
connectedClient.User = vPart[1]
case "age":
createdAt, err := durationFieldToTimestamp(vPart[1])
if err != nil {
log.Debugf("cloud not parse age field(%s): %s", vPart[1], err.Error())
log.Debugf("could not parse 'age' field(%s): %s", vPart[1], err.Error())
return nil, false
}
connectedClient.CreatedAt = createdAt
case "idle":
idleSinceTs, err := durationFieldToTimestamp(vPart[1])
if err != nil {
log.Debugf("cloud not parse idle field(%s): %s", vPart[1], err.Error())
log.Debugf("could not parse 'idle' field(%s): %s", vPart[1], err.Error())
return nil, false
}
connectedClient.IdleSince = idleSinceTs
case "flags":
connectedClient.Flags = vPart[1]
case "db":
connectedClient.Db = vPart[1]
case "sub":
Sub, err := strconv.ParseInt(vPart[1], 10, 64)
if err != nil {
log.Debugf("could not parse 'sub' field(%s): %s", vPart[1], err.Error())
return nil, false

Check warning on line 86 in exporter/clients.go

View check run for this annotation

Codecov / codecov/patch

exporter/clients.go#L85-L86

Added lines #L85 - L86 were not covered by tests
}
connectedClient.Sub = Sub
case "psub":
Psub, err := strconv.ParseInt(vPart[1], 10, 64)
if err != nil {
log.Debugf("could not parse 'psub' field(%s): %s", vPart[1], err.Error())
return nil, false

Check warning on line 93 in exporter/clients.go

View check run for this annotation

Codecov / codecov/patch

exporter/clients.go#L92-L93

Added lines #L92 - L93 were not covered by tests
}
connectedClient.Psub = Psub
case "ssub":
Ssub, err := strconv.ParseInt(vPart[1], 10, 64)
if err != nil {
log.Debugf("could not parse 'ssub' field(%s): %s", vPart[1], err.Error())
return nil, false

Check warning on line 100 in exporter/clients.go

View check run for this annotation

Codecov / codecov/patch

exporter/clients.go#L99-L100

Added lines #L99 - L100 were not covered by tests
}
connectedClient.Ssub = Ssub
case "watch":
Watch, err := strconv.ParseInt(vPart[1], 10, 64)
if err != nil {
log.Debugf("could not parse 'watch' field(%s): %s", vPart[1], err.Error())
return nil, false

Check warning on line 107 in exporter/clients.go

View check run for this annotation

Codecov / codecov/patch

exporter/clients.go#L106-L107

Added lines #L106 - L107 were not covered by tests
}
connectedClient.Watch = Watch
case "qbuf":
Qbuf, err := strconv.ParseInt(vPart[1], 10, 64)
if err != nil {
log.Debugf("could not parse 'qbuf' field(%s): %s", vPart[1], err.Error())
return nil, false

Check warning on line 114 in exporter/clients.go

View check run for this annotation

Codecov / codecov/patch

exporter/clients.go#L113-L114

Added lines #L113 - L114 were not covered by tests
}
connectedClient.Qbuf = Qbuf
case "qbuf-free":
QbufFree, err := strconv.ParseInt(vPart[1], 10, 64)
if err != nil {
log.Debugf("could not parse 'qbuf-free' field(%s): %s", vPart[1], err.Error())
return nil, false

Check warning on line 121 in exporter/clients.go

View check run for this annotation

Codecov / codecov/patch

exporter/clients.go#L120-L121

Added lines #L120 - L121 were not covered by tests
}
connectedClient.QbufFree = QbufFree
case "obl":
Obl, err := strconv.ParseInt(vPart[1], 10, 64)
if err != nil {
log.Debugf("could not parse 'obl' field(%s): %s", vPart[1], err.Error())
return nil, false

Check warning on line 128 in exporter/clients.go

View check run for this annotation

Codecov / codecov/patch

exporter/clients.go#L127-L128

Added lines #L127 - L128 were not covered by tests
}
connectedClient.Obl = Obl
case "oll":
Oll, err := strconv.ParseInt(vPart[1], 10, 64)
if err != nil {
log.Debugf("could not parse 'oll' field(%s): %s", vPart[1], err.Error())
return nil, false

Check warning on line 135 in exporter/clients.go

View check run for this annotation

Codecov / codecov/patch

exporter/clients.go#L134-L135

Added lines #L134 - L135 were not covered by tests
}
connectedClient.Oll = Oll
case "omem":
connectedClient.OMem = vPart[1]
case "cmd":
connectedClient.Cmd = vPart[1]
OMem, err := strconv.ParseInt(vPart[1], 10, 64)
if err != nil {
log.Debugf("could not parse 'omem' field(%s): %s", vPart[1], err.Error())
return nil, false

Check warning on line 142 in exporter/clients.go

View check run for this annotation

Codecov / codecov/patch

exporter/clients.go#L141-L142

Added lines #L141 - L142 were not covered by tests
}
connectedClient.OMem = OMem
case "tot-mem":
TotMem, err := strconv.ParseInt(vPart[1], 10, 64)
if err != nil {
log.Debugf("could not parse 'tot-mem' field(%s): %s", vPart[1], err.Error())
return nil, false

Check warning on line 149 in exporter/clients.go

View check run for this annotation

Codecov / codecov/patch

exporter/clients.go#L148-L149

Added lines #L148 - L149 were not covered by tests
}
connectedClient.TotMem = TotMem
case "addr":
hostPortString := strings.Split(vPart[1], ":")
if len(hostPortString) < 2 {
Expand All @@ -86,13 +165,12 @@ func parseClientListString(clientInfo string) (*ClientInfo, bool) {
return &connectedClient, true
}

func durationFieldToTimestamp(field string) (string, error) {
func durationFieldToTimestamp(field string) (int64, error) {
parsed, err := strconv.ParseInt(field, 10, 64)
if err != nil {
return "", err
return 0, err
}

return strconv.FormatInt(time.Now().Unix()-parsed, 10), nil
return time.Now().Unix() - parsed, nil
}

func (e *Exporter) extractConnectedClientMetrics(ch chan<- prometheus.Metric, c redis.Conn) {
Expand All @@ -103,30 +181,185 @@ func (e *Exporter) extractConnectedClientMetrics(ch chan<- prometheus.Metric, c
}

for _, c := range strings.Split(reply, "\n") {
if lbls, ok := parseClientListString(c); ok {
connectedClientsLabels := []string{"name", "created_at", "idle_since", "flags", "db", "omem", "cmd", "host"}
connectedClientsLabelsValues := []string{lbls.Name, lbls.CreatedAt, lbls.IdleSince, lbls.Flags, lbls.Db, lbls.OMem, lbls.Cmd, lbls.Host}
if info, ok := parseClientListString(c); ok {
clientInfoLabels := []string{"id", "name", "flags", "db", "host"}
clientInfoLabelsValues := []string{info.Id, info.Name, info.Flags, info.Db, info.Host}

if e.options.ExportClientsInclPort {
connectedClientsLabels = append(connectedClientsLabels, "port")
connectedClientsLabelsValues = append(connectedClientsLabelsValues, lbls.Port)
clientInfoLabels = append(clientInfoLabels, "port")
clientInfoLabelsValues = append(clientInfoLabelsValues, info.Port)
}

if user := lbls.User; user != "" {
connectedClientsLabels = append(connectedClientsLabels, "user")
connectedClientsLabelsValues = append(connectedClientsLabelsValues, user)
if user := info.User; user != "" {
clientInfoLabels = append(clientInfoLabels, "user")
clientInfoLabelsValues = append(clientInfoLabelsValues, user)
}

if resp := lbls.Resp; resp != "" {
connectedClientsLabels = append(connectedClientsLabels, "resp")
connectedClientsLabelsValues = append(connectedClientsLabelsValues, resp)
// introduced in Redis 7.0
if resp := info.Resp; resp != "" {
clientInfoLabels = append(clientInfoLabels, "resp")
clientInfoLabelsValues = append(clientInfoLabelsValues, resp)
}

e.metricDescriptions["connected_clients_details"] = newMetricDescr(e.options.Namespace, "connected_clients_details", "Details about connected clients", connectedClientsLabels)
e.metricDescriptions["connected_client_info"] = newMetricDescr(
e.options.Namespace,
"connected_client_info",
"Details about a connected client",
clientInfoLabels,
)
e.registerConstMetricGauge(
ch, "connected_client_info", 1.0,
clientInfoLabelsValues...,
)

// keep the old name for backwards compatability
e.metricDescriptions["connected_clients_details"] = newMetricDescr(
e.options.Namespace,
"connected_clients_details",
"Details about a connected client",
clientInfoLabels,
)
e.registerConstMetricGauge(
ch, "connected_clients_details", 1.0,
connectedClientsLabelsValues...,
clientInfoLabelsValues...,
)

clientBaseLabels := []string{"id", "name"}
clientBaseLabelsValues := []string{info.Id, info.Name}

e.metricDescriptions["connected_client_output_buffer_memory_usage_bytes"] = newMetricDescr(
e.options.Namespace,
"connected_client_output_buffer_memory_usage_bytes",
"A connected client's output buffer memory usage in bytes",
clientBaseLabels,
)
e.registerConstMetricGauge(
ch, "connected_client_output_buffer_memory_usage_bytes", float64(info.OMem),
clientBaseLabelsValues...,
)

e.metricDescriptions["connected_client_total_memory_consumed_bytes"] = newMetricDescr(
e.options.Namespace,
"connected_client_total_memory_consumed_bytes",
"Total memory consumed by a client in its various buffers",
clientBaseLabels,
)
e.registerConstMetricGauge(
ch, "connected_client_total_memory_consumed_bytes", float64(info.TotMem),
clientBaseLabelsValues...,
)

e.metricDescriptions["connected_client_created_at_timestamp"] = newMetricDescr(
e.options.Namespace,
"connected_client_created_at_timestamp",
"A connected client's creation timestamp",
clientBaseLabels,
)
e.registerConstMetricGauge(
ch, "connected_client_created_at_timestamp", float64(info.CreatedAt),
clientBaseLabelsValues...,
)

e.metricDescriptions["connected_client_idle_since_timestamp"] = newMetricDescr(
e.options.Namespace,
"connected_client_idle_since_timestamp",
"A connected client's idle since timestamp",
clientBaseLabels,
)
e.registerConstMetricGauge(
ch, "connected_client_idle_since_timestamp", float64(info.IdleSince),
clientBaseLabelsValues...,
)

e.metricDescriptions["connected_client_channel_subscriptions_count"] = newMetricDescr(
e.options.Namespace,
"connected_client_channel_subscriptions_count",
"A connected client's number of channel subscriptions",
clientBaseLabels,
)
e.registerConstMetricGauge(
ch, "connected_client_channel_subscriptions_count", float64(info.Sub),
clientBaseLabelsValues...,
)

e.metricDescriptions["connected_client_pattern_matching_subscriptions_count"] = newMetricDescr(
e.options.Namespace,
"connected_client_pattern_matching_subscriptions_count",
"A connected client's number of pattern matching subscriptions",
clientBaseLabels,
)
e.registerConstMetricGauge(
ch, "connected_client_pattern_matching_subscriptions_count", float64(info.Psub),
clientBaseLabelsValues...,
)

if info.Ssub != -1 {
e.metricDescriptions["connected_client_shard_channel_subscriptions_count"] = newMetricDescr(
e.options.Namespace,
"connected_client_shard_channel_subscriptions_count",
"a connected client's number of shard channel subscriptions",
clientBaseLabels,
)
e.registerConstMetricGauge(
ch, "connected_client_shard_channel_subscriptions_count", float64(info.Ssub),
clientBaseLabelsValues...,
)
}
if info.Watch != -1 {
e.metricDescriptions["connected_client_shard_channel_watched_keys"] = newMetricDescr(
e.options.Namespace,
"connected_client_shard_channel_watched_keys",
"a connected client's number of keys it's currently watching",
clientBaseLabels,
)
e.registerConstMetricGauge(
ch, "connected_client_shard_channel_watched_keys", float64(info.Watch),
clientBaseLabelsValues...,
)
}

e.metricDescriptions["connected_client_query_buffer_length_bytes"] = newMetricDescr(
e.options.Namespace,
"connected_client_query_buffer_length_bytes",
"A connected client's query buffer length in bytes (0 means no query pending)",
clientBaseLabels,
)
e.registerConstMetricGauge(
ch, "connected_client_query_buffer_length_bytes", float64(info.Qbuf),
clientBaseLabelsValues...,
)

e.metricDescriptions["connected_client_query_buffer_free_space_bytes"] = newMetricDescr(
e.options.Namespace,
"connected_client_query_buffer_free_space_bytes",
"A connected client's free space of the query buffer in bytes (0 means the buffer is full)",
clientBaseLabels,
)
e.registerConstMetricGauge(
ch, "connected_client_query_buffer_free_space_bytes", float64(info.QbufFree),
clientBaseLabelsValues...,
)

e.metricDescriptions["connected_client_output_buffer_length_bytes"] = newMetricDescr(
e.options.Namespace,
"connected_client_output_buffer_length_bytes",
"A connected client's output buffer length in bytes",
clientBaseLabels,
)
e.registerConstMetricGauge(
ch, "connected_client_output_buffer_length_bytes", float64(info.Obl),
clientBaseLabelsValues...,
)

e.metricDescriptions["connected_client_output_list_length"] = newMetricDescr(
e.options.Namespace,
"connected_client_output_list_length",
"A connected client's output list length (replies are queued in this list when the buffer is full)",
clientBaseLabels,
)
e.registerConstMetricGauge(
ch, "connected_client_output_list_length", float64(info.Oll),
clientBaseLabelsValues...,
)
}
}
Expand Down
Loading

0 comments on commit 11daff8

Please sign in to comment.