Skip to content

Commit

Permalink
fix: Show Replication Queue size and Replication TCP Errors (#23960)
Browse files Browse the repository at this point in the history
* feat: Show remaining replication queue size

* fix: Show non-http related error messages

* fix: Show non-http related error messages with backoff

* fix: Updates for replication tests

* chore: formatting

* chore: formatting

* chore: formatting

* chore: formatting

* chore: lowercase json field

---------

Co-authored-by: Geoffrey <[email protected]>
Co-authored-by: Jeffrey Smith II <[email protected]>
  • Loading branch information
3 people authored Feb 2, 2023
1 parent e2f835b commit ec7fdd3
Show file tree
Hide file tree
Showing 8 changed files with 165 additions and 37 deletions.
29 changes: 15 additions & 14 deletions replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,21 @@ var ErrMaxQueueSizeTooSmall = errors.Error{

// Replication contains all info about a replication that should be returned to users.
type Replication struct {
ID platform.ID `json:"id" db:"id"`
OrgID platform.ID `json:"orgID" db:"org_id"`
Name string `json:"name" db:"name"`
Description *string `json:"description,omitempty" db:"description"`
RemoteID platform.ID `json:"remoteID" db:"remote_id"`
LocalBucketID platform.ID `json:"localBucketID" db:"local_bucket_id"`
RemoteBucketID *platform.ID `json:"remoteBucketID" db:"remote_bucket_id"`
RemoteBucketName string `json:"RemoteBucketName" db:"remote_bucket_name"`
MaxQueueSizeBytes int64 `json:"maxQueueSizeBytes" db:"max_queue_size_bytes"`
CurrentQueueSizeBytes int64 `json:"currentQueueSizeBytes" db:"current_queue_size_bytes"`
LatestResponseCode *int32 `json:"latestResponseCode,omitempty" db:"latest_response_code"`
LatestErrorMessage *string `json:"latestErrorMessage,omitempty" db:"latest_error_message"`
DropNonRetryableData bool `json:"dropNonRetryableData" db:"drop_non_retryable_data"`
MaxAgeSeconds int64 `json:"maxAgeSeconds" db:"max_age_seconds"`
ID platform.ID `json:"id" db:"id"`
OrgID platform.ID `json:"orgID" db:"org_id"`
Name string `json:"name" db:"name"`
Description *string `json:"description,omitempty" db:"description"`
RemoteID platform.ID `json:"remoteID" db:"remote_id"`
LocalBucketID platform.ID `json:"localBucketID" db:"local_bucket_id"`
RemoteBucketID *platform.ID `json:"remoteBucketID" db:"remote_bucket_id"`
RemoteBucketName string `json:"RemoteBucketName" db:"remote_bucket_name"`
MaxQueueSizeBytes int64 `json:"maxQueueSizeBytes" db:"max_queue_size_bytes"`
CurrentQueueSizeBytes int64 `json:"currentQueueSizeBytes" db:"current_queue_size_bytes"`
RemainingQueueSizeBytes int64 `json:"remainingQueueSizeBytes" db:"remaining_queue_size_bytes"`
LatestResponseCode *int32 `json:"latestResponseCode,omitempty" db:"latest_response_code"`
LatestErrorMessage *string `json:"latestErrorMessage,omitempty" db:"latest_error_message"`
DropNonRetryableData bool `json:"dropNonRetryableData" db:"drop_non_retryable_data"`
MaxAgeSeconds int64 `json:"maxAgeSeconds" db:"max_age_seconds"`
}

// ReplicationListFilter is a selection filter for listing replications.
Expand Down
17 changes: 17 additions & 0 deletions replications/internal/queue_management.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,23 @@ func (qm *durableQueueManager) CurrentQueueSizes(ids []platform.ID) (map[platfor
return sizes, nil
}

// Returns the remaining number of bytes in Queue to be read:
func (qm *durableQueueManager) RemainingQueueSizes(ids []platform.ID) (map[platform.ID]int64, error) {
qm.mutex.RLock()
defer qm.mutex.RUnlock()

sizes := make(map[platform.ID]int64, len(ids))

for _, id := range ids {
if _, exist := qm.replicationQueues[id]; !exist {
return nil, fmt.Errorf("durable queue not found for replication ID %q", id)
}
sizes[id] = qm.replicationQueues[id].queue.TotalBytes()
}

return sizes, nil
}

// StartReplicationQueues updates the durableQueueManager.replicationQueues map, fully removing any partially deleted
// queues (present on disk, but not tracked in sqlite), opening all current queues, and logging info for each.
func (qm *durableQueueManager) StartReplicationQueues(trackedReplications map[platform.ID]*influxdb.TrackedReplication) error {
Expand Down
30 changes: 30 additions & 0 deletions replications/internal/queue_management_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,11 @@ func TestEnqueueData(t *testing.T) {
require.NoError(t, err)
// Empty queues are 8 bytes for the footer.
require.Equal(t, map[platform.ID]int64{id1: 8}, sizes)
// Remaining queue should initially be empty:
rsizes, err := qm.RemainingQueueSizes([]platform.ID{id1})
require.NoError(t, err)
// Empty queue = 0 bytes:
require.Equal(t, map[platform.ID]int64{id1: 0}, rsizes)

data := "some fake data"

Expand All @@ -430,6 +435,11 @@ func TestEnqueueData(t *testing.T) {
sizes, err = qm.CurrentQueueSizes([]platform.ID{id1})
require.NoError(t, err)
require.Greater(t, sizes[id1], int64(8))
rsizes, err = qm.RemainingQueueSizes([]platform.ID{id1})
require.NoError(t, err)
require.Greater(t, rsizes[id1], int64(0))
// Difference between disk size and queue should only be footer size
require.Equal(t, sizes[id1]-rsizes[id1], int64(8))

written, err := qm.replicationQueues[id1].queue.Current()
require.NoError(t, err)
Expand Down Expand Up @@ -481,8 +491,17 @@ func TestSendWrite(t *testing.T) {
require.True(t, scan.Next())
require.Equal(t, []byte(points[pointIndex]), scan.Bytes())
require.NoError(t, scan.Err())
// Initial Queue size should be size of data + footer
rsizesI, err := qm.RemainingQueueSizes([]platform.ID{id1})
require.NoError(t, err)
require.Equal(t, rsizesI[id1], int64(8+len(points[pointIndex])))
// Send the write to the "remote" with a success
rq.SendWrite()
// Queue becomes empty after write:
rsizesJ, err := qm.RemainingQueueSizes([]platform.ID{id1})
require.NoError(t, err)
require.Equal(t, rsizesJ[id1], int64(0))

// Make sure the data is no longer in the queue
_, err = rq.queue.NewScanner()
require.Equal(t, io.EOF, err)
Expand All @@ -496,9 +515,15 @@ func TestSendWrite(t *testing.T) {
require.True(t, scan.Next())
require.Equal(t, []byte(points[pointIndex]), scan.Bytes())
require.NoError(t, scan.Err())
rsizesI, err = qm.RemainingQueueSizes([]platform.ID{id1})
require.NoError(t, err)
// Send the write to the "remote" with a FAILURE
shouldFailThisWrite = true
rq.SendWrite()
// Queue size should not have decreased if write has failed:
rsizesJ, err = qm.RemainingQueueSizes([]platform.ID{id1})
require.NoError(t, err)
require.Equal(t, rsizesJ[id1], rsizesI[id1])
// Make sure the data is still in the queue
scan, err = rq.queue.NewScanner()
require.NoError(t, err)
Expand All @@ -508,6 +533,11 @@ func TestSendWrite(t *testing.T) {
// Send the write to the "remote" again, with a SUCCESS
shouldFailThisWrite = false
rq.SendWrite()
// Queue Becomes empty after a successful write
rsizesJ, err = qm.RemainingQueueSizes([]platform.ID{id1})
require.NoError(t, err)
require.Equal(t, rsizesJ[id1], int64(0))

// Make sure the data is no longer in the queue
_, err = rq.queue.NewScanner()
require.Equal(t, io.EOF, err)
Expand Down
15 changes: 15 additions & 0 deletions replications/mock/queue_management.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions replications/remotewrite/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ func (w *writer) Write(data []byte, attempts int) (backoff time.Duration, err er
res, postWriteErr := PostWrite(ctx, conf, data, w.clientTimeout)
res, msg, ok := normalizeResponse(res, postWriteErr)
if !ok {
// Update Response info:
if err := w.configStore.UpdateResponseInfo(ctx, w.replicationID, res.StatusCode, msg); err != nil {
w.logger.Debug("failed to update config store with latest remote write response info", zap.Error(err))
return w.backoff(attempts), err
}
// bail out
return w.backoff(attempts), postWriteErr
}
Expand Down
3 changes: 1 addition & 2 deletions replications/remotewrite/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,9 @@ func TestWrite(t *testing.T) {
testConfig := &influxdb.ReplicationHTTPConfig{
RemoteURL: "not a good URL",
}

w, configStore, _ := testWriter(t)

configStore.EXPECT().GetFullHTTPConfig(gomock.Any(), testID).Return(testConfig, nil)
configStore.EXPECT().UpdateResponseInfo(gomock.Any(), testID, int(0), gomock.Any())
_, actualErr := w.Write([]byte{}, 1)
require.Error(t, actualErr)
})
Expand Down
18 changes: 18 additions & 0 deletions replications/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ type DurableQueueManager interface {
DeleteQueue(replicationID platform.ID) error
UpdateMaxQueueSize(replicationID platform.ID, maxQueueSizeBytes int64) error
CurrentQueueSizes(ids []platform.ID) (map[platform.ID]int64, error)
RemainingQueueSizes(ids []platform.ID) (map[platform.ID]int64, error)
StartReplicationQueues(trackedReplications map[platform.ID]*influxdb.TrackedReplication) error
CloseAll() error
EnqueueData(replicationID platform.ID, data []byte, numPoints int) error
Expand Down Expand Up @@ -129,6 +130,13 @@ func (s *service) ListReplications(ctx context.Context, filter influxdb.Replicat
for i := range rs.Replications {
rs.Replications[i].CurrentQueueSizeBytes = sizes[rs.Replications[i].ID]
}
rsizes, err := s.durableQueueManager.RemainingQueueSizes(ids)
if err != nil {
return nil, err
}
for i := range rs.Replications {
rs.Replications[i].RemainingQueueSizeBytes = rsizes[rs.Replications[i].ID]
}

return rs, nil
}
Expand Down Expand Up @@ -196,6 +204,11 @@ func (s *service) GetReplication(ctx context.Context, id platform.ID) (*influxdb
return nil, err
}
r.CurrentQueueSizeBytes = sizes[r.ID]
rsizes, err := s.durableQueueManager.RemainingQueueSizes([]platform.ID{r.ID})
if err != nil {
return nil, err
}
r.RemainingQueueSizeBytes = rsizes[r.ID]

return r, nil
}
Expand All @@ -221,6 +234,11 @@ func (s *service) UpdateReplication(ctx context.Context, id platform.ID, request
return nil, err
}
r.CurrentQueueSizeBytes = sizes[r.ID]
rsizes, err := s.durableQueueManager.RemainingQueueSizes([]platform.ID{r.ID})
if err != nil {
return nil, err
}
r.RemainingQueueSizeBytes = rsizes[r.ID]

return r, nil
}
Expand Down
85 changes: 64 additions & 21 deletions replications/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,14 @@ func TestListReplications(t *testing.T) {
filter := influxdb.ReplicationListFilter{}

tests := []struct {
name string
list influxdb.Replications
ids []platform.ID
sizes map[platform.ID]int64
storeErr error
queueManagerErr error
name string
list influxdb.Replications
ids []platform.ID
sizes map[platform.ID]int64
rsizes map[platform.ID]int64
storeErr error
queueManagerErr error
queueManagerRemainingSizesErr error
}{
{
name: "matches multiple",
Expand Down Expand Up @@ -149,6 +151,14 @@ func TestListReplications(t *testing.T) {
ids: []platform.ID{replication1.ID},
queueManagerErr: errors.New("error from queue manager"),
},
{
name: "queue manager error - remaining queue size",
list: influxdb.Replications{
Replications: []influxdb.Replication{replication1},
},
ids: []platform.ID{replication1.ID},
queueManagerRemainingSizesErr: errors.New("Remaining Queue Size erro"),
},
}

for _, tt := range tests {
Expand All @@ -161,13 +171,18 @@ func TestListReplications(t *testing.T) {
mocks.durableQueueManager.EXPECT().CurrentQueueSizes(tt.ids).Return(tt.sizes, tt.queueManagerErr)
}

if tt.storeErr == nil && tt.queueManagerErr == nil && len(tt.list.Replications) > 0 {
mocks.durableQueueManager.EXPECT().RemainingQueueSizes(tt.ids).Return(tt.rsizes, tt.queueManagerRemainingSizesErr)
}
got, err := svc.ListReplications(ctx, filter)

var wantErr error
if tt.storeErr != nil {
wantErr = tt.storeErr
} else if tt.queueManagerErr != nil {
wantErr = tt.queueManagerErr
} else if tt.queueManagerRemainingSizesErr != nil {
wantErr = tt.queueManagerRemainingSizesErr
}

require.Equal(t, wantErr, err)
Expand All @@ -179,6 +194,7 @@ func TestListReplications(t *testing.T) {

for _, r := range got.Replications {
require.Equal(t, tt.sizes[r.ID], r.CurrentQueueSizeBytes)
require.Equal(t, tt.rsizes[r.ID], r.RemainingQueueSizeBytes)
}
})
}
Expand Down Expand Up @@ -315,12 +331,14 @@ func TestGetReplication(t *testing.T) {
t.Parallel()

tests := []struct {
name string
sizes map[platform.ID]int64
storeErr error
queueManagerErr error
storeWant influxdb.Replication
want influxdb.Replication
name string
sizes map[platform.ID]int64
rsizes map[platform.ID]int64
storeErr error
queueManagerErr error
queueManagerRemainingSizesErr error
storeWant influxdb.Replication
want influxdb.Replication
}{
{
name: "success",
Expand All @@ -337,6 +355,11 @@ func TestGetReplication(t *testing.T) {
storeWant: replication1,
queueManagerErr: errors.New("queue manager error"),
},
{
name: "queue manager error - remaining queue size",
storeWant: replication1,
queueManagerRemainingSizesErr: errors.New("queue manager error"),
},
}

for _, tt := range tests {
Expand All @@ -348,6 +371,9 @@ func TestGetReplication(t *testing.T) {
if tt.storeErr == nil {
mocks.durableQueueManager.EXPECT().CurrentQueueSizes([]platform.ID{id1}).Return(tt.sizes, tt.queueManagerErr)
}
if tt.storeErr == nil && tt.queueManagerErr == nil {
mocks.durableQueueManager.EXPECT().RemainingQueueSizes([]platform.ID{id1}).Return(tt.rsizes, tt.queueManagerRemainingSizesErr)
}

got, err := svc.GetReplication(ctx, id1)

Expand All @@ -356,6 +382,8 @@ func TestGetReplication(t *testing.T) {
wantErr = tt.storeErr
} else if tt.queueManagerErr != nil {
wantErr = tt.queueManagerErr
} else if tt.queueManagerRemainingSizesErr != nil {
wantErr = tt.queueManagerRemainingSizesErr
}

require.Equal(t, wantErr, err)
Expand All @@ -366,6 +394,8 @@ func TestGetReplication(t *testing.T) {
}

require.Equal(t, tt.sizes[got.ID], got.CurrentQueueSizeBytes)
require.Equal(t, tt.rsizes[got.ID], got.RemainingQueueSizeBytes)

})
}
}
Expand All @@ -374,15 +404,17 @@ func TestUpdateReplication(t *testing.T) {
t.Parallel()

tests := []struct {
name string
request influxdb.UpdateReplicationRequest
sizes map[platform.ID]int64
storeErr error
queueManagerUpdateSizeErr error
queueManagerCurrentSizesErr error
storeUpdate *influxdb.Replication
want *influxdb.Replication
wantErr error
name string
request influxdb.UpdateReplicationRequest
sizes map[platform.ID]int64
rsizes map[platform.ID]int64
storeErr error
queueManagerUpdateSizeErr error
queueManagerCurrentSizesErr error
queueManagerRemainingSizesErr error
storeUpdate *influxdb.Replication
want *influxdb.Replication
wantErr error
}{
{
name: "success with new max queue size",
Expand Down Expand Up @@ -417,6 +449,13 @@ func TestUpdateReplication(t *testing.T) {
storeUpdate: &updatedReplicationWithNoNewSize,
wantErr: errors.New("current size err"),
},
{
name: "queue manager error - remaining queue size",
request: updateReqWithNoNewSize,
queueManagerRemainingSizesErr: errors.New("remaining queue size err"),
storeUpdate: &updatedReplicationWithNoNewSize,
wantErr: errors.New("remaining queue size err"),
},
}

for _, tt := range tests {
Expand All @@ -436,6 +475,10 @@ func TestUpdateReplication(t *testing.T) {
mocks.durableQueueManager.EXPECT().CurrentQueueSizes([]platform.ID{id1}).Return(tt.sizes, tt.queueManagerCurrentSizesErr)
}

if tt.storeErr == nil && tt.queueManagerUpdateSizeErr == nil && tt.queueManagerCurrentSizesErr == nil {
mocks.durableQueueManager.EXPECT().RemainingQueueSizes([]platform.ID{id1}).Return(tt.rsizes, tt.queueManagerRemainingSizesErr)
}

got, err := svc.UpdateReplication(ctx, id1, tt.request)
require.Equal(t, tt.want, got)
require.Equal(t, tt.wantErr, err)
Expand Down

0 comments on commit ec7fdd3

Please sign in to comment.