Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add total bytes for m3msg client #4302

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 32 additions & 22 deletions src/aggregator/client/m3msg_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ func (c *M3MsgClient) WriteUntimedCounter(
metadatas: metadatas,
},
}
err := c.write(counter.ID, payload)
size, err := c.write(counter.ID, payload)
c.metrics.totalBytesSent.Inc(int64(size))
c.metrics.writeUntimedCounter.ReportSuccessOrError(err, c.nowFn().Sub(callStart))
return err
}
Expand All @@ -129,7 +130,8 @@ func (c *M3MsgClient) WriteUntimedBatchTimer(
metadatas: metadatas,
},
}
err := c.write(batchTimer.ID, payload)
size, err := c.write(batchTimer.ID, payload)
c.metrics.totalBytesSent.Inc(int64(size))
c.metrics.writeUntimedBatchTimer.ReportSuccessOrError(err, c.nowFn().Sub(callStart))
return err
}
Expand All @@ -147,7 +149,8 @@ func (c *M3MsgClient) WriteUntimedGauge(
metadatas: metadatas,
},
}
err := c.write(gauge.ID, payload)
size, err := c.write(gauge.ID, payload)
c.metrics.totalBytesSent.Inc(int64(size))
c.metrics.writeUntimedGauge.ReportSuccessOrError(err, c.nowFn().Sub(callStart))
return err
}
Expand All @@ -165,7 +168,8 @@ func (c *M3MsgClient) WriteTimed(
metadata: metadata,
},
}
err := c.write(metric.ID, payload)
size, err := c.write(metric.ID, payload)
c.metrics.totalBytesSent.Inc(int64(size))
c.metrics.writeForwarded.ReportSuccessOrError(err, c.nowFn().Sub(callStart))
return err
}
Expand All @@ -183,7 +187,8 @@ func (c *M3MsgClient) WritePassthrough(
storagePolicy: storagePolicy,
},
}
err := c.write(metric.ID, payload)
size, err := c.write(metric.ID, payload)
c.metrics.totalBytesSent.Inc(int64(size))
c.metrics.writePassthrough.ReportSuccessOrError(err, c.nowFn().Sub(callStart))
return err
}
Expand All @@ -201,7 +206,8 @@ func (c *M3MsgClient) WriteTimedWithStagedMetadatas(
metadatas: metadatas,
},
}
err := c.write(metric.ID, payload)
size, err := c.write(metric.ID, payload)
c.metrics.totalBytesSent.Inc(int64(size))
c.metrics.writeForwarded.ReportSuccessOrError(err, c.nowFn().Sub(callStart))
return err
}
Expand All @@ -219,27 +225,29 @@ func (c *M3MsgClient) WriteForwarded(
metadata: metadata,
},
}
err := c.write(metric.ID, payload)
size, err := c.write(metric.ID, payload)
c.metrics.totalBytesSent.Inc(int64(size))
c.metrics.writeForwarded.ReportSuccessOrError(err, c.nowFn().Sub(callStart))
return err
}

