Skip to content

Commit

Permalink
feat: metrics collection for replications remote writes (#22952)
Browse files Browse the repository at this point in the history
* feat: metrics collection for replications remote writes

* fix: don't update metrics with 204 error code on successful writes
  • Loading branch information
williamhbaker authored Dec 1, 2021
1 parent 9060150 commit f05d013
Show file tree
Hide file tree
Showing 5 changed files with 164 additions and 22 deletions.
4 changes: 2 additions & 2 deletions replications/internal/queue_management.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func (rq *replicationQueue) SendWrite() bool {

// Update metrics after the call to scan.Advance()
defer func() {
rq.metrics.Dequeue(rq.id, rq.queue.DiskUsage())
rq.metrics.Dequeue(rq.id, rq.queue.TotalBytes())
}()

if _, err = scan.Advance(); err != nil {
Expand Down Expand Up @@ -361,7 +361,7 @@ func (qm *durableQueueManager) EnqueueData(replicationID platform.ID, data []byt
return err
}
// Update metrics for this replication queue when adding data to the queue.
qm.metrics.EnqueueData(replicationID, len(data), numPoints, rq.queue.DiskUsage())
qm.metrics.EnqueueData(replicationID, len(data), numPoints, rq.queue.TotalBytes())

qm.replicationQueues[replicationID].receive <- struct{}{}

Expand Down
19 changes: 7 additions & 12 deletions replications/internal/queue_management_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,13 +402,14 @@ func TestEnqueueData_WithMetrics(t *testing.T) {
reg := prom.NewRegistry(zaptest.NewLogger(t))
reg.MustRegister(qm.metrics.PrometheusCollectors()...)

data := []byte("some fake data")
data := "some fake data"
numPointsPerData := 3
numDataToAdd := 4
rq.remoteWriter = getTestRemoteWriter(t, data, nil)

for i := 1; i <= numDataToAdd; i++ {
go func() { <-rq.receive }() // absorb the receive to avoid testcase deadlock
require.NoError(t, qm.EnqueueData(id1, data, numPointsPerData))
require.NoError(t, qm.EnqueueData(id1, []byte(data), numPointsPerData))

pointCount := getPromMetric(t, "replications_queue_total_points_queued", reg)
require.Equal(t, i*numPointsPerData, int(pointCount.Counter.GetValue()))
Expand All @@ -417,20 +418,14 @@ func TestEnqueueData_WithMetrics(t *testing.T) {
require.Equal(t, i*len(data), int(totalBytesQueued.Counter.GetValue()))

currentBytesQueued := getPromMetric(t, "replications_queue_current_bytes_queued", reg)
// 8 bytes for an empty queue; 8 extra bytes for each byte slice appended to the queue
require.Equal(t, 8+i*(8+len(data)), int(currentBytesQueued.Gauge.GetValue()))
// 8 extra bytes for each byte slice appended to the queue
require.Equal(t, i*(8+len(data)), int(currentBytesQueued.Gauge.GetValue()))
}

// Reduce the max segment size so that a new segment is created & the next call to SendWrite causes the first
// segment to be dropped and the queue size on disk to be lower than before when the queue head is advanced.
require.NoError(t, rq.queue.SetMaxSegmentSize(8))

queueSizeBefore := rq.queue.DiskUsage()
// Queue size should be 0 after SendWrite completes
rq.SendWrite()

// Ensure that the smaller queue disk size was reflected in the metrics.
currentBytesQueued := getPromMetric(t, "replications_queue_current_bytes_queued", reg)
require.Less(t, int64(currentBytesQueued.Gauge.GetValue()), queueSizeBefore)
require.Equal(t, float64(0), currentBytesQueued.Gauge.GetValue())
}

func getPromMetric(t *testing.T, name string, reg *prom.Registry) *dto.Metric {
Expand Down
55 changes: 48 additions & 7 deletions replications/metrics/replications_metrics.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
package metrics

import (
"strconv"

"github.com/influxdata/influxdb/v2/kit/platform"
"github.com/prometheus/client_golang/prometheus"
)

type ReplicationsMetrics struct {
TotalPointsQueued *prometheus.CounterVec
TotalBytesQueued *prometheus.CounterVec
CurrentBytesQueued *prometheus.GaugeVec
TotalPointsQueued *prometheus.CounterVec
TotalBytesQueued *prometheus.CounterVec
CurrentBytesQueued *prometheus.GaugeVec
RemoteWriteErrors *prometheus.CounterVec
RemoteWriteBytesSent *prometheus.CounterVec
RemoteWriteBytesDropped *prometheus.CounterVec
}

func NewReplicationsMetrics() *ReplicationsMetrics {
Expand All @@ -34,6 +39,24 @@ func NewReplicationsMetrics() *ReplicationsMetrics {
Name: "current_bytes_queued",
Help: "Current number of bytes in the replication stream queue",
}, []string{"replicationID"}),
RemoteWriteErrors: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "remote_write_errors",
Help: "Error codes returned from attempted remote writes",
}, []string{"replicationID", "code"}),
RemoteWriteBytesSent: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "remote_write_bytes_sent",
Help: "Bytes of data successfully sent by the replication stream",
}, []string{"replicationID"}),
RemoteWriteBytesDropped: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "remote_write_bytes_dropped",
Help: "Bytes of data dropped by the replication stream",
}, []string{"replicationID"}),
}
}

