Skip to content

Commit

Permalink
[m3msg] Remove unnecessary ConsumeHandler interface (#3918)
Browse files Browse the repository at this point in the history
The ConsumerHandler is redundant and unnecessary. Use the
MessageProcessor interface for the aggregator so it's consistent with
the coordinator.

This will make it easier to add some base metrics to both consumers in
the future.
  • Loading branch information
ryanhall07 authored Nov 10, 2021
1 parent e37f346 commit 13e4c45
Show file tree
Hide file tree
Showing 7 changed files with 124 additions and 132 deletions.
73 changes: 31 additions & 42 deletions src/aggregator/server/m3msg/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@
package m3msg

import (
"errors"
"fmt"
"io"

"github.com/m3db/m3/src/aggregator/aggregator"
"github.com/m3db/m3/src/metrics/encoding"
Expand All @@ -35,11 +33,6 @@ import (
"go.uber.org/zap"
)

type server struct {
aggregator aggregator.Aggregator
logger *zap.Logger
}

// NewServer creates a new M3Msg server.
func NewServer(
address string,
Expand All @@ -49,49 +42,43 @@ func NewServer(
if err := opts.Validate(); err != nil {
return nil, err
}

s := &server{
aggregator: aggregator,
logger: opts.InstrumentOptions().Logger(),
newMessageProcessor := func() consumer.MessageProcessor {
// construct a new messageProcessor per consumer so the internal protos can be reused across messages on the
// same connection.
return &messageProcessor{
aggregator: aggregator,
logger: opts.InstrumentOptions().Logger(),
}
}

handler := consumer.NewConsumerHandler(s.Consume, opts.ConsumerOptions())
handler := consumer.NewMessageHandler(newMessageProcessor, opts.ConsumerOptions())
return xserver.NewServer(address, handler, opts.ServerOptions()), nil
}

func (s *server) Consume(c consumer.Consumer) {
var (
pb = &metricpb.MetricWithMetadatas{}
union = &encoding.UnaggregatedMessageUnion{}
)
for {
msg, err := c.Message()
if err != nil {
if !errors.Is(err, io.EOF) {
s.logger.Error("could not read message", zap.Error(err))
}
break
}
type messageProcessor struct {
pb metricpb.MetricWithMetadatas
union encoding.UnaggregatedMessageUnion
aggregator aggregator.Aggregator
logger *zap.Logger
}

// Reset and reuse the protobuf message for unpacking.
protobuf.ReuseMetricWithMetadatasProto(pb)
if err = s.handleMessage(pb, union, msg); err != nil {
s.logger.Error("could not process message",
zap.Error(err),
zap.Uint64("shard", msg.ShardID()),
zap.String("proto", pb.String()))
}
func (m *messageProcessor) Process(msg consumer.Message) {
if err := m.handleMessage(&m.pb, &m.union, msg); err != nil {
m.logger.Error("could not process message",
zap.Error(err),
zap.Uint64("shard", msg.ShardID()),
zap.String("proto", m.pb.String()))
}
c.Close()
}

func (s *server) handleMessage(
func (m *messageProcessor) handleMessage(
pb *metricpb.MetricWithMetadatas,
union *encoding.UnaggregatedMessageUnion,
msg consumer.Message,
) error {
defer msg.Ack()

// Reset and reuse the protobuf message for unpacking.
protobuf.ReuseMetricWithMetadatasProto(&m.pb)
// Unmarshal the message.
if err := pb.Unmarshal(msg.Bytes()); err != nil {
return err
Expand All @@ -104,46 +91,48 @@ func (s *server) handleMessage(
return err
}
u := union.CounterWithMetadatas.ToUnion()
return s.aggregator.AddUntimed(u, union.CounterWithMetadatas.StagedMetadatas)
return m.aggregator.AddUntimed(u, union.CounterWithMetadatas.StagedMetadatas)
case metricpb.MetricWithMetadatas_BATCH_TIMER_WITH_METADATAS:
err := union.BatchTimerWithMetadatas.FromProto(pb.BatchTimerWithMetadatas)
if err != nil {
return err
}
u := union.BatchTimerWithMetadatas.ToUnion()
return s.aggregator.AddUntimed(u, union.BatchTimerWithMetadatas.StagedMetadatas)
return m.aggregator.AddUntimed(u, union.BatchTimerWithMetadatas.StagedMetadatas)
case metricpb.MetricWithMetadatas_GAUGE_WITH_METADATAS:
err := union.GaugeWithMetadatas.FromProto(pb.GaugeWithMetadatas)
if err != nil {
return err
}
u := union.GaugeWithMetadatas.ToUnion()
return s.aggregator.AddUntimed(u, union.GaugeWithMetadatas.StagedMetadatas)
return m.aggregator.AddUntimed(u, union.GaugeWithMetadatas.StagedMetadatas)
case metricpb.MetricWithMetadatas_FORWARDED_METRIC_WITH_METADATA:
err := union.ForwardedMetricWithMetadata.FromProto(pb.ForwardedMetricWithMetadata)
if err != nil {
return err
}
return s.aggregator.AddForwarded(
return m.aggregator.AddForwarded(
union.ForwardedMetricWithMetadata.ForwardedMetric,
union.ForwardedMetricWithMetadata.ForwardMetadata)
case metricpb.MetricWithMetadatas_TIMED_METRIC_WITH_METADATA:
err := union.TimedMetricWithMetadata.FromProto(pb.TimedMetricWithMetadata)
if err != nil {
return err
}
return s.aggregator.AddTimed(
return m.aggregator.AddTimed(
union.TimedMetricWithMetadata.Metric,
union.TimedMetricWithMetadata.TimedMetadata)
case metricpb.MetricWithMetadatas_TIMED_METRIC_WITH_METADATAS:
err := union.TimedMetricWithMetadatas.FromProto(pb.TimedMetricWithMetadatas)
if err != nil {
return err
}
return s.aggregator.AddTimedWithStagedMetadatas(
return m.aggregator.AddTimedWithStagedMetadatas(
union.TimedMetricWithMetadatas.Metric,
union.TimedMetricWithMetadatas.StagedMetadatas)
default:
return fmt.Errorf("unrecognized message type: %v", pb.Type)
}
}

func (m *messageProcessor) Close() {}
2 changes: 1 addition & 1 deletion src/cmd/services/m3coordinator/server/m3msg/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (c handlerConfiguration) newHandler(
ProtobufDecoderPoolOptions: c.ProtobufDecoderPool.NewObjectPoolOptions(iOpts),
BlockholePolicies: c.BlackholePolicies,
})
return consumer.NewMessageHandler(p, cOpts), nil
return consumer.NewMessageHandler(consumer.SingletonMessageProcessor(p), cOpts), nil
}

// NewOptions creates handler options.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestM3MsgServerWithProtobufHandler(t *testing.T) {

s := server.NewServer(
"a",
consumer.NewMessageHandler(newProtobufProcessor(hOpts), opts),
consumer.NewMessageHandler(consumer.SingletonMessageProcessor(newProtobufProcessor(hOpts)), opts),
server.NewOptions(),
)
s.Serve(l)
Expand Down Expand Up @@ -150,7 +150,7 @@ func TestM3MsgServerWithProtobufHandler_Blackhole(t *testing.T) {

s := server.NewServer(
"a",
consumer.NewMessageHandler(newProtobufProcessor(hOpts), opts),
consumer.NewMessageHandler(consumer.SingletonMessageProcessor(newProtobufProcessor(hOpts)), opts),
server.NewOptions(),
)
s.Serve(l)
Expand Down
29 changes: 18 additions & 11 deletions src/msg/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (l *listener) Accept() (Consumer, error) {
return nil, err
}

return newConsumer(conn, l.msgPool, l.opts, l.m), nil
return newConsumer(conn, l.msgPool, l.opts, l.m, NewNoOpMessageProcessor), nil
}

type metrics struct {
Expand Down Expand Up @@ -97,18 +97,20 @@ type consumer struct {
w xio.ResettableWriter
conn net.Conn

ackPb msgpb.Ack
closed bool
doneCh chan struct{}
wg sync.WaitGroup
m metrics
ackPb msgpb.Ack
closed bool
doneCh chan struct{}
wg sync.WaitGroup
m metrics
messageProcessor MessageProcessor
}

func newConsumer(
conn net.Conn,
mPool *messagePool,
opts Options,
m metrics,
newMessageProcessorFn NewMessageProcessorFn,
) *consumer {
var (
wOpts = xio.ResettableWriterOptions{
Expand All @@ -126,11 +128,12 @@ func newConsumer(
decoder: proto.NewDecoder(
conn, opts.DecoderOptions(), opts.ConnectionReadBufferSize(),
),
w: writerFn(newConnWithTimeout(conn, opts.ConnectionWriteTimeout(), time.Now), wOpts),
conn: conn,
closed: false,
doneCh: make(chan struct{}),
m: m,
w: writerFn(newConnWithTimeout(conn, opts.ConnectionWriteTimeout(), time.Now), wOpts),
conn: conn,
closed: false,
doneCh: make(chan struct{}),
m: m,
messageProcessor: newMessageProcessorFn(),
}
}

Expand All @@ -141,6 +144,9 @@ func (c *consumer) Init() {
c.wg.Done()
}()
}
func (c *consumer) process(m Message) {
c.messageProcessor.Process(m)
}

func (c *consumer) Message() (Message, error) {
m := c.mPool.Get()
Expand Down Expand Up @@ -230,6 +236,7 @@ func (c *consumer) Close() {
close(c.doneCh)
c.wg.Wait()
c.conn.Close()
c.messageProcessor.Close()
}

type message struct {
Expand Down
51 changes: 12 additions & 39 deletions src/msg/consumer/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,54 +29,27 @@ import (
"go.uber.org/zap"
)

type consumerHandler struct {
opts Options
mPool *messagePool
consumeFn ConsumeFn
m metrics
}

// NewConsumerHandler creates a new server handler with consumerFn.
func NewConsumerHandler(consumeFn ConsumeFn, opts Options) server.Handler {
mPool := newMessagePool(opts.MessagePoolOptions())
mPool.Init()
return &consumerHandler{
consumeFn: consumeFn,
opts: opts,
mPool: mPool,
m: newConsumerMetrics(opts.InstrumentOptions().MetricsScope()),
}
}

func (h *consumerHandler) Handle(conn net.Conn) {
c := newConsumer(conn, h.mPool, h.opts, h.m)
c.Init()
h.consumeFn(c)
}

func (h *consumerHandler) Close() {}

type messageHandler struct {
opts Options
mPool *messagePool
mp MessageProcessor
m metrics
opts Options
mPool *messagePool
newMessageProcessorFn NewMessageProcessorFn
m metrics
}

// NewMessageHandler creates a new server handler with messageFn.
func NewMessageHandler(mp MessageProcessor, opts Options) server.Handler {
func NewMessageHandler(newMessageProcessorFn NewMessageProcessorFn, opts Options) server.Handler {
mPool := newMessagePool(opts.MessagePoolOptions())
mPool.Init()
return &messageHandler{
mp: mp,
opts: opts,
mPool: mPool,
m: newConsumerMetrics(opts.InstrumentOptions().MetricsScope()),
newMessageProcessorFn: newMessageProcessorFn,
opts: opts,
mPool: mPool,
m: newConsumerMetrics(opts.InstrumentOptions().MetricsScope()),
}
}

func (h *messageHandler) Handle(conn net.Conn) {
c := newConsumer(conn, h.mPool, h.opts, h.m)
c := newConsumer(conn, h.mPool, h.opts, h.m, h.newMessageProcessorFn)
c.Init()
var (
msgErr error
Expand All @@ -87,12 +60,12 @@ func (h *messageHandler) Handle(conn net.Conn) {
if msgErr != nil {
break
}
h.mp.Process(msg)
c.process(msg)
}
if msgErr != nil && msgErr != io.EOF {
h.opts.InstrumentOptions().Logger().With(zap.Error(msgErr)).Error("could not read message from consumer")
}
c.Close()
}

func (h *messageHandler) Close() { h.mp.Close() }
func (h *messageHandler) Close() {}
Loading

0 comments on commit 13e4c45

Please sign in to comment.