From 7114a9d25ffdd4f22617ee692f2c8df391d5ffef Mon Sep 17 00:00:00 2001 From: Maycon Santos Date: Mon, 15 Jul 2024 23:04:28 +0200 Subject: [PATCH] fix forwarded metrics --- signal/server/signal.go | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/signal/server/signal.go b/signal/server/signal.go index fc9c19efdbb..02c49c31d77 100644 --- a/signal/server/signal.go +++ b/signal/server/signal.go @@ -23,6 +23,8 @@ const ( labelTypeError = "error" labelTypeNotConnected = "not_connected" labelTypeNotRegistered = "not_registered" + labelTypeStream = "stream" + labelTypeMessage = "message" labelError = "error" labelErrorMissingId = "missing_id" @@ -62,6 +64,7 @@ func (s *Server) Send(ctx context.Context, msg *proto.EncryptedMessage) (*proto. } if dstPeer, found := s.registry.Get(msg.RemoteKey); found { + start := time.Now() //forward the message to the target peer if err := dstPeer.Stream.Send(msg); err != nil { log.Errorf("error while forwarding message from peer [%s] to peer [%s] %v", msg.Key, msg.RemoteKey, err) @@ -69,6 +72,7 @@ func (s *Server) Send(ctx context.Context, msg *proto.EncryptedMessage) (*proto. s.metrics.MessageForwardFailures.Add(ctx, 1, metric.WithAttributes(attribute.String(labelType, labelTypeError))) } else { + s.metrics.MessageForwardLatency.Record(ctx, float64(time.Since(start).Nanoseconds())/1e6, metric.WithAttributes(attribute.String(labelType, labelTypeMessage))) s.metrics.MessagesForwarded.Add(context.Background(), 1) } } else { @@ -118,22 +122,21 @@ func (s *Server) ConnectStream(stream proto.SignalExchange_ConnectStreamServer) } else if err != nil { return err } - start := time.Now() log.Debugf("received a new message from peer [%s] to peer [%s]", p.Id, msg.RemoteKey) // lookup the target peer where the message is going to if dstPeer, found := s.registry.Get(msg.RemoteKey); found { + start := time.Now() //forward the message to the target peer if err := dstPeer.Stream.Send(msg); err != nil { log.Errorf("error while forwarding message from peer [%s] to peer [%s] %v", p.Id, msg.RemoteKey, err) //todo respond to the sender? - + s.metrics.MessageForwardFailures.Add(stream.Context(), 1, metric.WithAttributes(attribute.String(labelType, labelTypeError))) + } else { // in milliseconds - s.metrics.MessageForwardLatency.Record(stream.Context(), float64(time.Since(start).Nanoseconds())/1e6) + s.metrics.MessageForwardLatency.Record(stream.Context(), float64(time.Since(start).Nanoseconds())/1e6, metric.WithAttributes(attribute.String(labelType, labelTypeStream))) s.metrics.MessagesForwarded.Add(stream.Context(), 1) - } else { - s.metrics.MessageForwardFailures.Add(stream.Context(), 1, metric.WithAttributes(attribute.String(labelType, labelTypeError))) } } else { log.Debugf("message from peer [%s] can't be forwarded to peer [%s] because destination peer is not connected", p.Id, msg.RemoteKey)