diff --git a/pkg/stanza/fileconsumer/file_test.go b/pkg/stanza/fileconsumer/file_test.go index d816089f228e..74ccb5839e80 100644 --- a/pkg/stanza/fileconsumer/file_test.go +++ b/pkg/stanza/fileconsumer/file_test.go @@ -1318,3 +1318,95 @@ func TestWindowsFilesClosedImmediately(t *testing.T) { // On Windows, poll should close the file after reading it. We can test this by trying to move it. require.NoError(t, os.Rename(temp.Name(), temp.Name()+"_renamed")) } + +func TestDelayedDisambiguation(t *testing.T) { + t.Parallel() + tempDir := t.TempDir() + cfg := NewConfig().includeDir(tempDir) + cfg.FingerprintSize = 18 + cfg.StartAt = "beginning" + operator, sink := testManager(t, cfg) + operator.persister = testutil.NewMockPersister("test") + + // Two identical files, smaller than fingerprint size + file1 := filetest.OpenTempWithPattern(t, tempDir, "*.log1") + file2 := filetest.OpenTempWithPattern(t, tempDir, "*.log2") + + sameContent := "aaaaaaaaaaa" + filetest.WriteString(t, file1, sameContent+"\n") + filetest.WriteString(t, file2, sameContent+"\n") + operator.poll(context.Background()) + + token, attributes := sink.NextCall(t) + require.Equal(t, []byte(sameContent), token) + sink.ExpectNoCallsUntil(t, 100*time.Millisecond) + operator.wg.Wait() + + // Append different data + newContent1 := "more content in file 1 only" + newContent2 := "different content in file 2" + filetest.WriteString(t, file1, newContent1+"\n") + filetest.WriteString(t, file2, newContent2+"\n") + operator.poll(context.Background()) + + var sameTokenOtherFile emittest.Call + if attributes[attrs.LogFileName].(string) == filepath.Base(file1.Name()) { + sameTokenOtherFile = emittest.Call{Token: []byte(sameContent), Attrs: map[string]any{attrs.LogFileName: filepath.Base(file2.Name())}} + } else { + sameTokenOtherFile = emittest.Call{Token: []byte(sameContent), Attrs: map[string]any{attrs.LogFileName: filepath.Base(file1.Name())}} + } + newFromFile1 := emittest.Call{Token: []byte(newContent1), Attrs: map[string]any{attrs.LogFileName: filepath.Base(file1.Name())}} + newFromFile2 := emittest.Call{Token: []byte(newContent2), Attrs: map[string]any{attrs.LogFileName: filepath.Base(file2.Name())}} + sink.ExpectCalls(t, &sameTokenOtherFile, &newFromFile1, &newFromFile2) +} + +func TestNoLostPartial(t *testing.T) { + t.Parallel() + tempDir := t.TempDir() + cfg := NewConfig().includeDir(tempDir) + cfg.FingerprintSize = 18 + cfg.StartAt = "beginning" + operator, sink := testManager(t, cfg) + operator.persister = testutil.NewMockPersister("test") + + // Two same fingerprint file , and smaller than config size + file1 := filetest.OpenTempWithPattern(t, tempDir, "*.log1") + file2 := filetest.OpenTempWithPattern(t, tempDir, "*.log2") + + sameContent := "aaaaaaaaaaa" + filetest.WriteString(t, file1, sameContent+"\n") + filetest.WriteString(t, file2, sameContent+"\n") + operator.poll(context.Background()) + + token, attributes := sink.NextCall(t) + require.Equal(t, []byte(sameContent), token) + sink.ExpectNoCallsUntil(t, 100*time.Millisecond) + operator.wg.Wait() + + newContent1 := "additional content in file 1 only" + filetest.WriteString(t, file1, newContent1+"\n") + + var otherFileName string + if attributes[attrs.LogFileName].(string) == filepath.Base(file1.Name()) { + otherFileName = filepath.Base(file2.Name()) + } else { + otherFileName = filepath.Base(file1.Name()) + } + + var foundSameFromOtherFile, foundNewFromFileOne bool + require.Eventually(t, func() bool { + operator.poll(context.Background()) + defer operator.wg.Wait() + + token, attributes = sink.NextCall(t) + switch { + case string(token) == sameContent && attributes[attrs.LogFileName].(string) == otherFileName: + foundSameFromOtherFile = true + case string(token) == newContent1 && attributes[attrs.LogFileName].(string) == filepath.Base(file1.Name()): + foundNewFromFileOne = true + default: + t.Errorf("unexpected token from file %q: %s", filepath.Base(attributes[attrs.LogFileName].(string)), token) + } + return foundSameFromOtherFile && foundNewFromFileOne + }, time.Second, 100*time.Millisecond) +} diff --git a/pkg/stanza/fileconsumer/internal/emittest/sink.go b/pkg/stanza/fileconsumer/internal/emittest/sink.go index e775ec24d470..1785bfb8bfb1 100644 --- a/pkg/stanza/fileconsumer/internal/emittest/sink.go +++ b/pkg/stanza/fileconsumer/internal/emittest/sink.go @@ -22,13 +22,13 @@ type sinkCfg struct { type SinkOpt func(*sinkCfg) -type call struct { - token []byte - attrs map[string]any +type Call struct { + Token []byte + Attrs map[string]any } type Sink struct { - emitChan chan *call + emitChan chan *Call timeout time.Duration emit.Callback } @@ -53,14 +53,14 @@ func NewSink(opts ...SinkOpt) *Sink { for _, opt := range opts { opt(cfg) } - emitChan := make(chan *call, cfg.emitChanLen) + emitChan := make(chan *Call, cfg.emitChanLen) return &Sink{ emitChan: emitChan, timeout: cfg.timeout, Callback: func(_ context.Context, token []byte, attrs map[string]any) error { copied := make([]byte, len(token)) copy(copied, token) - emitChan <- &call{copied, attrs} + emitChan <- &Call{copied, attrs} return nil }, } @@ -76,7 +76,7 @@ func (s *Sink) NextTokens(t *testing.T, n int) [][]byte { for i := 0; i < n; i++ { select { case call := <-s.emitChan: - emitChan = append(emitChan, call.token) + emitChan = append(emitChan, call.Token) case <-time.After(s.timeout): assert.Fail(t, "Timed out waiting for message") return nil @@ -88,7 +88,7 @@ func (s *Sink) NextTokens(t *testing.T, n int) [][]byte { func (s *Sink) NextCall(t *testing.T) ([]byte, map[string]any) { select { case c := <-s.emitChan: - return c.token, c.attrs + return c.Token, c.Attrs case <-time.After(s.timeout): assert.Fail(t, "Timed out waiting for message") return nil, nil @@ -98,7 +98,7 @@ func (s *Sink) NextCall(t *testing.T) ([]byte, map[string]any) { func (s *Sink) ExpectToken(t *testing.T, expected []byte) { select { case call := <-s.emitChan: - assert.Equal(t, expected, call.token) + assert.Equal(t, expected, call.Token) case <-time.After(s.timeout): assert.Fail(t, fmt.Sprintf("Timed out waiting for token: %s", expected)) } @@ -109,7 +109,7 @@ func (s *Sink) ExpectTokens(t *testing.T, expected ...[]byte) { for i := 0; i < len(expected); i++ { select { case call := <-s.emitChan: - actual = append(actual, call.token) + actual = append(actual, call.Token) case <-time.After(s.timeout): assert.Fail(t, "Timed out waiting for message") return @@ -121,13 +121,27 @@ func (s *Sink) ExpectTokens(t *testing.T, expected ...[]byte) { func (s *Sink) ExpectCall(t *testing.T, expected []byte, attrs map[string]any) { select { case c := <-s.emitChan: - assert.Equal(t, expected, c.token) - assert.Equal(t, attrs, c.attrs) + assert.Equal(t, expected, c.Token) + assert.Equal(t, attrs, c.Attrs) case <-time.After(s.timeout): assert.Fail(t, fmt.Sprintf("Timed out waiting for token: %s", expected)) } } +func (s *Sink) ExpectCalls(t *testing.T, expected ...*Call) { + actual := make([]*Call, 0, len(expected)) + for i := 0; i < len(expected); i++ { + select { + case call := <-s.emitChan: + actual = append(actual, call) + case <-time.After(s.timeout): + assert.Fail(t, "Timed out waiting for message") + return + } + } + require.ElementsMatch(t, expected, actual) +} + func (s *Sink) ExpectNoCalls(t *testing.T) { s.ExpectNoCallsUntil(t, 200*time.Millisecond) } @@ -135,7 +149,7 @@ func (s *Sink) ExpectNoCalls(t *testing.T) { func (s *Sink) ExpectNoCallsUntil(t *testing.T, d time.Duration) { select { case c := <-s.emitChan: - assert.Fail(t, "Received unexpected message", "Message: %s", c.token) + assert.Fail(t, "Received unexpected message", "Message: %s", c.Token) case <-time.After(d): } } diff --git a/pkg/stanza/fileconsumer/internal/emittest/sink_test.go b/pkg/stanza/fileconsumer/internal/emittest/sink_test.go index 18deb527fceb..326400b7e3e8 100644 --- a/pkg/stanza/fileconsumer/internal/emittest/sink_test.go +++ b/pkg/stanza/fileconsumer/internal/emittest/sink_test.go @@ -17,7 +17,7 @@ func TestNextToken(t *testing.T) { s, testCalls := sinkTest(t) for _, c := range testCalls { token := s.NextToken(t) - assert.Equal(t, c.token, token) + assert.Equal(t, c.Token, token) } } @@ -25,7 +25,7 @@ func TestNextTokenTimeout(t *testing.T) { s, testCalls := sinkTest(t, WithTimeout(10*time.Millisecond)) for _, c := range testCalls { token := s.NextToken(t) - assert.Equal(t, c.token, token) + assert.Equal(t, c.Token, token) } // Create a new T so we can expect it to fail without failing the overall test. @@ -38,8 +38,8 @@ func TestNextTokens(t *testing.T) { s, testCalls := sinkTest(t) for i := 0; i < 5; i++ { tokens := s.NextTokens(t, 2) - assert.Equal(t, testCalls[2*i].token, tokens[0]) - assert.Equal(t, testCalls[2*i+1].token, tokens[1]) + assert.Equal(t, testCalls[2*i].Token, tokens[0]) + assert.Equal(t, testCalls[2*i+1].Token, tokens[1]) } } @@ -47,8 +47,8 @@ func TestNextTokensTimeout(t *testing.T) { s, testCalls := sinkTest(t, WithTimeout(10*time.Millisecond)) for i := 0; i < 5; i++ { tokens := s.NextTokens(t, 2) - assert.Equal(t, testCalls[2*i].token, tokens[0]) - assert.Equal(t, testCalls[2*i+1].token, tokens[1]) + assert.Equal(t, testCalls[2*i].Token, tokens[0]) + assert.Equal(t, testCalls[2*i+1].Token, tokens[1]) } // Create a new T so we can expect it to fail without failing the overall test. @@ -61,8 +61,8 @@ func TestNextCall(t *testing.T) { s, testCalls := sinkTest(t) for _, c := range testCalls { token, attributes := s.NextCall(t) - require.Equal(t, c.token, token) - require.Equal(t, c.attrs, attributes) + require.Equal(t, c.Token, token) + require.Equal(t, c.Attrs, attributes) } } @@ -70,8 +70,8 @@ func TestNextCallTimeout(t *testing.T) { s, testCalls := sinkTest(t, WithTimeout(10*time.Millisecond)) for _, c := range testCalls { token, attributes := s.NextCall(t) - require.Equal(t, c.token, token) - require.Equal(t, c.attrs, attributes) + require.Equal(t, c.Token, token) + require.Equal(t, c.Attrs, attributes) } // Create a new T so we can expect it to fail without failing the overall test. @@ -83,14 +83,14 @@ func TestNextCallTimeout(t *testing.T) { func TestExpectToken(t *testing.T) { s, testCalls := sinkTest(t) for _, c := range testCalls { - s.ExpectToken(t, c.token) + s.ExpectToken(t, c.Token) } } func TestExpectTokenTimeout(t *testing.T) { s, testCalls := sinkTest(t, WithTimeout(10*time.Millisecond)) for _, c := range testCalls { - s.ExpectToken(t, c.token) + s.ExpectToken(t, c.Token) } // Create a new T so we can expect it to fail without failing the overall test. @@ -102,14 +102,14 @@ func TestExpectTokenTimeout(t *testing.T) { func TestExpectTokens(t *testing.T) { s, testCalls := sinkTest(t) for i := 0; i < 5; i++ { - s.ExpectTokens(t, testCalls[2*i].token, testCalls[2*i+1].token) + s.ExpectTokens(t, testCalls[2*i].Token, testCalls[2*i+1].Token) } } func TestExpectTokensTimeout(t *testing.T) { s, testCalls := sinkTest(t, WithTimeout(10*time.Millisecond)) for i := 0; i < 5; i++ { - s.ExpectTokens(t, testCalls[2*i].token, testCalls[2*i+1].token) + s.ExpectTokens(t, testCalls[2*i].Token, testCalls[2*i+1].Token) } // Create a new T so we can expect it to fail without failing the overall test. @@ -121,14 +121,14 @@ func TestExpectTokensTimeout(t *testing.T) { func TestExpectCall(t *testing.T) { s, testCalls := sinkTest(t) for _, c := range testCalls { - s.ExpectCall(t, c.token, c.attrs) + s.ExpectCall(t, c.Token, c.Attrs) } } func TestExpectCallTimeout(t *testing.T) { s, testCalls := sinkTest(t, WithTimeout(10*time.Millisecond)) for _, c := range testCalls { - s.ExpectCall(t, c.token, c.attrs) + s.ExpectCall(t, c.Token, c.Attrs) } // Create a new T so we can expect it to fail without failing the overall test. @@ -137,6 +137,35 @@ func TestExpectCallTimeout(t *testing.T) { assert.True(t, tt.Failed()) } +func TestExpectCalls(t *testing.T) { + s, testCalls := sinkTest(t) + testCallsOutOfOrder := make([]*Call, 0, 10) + for i := 0; i < len(testCalls); i += 2 { + testCallsOutOfOrder = append(testCallsOutOfOrder, testCalls[i]) + } + for i := 1; i < len(testCalls); i += 2 { + testCallsOutOfOrder = append(testCallsOutOfOrder, testCalls[i]) + } + s.ExpectCalls(t, testCallsOutOfOrder...) +} + +func TestExpectCallsTimeout(t *testing.T) { + s, testCalls := sinkTest(t, WithTimeout(10*time.Millisecond)) + testCallsOutOfOrder := make([]*Call, 0, 10) + for i := 0; i < len(testCalls); i += 2 { + testCallsOutOfOrder = append(testCallsOutOfOrder, testCalls[i]) + } + for i := 1; i < len(testCalls); i += 2 { + testCallsOutOfOrder = append(testCallsOutOfOrder, testCalls[i]) + } + s.ExpectCalls(t, testCallsOutOfOrder...) + + // Create a new T so we can expect it to fail without failing the overall test. + tt := new(testing.T) + s.ExpectCalls(tt, new(Call)) + assert.True(t, tt.Failed()) +} + func TestExpectNoCalls(t *testing.T) { s, _ := sinkTest(t) s.NextTokens(t, 10) // drain the channel @@ -156,24 +185,24 @@ func TestExpectNoCallsFailure(t *testing.T) { func TestWithCallBuffer(t *testing.T) { s, testCalls := sinkTest(t, WithCallBuffer(5)) for i := 0; i < 10; i++ { - s.ExpectCall(t, testCalls[i].token, testCalls[i].attrs) + s.ExpectCall(t, testCalls[i].Token, testCalls[i].Attrs) } } -func sinkTest(t *testing.T, opts ...SinkOpt) (*Sink, []*call) { +func sinkTest(t *testing.T, opts ...SinkOpt) (*Sink, []*Call) { s := NewSink(opts...) - testCalls := make([]*call, 0, 10) + testCalls := make([]*Call, 0, 10) for i := 0; i < 10; i++ { - testCalls = append(testCalls, &call{ - token: []byte(fmt.Sprintf("token-%d", i)), - attrs: map[string]any{ + testCalls = append(testCalls, &Call{ + Token: []byte(fmt.Sprintf("token-%d", i)), + Attrs: map[string]any{ "key": fmt.Sprintf("value-%d", i), }, }) } go func() { for _, c := range testCalls { - require.NoError(t, s.Callback(context.Background(), c.token, c.attrs)) + require.NoError(t, s.Callback(context.Background(), c.Token, c.Attrs)) } }() return s, testCalls