From a763dcdedfb098f3f28938fe40b1b6906bdfaf4a Mon Sep 17 00:00:00 2001 From: Lukas Vogel Date: Mon, 28 Jan 2019 09:52:39 +0100 Subject: [PATCH 1/4] trust handler: Add metrics --- go/lib/infra/messenger/metrics.go | 2 +- go/lib/infra/metrics.go | 77 ++++++++++++++ go/lib/infra/modules/trust/handlers.go | 139 ++++++++++++++++++++----- go/lib/infra/modules/trust/metrics.go | 72 +++++++++++++ go/lib/infra/modules/trust/trust.go | 1 + go/lib/prom/prom.go | 14 +++ 6 files changed, 278 insertions(+), 27 deletions(-) create mode 100644 go/lib/infra/metrics.go create mode 100644 go/lib/infra/modules/trust/metrics.go diff --git a/go/lib/infra/messenger/metrics.go b/go/lib/infra/messenger/metrics.go index 8de4a283a0..898ee4d531 100644 --- a/go/lib/infra/messenger/metrics.go +++ b/go/lib/infra/messenger/metrics.go @@ -64,7 +64,7 @@ func init() { "The results of messenger calls", []string{prom.LabelResult, prom.LabelOperation}) latency = prom.NewHistogramVec(promNamespace, "", "calls_latency", "Histogram of call latency in seconds.", []string{prom.LabelResult, prom.LabelOperation}, - []float64{0.01, 0.02, 0.04, 0.08, 0.16, 0.32, 0.64, 1.28, 2.56, 5.12, 10.24}) + prom.DefaultLatencyBuckets) } func metricStartOp(op promOp) opMetrics { diff --git a/go/lib/infra/metrics.go b/go/lib/infra/metrics.go new file mode 100644 index 0000000000..5fae5daf3c --- /dev/null +++ b/go/lib/infra/metrics.go @@ -0,0 +1,77 @@ +// Copyright 2019 Anapaya Systems +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package infra + +import ( + "github.com/prometheus/client_golang/prometheus" + + "github.com/scionproto/scion/go/lib/common" + "github.com/scionproto/scion/go/lib/prom" +) + +const ( + PromSrcASLocal = "as_local" + PromSrcISDLocal = "isd_local" + PromSrcISDRemote = "isd_remote" + PromSrcUnknown = "unknown" +) + +// HandlerMetrics contains the standard metrics for a handler. +type HandlerMetrics struct { + RequestsTotal *prometheus.CounterVec + RequestLatency *prometheus.HistogramVec + ResultsTotal *prometheus.CounterVec +} + +// HandlerResult contains a result label and a status label. +type HandlerResult struct { + Result string + Status string +} + +var ( + MetricsErrInternal = &HandlerResult{Result: "err_internal", Status: prom.StatusErr} + MetricsErrInvalid = &HandlerResult{Result: "err_invalid_req", Status: prom.StatusErr} + + metricsErrMsger = &HandlerResult{Result: "err_msger", Status: prom.StatusErr} + metricsErrMsgerTimeout = &HandlerResult{Result: "err_msger_to", Status: prom.StatusTimeout} + + metricsErrTrustDB = &HandlerResult{Result: "err_trustdb", Status: prom.StatusErr} + metricsErrTrustDBTimeout = &HandlerResult{Result: "err_trustdb_to", Status: prom.StatusTimeout} + + metricsErrTS = &HandlerResult{Result: "err_truststore", Status: prom.StatusErr} + metricsErrTSTimeout = &HandlerResult{Result: "err_truststore_to", Status: prom.StatusTimeout} + + MetricsResultOk = &HandlerResult{Result: prom.ResultOk, Status: prom.StatusOk} +) + +func MetricsErrTrustDB(err error) *HandlerResult { + return metricsErrWithTimeout(err, metricsErrTrustDBTimeout, metricsErrTrustDB) +} + +func MetricsErrMsger(err error) *HandlerResult { + return metricsErrWithTimeout(err, metricsErrMsgerTimeout, metricsErrMsger) +} + +func MetricsErrTrustStore(err error) *HandlerResult { + return metricsErrWithTimeout(err, metricsErrTSTimeout, metricsErrTS) +} + +func metricsErrWithTimeout(err error, timeoutResult, result *HandlerResult) *HandlerResult { + if common.IsTimeoutErr(err) { + return timeoutResult + } + return result +} diff --git a/go/lib/infra/modules/trust/handlers.go b/go/lib/infra/modules/trust/handlers.go index 9ff9d7bf26..cf75201d22 100644 --- a/go/lib/infra/modules/trust/handlers.go +++ b/go/lib/infra/modules/trust/handlers.go @@ -16,17 +16,40 @@ package trust import ( "context" + "time" + "github.com/prometheus/client_golang/prometheus" + + "github.com/scionproto/scion/go/lib/addr" "github.com/scionproto/scion/go/lib/common" "github.com/scionproto/scion/go/lib/ctrl/cert_mgmt" "github.com/scionproto/scion/go/lib/infra" "github.com/scionproto/scion/go/lib/infra/messenger" "github.com/scionproto/scion/go/lib/log" + "github.com/scionproto/scion/go/lib/prom" "github.com/scionproto/scion/go/lib/scrypto/cert" "github.com/scionproto/scion/go/lib/scrypto/trc" + "github.com/scionproto/scion/go/lib/snet" "github.com/scionproto/scion/go/proto" ) +func promSrcValue(r *infra.Request, localIA addr.IA) string { + if r == nil { + return infra.PromSrcUnknown + } + sAddr, ok := r.Peer.(*snet.Addr) + if !ok { + return infra.PromSrcUnknown + } + if localIA.Equal(sAddr.IA) { + return infra.PromSrcASLocal + } + if localIA.I == sAddr.IA.I { + return infra.PromSrcISDLocal + } + return infra.PromSrcISDRemote +} + // trcReqHandler contains the state of a handler for a specific TRC Request // message, received via the Messenger's ListenAndServe method. type trcReqHandler struct { @@ -38,19 +61,33 @@ type trcReqHandler struct { } func (h *trcReqHandler) Handle() { + trcReqMetrics.RequestsTotal.With(prometheus.Labels{ + prom.LabelSrc: promSrcValue(h.request, h.store.ia), + }).Inc() + start := time.Now() + result := h.handleInt() + trcReqMetrics.ResultsTotal.With(prometheus.Labels{ + prom.LabelResult: result.Result, + }) + trcReqMetrics.RequestLatency.With(prometheus.Labels{ + prom.LabelStatus: result.Status, + }).Observe(time.Since(start).Seconds()) +} + +func (h *trcReqHandler) handleInt() *infra.HandlerResult { logger := log.FromCtx(h.request.Context()) trcReq, ok := h.request.Message.(*cert_mgmt.TRCReq) if !ok { logger.Error("[TrustStore:trcReqHandler] wrong message type, expected cert_mgmt.TRCReq", "msg", h.request.Message, "type", common.TypeOf(h.request.Message)) - return + return infra.MetricsErrInternal } logger.Debug("[TrustStore:trcReqHandler] Received request", "trcReq", trcReq, "peer", h.request.Peer) messenger, ok := infra.MessengerFromContext(h.request.Context()) if !ok { logger.Warn("[TrustStore:trcReqHandler] Unable to service request, no Messenger found") - return + return infra.MetricsErrInternal } subCtx, cancelF := context.WithTimeout(h.request.Context(), HandlerTimeout) defer cancelF() @@ -64,13 +101,17 @@ func (h *trcReqHandler) Handle() { // call getTRC instead of getValidTRC. if trcReq.CacheOnly { trcObj, err = h.store.trustdb.GetTRCVersion(h.request.Context(), trcReq.ISD, trcReq.Version) + if err != nil { + logger.Error("[TrustStore:trcReqHandler] Unable to retrieve TRC", "err", err) + return infra.MetricsErrTrustDB(err) + } } else { trcObj, err = h.store.getTRC(h.request.Context(), trcReq.ISD, trcReq.Version, h.recurse, h.request.Peer, nil) - } - if err != nil { - logger.Error("[TrustStore:trcReqHandler] Unable to retrieve TRC", "err", err) - return + if err != nil { + logger.Error("[TrustStore:trcReqHandler] Unable to retrieve TRC", "err", err) + return infra.MetricsErrTrustStore(err) + } } var rawTRC common.RawBytes if trcObj != nil { @@ -78,7 +119,7 @@ func (h *trcReqHandler) Handle() { rawTRC, err = trcObj.Compress() if err != nil { logger.Warn("[TrustStore:trcReqHandler] Unable to compress TRC", "err", err) - return + return infra.MetricsErrInternal } } trcMessage := &cert_mgmt.TRC{ @@ -86,10 +127,11 @@ func (h *trcReqHandler) Handle() { } if err := messenger.SendTRC(subCtx, trcMessage, h.request.Peer, h.request.ID); err != nil { logger.Error("[TrustStore:trcReqHandler] Messenger error", "err", err) - return + return infra.MetricsErrMsger(err) } logger.Debug("[TrustStore:trcReqHandler] Replied with TRC", "trc", trcObj, "peer", h.request.Peer) + return infra.MetricsResultOk } // chainReqHandler contains the state of a handler for a specific Certificate @@ -103,19 +145,33 @@ type chainReqHandler struct { } func (h *chainReqHandler) Handle() { + chainReqMetrics.RequestsTotal.With(prometheus.Labels{ + prom.LabelSrc: promSrcValue(h.request, h.store.ia), + }).Inc() + start := time.Now() + result := h.handleInt() + chainReqMetrics.ResultsTotal.With(prometheus.Labels{ + prom.LabelResult: result.Result, + }) + chainReqMetrics.RequestLatency.With(prometheus.Labels{ + prom.LabelStatus: result.Status, + }).Observe(time.Since(start).Seconds()) +} + +func (h *chainReqHandler) handleInt() *infra.HandlerResult { logger := log.FromCtx(h.request.Context()) chainReq, ok := h.request.Message.(*cert_mgmt.ChainReq) if !ok { logger.Error("[TrustStore:chainReqHandler] wrong message type, expected cert_mgmt.ChainReq", "msg", h.request.Message, "type", common.TypeOf(h.request.Message)) - return + return infra.MetricsErrInternal } logger.Debug("[TrustStore:chainReqHandler] Received request", "chainReq", chainReq, "peer", h.request.Peer) messenger, ok := infra.MessengerFromContext(h.request.Context()) if !ok { logger.Warn("[TrustStore:chainReqHandler] Unable to service request, no Messenger found") - return + return infra.MetricsErrInternal } subCtx, cancelF := context.WithTimeout(h.request.Context(), HandlerTimeout) defer cancelF() @@ -130,20 +186,24 @@ func (h *chainReqHandler) Handle() { if chainReq.CacheOnly { chain, err = h.store.trustdb.GetChainVersion(h.request.Context(), chainReq.IA(), chainReq.Version) + if err != nil { + logger.Error("[TrustStore:chainReqHandler] Unable to retrieve Chain", "err", err) + return infra.MetricsErrTrustDB(err) + } } else { chain, err = h.store.getChain(h.request.Context(), chainReq.IA(), chainReq.Version, h.recurse, h.request.Peer) - } - if err != nil { - logger.Error("[TrustStore:chainReqHandler] Unable to retrieve Chain", "err", err) - return + if err != nil { + logger.Error("[TrustStore:chainReqHandler] Unable to retrieve Chain", "err", err) + return infra.MetricsErrTrustStore(err) + } } var rawChain common.RawBytes if chain != nil { rawChain, err = chain.Compress() if err != nil { logger.Error("[TrustStore:chainReqHandler] Unable to compress Chain", "err", err) - return + return infra.MetricsErrInternal } } chainMessage := &cert_mgmt.Chain{ @@ -152,10 +212,11 @@ func (h *chainReqHandler) Handle() { err = messenger.SendCertChain(subCtx, chainMessage, h.request.Peer, h.request.ID) if err != nil { logger.Error("[TrustStore:chainReqHandler] Messenger API error", "err", err) - return + return infra.MetricsErrMsger(err) } logger.Debug("[TrustStore:chainReqHandler] Replied with chain", "chain", chain, "peer", h.request.Peer) + return infra.MetricsResultOk } type trcPushHandler struct { @@ -164,6 +225,18 @@ type trcPushHandler struct { } func (h *trcPushHandler) Handle() { + trcPushMetrics.RequestsTotal.With(prometheus.Labels{}).Inc() + start := time.Now() + result := h.handleInt() + trcPushMetrics.ResultsTotal.With(prometheus.Labels{ + prom.LabelResult: result.Result, + }) + trcPushMetrics.RequestLatency.With(prometheus.Labels{ + prom.LabelStatus: result.Status, + }).Observe(time.Since(start).Seconds()) +} + +func (h *trcPushHandler) handleInt() *infra.HandlerResult { logger := log.FromCtx(h.request.Context()) // FIXME(scrye): In case a TRC update will invalidate the local certificate // chain after the gracePeriod, CSes must use this gracePeriod to fetch a @@ -173,14 +246,14 @@ func (h *trcPushHandler) Handle() { if !ok { logger.Error("[TrustStore:trcPushHandler] Wrong message type, expected cert_mgmt.TRC", "msg", h.request.Message, "type", common.TypeOf(h.request.Message)) - return + return infra.MetricsErrInternal } logger.Debug("[TrustStore:trcPushHandler] Received push", "trcPush", trcPush, "peer", h.request.Peer) msger, ok := infra.MessengerFromContext(h.request.Context()) if !ok { logger.Warn("[TrustStore:trcPushHandler] Unable to service request, no Messenger found") - return + return infra.MetricsErrInternal } subCtx, cancelF := context.WithTimeout(h.request.Context(), HandlerTimeout) defer cancelF() @@ -191,23 +264,24 @@ func (h *trcPushHandler) Handle() { if err != nil { logger.Error("[TrustStore:trcPushHandler] Unable to extract TRC from TRC push", "err", err) sendAck(proto.Ack_ErrCode_reject, messenger.AckRejectFailedToParse) - return + return infra.MetricsErrInvalid } if trcObj == nil { logger.Warn("[TrustStore:trcPushHandler] Empty chain received") sendAck(proto.Ack_ErrCode_reject, messenger.AckRejectFailedToParse) - return + return infra.MetricsErrInvalid } n, err := h.store.trustdb.InsertTRC(subCtx, trcObj) if err != nil { logger.Error("[TrustStore:trcPushHandler] Unable to insert TRC into DB", "err", err) sendAck(proto.Ack_ErrCode_retry, messenger.AckRetryDBError) - return + return infra.MetricsErrTrustDB(err) } if n != 0 { logger.Debug("[TrustStore:trcPushHandler] Inserted TRC into DB", "trc", trcObj) } sendAck(proto.Ack_ErrCode_ok, "") + return infra.MetricsResultOk } type chainPushHandler struct { @@ -216,19 +290,31 @@ type chainPushHandler struct { } func (h *chainPushHandler) Handle() { + chainPushMetrics.RequestsTotal.With(prometheus.Labels{}).Inc() + start := time.Now() + result := h.handleInt() + chainPushMetrics.ResultsTotal.With(prometheus.Labels{ + prom.LabelResult: result.Result, + }) + chainPushMetrics.RequestLatency.With(prometheus.Labels{ + prom.LabelStatus: result.Status, + }).Observe(time.Since(start).Seconds()) +} + +func (h *chainPushHandler) handleInt() *infra.HandlerResult { logger := log.FromCtx(h.request.Context()) chainPush, ok := h.request.Message.(*cert_mgmt.Chain) if !ok { logger.Error("[TrustStore:chainPushHandler] Wrong message type, expected cert_mgmt.Chain", "msg", h.request.Message, "type", common.TypeOf(h.request.Message)) - return + return infra.MetricsErrInternal } logger.Debug("[TrustStore:chainPushHandler] Received push", "chainPush", chainPush, "peer", h.request.Peer) msger, ok := infra.MessengerFromContext(h.request.Context()) if !ok { logger.Warn("[TrustStore:chainPushHandler] Unable to service request, no Messenger found") - return + return infra.MetricsErrInternal } subCtx, cancelF := context.WithTimeout(h.request.Context(), HandlerTimeout) defer cancelF() @@ -239,21 +325,22 @@ func (h *chainPushHandler) Handle() { logger.Error("[TrustStore:chainPushHandler] Unable to extract chain from chain push", "err", err) sendAck(proto.Ack_ErrCode_reject, messenger.AckRejectFailedToParse) - return + return infra.MetricsErrInvalid } if chain == nil { logger.Warn("[TrustStore:chainPushHandler] Empty chain received") sendAck(proto.Ack_ErrCode_reject, messenger.AckRejectFailedToParse) - return + return infra.MetricsErrInvalid } n, err := h.store.trustdb.InsertChain(subCtx, chain) if err != nil { logger.Error("[TrustStore:chainPushHandler] Unable to insert chain into DB", "err", err) sendAck(proto.Ack_ErrCode_retry, messenger.AckRetryDBError) - return + return infra.MetricsErrTrustDB(err) } if n != 0 { logger.Debug("[TrustStore:chainPushHandler] Inserted chain into DB", "chain", chain) } sendAck(proto.Ack_ErrCode_ok, "") + return infra.MetricsResultOk } diff --git a/go/lib/infra/modules/trust/metrics.go b/go/lib/infra/modules/trust/metrics.go new file mode 100644 index 0000000000..b9026e47a1 --- /dev/null +++ b/go/lib/infra/modules/trust/metrics.go @@ -0,0 +1,72 @@ +// Copyright 2019 Anapaya Systems +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package trust + +import ( + "sync" + + "github.com/scionproto/scion/go/lib/infra" + "github.com/scionproto/scion/go/lib/prom" +) + +const ( + promNamespace = "trust" +) + +var ( + chainPushMetrics *infra.HandlerMetrics + chainReqMetrics *infra.HandlerMetrics + trcPushMetrics *infra.HandlerMetrics + trcReqMetrics *infra.HandlerMetrics + + initOnce sync.Once +) + +func initMetrics() { + initOnce.Do(func() { + chainPushMetrics = &infra.HandlerMetrics{ + RequestsTotal: prom.NewCounterVec(promNamespace, "", "chain_push_total", + "Chain pushes received total.", []string{}), + RequestLatency: prom.NewHistogramVec(promNamespace, "", "chain_push_latency", + "Chain push latency.", []string{prom.LabelStatus}, prom.DefaultLatencyBuckets), + ResultsTotal: prom.NewCounterVec(promNamespace, "", "chain_push_results_total", + "Chain push results total.", []string{prom.LabelResult}), + } + chainReqMetrics = &infra.HandlerMetrics{ + RequestsTotal: prom.NewCounterVec(promNamespace, "", "chain_req_total", + "Chain requests received total.", []string{prom.LabelSrc}), + RequestLatency: prom.NewHistogramVec(promNamespace, "", "chain_req_latency", + "Chain requests latency.", []string{prom.LabelStatus}, prom.DefaultLatencyBuckets), + ResultsTotal: prom.NewCounterVec(promNamespace, "", "chain_req_results_total", + "Chain requests results total.", []string{prom.LabelResult}), + } + trcPushMetrics = &infra.HandlerMetrics{ + RequestsTotal: prom.NewCounterVec(promNamespace, "", "trc_push_total", + "TRC pushes received total.", []string{}), + RequestLatency: prom.NewHistogramVec(promNamespace, "", "trc_push_latency", + "TRC push latency.", []string{prom.LabelStatus}, prom.DefaultLatencyBuckets), + ResultsTotal: prom.NewCounterVec(promNamespace, "", "trc_push_results_total", + "TRC push results total.", []string{prom.LabelResult}), + } + trcReqMetrics = &infra.HandlerMetrics{ + RequestsTotal: prom.NewCounterVec(promNamespace, "", "trc_req_total", + "TRC requests received total.", []string{prom.LabelSrc}), + RequestLatency: prom.NewHistogramVec(promNamespace, "", "trc_req_latency", + "TRC requests latency.", []string{prom.LabelStatus}, prom.DefaultLatencyBuckets), + ResultsTotal: prom.NewCounterVec(promNamespace, "", "trc_req_results_total", + "TRC requests results total.", []string{prom.LabelResult}), + } + }) +} diff --git a/go/lib/infra/modules/trust/trust.go b/go/lib/infra/modules/trust/trust.go index df09e818db..008f27db64 100644 --- a/go/lib/infra/modules/trust/trust.go +++ b/go/lib/infra/modules/trust/trust.go @@ -87,6 +87,7 @@ type Store struct { func NewStore(db trustdb.TrustDB, local addr.IA, options *Config, logger log.Logger) (*Store, error) { + initMetrics() if options == nil { options = &Config{} } diff --git a/go/lib/prom/prom.go b/go/lib/prom/prom.go index 11c458d2a0..fca0cd6253 100644 --- a/go/lib/prom/prom.go +++ b/go/lib/prom/prom.go @@ -25,10 +25,14 @@ import ( const ( // LabelResult is the label for result classifications. LabelResult = "result" + // LabelStatus for latency status classifications, possible values are prefixed with Status*. + LabelStatus = "status" // LabelElem is the label for the element id that is added to all metrics. LabelElem = "elem" // LabelOperation is the label for the name of an executed operation. LabelOperation = "op" + // LabelSrc is the label for the src of a request. + LabelSrc = "src" // ResultOk is no error. ResultOk = "ok" @@ -36,6 +40,16 @@ const ( ErrNotClassified = "err_not_classified" // ErrTimeout is a timeout error. ErrTimeout = "err_timeout" + + StatusOk = "ok" + StatusErr = "err" + StatusTimeout = "err_timeout" +) + +var ( + // DefaultLatencyBuckets 10ms, 20ms, 40ms, ... 5.12s, 10.24s. + DefaultLatencyBuckets = []float64{0.01, 0.02, 0.04, 0.08, 0.16, 0.32, 0.64, + 1.28, 2.56, 5.12, 10.24} ) func CopyLabels(labels prometheus.Labels) prometheus.Labels { From b964a87fe288294d8a371d65be7a85540ed085b7 Mon Sep 17 00:00:00 2001 From: Lukas Vogel Date: Mon, 18 Feb 2019 12:38:17 +0100 Subject: [PATCH 2/4] Metrics for all handlers --- go/cert_srv/internal/reiss/handler.go | 4 +- go/lib/infra/common.go | 61 +++++++++++- go/lib/infra/messenger/messenger.go | 53 ++++++---- go/lib/infra/messenger/messenger_test.go | 8 +- go/lib/infra/messenger/metrics.go | 99 +++++++++++-------- go/lib/infra/metrics.go | 7 ++ go/lib/infra/modules/trust/handlers.go | 64 +----------- go/lib/infra/modules/trust/trust.go | 16 +-- go/path_srv/internal/handlers/ifstateinfo.go | 10 +- go/path_srv/internal/handlers/segreg.go | 13 +-- go/path_srv/internal/handlers/segreqcore.go | 14 +-- .../internal/handlers/segreqnoncore.go | 19 ++-- go/path_srv/internal/handlers/segrevoc.go | 17 ++-- go/path_srv/internal/handlers/segsync.go | 13 +-- 14 files changed, 223 insertions(+), 175 deletions(-) diff --git a/go/cert_srv/internal/reiss/handler.go b/go/cert_srv/internal/reiss/handler.go index b7268d4137..c6dfbadc42 100644 --- a/go/cert_srv/internal/reiss/handler.go +++ b/go/cert_srv/internal/reiss/handler.go @@ -52,13 +52,15 @@ type Handler struct { IA addr.IA } -func (h *Handler) Handle(r *infra.Request) { +func (h *Handler) Handle(r *infra.Request) *infra.HandlerResult { addr := r.Peer.(*snet.Addr) req := r.Message.(*cert_mgmt.ChainIssReq) if err := h.handle(r, addr, req); err != nil { log.Error("[ReissHandler] Dropping certificate reissue request", "addr", addr, "req", req, "err", err) } + // TODO(lukedirtwalker): reflect error in metrics. + return infra.MetricsResultOk } // handle handles certificate chain reissue requests. If the requested diff --git a/go/lib/infra/common.go b/go/lib/infra/common.go index 6f174c4cc9..a632a1f0e0 100644 --- a/go/lib/infra/common.go +++ b/go/lib/infra/common.go @@ -52,18 +52,18 @@ type Transport interface { Close(context.Context) error } -// Interface Handler is implemented by objects that can handle a request coming +// Handler is implemented by objects that can handle a request coming // from a remote SCION network node. type Handler interface { - Handle(*Request) + Handle(*Request) *HandlerResult } // Constructs a handler for request r. Handle() can be called on the // resulting object to process the message. -type HandlerFunc func(r *Request) +type HandlerFunc func(r *Request) *HandlerResult -func (f HandlerFunc) Handle(r *Request) { - f(r) +func (f HandlerFunc) Handle(r *Request) *HandlerResult { + return f(r) } // Request describes an object received from the network that is not part of an @@ -195,6 +195,57 @@ func (mt MessageType) String() string { } } +// MetricLabel returns the label for metrics for a given message type. +// The postfix for requests is always "req" and for replies and push messages it is always "push". +func (mt MessageType) MetricLabel() string { + switch mt { + case None: + return "none" + case ChainRequest: + return "chain_req" + case Chain: + return "chain_push" + case TRCRequest: + return "trc_req" + case TRC: + return "trc_push" + case IfId: + return "ifid_push" + case IfStateInfos: + return "if_info_push" + case IfStateReq: + return "if_info_req" + case Seg: + return "pathseg_push" + case SegChangesReq: + return "seg_changes_req" + case SegChangesReply: + return "seg_changes_push" + case SegChangesIdReq: + return "seg_changes_id_req" + case SegChangesIdReply: + return "seg_changes_id_push" + case SegReg: + return "seg_reg_push" + case SegRequest: + return "seg_req" + case SegReply: + return "seg_push" + case SegRev: + return "seg_rev_push" + case SegSync: + return "seg_sync_push" + case ChainIssueRequest: + return "chain_issue_req" + case ChainIssueReply: + return "chain_issue_push" + case Ack: + return "ack_push" + default: + return "unknown_mt" + } +} + type Messenger interface { SendAck(ctx context.Context, msg *ack.Ack, a net.Addr, id uint64) error GetTRC(ctx context.Context, msg *cert_mgmt.TRCReq, a net.Addr, diff --git a/go/lib/infra/messenger/messenger.go b/go/lib/infra/messenger/messenger.go index c67dc6d0f1..321ca723b5 100644 --- a/go/lib/infra/messenger/messenger.go +++ b/go/lib/infra/messenger/messenger.go @@ -75,6 +75,8 @@ import ( "sync" "time" + "github.com/prometheus/client_golang/prometheus" + "github.com/scionproto/scion/go/lib/addr" "github.com/scionproto/scion/go/lib/common" "github.com/scionproto/scion/go/lib/ctrl" @@ -87,6 +89,7 @@ import ( "github.com/scionproto/scion/go/lib/infra" "github.com/scionproto/scion/go/lib/infra/disp" "github.com/scionproto/scion/go/lib/log" + "github.com/scionproto/scion/go/lib/prom" "github.com/scionproto/scion/go/lib/sciond" "github.com/scionproto/scion/go/lib/snet" "github.com/scionproto/scion/go/lib/spath" @@ -155,6 +158,7 @@ type Messenger struct { func New(ia addr.IA, dispatcher *disp.Dispatcher, store infra.TrustStore, logger log.Logger, config *Config) *Messenger { + initMetrics() if config == nil { config = &Config{} } @@ -181,7 +185,7 @@ func New(ia addr.IA, dispatcher *disp.Dispatcher, store infra.TrustStore, logger } func (m *Messenger) SendAck(ctx context.Context, msg *ack.Ack, a net.Addr, id uint64) error { - opMetric := metricStartOp(promOpSendAck) + opMetric := metricStartOp(infra.Ack) err := m.sendAck(ctx, msg, a, id) opMetric.publishResult(err) return err @@ -202,7 +206,7 @@ func (m *Messenger) sendAck(ctx context.Context, msg *ack.Ack, a net.Addr, id ui func (m *Messenger) GetTRC(ctx context.Context, msg *cert_mgmt.TRCReq, a net.Addr, id uint64) (*cert_mgmt.TRC, error) { - opMetric := metricStartOp(promOpGetTRC) + opMetric := metricStartOp(infra.TRCRequest) trc, err := m.getTRC(ctx, msg, a, id) opMetric.publishResult(err) return trc, err @@ -237,7 +241,7 @@ func (m *Messenger) getTRC(ctx context.Context, msg *cert_mgmt.TRCReq, // SendTRC sends a reliable cert_mgmt.TRC to address a. func (m *Messenger) SendTRC(ctx context.Context, msg *cert_mgmt.TRC, a net.Addr, id uint64) error { - opMetrics := metricStartOp(promOpSendTRC) + opMetrics := metricStartOp(infra.TRC) err := m.sendTRC(ctx, msg, a, id) opMetrics.publishResult(err) return err @@ -258,7 +262,7 @@ func (m *Messenger) sendTRC(ctx context.Context, msg *cert_mgmt.TRC, a net.Addr, func (m *Messenger) GetCertChain(ctx context.Context, msg *cert_mgmt.ChainReq, a net.Addr, id uint64) (*cert_mgmt.Chain, error) { - opMetrics := metricStartOp(promOpGetCrtChain) + opMetrics := metricStartOp(infra.ChainRequest) chain, err := m.getCertChain(ctx, msg, a, id) opMetrics.publishResult(err) return chain, err @@ -295,7 +299,7 @@ func (m *Messenger) getCertChain(ctx context.Context, msg *cert_mgmt.ChainReq, func (m *Messenger) SendCertChain(ctx context.Context, msg *cert_mgmt.Chain, a net.Addr, id uint64) error { - opMetrics := metricStartOp(promOpSendCrtChain) + opMetrics := metricStartOp(infra.Chain) err := m.sendCertChain(ctx, msg, a, id) opMetrics.publishResult(err) return err @@ -315,7 +319,7 @@ func (m *Messenger) sendCertChain(ctx context.Context, msg *cert_mgmt.Chain, a n // SendIfId sends a reliable ifid.IFID to address a. func (m *Messenger) SendIfId(ctx context.Context, msg *ifid.IFID, a net.Addr, id uint64) error { - opMetrics := metricStartOp(promOpSendIfId) + opMetrics := metricStartOp(infra.IfId) err := m.sendIfId(ctx, msg, a, id) opMetrics.publishResult(err) return err @@ -329,7 +333,7 @@ func (m *Messenger) sendIfId(ctx context.Context, msg *ifid.IFID, a net.Addr, id func (m *Messenger) SendIfStateInfos(ctx context.Context, msg *path_mgmt.IFStateInfos, a net.Addr, id uint64) error { - opMetrics := metricStartOp(promOpSendIfStateInfo) + opMetrics := metricStartOp(infra.IfStateInfos) err := m.sendIfStateInfos(ctx, msg, a, id) opMetrics.publishResult(err) return err @@ -351,7 +355,7 @@ func (m *Messenger) sendIfStateInfos(ctx context.Context, msg *path_mgmt.IFState func (m *Messenger) SendSeg(ctx context.Context, msg *seg.PathSegment, a net.Addr, id uint64) error { - opMetrics := metricStartOp(promOpSendSeg) + opMetrics := metricStartOp(infra.Seg) err := m.sendSeg(ctx, msg, a, id) opMetrics.publishResult(err) return err @@ -368,7 +372,7 @@ func (m *Messenger) sendSeg(ctx context.Context, msg *seg.PathSegment, func (m *Messenger) GetSegs(ctx context.Context, msg *path_mgmt.SegReq, a net.Addr, id uint64) (*path_mgmt.SegReply, error) { - opMetrics := metricStartOp(promOpGetSegs) + opMetrics := metricStartOp(infra.SegRequest) reply, err := m.getSegs(ctx, msg, a, id) opMetrics.publishResult(err) return reply, err @@ -409,7 +413,7 @@ func (m *Messenger) getSegs(ctx context.Context, msg *path_mgmt.SegReq, func (m *Messenger) SendSegReply(ctx context.Context, msg *path_mgmt.SegReply, a net.Addr, id uint64) error { - opMetrics := metricStartOp(promOpSendSegReply) + opMetrics := metricStartOp(infra.SegReply) err := m.sendSegReply(ctx, msg, a, id) opMetrics.publishResult(err) return err @@ -431,7 +435,7 @@ func (m *Messenger) sendSegReply(ctx context.Context, msg *path_mgmt.SegReply, func (m *Messenger) SendSegSync(ctx context.Context, msg *path_mgmt.SegSync, a net.Addr, id uint64) error { - opMetrics := metricStartOp(promOpSendSegSync) + opMetrics := metricStartOp(infra.SegSync) err := m.sendSegSync(ctx, msg, a, id) opMetrics.publishResult(err) return err @@ -452,7 +456,7 @@ func (m *Messenger) sendSegSync(ctx context.Context, msg *path_mgmt.SegSync, func (m *Messenger) GetSegChangesIds(ctx context.Context, msg *path_mgmt.SegChangesIdReq, a net.Addr, id uint64) (*path_mgmt.SegChangesIdReply, error) { - opMetrics := metricStartOp(promOpGetSegChangesId) + opMetrics := metricStartOp(infra.SegChangesIdReq) reply, err := m.getSegChangesIds(ctx, msg, a, id) opMetrics.publishResult(err) return reply, err @@ -489,7 +493,7 @@ func (m *Messenger) getSegChangesIds(ctx context.Context, msg *path_mgmt.SegChan func (m *Messenger) SendSegChangesIdReply(ctx context.Context, msg *path_mgmt.SegChangesIdReply, a net.Addr, id uint64) error { - opMetrics := metricStartOp(promOpSendSegChangesIdReply) + opMetrics := metricStartOp(infra.SegChangesIdReply) err := m.sendSegChangesIdReply(ctx, msg, a, id) opMetrics.publishResult(err) return err @@ -511,7 +515,7 @@ func (m *Messenger) sendSegChangesIdReply(ctx context.Context, msg *path_mgmt.Se func (m *Messenger) GetSegChanges(ctx context.Context, msg *path_mgmt.SegChangesReq, a net.Addr, id uint64) (*path_mgmt.SegChangesReply, error) { - opMetrics := metricStartOp(promOpGetSegChanges) + opMetrics := metricStartOp(infra.SegChangesReq) reply, err := m.getSegChanges(ctx, msg, a, id) opMetrics.publishResult(err) return reply, err @@ -551,7 +555,7 @@ func (m *Messenger) getSegChanges(ctx context.Context, msg *path_mgmt.SegChanges func (m *Messenger) SendSegChangesReply(ctx context.Context, msg *path_mgmt.SegChangesReply, a net.Addr, id uint64) error { - opMetrics := metricStartOp(promOpSendSegChanges) + opMetrics := metricStartOp(infra.SegChangesReply) err := m.sendSegChangesReply(ctx, msg, a, id) opMetrics.publishResult(err) return err @@ -573,7 +577,7 @@ func (m *Messenger) sendSegChangesReply(ctx context.Context, msg *path_mgmt.SegC func (m *Messenger) RequestChainIssue(ctx context.Context, msg *cert_mgmt.ChainIssReq, a net.Addr, id uint64) (*cert_mgmt.ChainIssRep, error) { - opMetrics := metricStartOp(promOpRequestChainIssue) + opMetrics := metricStartOp(infra.ChainIssueRequest) reply, err := m.requestChainIssue(ctx, msg, a, id) opMetrics.publishResult(err) return reply, err @@ -610,7 +614,7 @@ func (m *Messenger) requestChainIssue(ctx context.Context, msg *cert_mgmt.ChainI func (m *Messenger) SendChainIssueReply(ctx context.Context, msg *cert_mgmt.ChainIssRep, a net.Addr, id uint64) error { - opMetrics := metricStartOp(promOpSendChainIssue) + opMetrics := metricStartOp(infra.ChainIssueReply) err := m.sendChainIssueReply(ctx, msg, a, id) opMetrics.publishResult(err) return err @@ -740,8 +744,21 @@ func (m *Messenger) serve(ctx context.Context, cancelF context.CancelFunc, pld * go func() { defer log.LogPanicAndExit() defer cancelF() - handler.Handle( + inCallsTotal.With(prometheus.Labels{ + prom.LabelOperation: msgType.MetricLabel(), + prom.LabelSrc: metricSrcValue(address, m.ia), + }).Inc() + start := time.Now() + result := handler.Handle( infra.NewRequest(log.CtxWith(ctx, logger), msg, signedPld, address, pld.ReqId)) + inResultsTotal.With(prometheus.Labels{ + prom.LabelOperation: msgType.MetricLabel(), + prom.LabelResult: result.Result, + }).Inc() + inCallsLatency.With(prometheus.Labels{ + prom.LabelOperation: msgType.MetricLabel(), + prom.LabelStatus: result.Status, + }).Observe(time.Since(start).Seconds()) }() } diff --git a/go/lib/infra/messenger/messenger_test.go b/go/lib/infra/messenger/messenger_test.go index 5fea3ddb31..e3998ac288 100644 --- a/go/lib/infra/messenger/messenger_test.go +++ b/go/lib/infra/messenger/messenger_test.go @@ -39,22 +39,24 @@ var ( mockTRC = &cert_mgmt.TRC{RawTRC: common.RawBytes("foobar")} ) -func MockTRCHandler(request *infra.Request) { +func MockTRCHandler(request *infra.Request) *infra.HandlerResult { messengerI, ok := infra.MessengerFromContext(request.Context()) if !ok { log.Warn("Unable to service request, no Messenger interface found") - return + return infra.MetricsErrInternal } messenger, ok := messengerI.(*Messenger) if !ok { log.Warn("Unable to service request, bad Messenger value found") - return + return infra.MetricsErrInternal } subCtx, cancelF := context.WithTimeout(request.Context(), 3*time.Second) defer cancelF() if err := messenger.SendTRC(subCtx, mockTRC, nil, request.ID); err != nil { log.Error("Server error", "err", err) + return infra.MetricsErrInternal } + return infra.MetricsResultOk } func TestTRCExchange(t *testing.T) { diff --git a/go/lib/infra/messenger/metrics.go b/go/lib/infra/messenger/metrics.go index 898ee4d531..00d1257cf5 100644 --- a/go/lib/infra/messenger/metrics.go +++ b/go/lib/infra/messenger/metrics.go @@ -15,81 +15,96 @@ package messenger import ( + "net" + "sync" "time" "github.com/prometheus/client_golang/prometheus" + "github.com/scionproto/scion/go/lib/addr" "github.com/scionproto/scion/go/lib/common" + "github.com/scionproto/scion/go/lib/infra" "github.com/scionproto/scion/go/lib/prom" + "github.com/scionproto/scion/go/lib/snet" ) const ( promNamespace = "messenger" ) -type promOp string +var ( + outCallsTotal *prometheus.CounterVec + outResultsTotal *prometheus.CounterVec + outCallsLatency *prometheus.HistogramVec -const ( - promOpSendAck promOp = "send_ack" - promOpGetTRC promOp = "get_trc" - promOpSendTRC promOp = "send_trc" - promOpGetCrtChain promOp = "get_chain" - promOpSendCrtChain promOp = "send_crt_chain" - promOpSendIfId promOp = "send_ifid" - promOpSendIfStateInfo promOp = "send_if_info" - promOpSendSeg promOp = "send_seg" - promOpGetSegs promOp = "get_segs" - promOpSendSegReply promOp = "send_seg_reply" - promOpSendSegSync promOp = "send_seg_sync" - promOpGetSegChangesId promOp = "get_seg_changes_id" - promOpSendSegChangesIdReply promOp = "send_seg_change_reply" - promOpGetSegChanges promOp = "get_seg_changes" - promOpSendSegChanges promOp = "send_seg_changes" - promOpRequestChainIssue promOp = "request_chain_issue" - promOpSendChainIssue promOp = "send_chain_issue_reply" -) + inCallsTotal *prometheus.CounterVec + inResultsTotal *prometheus.CounterVec + inCallsLatency *prometheus.HistogramVec -var ( - callsTotal *prometheus.CounterVec - resultsTotal *prometheus.CounterVec - latency *prometheus.HistogramVec + initOnce sync.Once ) -func init() { - // Cardinality: 17 (len(allOps)) - callsTotal = prom.NewCounterVec(promNamespace, "", "calls_total", - "Total calls on the messenger.", []string{prom.LabelOperation}) - // Cardinality: X (len(allResults) * 17 (len(allOps)) - resultsTotal = prom.NewCounterVec(promNamespace, "", "results_total", - "The results of messenger calls", []string{prom.LabelResult, prom.LabelOperation}) - latency = prom.NewHistogramVec(promNamespace, "", "calls_latency", - "Histogram of call latency in seconds.", []string{prom.LabelResult, prom.LabelOperation}, - prom.DefaultLatencyBuckets) +func initMetrics() { + initOnce.Do(func() { + // Cardinality: 17 (len(allOps)) + outCallsTotal = prom.NewCounterVec(promNamespace, "", "out_calls_total", + "Total out calls on the messenger.", []string{prom.LabelOperation}) + // Cardinality: X (len(allResults) * 17 (len(allOps)) + outResultsTotal = prom.NewCounterVec(promNamespace, "", "out_results_total", + "The out results of messenger calls", []string{prom.LabelResult, prom.LabelOperation}) + outCallsLatency = prom.NewHistogramVec(promNamespace, "", "out_calls_latency", + "Histogram of out call latency in seconds.", + []string{prom.LabelResult, prom.LabelOperation}, + prom.DefaultLatencyBuckets) + + inCallsTotal = prom.NewCounterVec(promNamespace, "", "in_calls_total", + "Total in calls on the messenger.", []string{prom.LabelOperation, prom.LabelSrc}) + inResultsTotal = prom.NewCounterVec(promNamespace, "", "in_results_total", + "The in results of messenger calls", []string{prom.LabelResult, prom.LabelOperation}) + inCallsLatency = prom.NewHistogramVec(promNamespace, "", "in_calls_latency", + "Histogram of out call latency in seconds.", + []string{prom.LabelStatus, prom.LabelOperation}, + prom.DefaultLatencyBuckets) + }) +} + +func metricSrcValue(peer net.Addr, localIA addr.IA) string { + sAddr, ok := peer.(*snet.Addr) + if !ok { + return infra.PromSrcUnknown + } + if localIA.Equal(sAddr.IA) { + return infra.PromSrcASLocal + } + if localIA.I == sAddr.IA.I { + return infra.PromSrcISDLocal + } + return infra.PromSrcISDRemote } -func metricStartOp(op promOp) opMetrics { - callsTotal.With(prometheus.Labels{ - prom.LabelOperation: string(op), +func metricStartOp(msgType infra.MessageType) opMetrics { + outCallsTotal.With(prometheus.Labels{ + prom.LabelOperation: msgType.MetricLabel(), }).Inc() return opMetrics{ - op: op, + mt: msgType, begin: time.Now(), } } type opMetrics struct { - op promOp + mt infra.MessageType begin time.Time } func (m *opMetrics) publishResult(err error) { resLabel := errorToResultLabel(err) resLabels := prometheus.Labels{ - prom.LabelOperation: string(m.op), + prom.LabelOperation: m.mt.MetricLabel(), prom.LabelResult: resLabel, } - latency.With(resLabels).Observe(time.Since(m.begin).Seconds()) - resultsTotal.With(resLabels).Inc() + outCallsLatency.With(resLabels).Observe(time.Since(m.begin).Seconds()) + outResultsTotal.With(resLabels).Inc() } func errorToResultLabel(err error) string { diff --git a/go/lib/infra/metrics.go b/go/lib/infra/metrics.go index 5fae5daf3c..53fe2406ed 100644 --- a/go/lib/infra/metrics.go +++ b/go/lib/infra/metrics.go @@ -54,6 +54,9 @@ var ( metricsErrTS = &HandlerResult{Result: "err_truststore", Status: prom.StatusErr} metricsErrTSTimeout = &HandlerResult{Result: "err_truststore_to", Status: prom.StatusTimeout} + metricsErrRevCache = &HandlerResult{Result: "err_revcache", Status: prom.StatusErr} + metricsErrRevCacheTo = &HandlerResult{Result: "err_revcache_to", Status: prom.StatusTimeout} + MetricsResultOk = &HandlerResult{Result: prom.ResultOk, Status: prom.StatusOk} ) @@ -61,6 +64,10 @@ func MetricsErrTrustDB(err error) *HandlerResult { return metricsErrWithTimeout(err, metricsErrTrustDBTimeout, metricsErrTrustDB) } +func MetricsErrRevCache(err error) *HandlerResult { + return metricsErrWithTimeout(err, metricsErrRevCacheTo, metricsErrRevCache) +} + func MetricsErrMsger(err error) *HandlerResult { return metricsErrWithTimeout(err, metricsErrMsgerTimeout, metricsErrMsger) } diff --git a/go/lib/infra/modules/trust/handlers.go b/go/lib/infra/modules/trust/handlers.go index cf75201d22..6c875244c3 100644 --- a/go/lib/infra/modules/trust/handlers.go +++ b/go/lib/infra/modules/trust/handlers.go @@ -16,9 +16,6 @@ package trust import ( "context" - "time" - - "github.com/prometheus/client_golang/prometheus" "github.com/scionproto/scion/go/lib/addr" "github.com/scionproto/scion/go/lib/common" @@ -26,7 +23,6 @@ import ( "github.com/scionproto/scion/go/lib/infra" "github.com/scionproto/scion/go/lib/infra/messenger" "github.com/scionproto/scion/go/lib/log" - "github.com/scionproto/scion/go/lib/prom" "github.com/scionproto/scion/go/lib/scrypto/cert" "github.com/scionproto/scion/go/lib/scrypto/trc" "github.com/scionproto/scion/go/lib/snet" @@ -60,21 +56,7 @@ type trcReqHandler struct { recurse bool } -func (h *trcReqHandler) Handle() { - trcReqMetrics.RequestsTotal.With(prometheus.Labels{ - prom.LabelSrc: promSrcValue(h.request, h.store.ia), - }).Inc() - start := time.Now() - result := h.handleInt() - trcReqMetrics.ResultsTotal.With(prometheus.Labels{ - prom.LabelResult: result.Result, - }) - trcReqMetrics.RequestLatency.With(prometheus.Labels{ - prom.LabelStatus: result.Status, - }).Observe(time.Since(start).Seconds()) -} - -func (h *trcReqHandler) handleInt() *infra.HandlerResult { +func (h *trcReqHandler) Handle() *infra.HandlerResult { logger := log.FromCtx(h.request.Context()) trcReq, ok := h.request.Message.(*cert_mgmt.TRCReq) if !ok { @@ -144,21 +126,7 @@ type chainReqHandler struct { recurse bool } -func (h *chainReqHandler) Handle() { - chainReqMetrics.RequestsTotal.With(prometheus.Labels{ - prom.LabelSrc: promSrcValue(h.request, h.store.ia), - }).Inc() - start := time.Now() - result := h.handleInt() - chainReqMetrics.ResultsTotal.With(prometheus.Labels{ - prom.LabelResult: result.Result, - }) - chainReqMetrics.RequestLatency.With(prometheus.Labels{ - prom.LabelStatus: result.Status, - }).Observe(time.Since(start).Seconds()) -} - -func (h *chainReqHandler) handleInt() *infra.HandlerResult { +func (h *chainReqHandler) Handle() *infra.HandlerResult { logger := log.FromCtx(h.request.Context()) chainReq, ok := h.request.Message.(*cert_mgmt.ChainReq) if !ok { @@ -224,19 +192,7 @@ type trcPushHandler struct { store *Store } -func (h *trcPushHandler) Handle() { - trcPushMetrics.RequestsTotal.With(prometheus.Labels{}).Inc() - start := time.Now() - result := h.handleInt() - trcPushMetrics.ResultsTotal.With(prometheus.Labels{ - prom.LabelResult: result.Result, - }) - trcPushMetrics.RequestLatency.With(prometheus.Labels{ - prom.LabelStatus: result.Status, - }).Observe(time.Since(start).Seconds()) -} - -func (h *trcPushHandler) handleInt() *infra.HandlerResult { +func (h *trcPushHandler) Handle() *infra.HandlerResult { logger := log.FromCtx(h.request.Context()) // FIXME(scrye): In case a TRC update will invalidate the local certificate // chain after the gracePeriod, CSes must use this gracePeriod to fetch a @@ -289,19 +245,7 @@ type chainPushHandler struct { store *Store } -func (h *chainPushHandler) Handle() { - chainPushMetrics.RequestsTotal.With(prometheus.Labels{}).Inc() - start := time.Now() - result := h.handleInt() - chainPushMetrics.ResultsTotal.With(prometheus.Labels{ - prom.LabelResult: result.Result, - }) - chainPushMetrics.RequestLatency.With(prometheus.Labels{ - prom.LabelStatus: result.Status, - }).Observe(time.Since(start).Seconds()) -} - -func (h *chainPushHandler) handleInt() *infra.HandlerResult { +func (h *chainPushHandler) Handle() *infra.HandlerResult { logger := log.FromCtx(h.request.Context()) chainPush, ok := h.request.Message.(*cert_mgmt.Chain) if !ok { diff --git a/go/lib/infra/modules/trust/trust.go b/go/lib/infra/modules/trust/trust.go index 008f27db64..213ab12b21 100644 --- a/go/lib/infra/modules/trust/trust.go +++ b/go/lib/infra/modules/trust/trust.go @@ -574,13 +574,13 @@ func (store *Store) LoadAuthoritativeChain(dir string) error { // allowed to issue new TRC requests over the network. This method should only // be used when servicing requests coming from remote nodes. func (store *Store) NewTRCReqHandler(recurse bool) infra.Handler { - f := func(r *infra.Request) { + f := func(r *infra.Request) *infra.HandlerResult { handler := &trcReqHandler{ request: r, store: store, recurse: recurse, } - handler.Handle() + return handler.Handle() } return infra.HandlerFunc(f) } @@ -591,13 +591,13 @@ func (store *Store) NewTRCReqHandler(recurse bool) infra.Handler { // over the network. This method should only be used when servicing requests // coming from remote nodes. func (store *Store) NewChainReqHandler(recurse bool) infra.Handler { - f := func(r *infra.Request) { + f := func(r *infra.Request) *infra.HandlerResult { handler := chainReqHandler{ request: r, store: store, recurse: recurse, } - handler.Handle() + return handler.Handle() } return infra.HandlerFunc(f) } @@ -606,12 +606,12 @@ func (store *Store) NewChainReqHandler(recurse bool) infra.Handler { // peer, backed by the trust store. TRCs are pushed by local BSes during // beaconing. Pushes are allowed from all local AS sources. func (store *Store) NewTRCPushHandler() infra.Handler { - f := func(r *infra.Request) { + f := func(r *infra.Request) *infra.HandlerResult { handler := trcPushHandler{ request: r, store: store, } - handler.Handle() + return handler.Handle() } return infra.HandlerFunc(f) } @@ -621,12 +621,12 @@ func (store *Store) NewTRCPushHandler() infra.Handler { // by other ASes during core registration. Pushes are allowed from all // local ISD sources. func (store *Store) NewChainPushHandler() infra.Handler { - f := func(r *infra.Request) { + f := func(r *infra.Request) *infra.HandlerResult { handler := chainPushHandler{ request: r, store: store, } - handler.Handle() + return handler.Handle() } return infra.HandlerFunc(f) } diff --git a/go/path_srv/internal/handlers/ifstateinfo.go b/go/path_srv/internal/handlers/ifstateinfo.go index 40ecd6ed2e..d4e1b0edf1 100644 --- a/go/path_srv/internal/handlers/ifstateinfo.go +++ b/go/path_srv/internal/handlers/ifstateinfo.go @@ -29,32 +29,34 @@ type ifStateInfoHandler struct { } func NewIfStatInfoHandler(args HandlerArgs) infra.Handler { - f := func(r *infra.Request) { + f := func(r *infra.Request) *infra.HandlerResult { handler := &ifStateInfoHandler{ baseHandler: newBaseHandler(r, args), } - handler.Handle() + return handler.Handle() } return infra.HandlerFunc(f) } -func (h *ifStateInfoHandler) Handle() { +func (h *ifStateInfoHandler) Handle() *infra.HandlerResult { logger := log.FromCtx(h.request.Context()) ifStateInfo, ok := h.request.Message.(*path_mgmt.IFStateInfos) if !ok { logger.Error("[ifStateHandler] wrong message type, expected path_mgmt.IFStateInfos", "msg", h.request.Message, "type", common.TypeOf(h.request.Message)) - return + return infra.MetricsErrInternal } logger.Debug("[ifStateHandler] Received IfStateInfo", "ifStateInfo", ifStateInfo) subCtx, cancelF := context.WithTimeout(h.request.Context(), HandlerTimeout) defer cancelF() + // TODO(lukedirtwalker): if all verifications fail we should reflect that in metrics. for _, info := range ifStateInfo.Infos { if !info.Active && info.SRevInfo != nil { h.verifyAndStore(subCtx, info.SRevInfo) } } logger.Debug("[ifStateHandler] done processing ifStateInfo") + return infra.MetricsResultOk } func (h *ifStateInfoHandler) verifyAndStore(ctx context.Context, rev *path_mgmt.SignedRevInfo) { diff --git a/go/path_srv/internal/handlers/segreg.go b/go/path_srv/internal/handlers/segreg.go index 377cb16055..bb26b5cf54 100644 --- a/go/path_srv/internal/handlers/segreg.go +++ b/go/path_srv/internal/handlers/segreg.go @@ -32,28 +32,28 @@ type segRegHandler struct { } func NewSegRegHandler(args HandlerArgs) infra.Handler { - f := func(r *infra.Request) { + f := func(r *infra.Request) *infra.HandlerResult { handler := &segRegHandler{ baseHandler: newBaseHandler(r, args), localIA: args.IA, } - handler.Handle() + return handler.Handle() } return infra.HandlerFunc(f) } -func (h *segRegHandler) Handle() { +func (h *segRegHandler) Handle() *infra.HandlerResult { logger := log.FromCtx(h.request.Context()) segReg, ok := h.request.Message.(*path_mgmt.SegReg) if !ok { logger.Error("[segRegHandler] wrong message type, expected path_mgmt.SegReg", "msg", h.request.Message, "type", common.TypeOf(h.request.Message)) - return + return infra.MetricsErrInternal } msger, ok := infra.MessengerFromContext(h.request.Context()) if !ok { logger.Error("[segRegHandler] Unable to service request, no Messenger found") - return + return infra.MetricsErrInternal } subCtx, cancelF := context.WithTimeout(h.request.Context(), HandlerTimeout) defer cancelF() @@ -61,10 +61,11 @@ func (h *segRegHandler) Handle() { if err := segReg.ParseRaw(); err != nil { logger.Error("[segRegHandler] Failed to parse message", "err", err) sendAck(proto.Ack_ErrCode_reject, messenger.AckRejectFailedToParse) - return + return infra.MetricsErrInvalid } logSegRecs(logger, "[segRegHandler]", h.request.Peer, segReg.SegRecs) h.verifyAndStore(subCtx, h.request.Peer, segReg.Recs, segReg.SRevInfos) // TODO(lukedirtwalker): If all segments failed to verify the ack should also be negative here. sendAck(proto.Ack_ErrCode_ok, "") + return infra.MetricsResultOk } diff --git a/go/path_srv/internal/handlers/segreqcore.go b/go/path_srv/internal/handlers/segreqcore.go index 7a941e5b09..30b13cb431 100644 --- a/go/path_srv/internal/handlers/segreqcore.go +++ b/go/path_srv/internal/handlers/segreqcore.go @@ -36,7 +36,7 @@ type segReqCoreHandler struct { } func NewSegReqCoreHandler(args HandlerArgs, segsDeduper dedupe.Deduper) infra.Handler { - f := func(r *infra.Request) { + f := func(r *infra.Request) *infra.HandlerResult { handler := &segReqCoreHandler{ segReqHandler: segReqHandler{ baseHandler: newBaseHandler(r, args), @@ -44,32 +44,34 @@ func NewSegReqCoreHandler(args HandlerArgs, segsDeduper dedupe.Deduper) infra.Ha segsDeduper: segsDeduper, }, } - handler.Handle() + return handler.Handle() } return infra.HandlerFunc(f) } -func (h *segReqCoreHandler) Handle() { +func (h *segReqCoreHandler) Handle() *infra.HandlerResult { logger := log.FromCtx(h.request.Context()) segReq, ok := h.request.Message.(*path_mgmt.SegReq) if !ok { logger.Error("[segReqCoreHandler] wrong message type, expected path_mgmt.SegReq", "msg", h.request.Message, "type", common.TypeOf(h.request.Message)) - return + return infra.MetricsErrInternal } logger.Debug("[segReqCoreHandler] Received", "segReq", segReq) msger, ok := infra.MessengerFromContext(h.request.Context()) if !ok { logger.Warn("[segReqCoreHandler] Unable to service request, no Messenger found") - return + return infra.MetricsErrInternal } subCtx, cancelF := context.WithTimeout(h.request.Context(), HandlerTimeout) defer cancelF() if !h.isValidDst(segReq) { h.sendEmptySegReply(subCtx, segReq, msger) - return + return infra.MetricsErrInvalid } h.handleReq(subCtx, msger, segReq) + // TODO(lukedirtwalker): Handle errors + return infra.MetricsResultOk } func (h *segReqCoreHandler) handleReq(ctx context.Context, diff --git a/go/path_srv/internal/handlers/segreqnoncore.go b/go/path_srv/internal/handlers/segreqnoncore.go index e0519fe6f0..fdb0f4cc82 100644 --- a/go/path_srv/internal/handlers/segreqnoncore.go +++ b/go/path_srv/internal/handlers/segreqnoncore.go @@ -37,7 +37,7 @@ type segReqNonCoreHandler struct { } func NewSegReqNonCoreHandler(args HandlerArgs, segsDeduper dedupe.Deduper) infra.Handler { - f := func(r *infra.Request) { + f := func(r *infra.Request) *infra.HandlerResult { handler := &segReqNonCoreHandler{ segReqHandler: segReqHandler{ baseHandler: newBaseHandler(r, args), @@ -45,27 +45,27 @@ func NewSegReqNonCoreHandler(args HandlerArgs, segsDeduper dedupe.Deduper) infra segsDeduper: segsDeduper, }, } - handler.Handle() + return handler.Handle() } return infra.HandlerFunc(f) } -func (h *segReqNonCoreHandler) Handle() { +func (h *segReqNonCoreHandler) Handle() *infra.HandlerResult { logger := log.FromCtx(h.request.Context()) segReq, ok := h.request.Message.(*path_mgmt.SegReq) if !ok { logger.Error("[segReqHandler] wrong message type, expected path_mgmt.SegReq", "msg", h.request.Message, "type", common.TypeOf(h.request.Message)) - return + return infra.MetricsErrInternal } logger.Debug("[segReqHandler] Received", "segReq", segReq) msger, ok := infra.MessengerFromContext(h.request.Context()) if !ok { logger.Warn("[segReqHandler] Unable to service request, no Messenger found") - return + return infra.MetricsErrInternal } if !h.validSrcDst(segReq) { - return + return infra.MetricsErrInvalid } subCtx, cancelF := context.WithTimeout(h.request.Context(), HandlerTimeout) defer cancelF() @@ -74,19 +74,22 @@ func (h *segReqNonCoreHandler) Handle() { if err != nil { logger.Error("[segReqHandler] Failed to determine dest type", "err", err) h.sendEmptySegReply(subCtx, segReq, msger) - return + return infra.MetricsErrInvalid } coreASes, err := h.coreASes(subCtx) if err != nil { logger.Error("[segReqHandler] Failed to find local core ASes", "err", err) h.sendEmptySegReply(subCtx, segReq, msger) - return + // TODO(lukedirtwalker): Classify error better. + return infra.MetricsErrInternal } if dstCore { h.handleCoreDst(subCtx, segReq, msger, segReq.DstIA(), coreASes.ASList()) } else { h.handleNonCoreDst(subCtx, segReq, msger, segReq.DstIA(), coreASes.ASList()) } + // TODO(lukedirtwalker): the return value should come from the handle functions. + return infra.MetricsResultOk } func (h *segReqNonCoreHandler) validSrcDst(segReq *path_mgmt.SegReq) bool { diff --git a/go/path_srv/internal/handlers/segrevoc.go b/go/path_srv/internal/handlers/segrevoc.go index b53a3f2e55..3b66fe9ca3 100644 --- a/go/path_srv/internal/handlers/segrevoc.go +++ b/go/path_srv/internal/handlers/segrevoc.go @@ -31,28 +31,28 @@ type revocHandler struct { } func NewRevocHandler(args HandlerArgs) infra.Handler { - f := func(r *infra.Request) { + f := func(r *infra.Request) *infra.HandlerResult { handler := &revocHandler{ baseHandler: newBaseHandler(r, args), } - handler.Handle() + return handler.Handle() } return infra.HandlerFunc(f) } -func (h *revocHandler) Handle() { +func (h *revocHandler) Handle() *infra.HandlerResult { logger := log.FromCtx(h.request.Context()) logger = logger.New("from", h.request.Peer) revocation, ok := h.request.Message.(*path_mgmt.SignedRevInfo) if !ok { logger.Error("[revocHandler] wrong message type, expected path_mgmt.SignedRevInfo", "msg", h.request.Message, "type", common.TypeOf(h.request.Message)) - return + return infra.MetricsErrInternal } msger, ok := infra.MessengerFromContext(h.request.Context()) if !ok { logger.Error("[revocHandler] Unable to service request, no Messenger found") - return + return infra.MetricsErrInternal } subCtx, cancelF := context.WithTimeout(h.request.Context(), HandlerTimeout) defer cancelF() @@ -63,7 +63,7 @@ func (h *revocHandler) Handle() { if err != nil { logger.Warn("[revocHandler] Couldn't parse revocation", "err", err) sendAck(proto.Ack_ErrCode_reject, messenger.AckRejectFailedToParse) - return + return infra.MetricsErrInvalid } logger = logger.New("revInfo", revInfo) logger.Debug("[revocHandler] Received revocation") @@ -72,14 +72,15 @@ func (h *revocHandler) Handle() { if err != nil { logger.Warn("Couldn't verify revocation", "err", err) sendAck(proto.Ack_ErrCode_reject, messenger.AckRejectFailedToVerify) - return + return infra.MetricsErrInvalid } _, err = h.revCache.Insert(subCtx, revocation) if err != nil { logger.Error("Failed to insert revInfo", "err", err) sendAck(proto.Ack_ErrCode_retry, messenger.AckRetryDBError) - return + return infra.MetricsErrRevCache(err) } sendAck(proto.Ack_ErrCode_ok, "") + return infra.MetricsResultOk } diff --git a/go/path_srv/internal/handlers/segsync.go b/go/path_srv/internal/handlers/segsync.go index ddc0a31e5d..18e30ca4aa 100644 --- a/go/path_srv/internal/handlers/segsync.go +++ b/go/path_srv/internal/handlers/segsync.go @@ -32,28 +32,28 @@ type syncHandler struct { } func NewSyncHandler(args HandlerArgs) infra.Handler { - f := func(r *infra.Request) { + f := func(r *infra.Request) *infra.HandlerResult { handler := &syncHandler{ baseHandler: newBaseHandler(r, args), localIA: args.IA, } - handler.Handle() + return handler.Handle() } return infra.HandlerFunc(f) } -func (h *syncHandler) Handle() { +func (h *syncHandler) Handle() *infra.HandlerResult { logger := log.FromCtx(h.request.Context()) segSync, ok := h.request.Message.(*path_mgmt.SegSync) if !ok { logger.Error("[syncHandler] wrong message type, expected path_mgmt.SegSync", "msg", h.request.Message, "type", common.TypeOf(h.request.Message)) - return + return infra.MetricsErrInternal } msger, ok := infra.MessengerFromContext(h.request.Context()) if !ok { logger.Error("[syncHandler] Unable to service request, no Messenger found") - return + return infra.MetricsErrInternal } subCtx, cancelF := context.WithTimeout(h.request.Context(), HandlerTimeout) defer cancelF() @@ -61,10 +61,11 @@ func (h *syncHandler) Handle() { if err := segSync.ParseRaw(); err != nil { logger.Error("[syncHandler] Failed to parse message", "err", err) sendAck(proto.Ack_ErrCode_reject, messenger.AckRejectFailedToParse) - return + return infra.MetricsErrInvalid } logSegRecs(logger, "[syncHandler]", h.request.Peer, segSync.SegRecs) h.verifyAndStore(subCtx, h.request.Peer, segSync.Recs, segSync.SRevInfos) // TODO(lukedirtwalker): If all segments failed to verify the ack should also be negative here. sendAck(proto.Ack_ErrCode_ok, "") + return infra.MetricsResultOk } From 95c300ee7acd40121d6d24890ccfc17e2e6ec1c7 Mon Sep 17 00:00:00 2001 From: Lukas Vogel Date: Mon, 18 Feb 2019 15:40:07 +0100 Subject: [PATCH 3/4] Remove unused stuff --- go/lib/infra/metrics.go | 9 ---- go/lib/infra/modules/trust/metrics.go | 72 --------------------------- go/lib/infra/modules/trust/trust.go | 1 - 3 files changed, 82 deletions(-) delete mode 100644 go/lib/infra/modules/trust/metrics.go diff --git a/go/lib/infra/metrics.go b/go/lib/infra/metrics.go index 53fe2406ed..a6f1824e2d 100644 --- a/go/lib/infra/metrics.go +++ b/go/lib/infra/metrics.go @@ -15,8 +15,6 @@ package infra import ( - "github.com/prometheus/client_golang/prometheus" - "github.com/scionproto/scion/go/lib/common" "github.com/scionproto/scion/go/lib/prom" ) @@ -28,13 +26,6 @@ const ( PromSrcUnknown = "unknown" ) -// HandlerMetrics contains the standard metrics for a handler. -type HandlerMetrics struct { - RequestsTotal *prometheus.CounterVec - RequestLatency *prometheus.HistogramVec - ResultsTotal *prometheus.CounterVec -} - // HandlerResult contains a result label and a status label. type HandlerResult struct { Result string diff --git a/go/lib/infra/modules/trust/metrics.go b/go/lib/infra/modules/trust/metrics.go deleted file mode 100644 index b9026e47a1..0000000000 --- a/go/lib/infra/modules/trust/metrics.go +++ /dev/null @@ -1,72 +0,0 @@ -// Copyright 2019 Anapaya Systems -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package trust - -import ( - "sync" - - "github.com/scionproto/scion/go/lib/infra" - "github.com/scionproto/scion/go/lib/prom" -) - -const ( - promNamespace = "trust" -) - -var ( - chainPushMetrics *infra.HandlerMetrics - chainReqMetrics *infra.HandlerMetrics - trcPushMetrics *infra.HandlerMetrics - trcReqMetrics *infra.HandlerMetrics - - initOnce sync.Once -) - -func initMetrics() { - initOnce.Do(func() { - chainPushMetrics = &infra.HandlerMetrics{ - RequestsTotal: prom.NewCounterVec(promNamespace, "", "chain_push_total", - "Chain pushes received total.", []string{}), - RequestLatency: prom.NewHistogramVec(promNamespace, "", "chain_push_latency", - "Chain push latency.", []string{prom.LabelStatus}, prom.DefaultLatencyBuckets), - ResultsTotal: prom.NewCounterVec(promNamespace, "", "chain_push_results_total", - "Chain push results total.", []string{prom.LabelResult}), - } - chainReqMetrics = &infra.HandlerMetrics{ - RequestsTotal: prom.NewCounterVec(promNamespace, "", "chain_req_total", - "Chain requests received total.", []string{prom.LabelSrc}), - RequestLatency: prom.NewHistogramVec(promNamespace, "", "chain_req_latency", - "Chain requests latency.", []string{prom.LabelStatus}, prom.DefaultLatencyBuckets), - ResultsTotal: prom.NewCounterVec(promNamespace, "", "chain_req_results_total", - "Chain requests results total.", []string{prom.LabelResult}), - } - trcPushMetrics = &infra.HandlerMetrics{ - RequestsTotal: prom.NewCounterVec(promNamespace, "", "trc_push_total", - "TRC pushes received total.", []string{}), - RequestLatency: prom.NewHistogramVec(promNamespace, "", "trc_push_latency", - "TRC push latency.", []string{prom.LabelStatus}, prom.DefaultLatencyBuckets), - ResultsTotal: prom.NewCounterVec(promNamespace, "", "trc_push_results_total", - "TRC push results total.", []string{prom.LabelResult}), - } - trcReqMetrics = &infra.HandlerMetrics{ - RequestsTotal: prom.NewCounterVec(promNamespace, "", "trc_req_total", - "TRC requests received total.", []string{prom.LabelSrc}), - RequestLatency: prom.NewHistogramVec(promNamespace, "", "trc_req_latency", - "TRC requests latency.", []string{prom.LabelStatus}, prom.DefaultLatencyBuckets), - ResultsTotal: prom.NewCounterVec(promNamespace, "", "trc_req_results_total", - "TRC requests results total.", []string{prom.LabelResult}), - } - }) -} diff --git a/go/lib/infra/modules/trust/trust.go b/go/lib/infra/modules/trust/trust.go index 213ab12b21..6456c7efd5 100644 --- a/go/lib/infra/modules/trust/trust.go +++ b/go/lib/infra/modules/trust/trust.go @@ -87,7 +87,6 @@ type Store struct { func NewStore(db trustdb.TrustDB, local addr.IA, options *Config, logger log.Logger) (*Store, error) { - initMetrics() if options == nil { options = &Config{} } From 007a5232663a851b62e97aa6c153ff76b3c942a7 Mon Sep 17 00:00:00 2001 From: Lukas Vogel Date: Thu, 21 Feb 2019 12:35:22 +0100 Subject: [PATCH 4/4] Add doc --- go/lib/infra/metrics.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/go/lib/infra/metrics.go b/go/lib/infra/metrics.go index a6f1824e2d..5ce4e4fa4b 100644 --- a/go/lib/infra/metrics.go +++ b/go/lib/infra/metrics.go @@ -28,7 +28,11 @@ const ( // HandlerResult contains a result label and a status label. type HandlerResult struct { + // Result is the label used for the result metric. Result string + // Status is one of prom.StatusOk, prom.StatusErr, prom.StatusTimeout it is used for the latency + // histogram. This is a reduced view of the result, so that we don't get too many timeseries on + // the histogram. Status string }