From 21649f34b35ea22481c8cd062294e576138f0838 Mon Sep 17 00:00:00 2001 From: Paul Rogers Date: Tue, 12 Mar 2024 16:34:49 -0400 Subject: [PATCH 1/3] Add synchronization for fakeTailer responses --- pkg/ingester/tailer_test.go | 31 ++++++++++++++++++++++++++++++- 1 file changed, 30 insertions(+), 1 deletion(-) diff --git a/pkg/ingester/tailer_test.go b/pkg/ingester/tailer_test.go index 11de0d4daf82c..8ed794f99a5fe 100644 --- a/pkg/ingester/tailer_test.go +++ b/pkg/ingester/tailer_test.go @@ -149,10 +149,13 @@ func Test_dropstream(t *testing.T) { } type fakeTailServer struct { - responses []logproto.TailResponse + responses []logproto.TailResponse + responsesMu sync.Mutex } func (f *fakeTailServer) Send(response *logproto.TailResponse) error { + f.responsesMu.Lock() + defer f.responsesMu.Unlock() f.responses = append(f.responses, *response) return nil @@ -160,11 +163,37 @@ func (f *fakeTailServer) Send(response *logproto.TailResponse) error { func (f *fakeTailServer) Context() context.Context { return context.Background() } +func (f *fakeTailServer) cloneTailResponse(response logproto.TailResponse) logproto.TailResponse { + var clone logproto.TailResponse + if response.Stream != nil { + clone.Stream = &logproto.Stream{} + clone.Stream.Labels = response.Stream.Labels + if response.Stream.Entries != nil { + clone.Stream.Entries = make([]logproto.Entry, len(response.Stream.Entries)) + copy(clone.Stream.Entries, response.Stream.Entries) + } + } + if response.DroppedStreams != nil { + clone.DroppedStreams = make([]*logproto.DroppedStream, len(response.DroppedStreams)) + copy(clone.DroppedStreams, response.DroppedStreams) + } + + return clone +} + func (f *fakeTailServer) GetResponses() []logproto.TailResponse { + f.responsesMu.Lock() + defer f.responsesMu.Unlock() + clonedResponses := make([]logproto.TailResponse, len(f.responses)) + for i, resp := range f.responses { + clonedResponses[i] = f.cloneTailResponse(resp) + } return f.responses } func (f *fakeTailServer) Reset() { + f.responsesMu.Lock() + defer f.responsesMu.Unlock() f.responses = f.responses[:0] } From 1e2d6e54f3b469338a0f46cac9765aae68d7e889 Mon Sep 17 00:00:00 2001 From: Paul Rogers Date: Tue, 12 Mar 2024 16:50:08 -0400 Subject: [PATCH 2/3] Clone the hash entry too --- pkg/ingester/tailer_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/ingester/tailer_test.go b/pkg/ingester/tailer_test.go index 8ed794f99a5fe..8b60e9585bce0 100644 --- a/pkg/ingester/tailer_test.go +++ b/pkg/ingester/tailer_test.go @@ -168,6 +168,7 @@ func (f *fakeTailServer) cloneTailResponse(response logproto.TailResponse) logpr if response.Stream != nil { clone.Stream = &logproto.Stream{} clone.Stream.Labels = response.Stream.Labels + clone.Stream.Hash = response.Stream.Hash if response.Stream.Entries != nil { clone.Stream.Entries = make([]logproto.Entry, len(response.Stream.Entries)) copy(clone.Stream.Entries, response.Stream.Entries) From e587a1a2dff8c475fbd7ae55b748447d2ae59bf2 Mon Sep 17 00:00:00 2001 From: Paul Rogers Date: Tue, 12 Mar 2024 20:49:10 -0400 Subject: [PATCH 3/3] Remove clone function from fakeTailServer --- pkg/ingester/tailer_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/ingester/tailer_test.go b/pkg/ingester/tailer_test.go index 8b60e9585bce0..fa44cc0a7dcb8 100644 --- a/pkg/ingester/tailer_test.go +++ b/pkg/ingester/tailer_test.go @@ -163,7 +163,7 @@ func (f *fakeTailServer) Send(response *logproto.TailResponse) error { func (f *fakeTailServer) Context() context.Context { return context.Background() } -func (f *fakeTailServer) cloneTailResponse(response logproto.TailResponse) logproto.TailResponse { +func cloneTailResponse(response logproto.TailResponse) logproto.TailResponse { var clone logproto.TailResponse if response.Stream != nil { clone.Stream = &logproto.Stream{} @@ -187,7 +187,7 @@ func (f *fakeTailServer) GetResponses() []logproto.TailResponse { defer f.responsesMu.Unlock() clonedResponses := make([]logproto.TailResponse, len(f.responses)) for i, resp := range f.responses { - clonedResponses[i] = f.cloneTailResponse(resp) + clonedResponses[i] = cloneTailResponse(resp) } return f.responses }