Skip to content

Commit

Permalink
Merge branch 'master' into linasn/bootstrap-readonly-ns
Browse files Browse the repository at this point in the history
* master:
  [m3msg] Remove unnecessary ConsumeHandler interface (#3918)
  [query] Prom converter supporting value decrease tolerance (#3914)
  • Loading branch information
Linas Naginionis committed Nov 11, 2021
2 parents d8f27d0 + 13e4c45 commit 4b27532
Show file tree
Hide file tree
Showing 14 changed files with 312 additions and 145 deletions.
73 changes: 31 additions & 42 deletions src/aggregator/server/m3msg/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@
package m3msg

import (
"errors"
"fmt"
"io"

"github.com/m3db/m3/src/aggregator/aggregator"
"github.com/m3db/m3/src/metrics/encoding"
Expand All @@ -35,11 +33,6 @@ import (
"go.uber.org/zap"
)

type server struct {
aggregator aggregator.Aggregator
logger *zap.Logger
}

// NewServer creates a new M3Msg server.
func NewServer(
address string,
Expand All @@ -49,49 +42,43 @@ func NewServer(
if err := opts.Validate(); err != nil {
return nil, err
}

s := &server{
aggregator: aggregator,
logger: opts.InstrumentOptions().Logger(),
newMessageProcessor := func() consumer.MessageProcessor {
// construct a new messageProcessor per consumer so the internal protos can be reused across messages on the
// same connection.
return &messageProcessor{
aggregator: aggregator,
logger: opts.InstrumentOptions().Logger(),
}
}

handler := consumer.NewConsumerHandler(s.Consume, opts.ConsumerOptions())
handler := consumer.NewMessageHandler(newMessageProcessor, opts.ConsumerOptions())
return xserver.NewServer(address, handler, opts.ServerOptions()), nil
}

func (s *server) Consume(c consumer.Consumer) {
var (
pb = &metricpb.MetricWithMetadatas{}
union = &encoding.UnaggregatedMessageUnion{}
)
for {
msg, err := c.Message()
if err != nil {
if !errors.Is(err, io.EOF) {
s.logger.Error("could not read message", zap.Error(err))
}
break
}
type messageProcessor struct {
pb metricpb.MetricWithMetadatas
union encoding.UnaggregatedMessageUnion
aggregator aggregator.Aggregator
logger *zap.Logger
}

// Reset and reuse the protobuf message for unpacking.
protobuf.ReuseMetricWithMetadatasProto(pb)
if err = s.handleMessage(pb, union, msg); err != nil {
s.logger.Error("could not process message",
zap.Error(err),
zap.Uint64("shard", msg.ShardID()),
zap.String("proto", pb.String()))
}
func (m *messageProcessor) Process(msg consumer.Message) {
if err := m.handleMessage(&m.pb, &m.union, msg); err != nil {
m.logger.Error("could not process message",
zap.Error(err),
zap.Uint64("shard", msg.ShardID()),
zap.String("proto", m.pb.String()))
}
c.Close()
}

func (s *server) handleMessage(
func (m *messageProcessor) handleMessage(
pb *metricpb.MetricWithMetadatas,
union *encoding.UnaggregatedMessageUnion,
msg consumer.Message,
) error {
defer msg.Ack()

// Reset and reuse the protobuf message for unpacking.
protobuf.ReuseMetricWithMetadatasProto(&m.pb)
// Unmarshal the message.
if err := pb.Unmarshal(msg.Bytes()); err != nil {
return err
Expand All @@ -104,46 +91,48 @@ func (s *server) handleMessage(
return err
}
u := union.CounterWithMetadatas.ToUnion()
return s.aggregator.AddUntimed(u, union.CounterWithMetadatas.StagedMetadatas)
return m.aggregator.AddUntimed(u, union.CounterWithMetadatas.StagedMetadatas)
case metricpb.MetricWithMetadatas_BATCH_TIMER_WITH_METADATAS:
err := union.BatchTimerWithMetadatas.FromProto(pb.BatchTimerWithMetadatas)
if err != nil {
return err
}
u := union.BatchTimerWithMetadatas.ToUnion()
return s.aggregator.AddUntimed(u, union.BatchTimerWithMetadatas.StagedMetadatas)
return m.aggregator.AddUntimed(u, union.BatchTimerWithMetadatas.StagedMetadatas)
case metricpb.MetricWithMetadatas_GAUGE_WITH_METADATAS:
err := union.GaugeWithMetadatas.FromProto(pb.GaugeWithMetadatas)
if err != nil {
return err
}
u := union.GaugeWithMetadatas.ToUnion()
return s.aggregator.AddUntimed(u, union.GaugeWithMetadatas.StagedMetadatas)
return m.aggregator.AddUntimed(u, union.GaugeWithMetadatas.StagedMetadatas)
case metricpb.MetricWithMetadatas_FORWARDED_METRIC_WITH_METADATA:
err := union.ForwardedMetricWithMetadata.FromProto(pb.ForwardedMetricWithMetadata)
if err != nil {
return err
}
return s.aggregator.AddForwarded(
return m.aggregator.AddForwarded(
union.ForwardedMetricWithMetadata.ForwardedMetric,
union.ForwardedMetricWithMetadata.ForwardMetadata)
case metricpb.MetricWithMetadatas_TIMED_METRIC_WITH_METADATA:
err := union.TimedMetricWithMetadata.FromProto(pb.TimedMetricWithMetadata)
if err != nil {
return err
}
return s.aggregator.AddTimed(
return m.aggregator.AddTimed(
union.TimedMetricWithMetadata.Metric,
union.TimedMetricWithMetadata.TimedMetadata)
case metricpb.MetricWithMetadatas_TIMED_METRIC_WITH_METADATAS:
err := union.TimedMetricWithMetadatas.FromProto(pb.TimedMetricWithMetadatas)
if err != nil {
return err
}
return s.aggregator.AddTimedWithStagedMetadatas(
return m.aggregator.AddTimedWithStagedMetadatas(
union.TimedMetricWithMetadatas.Metric,
union.TimedMetricWithMetadatas.StagedMetadatas)
default:
return fmt.Errorf("unrecognized message type: %v", pb.Type)
}
}

func (m *messageProcessor) Close() {}
2 changes: 1 addition & 1 deletion src/cmd/services/m3comparator/main/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ type seriesGen struct {
res time.Duration
}

// FetchCompressed fetches timeseries data based on a query.
// FetchCompressedResult fetches timeseries data based on a query.
func (q *querier) FetchCompressedResult(
ctx context.Context,
query *storage.FetchQuery,
Expand Down
2 changes: 1 addition & 1 deletion src/cmd/services/m3coordinator/server/m3msg/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (c handlerConfiguration) newHandler(
ProtobufDecoderPoolOptions: c.ProtobufDecoderPool.NewObjectPoolOptions(iOpts),
BlockholePolicies: c.BlackholePolicies,
})
return consumer.NewMessageHandler(p, cOpts), nil
return consumer.NewMessageHandler(consumer.SingletonMessageProcessor(p), cOpts), nil
}

// NewOptions creates handler options.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestM3MsgServerWithProtobufHandler(t *testing.T) {

s := server.NewServer(
"a",
consumer.NewMessageHandler(newProtobufProcessor(hOpts), opts),
consumer.NewMessageHandler(consumer.SingletonMessageProcessor(newProtobufProcessor(hOpts)), opts),
server.NewOptions(),
)
s.Serve(l)
Expand Down Expand Up @@ -150,7 +150,7 @@ func TestM3MsgServerWithProtobufHandler_Blackhole(t *testing.T) {

s := server.NewServer(
"a",
consumer.NewMessageHandler(newProtobufProcessor(hOpts), opts),
consumer.NewMessageHandler(consumer.SingletonMessageProcessor(newProtobufProcessor(hOpts)), opts),
server.NewOptions(),
)
s.Serve(l)
Expand Down
37 changes: 37 additions & 0 deletions src/cmd/services/m3query/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package config

import (
"errors"
"math"
"time"

etcdclient "github.com/m3db/m3/src/cluster/client/etcd"
Expand All @@ -43,6 +44,7 @@ import (
"github.com/m3db/m3/src/x/instrument"
xlog "github.com/m3db/m3/src/x/log"
"github.com/m3db/m3/src/x/opentracing"
xtime "github.com/m3db/m3/src/x/time"
)

// BackendStorageType is an enum for different backends.
Expand Down Expand Up @@ -355,6 +357,41 @@ type ConsolidationConfiguration struct {
type PrometheusQueryConfiguration struct {
// MaxSamplesPerQuery is the limit on fetched samples per query.
MaxSamplesPerQuery *int `yaml:"maxSamplesPerQuery"`

// Convert configures Prometheus time series conversions.
Convert *PrometheusConvertConfiguration `yaml:"convert"`
}

// ConvertOptionsOrDefault creates storage.PromConvertOptions based on the given configuration.
func (c PrometheusQueryConfiguration) ConvertOptionsOrDefault() storage.PromConvertOptions {
opts := storage.NewPromConvertOptions()
if v := c.Convert; v != nil {
opts = opts.SetValueDecreaseTolerance(v.ValueDecreaseTolerance)

// Default to max time so that it's always applicable if value
// decrease tolerance is non-zero.
toleranceUntil := xtime.UnixNano(math.MaxInt64)
if value := v.ValueDecreaseToleranceUntil; value != nil {
toleranceUntil = xtime.ToUnixNano(*value)
}
opts = opts.SetValueDecreaseToleranceUntil(toleranceUntil)
}

return opts
}

// PrometheusConvertConfiguration configures Prometheus time series conversions.
type PrometheusConvertConfiguration struct {
// ValueDecreaseTolerance allows for setting a specific amount of tolerance
// to avoid returning a decrease if it's below a certain tolerance.
// This is useful for applications that have precision issues emitting
// monotonic increasing data and will accidentally make it seem like the
// counter value decreases when it hasn't changed.
ValueDecreaseTolerance float64 `yaml:"valueDecreaseTolerance"`

// ValueDecreaseToleranceUntil allows for setting a time threshold on
// which to apply the conditional value decrease threshold.
ValueDecreaseToleranceUntil *time.Time `yaml:"valueDecreaseToleranceUntil"`
}

// MaxSamplesPerQueryOrDefault returns the max samples per query or default.
Expand Down
29 changes: 18 additions & 11 deletions src/msg/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (l *listener) Accept() (Consumer, error) {
return nil, err
}

return newConsumer(conn, l.msgPool, l.opts, l.m), nil
return newConsumer(conn, l.msgPool, l.opts, l.m, NewNoOpMessageProcessor), nil
}

type metrics struct {
Expand Down Expand Up @@ -97,18 +97,20 @@ type consumer struct {
w xio.ResettableWriter
conn net.Conn

ackPb msgpb.Ack
closed bool
doneCh chan struct{}
wg sync.WaitGroup
m metrics
ackPb msgpb.Ack
closed bool
doneCh chan struct{}
wg sync.WaitGroup
m metrics
messageProcessor MessageProcessor
}

func newConsumer(
conn net.Conn,
mPool *messagePool,
opts Options,
m metrics,
newMessageProcessorFn NewMessageProcessorFn,
) *consumer {
var (
wOpts = xio.ResettableWriterOptions{
Expand All @@ -126,11 +128,12 @@ func newConsumer(
decoder: proto.NewDecoder(
conn, opts.DecoderOptions(), opts.ConnectionReadBufferSize(),
),
w: writerFn(newConnWithTimeout(conn, opts.ConnectionWriteTimeout(), time.Now), wOpts),
conn: conn,
closed: false,
doneCh: make(chan struct{}),
m: m,
w: writerFn(newConnWithTimeout(conn, opts.ConnectionWriteTimeout(), time.Now), wOpts),
conn: conn,
closed: false,
doneCh: make(chan struct{}),
m: m,
messageProcessor: newMessageProcessorFn(),
}
}

Expand All @@ -141,6 +144,9 @@ func (c *consumer) Init() {
c.wg.Done()
}()
}
func (c *consumer) process(m Message) {
c.messageProcessor.Process(m)
}

func (c *consumer) Message() (Message, error) {
m := c.mPool.Get()
Expand Down Expand Up @@ -230,6 +236,7 @@ func (c *consumer) Close() {
close(c.doneCh)
c.wg.Wait()
c.conn.Close()
c.messageProcessor.Close()
}

type message struct {
Expand Down
51 changes: 12 additions & 39 deletions src/msg/consumer/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,54 +29,27 @@ import (
"go.uber.org/zap"
)

type consumerHandler struct {
opts Options
mPool *messagePool
consumeFn ConsumeFn
m metrics
}

// NewConsumerHandler creates a new server handler with consumerFn.
func NewConsumerHandler(consumeFn ConsumeFn, opts Options) server.Handler {
mPool := newMessagePool(opts.MessagePoolOptions())
mPool.Init()
return &consumerHandler{
consumeFn: consumeFn,
opts: opts,
mPool: mPool,
m: newConsumerMetrics(opts.InstrumentOptions().MetricsScope()),
}
}

func (h *consumerHandler) Handle(conn net.Conn) {
c := newConsumer(conn, h.mPool, h.opts, h.m)
c.Init()
h.consumeFn(c)
}

func (h *consumerHandler) Close() {}

type messageHandler struct {
opts Options
mPool *messagePool
mp MessageProcessor
m metrics
opts Options
mPool *messagePool
newMessageProcessorFn NewMessageProcessorFn
m metrics
}

// NewMessageHandler creates a new server handler with messageFn.
func NewMessageHandler(mp MessageProcessor, opts Options) server.Handler {
func NewMessageHandler(newMessageProcessorFn NewMessageProcessorFn, opts Options) server.Handler {
mPool := newMessagePool(opts.MessagePoolOptions())
mPool.Init()
return &messageHandler{
mp: mp,
opts: opts,
mPool: mPool,
m: newConsumerMetrics(opts.InstrumentOptions().MetricsScope()),
newMessageProcessorFn: newMessageProcessorFn,
opts: opts,
mPool: mPool,
m: newConsumerMetrics(opts.InstrumentOptions().MetricsScope()),
}
}

func (h *messageHandler) Handle(conn net.Conn) {
c := newConsumer(conn, h.mPool, h.opts, h.m)
c := newConsumer(conn, h.mPool, h.opts, h.m, h.newMessageProcessorFn)
c.Init()
var (
msgErr error
Expand All @@ -87,12 +60,12 @@ func (h *messageHandler) Handle(conn net.Conn) {
if msgErr != nil {
break
}
h.mp.Process(msg)
c.process(msg)
}
if msgErr != nil && msgErr != io.EOF {
h.opts.InstrumentOptions().Logger().With(zap.Error(msgErr)).Error("could not read message from consumer")
}
c.Close()
}

func (h *messageHandler) Close() { h.mp.Close() }
func (h *messageHandler) Close() {}
Loading

0 comments on commit 4b27532

Please sign in to comment.