From 54d0a0f2d404748993f7cd1ec29d03bd8f582078 Mon Sep 17 00:00:00 2001 From: "ming.guo" Date: Mon, 5 Feb 2024 10:17:33 +0800 Subject: [PATCH 1/2] add bodySize for http, delay for kafka --- mq/gokafka.v2/consumer.go | 1 + mq/gokafka.v2/metrics.go | 9 +++++++++ mq/gokafka.v2/producer.go | 4 ++-- utils/http-request/http.go | 13 +++++++++++-- utils/http-request/log.go | 10 +++++----- 5 files changed, 28 insertions(+), 9 deletions(-) diff --git a/mq/gokafka.v2/consumer.go b/mq/gokafka.v2/consumer.go index 2787edc..a8c6828 100644 --- a/mq/gokafka.v2/consumer.go +++ b/mq/gokafka.v2/consumer.go @@ -87,6 +87,7 @@ func (kr *Consumer) Handle(ctx context.Context, handle func(msg kafka.Message) e startTime := time.Now() err = handle(m) metricReqDuration.WithLabelValues(m.Topic, sub).Observe(float64(time.Since(startTime).Milliseconds())) + metricsDelay.WithLabelValues(m.Topic).Observe(float64(time.Since(m.Time).Milliseconds())) ackErr := kr.Reader.CommitMessages(ctx, m) if ackErr != nil { glog.ErrorC(ctx, "Kafka Consumer.CommitMessages error:%+v", ackErr) diff --git a/mq/gokafka.v2/metrics.go b/mq/gokafka.v2/metrics.go index 4a4c32b..6fd5eec 100644 --- a/mq/gokafka.v2/metrics.go +++ b/mq/gokafka.v2/metrics.go @@ -24,9 +24,18 @@ var ( Name: "process_result", Help: "kafka pub/sub result", }, []string{"topic", "command", "result"}) + metricsDelay = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: "delay", + Name: "duration_ms", + Help: "kafka client delay time(ms).", + Buckets: []float64{10, 50, 200, 1000, 5000, 20000, 100000}, + }, []string{"topic"}) ) func init() { prometheus.MustRegister(metricReqDuration) prometheus.MustRegister(metricsResult) + prometheus.MustRegister(metricsDelay) } diff --git a/mq/gokafka.v2/producer.go b/mq/gokafka.v2/producer.go index 25e8528..fb4dba0 100644 --- a/mq/gokafka.v2/producer.go +++ b/mq/gokafka.v2/producer.go @@ -100,7 +100,7 @@ func (w *Producer) Send(ctx context.Context, topic string, key string, value []b result := "success" if err != nil { result = "fail" - glog.ErrorC(ctx, "Kafka WriteMessages unexpected error:%v", err) + glog.ErrorC(ctx, "Kafka WriteMessages unexpected error:%v topic:%v key:%s value:%s", err, topic, key, value) } metricsResult.WithLabelValues(topic, pub, result).Inc() metricReqDuration.WithLabelValues(topic, pub).Observe(float64(time.Since(startTime).Milliseconds())) @@ -113,7 +113,7 @@ func (w *Producer) SendBatch(ctx context.Context, msgs ...kafka.Message) error { result := "success" if err != nil { result = "fail" - glog.ErrorC(ctx, "Kafka WriteMessages unexpected error:%v", err) + glog.ErrorC(ctx, "Kafka WriteMessages unexpected error:%v len(msgs)=%v", err, len(msgs)) } cost := float64(time.Since(startTime).Milliseconds()) for _, msg := range msgs { diff --git a/utils/http-request/http.go b/utils/http-request/http.go index e7326eb..a216735 100644 --- a/utils/http-request/http.go +++ b/utils/http-request/http.go @@ -10,6 +10,8 @@ import ( "github.com/sunmi-OS/gocore/v2/utils" ) +const maxShowBodySize = 1024 * 100 + type HttpClient struct { Client *resty.Client Request *resty.Request @@ -20,6 +22,7 @@ type HttpClient struct { slowThresholdMs int64 // default: 0 默认关闭慢请求打印 hideRespBodyLogsWithPath map[string]bool hideReqBodyLogsWithPath map[string]bool + maxShowBodySize int64 } func New() *HttpClient { @@ -53,6 +56,7 @@ func New() *HttpClient { disableBreaker: true, // default disable, will open soon hideReqBodyLogsWithPath: hidelBodyLogsPath, hideRespBodyLogsWithPath: hidelBodyLogsPath, + maxShowBodySize: maxShowBodySize, } } @@ -77,8 +81,8 @@ func (h *HttpClient) SetDisableBreaker(disable bool) *HttpClient { return h } -func (h *HttpClient) SetSlowThresholdMs(threshold int64) *HttpClient { - h.slowThresholdMs = threshold +func (h *HttpClient) SetMaxShowBodySize(bodySize int64) *HttpClient { + h.maxShowBodySize = bodySize return h } @@ -98,6 +102,11 @@ func (h *HttpClient) SetReqBodyLogsWithPath(paths []string) *HttpClient { return h } +func (h *HttpClient) SetSlowThresholdMs(threshold int64) *HttpClient { + h.slowThresholdMs = threshold + return h +} + // ErrIncorrectCode 非2xx 状态码 var ErrIncorrectCode = errors.New("incorrect http status") diff --git a/utils/http-request/log.go b/utils/http-request/log.go index 1998805..1649e92 100644 --- a/utils/http-request/log.go +++ b/utils/http-request/log.go @@ -40,15 +40,15 @@ func (h *HttpClient) SetLog(log Log) *HttpClient { reqBody = hideBody respBody := hideBody path := r.RawRequest.URL.Path - if !h.hideRespBodyLogsWithPath[path] { + sendBytes := r.RawRequest.ContentLength + recvBytes := resp.Size() + statusCode := resp.StatusCode() + if !h.hideRespBodyLogsWithPath[path] && recvBytes < maxShowBodySize { respBody = string(resp.Body()) } - if !h.hideReqBodyLogsWithPath[path] { + if !h.hideReqBodyLogsWithPath[path] && sendBytes < maxShowBodySize { reqBody = r.Body } - sendBytes := r.RawRequest.ContentLength - recvBytes := resp.Size() - statusCode := resp.StatusCode() if !h.disableMetrics { clientSendBytes.WithLabelValues(path).Add(mustPositive(sendBytes)) From ad3fd5853ab4fd19d2c5d21fdd37f6af72742f63 Mon Sep 17 00:00:00 2001 From: "ming.guo" Date: Wed, 28 Feb 2024 16:29:34 +0800 Subject: [PATCH 2/2] fix max body size --- utils/http-request/http.go | 12 ++++++------ utils/http-request/log.go | 4 ++-- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/utils/http-request/http.go b/utils/http-request/http.go index a216735..1363f31 100644 --- a/utils/http-request/http.go +++ b/utils/http-request/http.go @@ -16,12 +16,12 @@ type HttpClient struct { Client *resty.Client Request *resty.Request - disableLog bool // default: false 默认打印日志(配置SetLog后) - disableMetrics bool // default: false 默认开启统计 - disableBreaker bool // default: true 默认关闭熔断 - slowThresholdMs int64 // default: 0 默认关闭慢请求打印 - hideRespBodyLogsWithPath map[string]bool - hideReqBodyLogsWithPath map[string]bool + disableLog bool // default: false 默认打印日志(配置SetLog后) + disableMetrics bool // default: false 默认开启统计 + disableBreaker bool // default: true 默认关闭熔断 + slowThresholdMs int64 // default: 0 默认关闭慢请求打印 + hideRespBodyLogsWithPath map[string]bool // 不打印path在map里的返回体 + hideReqBodyLogsWithPath map[string]bool // 不打印path在map里的请求体 maxShowBodySize int64 } diff --git a/utils/http-request/log.go b/utils/http-request/log.go index 1649e92..ac2d06a 100644 --- a/utils/http-request/log.go +++ b/utils/http-request/log.go @@ -43,10 +43,10 @@ func (h *HttpClient) SetLog(log Log) *HttpClient { sendBytes := r.RawRequest.ContentLength recvBytes := resp.Size() statusCode := resp.StatusCode() - if !h.hideRespBodyLogsWithPath[path] && recvBytes < maxShowBodySize { + if !h.hideRespBodyLogsWithPath[path] && recvBytes < h.maxShowBodySize { respBody = string(resp.Body()) } - if !h.hideReqBodyLogsWithPath[path] && sendBytes < maxShowBodySize { + if !h.hideReqBodyLogsWithPath[path] && sendBytes < h.maxShowBodySize { reqBody = r.Body }