Skip to content

Commit

Permalink
Emit metric for redis and bigtable client (#210)
Browse files Browse the repository at this point in the history
* Add metric for redis and bigtable direct serving

* Check whether pool stats for redis nil or not

* Make status string as const instead of var

* Remove bigtable grpc metric

Co-authored-by: Tio Pramayudi <[email protected]>
  • Loading branch information
tiopramayudi and tiopramayudi authored Feb 3, 2022
1 parent 70ba658 commit e90be53
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 1 deletion.
5 changes: 4 additions & 1 deletion api/pkg/transformer/feast/bigtablestore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,10 @@ func newBigtableClient(storage *spec.BigTableStorage) (*bigtable.Client, error)
if opt.KeepAliveTimeout != nil {
keepAliveParams.Timeout = opt.KeepAliveTimeout.AsDuration()
}
clientOpts = append(clientOpts, option.WithGRPCDialOption(grpc.WithKeepaliveParams(keepAliveParams)))

clientOpts = append(clientOpts,
option.WithGRPCDialOption(grpc.WithKeepaliveParams(keepAliveParams)),
)

if opt.CredentialJson != "" {
credsByte, err := base64.StdEncoding.DecodeString(opt.CredentialJson)
Expand Down
43 changes: 43 additions & 0 deletions api/pkg/transformer/feast/redis/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,15 @@ func NewRedisClient(redisStorage *spec.RedisStorage, featureTablesMetadata []*sp
IdleTimeout: getNullableDuration(option.IdleTimeout),
IdleCheckFrequency: getNullableDuration(option.IdleCheckFrequency),
MinIdleConns: int(option.MinIdleConnections),
OnConnect: func(_ context.Context, _ *redis.Conn) error {
redisNewConn.WithLabelValues().Inc()
return nil
},
})

redisClient.AddHook(&redisHook{})
go recordRedisConnMetric(redisClient, nil)

return newClient(redisClient.Pipeline(), featureTablesMetadata)
}

Expand All @@ -56,11 +63,47 @@ func NewRedisClusterClient(redisClusterStorage *spec.RedisClusterStorage, featur
IdleTimeout: getNullableDuration(option.IdleTimeout),
IdleCheckFrequency: getNullableDuration(option.IdleCheckFrequency),
MinIdleConns: int(option.MinIdleConnections),
OnConnect: func(_ context.Context, _ *redis.Conn) error {
redisNewConn.WithLabelValues().Inc()
return nil
},
})

redisClient.AddHook(&redisHook{})
go recordRedisConnMetric(nil, redisClient)

return newClient(redisClient.Pipeline(), featureTablesMetadata)
}

func recordRedisConnMetric(client *redis.Client, clusterClient *redis.ClusterClient) {
ctx := context.Background()
ticker := time.NewTicker(5 * time.Second)
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
var poolStats *redis.PoolStats
if client != nil {
poolStats = client.PoolStats()
} else {
poolStats = clusterClient.PoolStats()
}

if poolStats == nil {
continue
}

redisConnPoolStats.WithLabelValues(hitConnStats).Set(float64(poolStats.Hits))
redisConnPoolStats.WithLabelValues(missConnStats).Set(float64(poolStats.Misses))
redisConnPoolStats.WithLabelValues(timeoutConnStats).Set(float64(poolStats.Timeouts))
redisConnPoolStats.WithLabelValues(idleConnStats).Set(float64(poolStats.IdleConns))
redisConnPoolStats.WithLabelValues(staleConnStats).Set(float64(poolStats.StaleConns))
redisConnPoolStats.WithLabelValues(totalConnStats).Set(float64(poolStats.TotalConns))
}
}
}

func newClient(pipeliner redis.Pipeliner, featureTablesMetadata []*spec.FeatureTableMetadata) (RedisClient, error) {
return RedisClient{
encoder: newRedisEncoder(featureTablesMetadata),
Expand Down
92 changes: 92 additions & 0 deletions api/pkg/transformer/feast/redis/instrumentation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package redis

import (
"context"
"time"

"github.com/go-redis/redis/v8"
"github.com/gojek/merlin/pkg/transformer"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

const (
hitConnStats = "hits"
missConnStats = "misses"
timeoutConnStats = "timeouts"
idleConnStats = "idle_conns"
staleConnStats = "stale_conns"
totalConnStats = "total_conns"

success = "success"
fail = "fail"
)

var (
redisNewConn = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: transformer.PromNamespace,
Name: "redis_new_connection_count",
Help: "New connection established to redis server",
}, []string{})

redisPipeline = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: transformer.PromNamespace,
Name: "redis_pipelined_count",
Help: "Number of pipelined redis command",
}, []string{"status", "command"})

redisCommandLatency = promauto.NewHistogramVec(prometheus.HistogramOpts{
Namespace: transformer.PromNamespace,
Name: "redis_command_latency_ms",
Help: "Latency of redis command",
Buckets: prometheus.ExponentialBuckets(1, 2, 10), // 1,2,4,8,16,32,64,128,256,512,+Inf
}, []string{"command"})

redisConnPoolStats = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: transformer.PromNamespace,
Name: "redis_conn_stats",
Help: "Redis connection stats",
}, []string{"stat"})
)

type redisHook struct{}

type requestStartKey struct{}

func (h *redisHook) BeforeProcess(ctx context.Context, cmd redis.Cmder) (context.Context, error) {
return context.WithValue(ctx, requestStartKey{}, time.Now()), nil
}

func (h *redisHook) AfterProcess(ctx context.Context, cmd redis.Cmder) error {
startTime, ok := ctx.Value(requestStartKey{}).(time.Time)
if !ok {
return nil
}

duration := time.Since(startTime).Milliseconds()
redisCommandLatency.WithLabelValues(cmd.Name()).Observe(float64(duration))
return nil
}

func (h *redisHook) BeforeProcessPipeline(ctx context.Context, cmds []redis.Cmder) (context.Context, error) {
return context.WithValue(ctx, requestStartKey{}, time.Now()), nil
}

func (h *redisHook) AfterProcessPipeline(ctx context.Context, cmds []redis.Cmder) error {
if err := h.AfterProcess(ctx, redis.NewCmd(ctx, "pipeline")); err != nil {
return err
}

for _, cmd := range cmds {
if isActualError(cmd.Err()) {
redisPipeline.WithLabelValues(fail, cmd.Name()).Inc()
continue
}
redisPipeline.WithLabelValues(success, cmd.Name()).Inc()
}
return nil
}

func isActualError(err error) bool {
return err != nil && err != redis.Nil
}

0 comments on commit e90be53

Please sign in to comment.