Expand All @@ -43,17 +66,35 @@ func (rm *ReplicationsMetrics) PrometheusCollectors() []prometheus.Collector {
rm.TotalPointsQueued,
rm.TotalBytesQueued,
rm.CurrentBytesQueued,
rm.RemoteWriteErrors,
rm.RemoteWriteBytesSent,
rm.RemoteWriteBytesDropped,
}
}

// EnqueueData updates the metrics when adding new data to a replication queue.
func (rm *ReplicationsMetrics) EnqueueData(replicationID platform.ID, numBytes, numPoints int, queueSizeOnDisk int64) {
func (rm *ReplicationsMetrics) EnqueueData(replicationID platform.ID, numBytes, numPoints int, queueSize int64) {
rm.TotalPointsQueued.WithLabelValues(replicationID.String()).Add(float64(numPoints))
rm.TotalBytesQueued.WithLabelValues(replicationID.String()).Add(float64(numBytes))
rm.CurrentBytesQueued.WithLabelValues(replicationID.String()).Set(float64(queueSizeOnDisk))
rm.CurrentBytesQueued.WithLabelValues(replicationID.String()).Set(float64(queueSize))
}

// Dequeue updates the metrics when data has been removed from the queue.
func (rm *ReplicationsMetrics) Dequeue(replicationID platform.ID, queueSizeOnDisk int64) {
rm.CurrentBytesQueued.WithLabelValues(replicationID.String()).Set(float64(queueSizeOnDisk))
func (rm *ReplicationsMetrics) Dequeue(replicationID platform.ID, queueSize int64) {
rm.CurrentBytesQueued.WithLabelValues(replicationID.String()).Set(float64(queueSize))
}

// RemoteWriteError increments the error code counter for the replication.
func (rm *ReplicationsMetrics) RemoteWriteError(replicationID platform.ID, errorCode int) {
rm.RemoteWriteErrors.WithLabelValues(replicationID.String(), strconv.Itoa(errorCode)).Inc()
}

// RemoteWriteSent increases the total count of bytes sent following a successful remote write
func (rm *ReplicationsMetrics) RemoteWriteSent(replicationID platform.ID, bytes int) {
rm.RemoteWriteBytesSent.WithLabelValues(replicationID.String()).Add(float64(bytes))
}

// RemoteWriteDropped increases the total count of bytes dropped when data is dropped
func (rm *ReplicationsMetrics) RemoteWriteDropped(replicationID platform.ID, bytes int) {
rm.RemoteWriteBytesDropped.WithLabelValues(replicationID.String()).Add(float64(bytes))
}
10 changes: 9 additions & 1 deletion replications/remotewrite/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,23 +106,30 @@ func (w *writer) Write(ctx context.Context, data []byte) error {
return err
}

// Update metrics and most recent error diagnostic information.
if err := w.configStore.UpdateResponseInfo(ctx, w.replicationID, res.StatusCode, msg); err != nil {
return err
}

if err == nil {
// Successful write
w.metrics.RemoteWriteSent(w.replicationID, len(data))
w.logger.Debug("remote write successful", zap.Int("attempt", attempts), zap.Int("bytes", len(data)))
return nil
}

w.metrics.RemoteWriteError(w.replicationID, res.StatusCode)
w.logger.Debug("remote write error", zap.Int("attempt", attempts), zap.String("error message", "msg"), zap.Int("status code", res.StatusCode))

attempts++
var waitTime time.Duration
hasSetWaitTime := false

switch res.StatusCode {
case http.StatusBadRequest:
if conf.DropNonRetryableData {
w.logger.Debug(fmt.Sprintf("dropped %d bytes of data due to %d response from server", len(data), http.StatusBadRequest))
w.logger.Debug("dropped data", zap.Int("bytes", len(data)))
w.metrics.RemoteWriteDropped(w.replicationID, len(data))
return nil
}
case http.StatusTooManyRequests:
Expand All @@ -136,6 +143,7 @@ func (w *writer) Write(ctx context.Context, data []byte) error {
if !hasSetWaitTime {
waitTime = w.backoff(attempts)
}
w.logger.Debug("waiting to retry", zap.Duration("wait time", waitTime))

select {
case <-w.waitFunc(waitTime):
Expand Down
98 changes: 98 additions & 0 deletions replications/remotewrite/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"github.com/golang/mock/gomock"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kit/platform"
"github.com/influxdata/influxdb/v2/kit/prom"
"github.com/influxdata/influxdb/v2/kit/prom/promtest"
"github.com/influxdata/influxdb/v2/replications/metrics"
replicationsMock "github.com/influxdata/influxdb/v2/replications/mock"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -244,6 +246,102 @@ func TestWrite(t *testing.T) {
})
}

func TestWrite_Metrics(t *testing.T) {
maximumAttemptsBeforeErr := 5
testData := []byte("this is some data")

tests := []struct {
name string
status int
data []byte
wantWriteErr error
registerExpectations func(*testing.T, *replicationsMock.MockHttpConfigStore, *influxdb.ReplicationHTTPConfig)
checkMetrics func(*testing.T, *prom.Registry)
}{
{
name: "server errors",
status: http.StatusTeapot,
data: []byte{},
wantWriteErr: errors.New("maximum number of attempts exceeded"),
registerExpectations: func(t *testing.T, store *replicationsMock.MockHttpConfigStore, conf *influxdb.ReplicationHTTPConfig) {
store.EXPECT().GetFullHTTPConfig(gomock.Any(), testID).Return(conf, nil).Times(5)
store.EXPECT().UpdateResponseInfo(gomock.Any(), testID, http.StatusTeapot, invalidResponseCode(http.StatusTeapot).Error()).Return(nil).Times(5)
},
checkMetrics: func(t *testing.T, reg *prom.Registry) {
mfs := promtest.MustGather(t, reg)

errorCodes := promtest.FindMetric(mfs, "replications_queue_remote_write_errors", map[string]string{
"replicationID": testID.String(),
"code": strconv.Itoa(http.StatusTeapot),
})
require.NotNil(t, errorCodes)
require.Equal(t, float64(maximumAttemptsBeforeErr), errorCodes.Counter.GetValue())
},
},
{
name: "successful write",
status: http.StatusNoContent,
data: testData,
wantWriteErr: nil,
registerExpectations: func(t *testing.T, store *replicationsMock.MockHttpConfigStore, conf *influxdb.ReplicationHTTPConfig) {
store.EXPECT().GetFullHTTPConfig(gomock.Any(), testID).Return(conf, nil)
store.EXPECT().UpdateResponseInfo(gomock.Any(), testID, http.StatusNoContent, "").Return(nil)
},
checkMetrics: func(t *testing.T, reg *prom.Registry) {
mfs := promtest.MustGather(t, reg)

bytesSent := promtest.FindMetric(mfs, "replications_queue_remote_write_bytes_sent", map[string]string{
"replicationID": testID.String(),
})
require.NotNil(t, bytesSent)
require.Equal(t, float64(len(testData)), bytesSent.Counter.GetValue())
},
},
{
name: "dropped data",
status: http.StatusBadRequest,
data: testData,
wantWriteErr: nil,
registerExpectations: func(t *testing.T, store *replicationsMock.MockHttpConfigStore, conf *influxdb.ReplicationHTTPConfig) {
store.EXPECT().GetFullHTTPConfig(gomock.Any(), testID).Return(conf, nil)
store.EXPECT().UpdateResponseInfo(gomock.Any(), testID, http.StatusBadRequest, invalidResponseCode(http.StatusBadRequest).Error()).Return(nil)
},
checkMetrics: func(t *testing.T, reg *prom.Registry) {
mfs := promtest.MustGather(t, reg)

bytesDropped := promtest.FindMetric(mfs, "replications_queue_remote_write_bytes_dropped", map[string]string{
"replicationID": testID.String(),
})
require.NotNil(t, bytesDropped)
require.Equal(t, float64(len(testData)), bytesDropped.Counter.GetValue())
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
svr := testServer(t, tt.status, tt.data)
defer svr.Close()

testConfig := &influxdb.ReplicationHTTPConfig{
RemoteURL: svr.URL,
DropNonRetryableData: true,
}

w, configStore := testWriter(t)
reg := prom.NewRegistry(zaptest.NewLogger(t))
reg.MustRegister(w.metrics.PrometheusCollectors()...)

w.waitFunc = instaWait()
w.maximumAttemptsBeforeErr = maximumAttemptsBeforeErr

tt.registerExpectations(t, configStore, testConfig)
require.Equal(t, tt.wantWriteErr, w.Write(context.Background(), tt.data))
tt.checkMetrics(t, reg)
})
}
}

func TestPostWrite(t *testing.T) {
testData := []byte("some data")

Expand Down

0 comments on commit f05d013

Please sign in to comment.