From 21b7568e1bee3b5a85a03f384022efad22e9da83 Mon Sep 17 00:00:00 2001 From: Paul Rogers <129207811+paul1r@users.noreply.github.com> Date: Wed, 21 Feb 2024 17:31:48 -0500 Subject: [PATCH] test: Finish sync work for mockIngester.pushed var (#12028) --- pkg/distributor/distributor_test.go | 23 ++++++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 71830b4be4d2e..75e3a6e786700 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -396,7 +396,8 @@ func Test_IncrementTimestamp(t *testing.T) { distributors, _ := prepare(t, 1, 3, testData.limits, func(addr string) (ring_client.PoolClient, error) { return ing, nil }) _, err := distributors[0].Push(ctx, testData.push) assert.NoError(t, err) - assert.Equal(t, testData.expectedPush, ing.pushed[0]) + topVal := ing.Peek() + assert.Equal(t, testData.expectedPush, topVal) }) } } @@ -433,6 +434,8 @@ func TestDistributorPushConcurrently(t *testing.T) { labels := make(map[string]int) for i := range ingesters { + ingesters[i].mu.Lock() + pushed := ingesters[i].pushed counter = counter + len(pushed) for _, pr := range pushed { @@ -440,6 +443,7 @@ func TestDistributorPushConcurrently(t *testing.T) { labels[st.Labels] = labels[st.Labels] + 1 } } + ingesters[i].mu.Unlock() } assert.Equal(t, numReq*3, counter) // RF=3 // each stream is present 3 times @@ -500,7 +504,8 @@ func Test_SortLabelsOnPush(t *testing.T) { request.Streams[0].Labels = `{buzz="f", a="b"}` _, err := distributors[0].Push(ctx, request) require.NoError(t, err) - require.Equal(t, `{a="b", buzz="f"}`, ingester.pushed[0].Streams[0].Labels) + topVal := ingester.Peek() + require.Equal(t, `{a="b", buzz="f"}`, topVal.Streams[0].Labels) } func Test_TruncateLogLines(t *testing.T) { @@ -519,7 +524,8 @@ func Test_TruncateLogLines(t *testing.T) { _, err := distributors[0].Push(ctx, makeWriteRequest(1, 10)) require.NoError(t, err) - require.Len(t, ingester.pushed[0].Streams[0].Entries[0].Line, 5) + topVal := ingester.Peek() + require.Len(t, topVal.Streams[0].Entries[0].Line, 5) }) } @@ -1231,6 +1237,17 @@ func (i *mockIngester) Push(_ context.Context, in *logproto.PushRequest, _ ...gr return nil, nil } +func (i *mockIngester) Peek() *logproto.PushRequest { + i.mu.Lock() + defer i.mu.Unlock() + + if len(i.pushed) == 0 { + return nil + } + + return i.pushed[0] +} + func (i *mockIngester) GetStreamRates(_ context.Context, _ *logproto.StreamRatesRequest, _ ...grpc.CallOption) (*logproto.StreamRatesResponse, error) { return &logproto.StreamRatesResponse{}, nil }