diff --git a/internal/stanza/integration_test.go b/internal/stanza/integration_test.go index 56e5fb93ba74..69b3924effa0 100644 --- a/internal/stanza/integration_test.go +++ b/internal/stanza/integration_test.go @@ -27,6 +27,7 @@ import ( "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/obsreport" "go.uber.org/zap" ) @@ -87,8 +88,8 @@ func BenchmarkEmitterToConsumer(b *testing.B) { for _, wc := range workerCounts { b.Run(fmt.Sprintf("worker_count=%d", wc), func(b *testing.B) { - consumer := &mockLogsConsumer{} - logsReceiver, err := createNoopReceiver(wc, consumer) + cl := &consumertest.LogsSink{} + logsReceiver, err := createNoopReceiver(wc, cl) require.NoError(b, err) err = logsReceiver.Start(context.Background(), componenttest.NewNopHost()) @@ -97,7 +98,7 @@ func BenchmarkEmitterToConsumer(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - consumer.ResetReceivedCount() + cl.Reset() go func() { ctx := context.Background() @@ -108,9 +109,9 @@ func BenchmarkEmitterToConsumer(b *testing.B) { require.Eventually(b, func() bool { - return consumer.Received() == entryCount + return cl.LogRecordCount() == entryCount }, - 30*time.Second, 5*time.Millisecond, "Did not receive all logs (only received %d)", consumer.Received(), + 30*time.Second, 5*time.Millisecond, "Did not receive all logs (only received %d)", cl.LogRecordCount(), ) } }) @@ -126,8 +127,8 @@ func TestEmitterToConsumer(t *testing.T) { entries := complexEntriesForNDifferentHosts(entryCount, hostsCount) - consumer := &mockLogsConsumer{} - logsReceiver, err := createNoopReceiver(workerCount, consumer) + cl := &consumertest.LogsSink{} + logsReceiver, err := createNoopReceiver(workerCount, cl) require.NoError(t, err) err = logsReceiver.Start(context.Background(), componenttest.NewNopHost()) @@ -142,13 +143,13 @@ func TestEmitterToConsumer(t *testing.T) { require.Eventually(t, func() bool { - return consumer.Received() == entryCount + return cl.LogRecordCount() == entryCount }, - 5*time.Second, 5*time.Millisecond, "Did not receive all logs (only received %d)", consumer.Received(), + 5*time.Second, 5*time.Millisecond, "Did not receive all logs (only received %d)", cl.LogRecordCount(), ) // Wait for a small bit of time in order to let any potential extra entries drain out of the pipeline <-time.After(500 * time.Millisecond) - require.Equal(t, entryCount, consumer.Received()) + require.Equal(t, entryCount, cl.LogRecordCount()) } diff --git a/internal/stanza/mocks_test.go b/internal/stanza/mocks_test.go index 12f905d9865c..c7de6d0f8b50 100644 --- a/internal/stanza/mocks_test.go +++ b/internal/stanza/mocks_test.go @@ -19,7 +19,6 @@ import ( "errors" "fmt" "sync" - "sync/atomic" "time" "github.com/open-telemetry/opentelemetry-log-collection/entry" @@ -27,7 +26,7 @@ import ( "github.com/open-telemetry/opentelemetry-log-collection/operator/helper" "github.com/open-telemetry/opentelemetry-log-collection/operator/transformer/noop" "go.opentelemetry.io/collector/config" - "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/extension/experimental/storage" "go.opentelemetry.io/collector/pdata/plog" "go.uber.org/zap" @@ -77,46 +76,15 @@ func (o *UnstartableOperator) Process(ctx context.Context, entry *entry.Entry) e return nil } -type mockLogsConsumer struct { - received int32 -} - -func (m *mockLogsConsumer) Capabilities() consumer.Capabilities { - return consumer.Capabilities{MutatesData: false} -} - -func (m *mockLogsConsumer) ConsumeLogs(ctx context.Context, ld plog.Logs) error { - atomic.AddInt32(&m.received, int32(ld.LogRecordCount())) - return nil -} - -func (m *mockLogsConsumer) Received() int { - ret := atomic.LoadInt32(&m.received) - return int(ret) -} - -func (m *mockLogsConsumer) ResetReceivedCount() { - atomic.StoreInt32(&m.received, 0) -} - type mockLogsRejecter struct { - rejected int32 -} - -func (m *mockLogsRejecter) Capabilities() consumer.Capabilities { - return consumer.Capabilities{MutatesData: false} + consumertest.LogsSink } func (m *mockLogsRejecter) ConsumeLogs(ctx context.Context, ld plog.Logs) error { - atomic.AddInt32(&m.rejected, 1) + _ = m.LogsSink.ConsumeLogs(ctx, ld) return fmt.Errorf("no") } -func (m *mockLogsRejecter) Rejected() int { - ret := atomic.LoadInt32(&m.rejected) - return int(ret) -} - const testType = "test" type TestConfig struct { diff --git a/internal/stanza/receiver_test.go b/internal/stanza/receiver_test.go index 155f548dcb2a..82dece1a5110 100644 --- a/internal/stanza/receiver_test.go +++ b/internal/stanza/receiver_test.go @@ -29,12 +29,13 @@ import ( "github.com/open-telemetry/opentelemetry-log-collection/pipeline" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/consumer/consumertest" "go.uber.org/zap" "gopkg.in/yaml.v2" ) func TestStart(t *testing.T) { - mockConsumer := mockLogsConsumer{} + mockConsumer := &consumertest.LogsSink{} factory := NewFactory(TestReceiverType{}) @@ -42,7 +43,7 @@ func TestStart(t *testing.T) { context.Background(), componenttest.NewNopReceiverCreateSettings(), factory.CreateDefaultConfig(), - &mockConsumer, + mockConsumer, ) require.NoError(t, err, "receiver should successfully build") @@ -55,7 +56,7 @@ func TestStart(t *testing.T) { // Eventually because of asynchronuous nature of the receiver. require.Eventually(t, func() bool { - return mockConsumer.Received() == 1 + return mockConsumer.LogRecordCount() == 1 }, 10*time.Second, 5*time.Millisecond, "one log entry expected", ) @@ -63,14 +64,14 @@ func TestStart(t *testing.T) { } func TestHandleStartError(t *testing.T) { - mockConsumer := mockLogsConsumer{} + mockConsumer := &consumertest.LogsSink{} factory := NewFactory(TestReceiverType{}) cfg := factory.CreateDefaultConfig().(*TestConfig) cfg.Input = newUnstartableParams() - receiver, err := factory.CreateLogsReceiver(context.Background(), componenttest.NewNopReceiverCreateSettings(), cfg, &mockConsumer) + receiver, err := factory.CreateLogsReceiver(context.Background(), componenttest.NewNopReceiverCreateSettings(), cfg, mockConsumer) require.NoError(t, err, "receiver should successfully build") err = receiver.Start(context.Background(), componenttest.NewNopHost()) @@ -78,10 +79,10 @@ func TestHandleStartError(t *testing.T) { } func TestHandleConsumeError(t *testing.T) { - mockConsumer := mockLogsRejecter{} + mockConsumer := &mockLogsRejecter{} factory := NewFactory(TestReceiverType{}) - logsReceiver, err := factory.CreateLogsReceiver(context.Background(), componenttest.NewNopReceiverCreateSettings(), factory.CreateDefaultConfig(), &mockConsumer) + logsReceiver, err := factory.CreateLogsReceiver(context.Background(), componenttest.NewNopReceiverCreateSettings(), factory.CreateDefaultConfig(), mockConsumer) require.NoError(t, err, "receiver should successfully build") err = logsReceiver.Start(context.Background(), componenttest.NewNopHost()) @@ -93,11 +94,11 @@ func TestHandleConsumeError(t *testing.T) { // Eventually because of asynchronuous nature of the receiver. require.Eventually(t, func() bool { - return mockConsumer.Rejected() == 1 + return mockConsumer.LogRecordCount() == 1 }, 10*time.Second, 5*time.Millisecond, "one log entry expected", ) - logsReceiver.Shutdown(context.Background()) + require.NoError(t, logsReceiver.Shutdown(context.Background())) } func BenchmarkReadLine(b *testing.B) { diff --git a/internal/stanza/storage_test.go b/internal/stanza/storage_test.go index ad5b1b4bccee..c5729ff4852d 100644 --- a/internal/stanza/storage_test.go +++ b/internal/stanza/storage_test.go @@ -23,6 +23,7 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configtelemetry" + "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/otel/metric/nonrecording" "go.opentelemetry.io/otel/trace" "go.uber.org/zap/zaptest" @@ -98,7 +99,6 @@ func createReceiver(t *testing.T) *receiver { MetricsLevel: configtelemetry.LevelNone, }, } - mockConsumer := mockLogsConsumer{} factory := NewFactory(TestReceiverType{}) @@ -106,7 +106,7 @@ func createReceiver(t *testing.T) *receiver { context.Background(), params, factory.CreateDefaultConfig(), - &mockConsumer, + consumertest.NewNop(), ) require.NoError(t, err, "receiver should successfully build")