//nolint:gocritic
func (c *M3MsgClient) write(metricID id.RawID, payload payloadUnion) error {
func (c *M3MsgClient) write(metricID id.RawID, payload payloadUnion) (int, error) {
shard := c.shardFn(metricID, c.m3msg.numShards)

msg := c.m3msg.messagePool.Get()
if err := msg.Encode(shard, payload); err != nil {
size, err := msg.Encode(shard, payload)
if err != nil {
msg.Finalize(producer.Dropped)
return err
return 0, err
}

if err := c.m3msg.producer.Produce(msg); err != nil {
msg.Finalize(producer.Dropped)
return err
return 0, err
}

return nil
return size, nil
}

// Flush satisfies Client interface, as M3Msg client does not need explicit flushing.
Expand All @@ -259,6 +267,7 @@ type m3msgClientMetrics struct {
writeUntimedGauge instrument.MethodMetrics
writePassthrough instrument.MethodMetrics
writeForwarded instrument.MethodMetrics
totalBytesSent tally.Counter
}

func newM3msgClientMetrics(
Expand All @@ -271,6 +280,7 @@ func newM3msgClientMetrics(
writeUntimedGauge: instrument.NewMethodMetrics(scope, "writeUntimedGauge", opts),
writePassthrough: instrument.NewMethodMetrics(scope, "writePassthrough", opts),
writeForwarded: instrument.NewMethodMetrics(scope, "writeForwarded", opts),
totalBytesSent: scope.Counter("total-bytes-sent"),
}
}

Expand Down Expand Up @@ -324,7 +334,7 @@ func newMessage(pool *messagePool) *message {
func (m *message) Encode(
shard uint32,
payload payloadUnion,
) error {
) (int, error) {
m.shard = shard

switch payload.payloadType {
Expand All @@ -336,7 +346,7 @@ func (m *message) Encode(
StagedMetadatas: payload.untimed.metadatas,
}
if err := value.ToProto(&m.cm); err != nil {
return err
return 0, err
}

m.metric = metricpb.MetricWithMetadatas{
Expand All @@ -349,7 +359,7 @@ func (m *message) Encode(
StagedMetadatas: payload.untimed.metadatas,
}
if err := value.ToProto(&m.bm); err != nil {
return err
return 0, err
}

m.metric = metricpb.MetricWithMetadatas{
Expand All @@ -362,15 +372,15 @@ func (m *message) Encode(
StagedMetadatas: payload.untimed.metadatas,
}
if err := value.ToProto(&m.gm); err != nil {
return err
return 0, err
}

m.metric = metricpb.MetricWithMetadatas{
Type: metricpb.MetricWithMetadatas_GAUGE_WITH_METADATAS,
GaugeWithMetadatas: &m.gm,
}
default:
return fmt.Errorf("unrecognized metric type: %v",
return 0, fmt.Errorf("unrecognized metric type: %v",
payload.untimed.metric.Type)
}
case forwardedType:
Expand All @@ -379,7 +389,7 @@ func (m *message) Encode(
ForwardMetadata: payload.forwarded.metadata,
}
if err := value.ToProto(&m.fm); err != nil {
return err
return 0, err
}

m.metric = metricpb.MetricWithMetadatas{
Expand All @@ -392,7 +402,7 @@ func (m *message) Encode(
TimedMetadata: payload.timed.metadata,
}
if err := value.ToProto(&m.tm); err != nil {
return err
return 0, err
}

m.metric = metricpb.MetricWithMetadatas{
Expand All @@ -405,15 +415,15 @@ func (m *message) Encode(
StagedMetadatas: payload.timedWithStagedMetadatas.metadatas,
}
if err := value.ToProto(&m.tms); err != nil {
return err
return 0, err
}

m.metric = metricpb.MetricWithMetadatas{
Type: metricpb.MetricWithMetadatas_TIMED_METRIC_WITH_METADATAS,
TimedMetricWithMetadatas: &m.tms,
}
default:
return fmt.Errorf("unrecognized payload type: %v",
return 0, fmt.Errorf("unrecognized payload type: %v",
payload.payloadType)
}

Expand All @@ -427,7 +437,7 @@ func (m *message) Encode(
m.buf = m.buf[:size]

_, err := m.metric.MarshalTo(m.buf)
return err
return size, err
}

func (m *message) Shard() uint32 {
Expand Down
46 changes: 46 additions & 0 deletions src/aggregator/client/m3msg_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,17 @@ package client

import (
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"github.com/uber-go/tally"

"github.com/m3db/m3/src/metrics/metadata"
"github.com/m3db/m3/src/metrics/metric/id"
"github.com/m3db/m3/src/metrics/metric/unaggregated"
"github.com/m3db/m3/src/msg/producer"
"github.com/m3db/m3/src/x/instrument"
)

func TestNewM3MsgClient(t *testing.T) {
Expand All @@ -42,3 +48,43 @@ func TestNewM3MsgClient(t *testing.T) {
assert.NotNil(t, c)
assert.NoError(t, err)
}

func TestTotalBytesAdded(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

// Mock dependencies
p := producer.NewMockProducer(ctrl)
p.EXPECT().Init()
p.EXPECT().NumShards().Return(uint32(1))
p.EXPECT().Produce(gomock.Any()).Return(nil).AnyTimes()

opts := NewM3MsgOptions().SetProducer(p)
client, err := NewM3MsgClient(NewOptions().SetM3MsgOptions(opts))
assert.NoError(t, err)

// Mock metric and metadata
counter := unaggregated.Counter{
ID: id.RawID("testCounter"),
Value: 123,
}
metadatas := metadata.StagedMetadatas{}

// Mock time function
now := time.Now()
client.(*M3MsgClient).nowFn = func() time.Time { return now }

testScope := tally.NewTestScope("", make(map[string]string))
// Mock metrics
client.(*M3MsgClient).metrics = m3msgClientMetrics{
writeUntimedCounter: instrument.NewMethodMetrics(testScope, "writeUntimedCounter", instrument.TimerOptions{}),
totalBytesSent: testScope.Counter("total-bytes-sent"),
}

// Call the method
err = client.WriteUntimedCounter(counter, metadatas)
assert.NoError(t, err)

// Verify the total bytes added
assert.Equal(t, int64(23), testScope.Snapshot().Counters()["total-bytes-sent+"].Value())
}