Skip to content

Commit

Permalink
Merge pull request #157 from guoming0000/main
Browse files Browse the repository at this point in the history
add delay metrics for kafka, add max body size for http log
  • Loading branch information
luduoxin authored Feb 28, 2024
2 parents 17764de + ad3fd58 commit abff1ee
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 15 deletions.
1 change: 1 addition & 0 deletions mq/gokafka.v2/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ func (kr *Consumer) Handle(ctx context.Context, handle func(msg kafka.Message) e
startTime := time.Now()
err = handle(m)
metricReqDuration.WithLabelValues(m.Topic, sub).Observe(float64(time.Since(startTime).Milliseconds()))
metricsDelay.WithLabelValues(m.Topic).Observe(float64(time.Since(m.Time).Milliseconds()))
ackErr := kr.Reader.CommitMessages(ctx, m)
if ackErr != nil {
glog.ErrorC(ctx, "Kafka Consumer.CommitMessages error:%+v", ackErr)
Expand Down
9 changes: 9 additions & 0 deletions mq/gokafka.v2/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,18 @@ var (
Name: "process_result",
Help: "kafka pub/sub result",
}, []string{"topic", "command", "result"})
metricsDelay = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: "delay",
Name: "duration_ms",
Help: "kafka client delay time(ms).",
Buckets: []float64{10, 50, 200, 1000, 5000, 20000, 100000},
}, []string{"topic"})
)

func init() {
prometheus.MustRegister(metricReqDuration)
prometheus.MustRegister(metricsResult)
prometheus.MustRegister(metricsDelay)
}
4 changes: 2 additions & 2 deletions mq/gokafka.v2/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (w *Producer) Send(ctx context.Context, topic string, key string, value []b
result := "success"
if err != nil {
result = "fail"
glog.ErrorC(ctx, "Kafka WriteMessages unexpected error:%v", err)
glog.ErrorC(ctx, "Kafka WriteMessages unexpected error:%v topic:%v key:%s value:%s", err, topic, key, value)
}
metricsResult.WithLabelValues(topic, pub, result).Inc()
metricReqDuration.WithLabelValues(topic, pub).Observe(float64(time.Since(startTime).Milliseconds()))
Expand All @@ -113,7 +113,7 @@ func (w *Producer) SendBatch(ctx context.Context, msgs ...kafka.Message) error {
result := "success"
if err != nil {
result = "fail"
glog.ErrorC(ctx, "Kafka WriteMessages unexpected error:%v", err)
glog.ErrorC(ctx, "Kafka WriteMessages unexpected error:%v len(msgs)=%v", err, len(msgs))
}
cost := float64(time.Since(startTime).Milliseconds())
for _, msg := range msgs {
Expand Down
25 changes: 17 additions & 8 deletions utils/http-request/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,19 @@ import (
"github.com/sunmi-OS/gocore/v2/utils"
)

const maxShowBodySize = 1024 * 100

type HttpClient struct {
Client *resty.Client
Request *resty.Request

disableLog bool // default: false 默认打印日志(配置SetLog后)
disableMetrics bool // default: false 默认开启统计
disableBreaker bool // default: true 默认关闭熔断
slowThresholdMs int64 // default: 0 默认关闭慢请求打印
hideRespBodyLogsWithPath map[string]bool
hideReqBodyLogsWithPath map[string]bool
disableLog bool // default: false 默认打印日志(配置SetLog后)
disableMetrics bool // default: false 默认开启统计
disableBreaker bool // default: true 默认关闭熔断
slowThresholdMs int64 // default: 0 默认关闭慢请求打印
hideRespBodyLogsWithPath map[string]bool // 不打印path在map里的返回体
hideReqBodyLogsWithPath map[string]bool // 不打印path在map里的请求体
maxShowBodySize int64
}

func New() *HttpClient {
Expand Down Expand Up @@ -53,6 +56,7 @@ func New() *HttpClient {
disableBreaker: true, // default disable, will open soon
hideReqBodyLogsWithPath: hidelBodyLogsPath,
hideRespBodyLogsWithPath: hidelBodyLogsPath,
maxShowBodySize: maxShowBodySize,
}
}

Expand All @@ -77,8 +81,8 @@ func (h *HttpClient) SetDisableBreaker(disable bool) *HttpClient {
return h
}

func (h *HttpClient) SetSlowThresholdMs(threshold int64) *HttpClient {
h.slowThresholdMs = threshold
func (h *HttpClient) SetMaxShowBodySize(bodySize int64) *HttpClient {
h.maxShowBodySize = bodySize
return h
}

Expand All @@ -98,6 +102,11 @@ func (h *HttpClient) SetReqBodyLogsWithPath(paths []string) *HttpClient {
return h
}

func (h *HttpClient) SetSlowThresholdMs(threshold int64) *HttpClient {
h.slowThresholdMs = threshold
return h
}

// ErrIncorrectCode 非2xx 状态码
var ErrIncorrectCode = errors.New("incorrect http status")

Expand Down
10 changes: 5 additions & 5 deletions utils/http-request/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,15 @@ func (h *HttpClient) SetLog(log Log) *HttpClient {
reqBody = hideBody
respBody := hideBody
path := r.RawRequest.URL.Path
if !h.hideRespBodyLogsWithPath[path] {
sendBytes := r.RawRequest.ContentLength
recvBytes := resp.Size()
statusCode := resp.StatusCode()
if !h.hideRespBodyLogsWithPath[path] && recvBytes < h.maxShowBodySize {
respBody = string(resp.Body())
}
if !h.hideReqBodyLogsWithPath[path] {
if !h.hideReqBodyLogsWithPath[path] && sendBytes < h.maxShowBodySize {
reqBody = r.Body
}
sendBytes := r.RawRequest.ContentLength
recvBytes := resp.Size()
statusCode := resp.StatusCode()

if !h.disableMetrics {
clientSendBytes.WithLabelValues(path).Add(mustPositive(sendBytes))
Expand Down

0 comments on commit abff1ee

Please sign in to comment.