Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handlers: Add metrics #2407

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion go/cert_srv/internal/reiss/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,15 @@ type Handler struct {
IA addr.IA
}

func (h *Handler) Handle(r *infra.Request) {
func (h *Handler) Handle(r *infra.Request) *infra.HandlerResult {
addr := r.Peer.(*snet.Addr)
req := r.Message.(*cert_mgmt.ChainIssReq)
if err := h.handle(r, addr, req); err != nil {
log.Error("[ReissHandler] Dropping certificate reissue request",
"addr", addr, "req", req, "err", err)
}
// TODO(lukedirtwalker): reflect error in metrics.
return infra.MetricsResultOk
}

// handle handles certificate chain reissue requests. If the requested
Expand Down
61 changes: 56 additions & 5 deletions go/lib/infra/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,18 +52,18 @@ type Transport interface {
Close(context.Context) error
}

// Interface Handler is implemented by objects that can handle a request coming
// Handler is implemented by objects that can handle a request coming
// from a remote SCION network node.
type Handler interface {
Handle(*Request)
Handle(*Request) *HandlerResult
}

// Constructs a handler for request r. Handle() can be called on the
// resulting object to process the message.
type HandlerFunc func(r *Request)
type HandlerFunc func(r *Request) *HandlerResult

func (f HandlerFunc) Handle(r *Request) {
f(r)
func (f HandlerFunc) Handle(r *Request) *HandlerResult {
return f(r)
}

// Request describes an object received from the network that is not part of an
Expand Down Expand Up @@ -195,6 +195,57 @@ func (mt MessageType) String() string {
}
}

// MetricLabel returns the label for metrics for a given message type.
// The postfix for requests is always "req" and for replies and push messages it is always "push".
func (mt MessageType) MetricLabel() string {
switch mt {
case None:
return "none"
case ChainRequest:
return "chain_req"
case Chain:
return "chain_push"
case TRCRequest:
return "trc_req"
case TRC:
return "trc_push"
case IfId:
return "ifid_push"
case IfStateInfos:
return "if_info_push"
case IfStateReq:
return "if_info_req"
case Seg:
return "pathseg_push"
case SegChangesReq:
return "seg_changes_req"
case SegChangesReply:
return "seg_changes_push"
case SegChangesIdReq:
return "seg_changes_id_req"
case SegChangesIdReply:
return "seg_changes_id_push"
case SegReg:
return "seg_reg_push"
case SegRequest:
return "seg_req"
case SegReply:
return "seg_push"
case SegRev:
return "seg_rev_push"
case SegSync:
return "seg_sync_push"
case ChainIssueRequest:
return "chain_issue_req"
case ChainIssueReply:
return "chain_issue_push"
case Ack:
return "ack_push"
default:
return "unknown_mt"
}
}

type Messenger interface {
SendAck(ctx context.Context, msg *ack.Ack, a net.Addr, id uint64) error
GetTRC(ctx context.Context, msg *cert_mgmt.TRCReq, a net.Addr,
Expand Down
53 changes: 35 additions & 18 deletions go/lib/infra/messenger/messenger.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ import (
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"

"github.com/scionproto/scion/go/lib/addr"
"github.com/scionproto/scion/go/lib/common"
"github.com/scionproto/scion/go/lib/ctrl"
Expand All @@ -87,6 +89,7 @@ import (
"github.com/scionproto/scion/go/lib/infra"
"github.com/scionproto/scion/go/lib/infra/disp"
"github.com/scionproto/scion/go/lib/log"
"github.com/scionproto/scion/go/lib/prom"
"github.com/scionproto/scion/go/lib/sciond"
"github.com/scionproto/scion/go/lib/snet"
"github.com/scionproto/scion/go/lib/spath"
Expand Down Expand Up @@ -155,6 +158,7 @@ type Messenger struct {
func New(ia addr.IA, dispatcher *disp.Dispatcher, store infra.TrustStore, logger log.Logger,
config *Config) *Messenger {

initMetrics()
if config == nil {
config = &Config{}
}
Expand All @@ -181,7 +185,7 @@ func New(ia addr.IA, dispatcher *disp.Dispatcher, store infra.TrustStore, logger
}

func (m *Messenger) SendAck(ctx context.Context, msg *ack.Ack, a net.Addr, id uint64) error {
opMetric := metricStartOp(promOpSendAck)
opMetric := metricStartOp(infra.Ack)
err := m.sendAck(ctx, msg, a, id)
opMetric.publishResult(err)
return err
Expand All @@ -202,7 +206,7 @@ func (m *Messenger) sendAck(ctx context.Context, msg *ack.Ack, a net.Addr, id ui
func (m *Messenger) GetTRC(ctx context.Context, msg *cert_mgmt.TRCReq,
a net.Addr, id uint64) (*cert_mgmt.TRC, error) {

opMetric := metricStartOp(promOpGetTRC)
opMetric := metricStartOp(infra.TRCRequest)
trc, err := m.getTRC(ctx, msg, a, id)
opMetric.publishResult(err)
return trc, err
Expand Down Expand Up @@ -237,7 +241,7 @@ func (m *Messenger) getTRC(ctx context.Context, msg *cert_mgmt.TRCReq,

// SendTRC sends a reliable cert_mgmt.TRC to address a.
func (m *Messenger) SendTRC(ctx context.Context, msg *cert_mgmt.TRC, a net.Addr, id uint64) error {
opMetrics := metricStartOp(promOpSendTRC)
opMetrics := metricStartOp(infra.TRC)
err := m.sendTRC(ctx, msg, a, id)
opMetrics.publishResult(err)
return err
Expand All @@ -258,7 +262,7 @@ func (m *Messenger) sendTRC(ctx context.Context, msg *cert_mgmt.TRC, a net.Addr,
func (m *Messenger) GetCertChain(ctx context.Context, msg *cert_mgmt.ChainReq,
a net.Addr, id uint64) (*cert_mgmt.Chain, error) {

opMetrics := metricStartOp(promOpGetCrtChain)
opMetrics := metricStartOp(infra.ChainRequest)
chain, err := m.getCertChain(ctx, msg, a, id)
opMetrics.publishResult(err)
return chain, err
Expand Down Expand Up @@ -295,7 +299,7 @@ func (m *Messenger) getCertChain(ctx context.Context, msg *cert_mgmt.ChainReq,
func (m *Messenger) SendCertChain(ctx context.Context, msg *cert_mgmt.Chain, a net.Addr,
id uint64) error {

opMetrics := metricStartOp(promOpSendCrtChain)
opMetrics := metricStartOp(infra.Chain)
err := m.sendCertChain(ctx, msg, a, id)
opMetrics.publishResult(err)
return err
Expand All @@ -315,7 +319,7 @@ func (m *Messenger) sendCertChain(ctx context.Context, msg *cert_mgmt.Chain, a n

// SendIfId sends a reliable ifid.IFID to address a.
func (m *Messenger) SendIfId(ctx context.Context, msg *ifid.IFID, a net.Addr, id uint64) error {
opMetrics := metricStartOp(promOpSendIfId)
opMetrics := metricStartOp(infra.IfId)
err := m.sendIfId(ctx, msg, a, id)
opMetrics.publishResult(err)
return err
Expand All @@ -329,7 +333,7 @@ func (m *Messenger) sendIfId(ctx context.Context, msg *ifid.IFID, a net.Addr, id
func (m *Messenger) SendIfStateInfos(ctx context.Context, msg *path_mgmt.IFStateInfos,
a net.Addr, id uint64) error {

opMetrics := metricStartOp(promOpSendIfStateInfo)
opMetrics := metricStartOp(infra.IfStateInfos)
err := m.sendIfStateInfos(ctx, msg, a, id)
opMetrics.publishResult(err)
return err
Expand All @@ -351,7 +355,7 @@ func (m *Messenger) sendIfStateInfos(ctx context.Context, msg *path_mgmt.IFState
func (m *Messenger) SendSeg(ctx context.Context, msg *seg.PathSegment,
a net.Addr, id uint64) error {

opMetrics := metricStartOp(promOpSendSeg)
opMetrics := metricStartOp(infra.Seg)
err := m.sendSeg(ctx, msg, a, id)
opMetrics.publishResult(err)
return err
Expand All @@ -368,7 +372,7 @@ func (m *Messenger) sendSeg(ctx context.Context, msg *seg.PathSegment,
func (m *Messenger) GetSegs(ctx context.Context, msg *path_mgmt.SegReq,
a net.Addr, id uint64) (*path_mgmt.SegReply, error) {

opMetrics := metricStartOp(promOpGetSegs)
opMetrics := metricStartOp(infra.SegRequest)
reply, err := m.getSegs(ctx, msg, a, id)
opMetrics.publishResult(err)
return reply, err
Expand Down Expand Up @@ -409,7 +413,7 @@ func (m *Messenger) getSegs(ctx context.Context, msg *path_mgmt.SegReq,
func (m *Messenger) SendSegReply(ctx context.Context, msg *path_mgmt.SegReply,
a net.Addr, id uint64) error {

opMetrics := metricStartOp(promOpSendSegReply)
opMetrics := metricStartOp(infra.SegReply)
err := m.sendSegReply(ctx, msg, a, id)
opMetrics.publishResult(err)
return err
Expand All @@ -431,7 +435,7 @@ func (m *Messenger) sendSegReply(ctx context.Context, msg *path_mgmt.SegReply,
func (m *Messenger) SendSegSync(ctx context.Context, msg *path_mgmt.SegSync,
a net.Addr, id uint64) error {

opMetrics := metricStartOp(promOpSendSegSync)
opMetrics := metricStartOp(infra.SegSync)
err := m.sendSegSync(ctx, msg, a, id)
opMetrics.publishResult(err)
return err
Expand All @@ -452,7 +456,7 @@ func (m *Messenger) sendSegSync(ctx context.Context, msg *path_mgmt.SegSync,
func (m *Messenger) GetSegChangesIds(ctx context.Context, msg *path_mgmt.SegChangesIdReq,
a net.Addr, id uint64) (*path_mgmt.SegChangesIdReply, error) {

opMetrics := metricStartOp(promOpGetSegChangesId)
opMetrics := metricStartOp(infra.SegChangesIdReq)
reply, err := m.getSegChangesIds(ctx, msg, a, id)
opMetrics.publishResult(err)
return reply, err
Expand Down Expand Up @@ -489,7 +493,7 @@ func (m *Messenger) getSegChangesIds(ctx context.Context, msg *path_mgmt.SegChan
func (m *Messenger) SendSegChangesIdReply(ctx context.Context, msg *path_mgmt.SegChangesIdReply,
a net.Addr, id uint64) error {

opMetrics := metricStartOp(promOpSendSegChangesIdReply)
opMetrics := metricStartOp(infra.SegChangesIdReply)
err := m.sendSegChangesIdReply(ctx, msg, a, id)
opMetrics.publishResult(err)
return err
Expand All @@ -511,7 +515,7 @@ func (m *Messenger) sendSegChangesIdReply(ctx context.Context, msg *path_mgmt.Se
func (m *Messenger) GetSegChanges(ctx context.Context, msg *path_mgmt.SegChangesReq,
a net.Addr, id uint64) (*path_mgmt.SegChangesReply, error) {

opMetrics := metricStartOp(promOpGetSegChanges)
opMetrics := metricStartOp(infra.SegChangesReq)
reply, err := m.getSegChanges(ctx, msg, a, id)
opMetrics.publishResult(err)
return reply, err
Expand Down Expand Up @@ -551,7 +555,7 @@ func (m *Messenger) getSegChanges(ctx context.Context, msg *path_mgmt.SegChanges
func (m *Messenger) SendSegChangesReply(ctx context.Context, msg *path_mgmt.SegChangesReply,
a net.Addr, id uint64) error {

opMetrics := metricStartOp(promOpSendSegChanges)
opMetrics := metricStartOp(infra.SegChangesReply)
err := m.sendSegChangesReply(ctx, msg, a, id)
opMetrics.publishResult(err)
return err
Expand All @@ -573,7 +577,7 @@ func (m *Messenger) sendSegChangesReply(ctx context.Context, msg *path_mgmt.SegC
func (m *Messenger) RequestChainIssue(ctx context.Context, msg *cert_mgmt.ChainIssReq,
a net.Addr, id uint64) (*cert_mgmt.ChainIssRep, error) {

opMetrics := metricStartOp(promOpRequestChainIssue)
opMetrics := metricStartOp(infra.ChainIssueRequest)
reply, err := m.requestChainIssue(ctx, msg, a, id)
opMetrics.publishResult(err)
return reply, err
Expand Down Expand Up @@ -610,7 +614,7 @@ func (m *Messenger) requestChainIssue(ctx context.Context, msg *cert_mgmt.ChainI
func (m *Messenger) SendChainIssueReply(ctx context.Context, msg *cert_mgmt.ChainIssRep,
a net.Addr, id uint64) error {

opMetrics := metricStartOp(promOpSendChainIssue)
opMetrics := metricStartOp(infra.ChainIssueReply)
err := m.sendChainIssueReply(ctx, msg, a, id)
opMetrics.publishResult(err)
return err
Expand Down Expand Up @@ -740,8 +744,21 @@ func (m *Messenger) serve(ctx context.Context, cancelF context.CancelFunc, pld *
go func() {
defer log.LogPanicAndExit()
defer cancelF()
handler.Handle(
inCallsTotal.With(prometheus.Labels{
prom.LabelOperation: msgType.MetricLabel(),
prom.LabelSrc: metricSrcValue(address, m.ia),
}).Inc()
start := time.Now()
result := handler.Handle(
infra.NewRequest(log.CtxWith(ctx, logger), msg, signedPld, address, pld.ReqId))
inResultsTotal.With(prometheus.Labels{
prom.LabelOperation: msgType.MetricLabel(),
prom.LabelResult: result.Result,
}).Inc()
inCallsLatency.With(prometheus.Labels{
prom.LabelOperation: msgType.MetricLabel(),
prom.LabelStatus: result.Status,
}).Observe(time.Since(start).Seconds())
}()
}

Expand Down
8 changes: 5 additions & 3 deletions go/lib/infra/messenger/messenger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,22 +39,24 @@ var (
mockTRC = &cert_mgmt.TRC{RawTRC: common.RawBytes("foobar")}
)

func MockTRCHandler(request *infra.Request) {
func MockTRCHandler(request *infra.Request) *infra.HandlerResult {
messengerI, ok := infra.MessengerFromContext(request.Context())
if !ok {
log.Warn("Unable to service request, no Messenger interface found")
return
return infra.MetricsErrInternal
}
messenger, ok := messengerI.(*Messenger)
if !ok {
log.Warn("Unable to service request, bad Messenger value found")
return
return infra.MetricsErrInternal
}
subCtx, cancelF := context.WithTimeout(request.Context(), 3*time.Second)
defer cancelF()
if err := messenger.SendTRC(subCtx, mockTRC, nil, request.ID); err != nil {
log.Error("Server error", "err", err)
return infra.MetricsErrInternal
}
return infra.MetricsResultOk
}

func TestTRCExchange(t *testing.T) {
Expand Down
Loading