Skip to content

Commit

Permalink
feat(telem)_: track raw message by type on dispatch
Browse files Browse the repository at this point in the history
  • Loading branch information
adklempner committed Dec 13, 2024
1 parent 74db631 commit ebced90
Show file tree
Hide file tree
Showing 8 changed files with 160 additions and 27 deletions.
36 changes: 36 additions & 0 deletions protocol/common/message_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ import (
v1protocol "github.com/status-im/status-go/protocol/v1"
)

type TelemetryService interface {
PushRawMessageByType(ctx context.Context, msg struct {
MessageType string
Size uint32
})
}

// Whisper message properties.
const (
whisperTTL = 15
Expand Down Expand Up @@ -88,6 +95,8 @@ type MessageSender struct {

// handleSharedSecrets is a callback that is called every time a new shared secret is negotiated
handleSharedSecrets func([]*sharedsecret.Secret) error

telemetryClient TelemetryService
}

func NewMessageSender(
Expand All @@ -113,6 +122,10 @@ func NewMessageSender(
return p, nil
}

func (s *MessageSender) WithTelemetryClient(client TelemetryService) {
s.telemetryClient = client
}

func (s *MessageSender) Stop() {
s.messageEventsSubscriptionsMutex.Lock()
defer s.messageEventsSubscriptionsMutex.Unlock()
Expand Down Expand Up @@ -432,6 +445,9 @@ func (s *MessageSender) sendCommunity(
zap.String("messageType", "community"),
zap.Any("contentType", rawMessage.MessageType),
zap.Strings("hashes", types.EncodeHexes(hashes)))
if s.telemetryClient != nil {
s.sendBandwidthMetric(ctx, rawMessage)
}
s.transport.Track(messageID, hashes, newMessages)

return messageID, nil
Expand Down Expand Up @@ -550,6 +566,10 @@ func (s *MessageSender) sendPrivate(
s.transport.Track(messageID, hashes, newMessages)
}

if s.telemetryClient != nil {
s.sendBandwidthMetric(ctx, rawMessage)
}

return messageID, nil
}

Expand Down Expand Up @@ -578,6 +598,9 @@ func (s *MessageSender) SendPairInstallation(
return nil, errors.Wrap(err, "failed to send a message spec")
}

if s.telemetryClient != nil {
s.sendBandwidthMetric(ctx, &rawMessage)
}
s.transport.Track(messageID, hashes, newMessages)

return messageID, nil
Expand Down Expand Up @@ -808,6 +831,9 @@ func (s *MessageSender) SendPublic(
zap.Any("contentType", rawMessage.MessageType),
zap.String("messageType", "public"),
zap.Strings("hashes", types.EncodeHexes(hashes)))
if s.telemetryClient != nil {
s.sendBandwidthMetric(ctx, &rawMessage)
}
s.transport.Track(messageID, hashes, newMessages)

return messageID, nil
Expand Down Expand Up @@ -1381,3 +1407,13 @@ func (s *MessageSender) CleanupHashRatchetEncryptedMessages() error {

return nil
}

func (s *MessageSender) sendBandwidthMetric(ctx context.Context, rawMessage *RawMessage) {
s.telemetryClient.PushRawMessageByType(ctx, struct {
MessageType string
Size uint32
}{
MessageType: rawMessage.MessageType.String(),
Size: uint32(len(rawMessage.Payload)),
})
}
12 changes: 12 additions & 0 deletions protocol/common/message_sender_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package common

import (
"context"
"math"
"testing"

Expand Down Expand Up @@ -40,6 +41,14 @@ type MessageSenderSuite struct {
logger *zap.Logger
}

type mockTelemetryService struct{}

func (m *mockTelemetryService) PushRawMessageByType(ctx context.Context, msg struct {
MessageType string
Size uint32
}) {
}

func (s *MessageSenderSuite) SetupTest() {
s.testMessage = protobuf.ChatMessage{
Text: "abc123",
Expand Down Expand Up @@ -95,6 +104,9 @@ func (s *MessageSenderSuite) SetupTest() {
Datasync: true,
},
)

mockTelemetry := &mockTelemetryService{}
s.sender.WithTelemetryClient(mockTelemetry)
s.Require().NoError(err)
}

Expand Down
2 changes: 2 additions & 0 deletions protocol/messenger.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,8 @@ func NewMessenger(
c.wakuService.SetStatusTelemetryClient(telemetryClient)
}
telemetryClient.Start(ctx)

sender.WithTelemetryClient(telemetryClient)
}

messenger = &Messenger{
Expand Down
12 changes: 12 additions & 0 deletions protocol/messenger_builder_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package protocol

import (
"context"
"crypto/ecdsa"

"github.com/google/uuid"
Expand Down Expand Up @@ -52,6 +53,14 @@ func (tmc *testMessengerConfig) complete() error {
return nil
}

type mockTelemetryService struct{}

func (m *mockTelemetryService) PushRawMessageByType(ctx context.Context, msg struct {
MessageType string
Size uint32
}) {
}

func newTestMessenger(waku types.Waku, config testMessengerConfig) (*Messenger, error) {
err := config.complete()
if err != nil {
Expand Down Expand Up @@ -96,6 +105,9 @@ func newTestMessenger(waku types.Waku, config testMessengerConfig) (*Messenger,
"testVersion",
options...,
)

mockTelemetry := &mockTelemetryService{}
m.sender.WithTelemetryClient(mockTelemetry)
if err != nil {
return nil, err
}
Expand Down
9 changes: 9 additions & 0 deletions protocol/messenger_peersyncing.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,15 @@ func (m *Messenger) sendDataSync(receiver state.PeerID, payload *datasyncproto.P
}

m.logger.Debug("sent private messages", zap.Any("messageIDs", hexMessageIDs), zap.Strings("hashes", types.EncodeHexes(hashes)))
if m.telemetryClient != nil {
m.telemetryClient.PushRawMessageByType(ctx, struct {
MessageType string
Size uint32
}{
MessageType: "DATASYNC",
Size: uint32(len(marshalledPayload)),
})
}
m.transport.TrackMany(messageIDs, hashes, newMessages)

return nil
Expand Down
29 changes: 28 additions & 1 deletion telemetry/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ const (
MessageDeliveryConfirmedMetric TelemetryType = "MessageDeliveryConfirmed"
// Total number and size of Waku messages sent by this node
SentMessageTotalMetric TelemetryType = "SentMessageTotal"
// Size and type of raw message successfully returned by dispatchMessage
RawMessageByTypeMetric TelemetryType = "RawMessageByType"
)

const MaxRetryCache = 5000
Expand Down Expand Up @@ -151,6 +153,13 @@ func (c *Client) PushSentMessageTotal(ctx context.Context, messageSize uint32) {
c.processAndPushTelemetry(ctx, SentMessageTotal{Size: messageSize})
}

func (c *Client) PushRawMessageByType(ctx context.Context, msg struct {
MessageType string
Size uint32
}) {
c.processAndPushTelemetry(ctx, RawMessageByType{MessageType: msg.MessageType, Size: msg.Size})
}

type ReceivedMessages struct {
Filter transport.Filter
SSHMessage *types.Message
Expand Down Expand Up @@ -206,6 +215,11 @@ type SentMessageTotal struct {
Size uint32
}

type RawMessageByType struct {
MessageType string
Size uint32
}

type Client struct {
serverURL string
httpClient *http.Client
Expand Down Expand Up @@ -287,6 +301,7 @@ func (c *Client) Start(ctx context.Context) {
}
}
}()

go func() {
defer common.LogOnPanic()
sendPeriod := c.sendPeriod
Expand Down Expand Up @@ -317,7 +332,6 @@ func (c *Client) Start(ctx context.Context) {
return
}
}

}()
}

Expand Down Expand Up @@ -408,6 +422,12 @@ func (c *Client) processAndPushTelemetry(ctx context.Context, data interface{})
TelemetryType: SentMessageTotalMetric,
TelemetryData: c.ProcessSentMessageTotal(v),
}
case RawMessageByType:
telemetryRequest = TelemetryRequest{
Id: c.nextId,
TelemetryType: RawMessageByTypeMetric,
TelemetryData: c.ProcessRawMessageByType(v),
}
default:
c.logger.Error("Unknown telemetry data type")
return
Expand Down Expand Up @@ -589,6 +609,13 @@ func (c *Client) ProcessSentMessageTotal(sentMessageTotal SentMessageTotal) *jso
return c.marshalPostBody(postBody)
}

func (c *Client) ProcessRawMessageByType(rawMessageByType RawMessageByType) *json.RawMessage {
postBody := c.commonPostBody()
postBody["messageType"] = rawMessageByType.MessageType
postBody["size"] = rawMessageByType.Size
return c.marshalPostBody(postBody)
}

// Helper function to marshal post body and handle errors
func (c *Client) marshalPostBody(postBody map[string]interface{}) *json.RawMessage {
body, err := json.Marshal(postBody)
Expand Down
Loading

0 comments on commit ebced90

Please sign in to comment.