Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[chore] Fix windowseventlogreceiver test flakiness #34793

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions receiver/windowseventlogreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ require (
golang.org/x/sys v0.24.0
)

require go.opentelemetry.io/collector/consumer/consumertest v0.107.1-0.20240821120936-6764622672bc
require (
go.opentelemetry.io/collector/consumer/consumertest v0.107.1-0.20240821120936-6764622672bc
go.opentelemetry.io/collector/pdata v1.13.1-0.20240821120936-6764622672bc
)

require (
github.com/beorn7/perks v1.0.1 // indirect
Expand Down Expand Up @@ -55,7 +58,6 @@ require (
go.opentelemetry.io/collector/consumer/consumerprofiles v0.107.1-0.20240821120936-6764622672bc // indirect
go.opentelemetry.io/collector/extension v0.107.1-0.20240821120936-6764622672bc // indirect
go.opentelemetry.io/collector/featuregate v1.13.1-0.20240821120936-6764622672bc // indirect
go.opentelemetry.io/collector/pdata v1.13.1-0.20240821120936-6764622672bc // indirect
go.opentelemetry.io/collector/pdata/pprofile v0.107.1-0.20240821120936-6764622672bc // indirect
go.opentelemetry.io/otel v1.28.0 // indirect
go.opentelemetry.io/otel/exporters/prometheus v0.50.0 // indirect
Expand Down
266 changes: 146 additions & 120 deletions receiver/windowseventlogreceiver/receiver_windows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"context"
"encoding/xml"
"path/filepath"
"reflect"
"testing"
"time"

Expand All @@ -18,7 +19,10 @@ import (
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/confmap/confmaptest"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/receiver/receivertest"
"golang.org/x/sys/windows/registry"
"golang.org/x/sys/windows/svc/eventlog"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/consumerretry"
Expand Down Expand Up @@ -70,6 +74,10 @@ func TestCreateWithInvalidInputConfig(t *testing.T) {

func TestReadWindowsEventLogger(t *testing.T) {
logMessage := "Test log"
src := "otel-windowseventlogreceiver-test"
uninstallEventSource, err := assertEventSourceInstallation(t, src)
defer uninstallEventSource()
require.NoError(t, err)

ctx := context.Background()
factory := newFactoryAdapter()
Expand All @@ -85,13 +93,8 @@ func TestReadWindowsEventLogger(t *testing.T) {
defer func() {
require.NoError(t, receiver.Shutdown(ctx))
}()

src := "otel"
err = eventlog.InstallAsEventCreate(src, eventlog.Info|eventlog.Warning|eventlog.Error)
require.NoError(t, err)
defer func() {
require.NoError(t, eventlog.Remove(src))
}()
// Start launches nested goroutines, give them a chance to run before logging the test event(s).
time.Sleep(3 * time.Second)

logger, err := eventlog.Open(src)
require.NoError(t, err)
Expand All @@ -100,19 +103,8 @@ func TestReadWindowsEventLogger(t *testing.T) {
err = logger.Info(10, logMessage)
require.NoError(t, err)

logsReceived := func() bool {
return sink.LogRecordCount() == 1
}

// logs sometimes take a while to be written, so a substantial wait buffer is needed
require.Eventually(t, logsReceived, 20*time.Second, 200*time.Millisecond)
results := sink.AllLogs()
require.Len(t, results, 1)

records := results[0].ResourceLogs().At(0).ScopeLogs().At(0).LogRecords()
require.Equal(t, 1, records.Len())

record := records.At(0)
records := requireExpectedLogRecords(t, sink, src, 1)
record := records[0]
body := record.Body().Map().AsRaw()

require.Equal(t, logMessage, body["message"])
Expand All @@ -134,6 +126,10 @@ func TestReadWindowsEventLogger(t *testing.T) {

func TestReadWindowsEventLoggerRaw(t *testing.T) {
logMessage := "Test log"
src := "otel-windowseventlogreceiver-test"
uninstallEventSource, err := assertEventSourceInstallation(t, src)
defer uninstallEventSource()
require.NoError(t, err)

ctx := context.Background()
factory := newFactoryAdapter()
Expand All @@ -150,13 +146,8 @@ func TestReadWindowsEventLoggerRaw(t *testing.T) {
defer func() {
require.NoError(t, receiver.Shutdown(ctx))
}()

src := "otel"
err = eventlog.InstallAsEventCreate(src, eventlog.Info|eventlog.Warning|eventlog.Error)
defer func() {
require.NoError(t, eventlog.Remove(src))
}()
require.NoError(t, err)
// Start launches nested goroutines, give them a chance to run before logging the test event(s).
time.Sleep(3 * time.Second)

logger, err := eventlog.Open(src)
require.NoError(t, err)
Expand All @@ -165,19 +156,8 @@ func TestReadWindowsEventLoggerRaw(t *testing.T) {
err = logger.Info(10, logMessage)
require.NoError(t, err)

logsReceived := func() bool {
return sink.LogRecordCount() == 1
}

// logs sometimes take a while to be written, so a substantial wait buffer is needed
require.Eventually(t, logsReceived, 20*time.Second, 200*time.Millisecond)
results := sink.AllLogs()
require.Len(t, results, 1)

records := results[0].ResourceLogs().At(0).ScopeLogs().At(0).LogRecords()
require.Equal(t, 1, records.Len())

record := records.At(0)
records := requireExpectedLogRecords(t, sink, src, 1)
record := records[0]
body := record.Body().AsString()
bodyStruct := struct {
Data string `xml:"EventData>Data"`
Expand All @@ -188,91 +168,68 @@ func TestReadWindowsEventLoggerRaw(t *testing.T) {
require.Equal(t, logMessage, bodyStruct.Data)
}

func TestReadWindowsEventLoggerWithExcludeProvider(t *testing.T) {
func TestExcludeProvider(t *testing.T) {
logMessage := "Test log"
src := "otel"

ctx := context.Background()
factory := newFactoryAdapter()
createSettings := receivertest.NewNopSettings()
cfg := createTestConfig()
cfg.InputConfig.ExcludeProviders = []string{src}
sink := new(consumertest.LogsSink)

receiver, err := factory.CreateLogsReceiver(ctx, createSettings, cfg, sink)
require.NoError(t, err)

err = receiver.Start(ctx, componenttest.NewNopHost())
require.NoError(t, err)
defer func() {
require.NoError(t, receiver.Shutdown(ctx))
}()

err = eventlog.InstallAsEventCreate(src, eventlog.Info|eventlog.Warning|eventlog.Error)
require.NoError(t, err)
defer func() {
require.NoError(t, eventlog.Remove(src))
}()

logger, err := eventlog.Open(src)
require.NoError(t, err)
defer logger.Close()

err = logger.Info(10, logMessage)
require.NoError(t, err)

logsReceived := func() bool {
return sink.LogRecordCount() == 0
excludedSrc := "otel-excluded-windowseventlogreceiver-test"
notExcludedSrc := "otel-windowseventlogreceiver-test"
testSources := []string{excludedSrc, notExcludedSrc}

for _, src := range testSources {
uninstallEventSource, err := assertEventSourceInstallation(t, src)
defer uninstallEventSource()
require.NoError(t, err)
}

// logs sometimes take a while to be written, so a substantial wait buffer is needed
require.Eventually(t, logsReceived, 10*time.Second, 200*time.Millisecond)
results := sink.AllLogs()
require.Len(t, results, 0)
}

func TestReadWindowsEventLoggerRawWithExcludeProvider(t *testing.T) {
logMessage := "Test log"
src := "otel"

ctx := context.Background()
factory := newFactoryAdapter()
createSettings := receivertest.NewNopSettings()
cfg := createTestConfig()
cfg.InputConfig.Raw = true
cfg.InputConfig.ExcludeProviders = []string{src}
sink := new(consumertest.LogsSink)

receiver, err := factory.CreateLogsReceiver(ctx, createSettings, cfg, sink)
require.NoError(t, err)

err = receiver.Start(ctx, componenttest.NewNopHost())
require.NoError(t, err)
defer func() {
require.NoError(t, receiver.Shutdown(ctx))
}()

err = eventlog.InstallAsEventCreate(src, eventlog.Info|eventlog.Warning|eventlog.Error)
defer func() {
require.NoError(t, eventlog.Remove(src))
}()
require.NoError(t, err)

logger, err := eventlog.Open(src)
require.NoError(t, err)
defer logger.Close()

err = logger.Info(10, logMessage)
require.NoError(t, err)

logsReceived := func() bool {
return sink.LogRecordCount() == 0
tests := []struct {
name string
raw bool
}{
{
name: "with_EventXML",
},
{
name: "with_Raw",
raw: true,
},
}

// logs sometimes take a while to be written, so a substantial wait buffer is needed
require.Eventually(t, logsReceived, 10*time.Second, 200*time.Millisecond)
results := sink.AllLogs()
require.Len(t, results, 0)
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx := context.Background()
factory := newFactoryAdapter()
createSettings := receivertest.NewNopSettings()
cfg := createTestConfig()
cfg.InputConfig.Raw = tt.raw
cfg.InputConfig.ExcludeProviders = []string{excludedSrc}
sink := new(consumertest.LogsSink)

receiver, err := factory.CreateLogsReceiver(ctx, createSettings, cfg, sink)
require.NoError(t, err)

err = receiver.Start(ctx, componenttest.NewNopHost())
require.NoError(t, err)
defer func() {
require.NoError(t, receiver.Shutdown(ctx))
}()
// Start launches nested goroutines, give them a chance to run before logging the test event(s).
time.Sleep(3 * time.Second)
atoulme marked this conversation as resolved.
Show resolved Hide resolved

for _, src := range testSources {
logger, err := eventlog.Open(src)
require.NoError(t, err)
defer logger.Close()

err = logger.Info(10, logMessage)
require.NoError(t, err)
}

records := requireExpectedLogRecords(t, sink, notExcludedSrc, 1)
assert.NotEmpty(t, records)

records = filterAllLogRecordsBySource(t, sink, excludedSrc)
assert.Empty(t, records)
})
}
}

func createTestConfig() *WindowsLogConfig {
Expand All @@ -289,3 +246,72 @@ func createTestConfig() *WindowsLogConfig {
}(),
}
}

// assertEventSourceInstallation installs an event source and verifies that the registry key was created.
// It returns a function that can be used to uninstall the event source, that function is never nil
func assertEventSourceInstallation(t *testing.T, src string) (uninstallEventSource func(), err error) {
err = eventlog.InstallAsEventCreate(src, eventlog.Info|eventlog.Warning|eventlog.Error)
uninstallEventSource = func() {
assert.NoError(t, eventlog.Remove(src))
}
assert.NoError(t, err)
assert.EventuallyWithT(t, func(c *assert.CollectT) {
rk, err := registry.OpenKey(registry.LOCAL_MACHINE, "SYSTEM\\CurrentControlSet\\Services\\EventLog\\Application\\"+src, registry.QUERY_VALUE)
assert.NoError(c, err)
defer rk.Close()
_, _, err = rk.GetIntegerValue("TypesSupported")
assert.NoError(c, err)
}, 10*time.Second, 250*time.Millisecond)

return
}

func requireExpectedLogRecords(t *testing.T, sink *consumertest.LogsSink, expectedEventSrc string, expectedEventCount int) []plog.LogRecord {
var actualLogRecords []plog.LogRecord

// logs sometimes take a while to be written, so a substantial wait buffer is needed
require.EventuallyWithT(t, func(c *assert.CollectT) {
actualLogRecords = filterAllLogRecordsBySource(t, sink, expectedEventSrc)
assert.Equal(c, expectedEventCount, len(actualLogRecords))
}, 10*time.Second, 250*time.Millisecond)

return actualLogRecords
}

func filterAllLogRecordsBySource(t *testing.T, sink *consumertest.LogsSink, src string) (filteredLogRecords []plog.LogRecord) {
for _, logs := range sink.AllLogs() {
resourceLogsSlice := logs.ResourceLogs()
for i := 0; i < resourceLogsSlice.Len(); i++ {
resourceLogs := resourceLogsSlice.At(i)
scopeLogsSlice := resourceLogs.ScopeLogs()
for j := 0; j < scopeLogsSlice.Len(); j++ {
logRecords := scopeLogsSlice.At(j).LogRecords()
for k := 0; k < logRecords.Len(); k++ {
logRecord := logRecords.At(k)
if extractEventSourceFromLogRecord(t, logRecord) == src {
filteredLogRecords = append(filteredLogRecords, logRecord)
}
}
}
}
}

return
}

func extractEventSourceFromLogRecord(t *testing.T, logRecord plog.LogRecord) string {
eventMap := logRecord.Body().Map()
if !reflect.DeepEqual(eventMap, pcommon.Map{}) {
eventProviderMap := eventMap.AsRaw()["provider"]
if providerMap, ok := eventProviderMap.(map[string]any); ok {
return providerMap["name"].(string)
}
require.Fail(t, "Failed to extract event source from log record")
}

// This is a raw event log record. Extract the event source from the XML body string.
bodyString := logRecord.Body().AsString()
var eventXML windows.EventXML
require.NoError(t, xml.Unmarshal([]byte(bodyString), &eventXML))
return eventXML.Provider.Name
}
Loading