Skip to content

Commit

Permalink
test: Finish sync work for mockIngester.pushed var (grafana#12028)
Browse files Browse the repository at this point in the history
  • Loading branch information
paul1r authored and rhnasc committed Apr 12, 2024
1 parent 3d58e57 commit 21b7568
Showing 1 changed file with 20 additions and 3 deletions.
23 changes: 20 additions & 3 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,8 @@ func Test_IncrementTimestamp(t *testing.T) {
distributors, _ := prepare(t, 1, 3, testData.limits, func(addr string) (ring_client.PoolClient, error) { return ing, nil })
_, err := distributors[0].Push(ctx, testData.push)
assert.NoError(t, err)
assert.Equal(t, testData.expectedPush, ing.pushed[0])
topVal := ing.Peek()
assert.Equal(t, testData.expectedPush, topVal)
})
}
}
Expand Down Expand Up @@ -433,13 +434,16 @@ func TestDistributorPushConcurrently(t *testing.T) {
labels := make(map[string]int)

for i := range ingesters {
ingesters[i].mu.Lock()

pushed := ingesters[i].pushed
counter = counter + len(pushed)
for _, pr := range pushed {
for _, st := range pr.Streams {
labels[st.Labels] = labels[st.Labels] + 1
}
}
ingesters[i].mu.Unlock()
}
assert.Equal(t, numReq*3, counter) // RF=3
// each stream is present 3 times
Expand Down Expand Up @@ -500,7 +504,8 @@ func Test_SortLabelsOnPush(t *testing.T) {
request.Streams[0].Labels = `{buzz="f", a="b"}`
_, err := distributors[0].Push(ctx, request)
require.NoError(t, err)
require.Equal(t, `{a="b", buzz="f"}`, ingester.pushed[0].Streams[0].Labels)
topVal := ingester.Peek()
require.Equal(t, `{a="b", buzz="f"}`, topVal.Streams[0].Labels)
}

func Test_TruncateLogLines(t *testing.T) {
Expand All @@ -519,7 +524,8 @@ func Test_TruncateLogLines(t *testing.T) {

_, err := distributors[0].Push(ctx, makeWriteRequest(1, 10))
require.NoError(t, err)
require.Len(t, ingester.pushed[0].Streams[0].Entries[0].Line, 5)
topVal := ingester.Peek()
require.Len(t, topVal.Streams[0].Entries[0].Line, 5)
})
}

Expand Down Expand Up @@ -1231,6 +1237,17 @@ func (i *mockIngester) Push(_ context.Context, in *logproto.PushRequest, _ ...gr
return nil, nil
}

func (i *mockIngester) Peek() *logproto.PushRequest {
i.mu.Lock()
defer i.mu.Unlock()

if len(i.pushed) == 0 {
return nil
}

return i.pushed[0]
}

func (i *mockIngester) GetStreamRates(_ context.Context, _ *logproto.StreamRatesRequest, _ ...grpc.CallOption) (*logproto.StreamRatesResponse, error) {
return &logproto.StreamRatesResponse{}, nil
}
Expand Down

0 comments on commit 21b7568

Please sign in to comment.