From bc26acbbe960404e7aef9fcc1724a43185a21013 Mon Sep 17 00:00:00 2001 From: Paulo Janotti Date: Wed, 21 Aug 2024 08:28:27 -0700 Subject: [PATCH 1/2] Reduce windowseventlogreceiver tests flakiness --- .../receiver_windows_test.go | 266 ++++++++++-------- 1 file changed, 146 insertions(+), 120 deletions(-) diff --git a/receiver/windowseventlogreceiver/receiver_windows_test.go b/receiver/windowseventlogreceiver/receiver_windows_test.go index 2aa6bd008e2f..ae134f20c28c 100644 --- a/receiver/windowseventlogreceiver/receiver_windows_test.go +++ b/receiver/windowseventlogreceiver/receiver_windows_test.go @@ -9,6 +9,7 @@ import ( "context" "encoding/xml" "path/filepath" + "reflect" "testing" "time" @@ -18,7 +19,10 @@ import ( "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/confmap/confmaptest" "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/receiver/receivertest" + "golang.org/x/sys/windows/registry" "golang.org/x/sys/windows/svc/eventlog" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/consumerretry" @@ -70,6 +74,10 @@ func TestCreateWithInvalidInputConfig(t *testing.T) { func TestReadWindowsEventLogger(t *testing.T) { logMessage := "Test log" + src := "otel-windowseventlogreceiver-test" + uninstallEventSource, err := assertEventSourceInstallation(t, src) + defer uninstallEventSource() + require.NoError(t, err) ctx := context.Background() factory := newFactoryAdapter() @@ -85,13 +93,8 @@ func TestReadWindowsEventLogger(t *testing.T) { defer func() { require.NoError(t, receiver.Shutdown(ctx)) }() - - src := "otel" - err = eventlog.InstallAsEventCreate(src, eventlog.Info|eventlog.Warning|eventlog.Error) - require.NoError(t, err) - defer func() { - require.NoError(t, eventlog.Remove(src)) - }() + // Start launches nested goroutines, give them a chance to run before logging the test event(s). + time.Sleep(3 * time.Second) logger, err := eventlog.Open(src) require.NoError(t, err) @@ -100,19 +103,8 @@ func TestReadWindowsEventLogger(t *testing.T) { err = logger.Info(10, logMessage) require.NoError(t, err) - logsReceived := func() bool { - return sink.LogRecordCount() == 1 - } - - // logs sometimes take a while to be written, so a substantial wait buffer is needed - require.Eventually(t, logsReceived, 20*time.Second, 200*time.Millisecond) - results := sink.AllLogs() - require.Len(t, results, 1) - - records := results[0].ResourceLogs().At(0).ScopeLogs().At(0).LogRecords() - require.Equal(t, 1, records.Len()) - - record := records.At(0) + records := requireExpectedLogRecords(t, sink, src, 1) + record := records[0] body := record.Body().Map().AsRaw() require.Equal(t, logMessage, body["message"]) @@ -134,6 +126,10 @@ func TestReadWindowsEventLogger(t *testing.T) { func TestReadWindowsEventLoggerRaw(t *testing.T) { logMessage := "Test log" + src := "otel-windowseventlogreceiver-test" + uninstallEventSource, err := assertEventSourceInstallation(t, src) + defer uninstallEventSource() + require.NoError(t, err) ctx := context.Background() factory := newFactoryAdapter() @@ -150,13 +146,8 @@ func TestReadWindowsEventLoggerRaw(t *testing.T) { defer func() { require.NoError(t, receiver.Shutdown(ctx)) }() - - src := "otel" - err = eventlog.InstallAsEventCreate(src, eventlog.Info|eventlog.Warning|eventlog.Error) - defer func() { - require.NoError(t, eventlog.Remove(src)) - }() - require.NoError(t, err) + // Start launches nested goroutines, give them a chance to run before logging the test event(s). + time.Sleep(3 * time.Second) logger, err := eventlog.Open(src) require.NoError(t, err) @@ -165,19 +156,8 @@ func TestReadWindowsEventLoggerRaw(t *testing.T) { err = logger.Info(10, logMessage) require.NoError(t, err) - logsReceived := func() bool { - return sink.LogRecordCount() == 1 - } - - // logs sometimes take a while to be written, so a substantial wait buffer is needed - require.Eventually(t, logsReceived, 20*time.Second, 200*time.Millisecond) - results := sink.AllLogs() - require.Len(t, results, 1) - - records := results[0].ResourceLogs().At(0).ScopeLogs().At(0).LogRecords() - require.Equal(t, 1, records.Len()) - - record := records.At(0) + records := requireExpectedLogRecords(t, sink, src, 1) + record := records[0] body := record.Body().AsString() bodyStruct := struct { Data string `xml:"EventData>Data"` @@ -188,91 +168,68 @@ func TestReadWindowsEventLoggerRaw(t *testing.T) { require.Equal(t, logMessage, bodyStruct.Data) } -func TestReadWindowsEventLoggerWithExcludeProvider(t *testing.T) { +func TestExcludeProvider(t *testing.T) { logMessage := "Test log" - src := "otel" - - ctx := context.Background() - factory := newFactoryAdapter() - createSettings := receivertest.NewNopSettings() - cfg := createTestConfig() - cfg.InputConfig.ExcludeProviders = []string{src} - sink := new(consumertest.LogsSink) - - receiver, err := factory.CreateLogsReceiver(ctx, createSettings, cfg, sink) - require.NoError(t, err) - - err = receiver.Start(ctx, componenttest.NewNopHost()) - require.NoError(t, err) - defer func() { - require.NoError(t, receiver.Shutdown(ctx)) - }() - - err = eventlog.InstallAsEventCreate(src, eventlog.Info|eventlog.Warning|eventlog.Error) - require.NoError(t, err) - defer func() { - require.NoError(t, eventlog.Remove(src)) - }() - - logger, err := eventlog.Open(src) - require.NoError(t, err) - defer logger.Close() - - err = logger.Info(10, logMessage) - require.NoError(t, err) - - logsReceived := func() bool { - return sink.LogRecordCount() == 0 + excludedSrc := "otel-excluded-windowseventlogreceiver-test" + notExcludedSrc := "otel-windowseventlogreceiver-test" + testSources := []string{excludedSrc, notExcludedSrc} + + for _, src := range testSources { + uninstallEventSource, err := assertEventSourceInstallation(t, src) + defer uninstallEventSource() + require.NoError(t, err) } - // logs sometimes take a while to be written, so a substantial wait buffer is needed - require.Eventually(t, logsReceived, 10*time.Second, 200*time.Millisecond) - results := sink.AllLogs() - require.Len(t, results, 0) -} - -func TestReadWindowsEventLoggerRawWithExcludeProvider(t *testing.T) { - logMessage := "Test log" - src := "otel" - - ctx := context.Background() - factory := newFactoryAdapter() - createSettings := receivertest.NewNopSettings() - cfg := createTestConfig() - cfg.InputConfig.Raw = true - cfg.InputConfig.ExcludeProviders = []string{src} - sink := new(consumertest.LogsSink) - - receiver, err := factory.CreateLogsReceiver(ctx, createSettings, cfg, sink) - require.NoError(t, err) - - err = receiver.Start(ctx, componenttest.NewNopHost()) - require.NoError(t, err) - defer func() { - require.NoError(t, receiver.Shutdown(ctx)) - }() - - err = eventlog.InstallAsEventCreate(src, eventlog.Info|eventlog.Warning|eventlog.Error) - defer func() { - require.NoError(t, eventlog.Remove(src)) - }() - require.NoError(t, err) - - logger, err := eventlog.Open(src) - require.NoError(t, err) - defer logger.Close() - - err = logger.Info(10, logMessage) - require.NoError(t, err) - - logsReceived := func() bool { - return sink.LogRecordCount() == 0 + tests := []struct { + name string + raw bool + }{ + { + name: "with_EventXML", + }, + { + name: "with_Raw", + raw: true, + }, } - // logs sometimes take a while to be written, so a substantial wait buffer is needed - require.Eventually(t, logsReceived, 10*time.Second, 200*time.Millisecond) - results := sink.AllLogs() - require.Len(t, results, 0) + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx := context.Background() + factory := newFactoryAdapter() + createSettings := receivertest.NewNopSettings() + cfg := createTestConfig() + cfg.InputConfig.Raw = tt.raw + cfg.InputConfig.ExcludeProviders = []string{excludedSrc} + sink := new(consumertest.LogsSink) + + receiver, err := factory.CreateLogsReceiver(ctx, createSettings, cfg, sink) + require.NoError(t, err) + + err = receiver.Start(ctx, componenttest.NewNopHost()) + require.NoError(t, err) + defer func() { + require.NoError(t, receiver.Shutdown(ctx)) + }() + // Start launches nested goroutines, give them a chance to run before logging the test event(s). + time.Sleep(3 * time.Second) + + for _, src := range testSources { + logger, err := eventlog.Open(src) + require.NoError(t, err) + defer logger.Close() + + err = logger.Info(10, logMessage) + require.NoError(t, err) + } + + records := requireExpectedLogRecords(t, sink, notExcludedSrc, 1) + assert.NotEmpty(t, records) + + records = filterAllLogRecordsBySource(t, sink, excludedSrc) + assert.Empty(t, records) + }) + } } func createTestConfig() *WindowsLogConfig { @@ -289,3 +246,72 @@ func createTestConfig() *WindowsLogConfig { }(), } } + +// assertEventSourceInstallation installs an event source and verifies that the registry key was created. +// It returns a function that can be used to uninstall the event source, that function is never nil +func assertEventSourceInstallation(t *testing.T, src string) (uninstallEventSource func(), err error) { + err = eventlog.InstallAsEventCreate(src, eventlog.Info|eventlog.Warning|eventlog.Error) + uninstallEventSource = func() { + assert.NoError(t, eventlog.Remove(src)) + } + assert.NoError(t, err) + assert.EventuallyWithT(t, func(c *assert.CollectT) { + rk, err := registry.OpenKey(registry.LOCAL_MACHINE, "SYSTEM\\CurrentControlSet\\Services\\EventLog\\Application\\"+src, registry.QUERY_VALUE) + assert.NoError(c, err) + defer rk.Close() + _, _, err = rk.GetIntegerValue("TypesSupported") + assert.NoError(c, err) + }, 10*time.Second, 250*time.Millisecond) + + return +} + +func requireExpectedLogRecords(t *testing.T, sink *consumertest.LogsSink, expectedEventSrc string, expectedEventCount int) []plog.LogRecord { + var actualLogRecords []plog.LogRecord + + // logs sometimes take a while to be written, so a substantial wait buffer is needed + require.EventuallyWithT(t, func(c *assert.CollectT) { + actualLogRecords = filterAllLogRecordsBySource(t, sink, expectedEventSrc) + assert.Equal(c, expectedEventCount, len(actualLogRecords)) + }, 10*time.Second, 250*time.Millisecond) + + return actualLogRecords +} + +func filterAllLogRecordsBySource(t *testing.T, sink *consumertest.LogsSink, src string) (filteredLogRecords []plog.LogRecord) { + for _, logs := range sink.AllLogs() { + resourceLogsSlice := logs.ResourceLogs() + for i := 0; i < resourceLogsSlice.Len(); i++ { + resourceLogs := resourceLogsSlice.At(i) + scopeLogsSlice := resourceLogs.ScopeLogs() + for j := 0; j < scopeLogsSlice.Len(); j++ { + logRecords := scopeLogsSlice.At(j).LogRecords() + for k := 0; k < logRecords.Len(); k++ { + logRecord := logRecords.At(k) + if extractEventSourceFromLogRecord(t, logRecord) == src { + filteredLogRecords = append(filteredLogRecords, logRecord) + } + } + } + } + } + + return +} + +func extractEventSourceFromLogRecord(t *testing.T, logRecord plog.LogRecord) string { + eventMap := logRecord.Body().Map() + if !reflect.DeepEqual(eventMap, pcommon.Map{}) { + eventProviderMap := eventMap.AsRaw()["provider"] + if providerMap, ok := eventProviderMap.(map[string]any); ok { + return providerMap["name"].(string) + } + require.Fail(t, "Failed to extract event source from log record") + } + + // This is a raw event log record. Extract the event source from the XML body string. + bodyString := logRecord.Body().AsString() + var eventXML windows.EventXML + require.NoError(t, xml.Unmarshal([]byte(bodyString), &eventXML)) + return eventXML.Provider.Name +} From 9ce149188d0c736a872e320c3060bf0e3d0bdc9e Mon Sep 17 00:00:00 2001 From: Paulo Janotti Date: Wed, 21 Aug 2024 10:53:30 -0700 Subject: [PATCH 2/2] make tidy --- receiver/windowseventlogreceiver/go.mod | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/receiver/windowseventlogreceiver/go.mod b/receiver/windowseventlogreceiver/go.mod index db75b85823d9..2b405001d040 100644 --- a/receiver/windowseventlogreceiver/go.mod +++ b/receiver/windowseventlogreceiver/go.mod @@ -17,7 +17,10 @@ require ( golang.org/x/sys v0.24.0 ) -require go.opentelemetry.io/collector/consumer/consumertest v0.107.1-0.20240821120936-6764622672bc +require ( + go.opentelemetry.io/collector/consumer/consumertest v0.107.1-0.20240821120936-6764622672bc + go.opentelemetry.io/collector/pdata v1.13.1-0.20240821120936-6764622672bc +) require ( github.com/beorn7/perks v1.0.1 // indirect @@ -55,7 +58,6 @@ require ( go.opentelemetry.io/collector/consumer/consumerprofiles v0.107.1-0.20240821120936-6764622672bc // indirect go.opentelemetry.io/collector/extension v0.107.1-0.20240821120936-6764622672bc // indirect go.opentelemetry.io/collector/featuregate v1.13.1-0.20240821120936-6764622672bc // indirect - go.opentelemetry.io/collector/pdata v1.13.1-0.20240821120936-6764622672bc // indirect go.opentelemetry.io/collector/pdata/pprofile v0.107.1-0.20240821120936-6764622672bc // indirect go.opentelemetry.io/otel v1.28.0 // indirect go.opentelemetry.io/otel/exporters/prometheus v0.50.0 // indirect