Skip to content

Commit

Permalink
[winlogbeat] Add missing functionality to experimental API (#41525)
Browse files Browse the repository at this point in the history
* Put data under UserData also in experimental api

* Change docs and changelog

* check evt meta

* Propagate locale config appropiately

* Extract metadata cache

* Add render config

* Simplify render functions

* Add xml rendering to experimental api

* Add benchmarks

* Update docs

* Fix multi os build

* Format embedded messages in the experimental api

* Safer assert

* Test exp api include xml with same test suite

* Check for nil metadata

* Revert "Safer assert"

This reverts commit db5a57d.

* Use single buffer to render xml
  • Loading branch information
marc-gr authored Nov 25, 2024
1 parent 6d1c81e commit 4278366
Show file tree
Hide file tree
Showing 22 changed files with 483 additions and 154 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,12 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
*Winlogbeat*

- Add handling for missing `EvtVarType`s in experimental api. {issue}19337[19337] {pull}41418[41418]
- Properly set events `UserData` when experimental api is used. {pull}41525[41525]
- Include XML is respected for experimental api {pull}41525[41525]
- Forwarded events use renderedtext info for experimental api {pull}41525[41525]
- Language setting is respected for experimental api {pull}41525[41525]
- Language setting also added to decode xml wineventlog processor {pull}41525[41525]
- Format embedded messages in the experimental api {pull}41525[41525]
- Implement exclusion range support for event_id. {issue}38623[38623] {pull}41639[41639]


Expand Down
1 change: 1 addition & 0 deletions libbeat/processors/decode_xml_wineventlog/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type config struct {
MapECSFields bool `config:"map_ecs_fields"`
IgnoreMissing bool `config:"ignore_missing"`
IgnoreFailure bool `config:"ignore_failure"`
Language uint32 `config:"language"`
}

func defaultConfig() config {
Expand Down
2 changes: 1 addition & 1 deletion libbeat/processors/decode_xml_wineventlog/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (

type nonWinDecoder struct{}

func newDecoder() decoder {
func newDecoder(uint32) decoder {
return nonWinDecoder{}
}

Expand Down
12 changes: 7 additions & 5 deletions libbeat/processors/decode_xml_wineventlog/decoder_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@ import (
)

type winDecoder struct {
cache *metadataCache
locale uint32
cache *metadataCache
}

func newDecoder() decoder {
func newDecoder(locale uint32) decoder {
return &winDecoder{
locale: locale,
cache: &metadataCache{
store: map[string]*winevent.WinMeta{},
log: logp.NewLogger(logName),
Expand All @@ -46,7 +48,7 @@ func (dec *winDecoder) decode(data []byte) (mapstr.M, mapstr.M, error) {
if err != nil {
return nil, nil, err
}
md := dec.cache.getPublisherMetadata(evt.Provider.Name)
md := dec.cache.getPublisherMetadata(evt.Provider.Name, dec.locale)
winevent.EnrichRawValuesWithNames(md, &evt)
win, ecs := fields(evt)
return win, ecs, nil
Expand All @@ -59,7 +61,7 @@ type metadataCache struct {
log *logp.Logger
}

func (c *metadataCache) getPublisherMetadata(publisher string) *winevent.WinMeta {
func (c *metadataCache) getPublisherMetadata(publisher string, locale uint32) *winevent.WinMeta {
// NOTE: This code uses double-check locking to elevate to a write-lock
// when a cache value needs initialized.
c.mutex.RLock()
Expand All @@ -79,7 +81,7 @@ func (c *metadataCache) getPublisherMetadata(publisher string) *winevent.WinMeta
}

// Load metadata from the publisher.
md, err := wineventlog.NewPublisherMetadataStore(wineventlog.NilHandle, publisher, c.log)
md, err := wineventlog.NewPublisherMetadataStore(wineventlog.NilHandle, publisher, locale, c.log)
if err != nil {
// Return an empty store on error (can happen in cases where the
// log was forwarded and the provider doesn't exist on collector).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ when a specified field does not exist. Defaults to `false`.
`ignore_failure`:: (Optional) Ignore all errors produced by the processor.
Defaults to `false`.

`language`:: (Optional) The language ID the events will be rendered in. The language will be forced regardless
of the system language. Forwarded events will ignore this setting. A complete list of language IDs can be found
https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-lcid/a9eac961-e77d-41a6-90a5-ce1a8b0cdb9c[here].
It defaults to `0`, which indicates to use the system language.

Example:

[source,yaml]
Expand Down
2 changes: 1 addition & 1 deletion libbeat/processors/decode_xml_wineventlog/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func newProcessor(config config) (beat.Processor, error) {

return &processor{
config: config,
decoder: newDecoder(),
decoder: newDecoder(config.Language),
log: logp.NewLogger(logName),
}, nil
}
Expand Down
5 changes: 2 additions & 3 deletions winlogbeat/docs/winlogbeat-options.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -515,9 +515,8 @@ winlogbeat.event_logs:

There are a few notable differences in the events:

* Events that contained data under `winlog.user_data` will now have it under
`winlog.event_data`.
* Setting `include_xml: true` has no effect.
* If `include_xml` is `true` the performance will be the same as the default API,
as performance improvements are lost when parsing the XML.


[float]
Expand Down
15 changes: 9 additions & 6 deletions winlogbeat/eventlog/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,22 +60,25 @@ func TestBenchmarkRead(t *testing.T) {

for _, api := range []string{winEventLogAPIName, winEventLogExpAPIName} {
t.Run("api="+api, func(t *testing.T) {
for _, batchSize := range []int{10, 100, 500, 1000} {
t.Run(fmt.Sprintf("batch_size=%d", batchSize), func(t *testing.T) {
result := testing.Benchmark(benchmarkEventLog(api, batchSize))
outputBenchmarkResults(t, result)
})
for _, includexml := range []bool{true, false} {
for _, batchSize := range []int{10, 100, 500, 1000} {
t.Run(fmt.Sprintf("include_xml=%v/batch_size=%d", includexml, batchSize), func(t *testing.T) {
result := testing.Benchmark(benchmarkEventLog(api, includexml, batchSize))
outputBenchmarkResults(t, result)
})
}
}
})
}
}

func benchmarkEventLog(api string, batchSize int) func(b *testing.B) {
func benchmarkEventLog(api string, includexml bool, batchSize int) func(b *testing.B) {
return func(b *testing.B) {
conf := mapstr.M{
"name": providerName,
"batch_read_size": batchSize,
"no_more_events": "stop",
"include_xml": includexml,
}

log := openLog(b, api, nil, conf)
Expand Down
19 changes: 16 additions & 3 deletions winlogbeat/eventlog/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.

//go:build windows

package eventlog

// This component of the eventlog package provides a cache for storing Handles
Expand All @@ -26,6 +28,7 @@ import (

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/winlogbeat/sys"
win "github.com/elastic/beats/v7/winlogbeat/sys/wineventlog"
"github.com/elastic/elastic-agent-libs/logp"
)

Expand Down Expand Up @@ -92,7 +95,7 @@ func newMessageFilesCache(eventLogName string, loader messageFileLoaderFunc,
// If no item is cached, then one is loaded, stored, and returned.
// Callers should check the MessageFiles.Err value to see if an error occurred
// while loading the message files.
func (hc *messageFilesCache) get(sourceName string) sys.MessageFiles {
func (hc *messageFilesCache) get(sourceName string) win.EvtHandle {
v := hc.cache.Get(sourceName)
if v == nil {
hc.miss()
Expand All @@ -111,15 +114,25 @@ func (hc *messageFilesCache) get(sourceName string) sys.MessageFiles {

// Return the existing cached value.
messageFiles, _ = existing.(sys.MessageFiles)
return messageFiles

if messageFiles.Err == nil {
// There is only ever a single handle when using the Windows Event
// Log API.
return win.EvtHandle(messageFiles.Handles[0].Handle)
}
}
hc.size()
} else {
hc.hit()
}

messageFiles, _ := v.(sys.MessageFiles)
return messageFiles
if messageFiles.Err == nil {
// There is only ever a single handle when using the Windows Event
// Log API.
return win.EvtHandle(messageFiles.Handles[0].Handle)
}
return win.NilHandle
}

// evictionHandler is the callback handler that receives notifications when
Expand Down
6 changes: 3 additions & 3 deletions winlogbeat/eventlog/wineventlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ func (l *winEventLog) buildRecordFromXML(x []byte, recoveredErr error) Record {
}

// Get basic string values for raw fields.
winevent.EnrichRawValuesWithNames(l.winMeta(e.Provider.Name), &e)
winevent.EnrichRawValuesWithNames(l.winMeta(e.Provider.Name, l.config.EventLanguage), &e)
if e.Level == "" {
// Fallback on LevelRaw if the Level is not set in the RenderingInfo.
e.Level = win.EventLevel(e.LevelRaw).String()
Expand Down Expand Up @@ -605,7 +605,7 @@ func newWinMetaCache(ttl time.Duration) winMetaCache {
return winMetaCache{cache: make(map[string]winMetaCacheEntry), ttl: ttl, logger: logp.L()}
}

func (c *winMetaCache) winMeta(provider string) *winevent.WinMeta {
func (c *winMetaCache) winMeta(provider string, locale uint32) *winevent.WinMeta {
c.mu.RLock()
e, ok := c.cache[provider]
c.mu.RUnlock()
Expand All @@ -624,7 +624,7 @@ func (c *winMetaCache) winMeta(provider string) *winevent.WinMeta {
return e.WinMeta
}

s, err := win.NewPublisherMetadataStore(win.NilHandle, provider, c.logger)
s, err := win.NewPublisherMetadataStore(win.NilHandle, provider, locale, c.logger)
if err != nil {
// Return an empty store on error (can happen in cases where the
// log was forwarded and the provider doesn't exist on collector).
Expand Down
37 changes: 26 additions & 11 deletions winlogbeat/eventlog/wineventlog_experimental.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ type winEventLogExp struct {
log *logp.Logger

iterator *win.EventIterator
renderer *win.Renderer
renderer win.EventRenderer

metrics *inputMetrics
}
Expand Down Expand Up @@ -115,22 +115,36 @@ func newWinEventLogExp(options *conf.C) (EventLog, error) {
log = logp.NewLogger("wineventlog").With("id", id).With("channel", c.Name)
}

renderer, err := win.NewRenderer(win.NilHandle, log)
if err != nil {
return nil, err
}

l := &winEventLogExp{
config: c,
query: xmlQuery,
id: id,
channelName: c.Name,
file: isFile,
maxRead: c.BatchReadSize,
renderer: renderer,
log: log,
}

switch c.IncludeXML {
case true:
l.renderer = win.NewXMLRenderer(
win.RenderConfig{
IsForwarded: l.isForwarded(),
Locale: c.EventLanguage,
},
win.NilHandle, log)
case false:
l.renderer, err = win.NewRenderer(
win.RenderConfig{
IsForwarded: l.isForwarded(),
Locale: c.EventLanguage,
},
win.NilHandle, log)
if err != nil {
return nil, err
}
}

return l, nil
}

Expand Down Expand Up @@ -309,22 +323,23 @@ func (l *winEventLogExp) processHandle(h win.EvtHandle) (*Record, error) {
defer h.Close()

// NOTE: Render can return an error and a partial event.
evt, err := l.renderer.Render(h)
evt, xml, err := l.renderer.Render(h)
if evt == nil {
return nil, err
}
if err != nil {
evt.RenderErr = append(evt.RenderErr, err.Error())
}

//nolint:godox // Bad linter! Keep to have a record of feature disparity between non-experimental vs experimental.
// TODO: Need to add XML when configured.

r := &Record{
API: winEventLogExpAPIName,
Event: *evt,
}

if l.config.IncludeXML {
r.XML = xml
}

if l.file {
r.File = l.id
}
Expand Down
22 changes: 14 additions & 8 deletions winlogbeat/eventlog/wineventlog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,14 +167,17 @@ func TestWinEventLogConfig_Validate(t *testing.T) {
}

func TestWindowsEventLogAPI(t *testing.T) {
testWindowsEventLog(t, winEventLogAPIName)
testWindowsEventLog(t, winEventLogAPIName, false)
}

func TestWindowsEventLogAPIExperimental(t *testing.T) {
testWindowsEventLog(t, winEventLogExpAPIName)
// for the experimental api using include xml behave differently than not
// so we must test both settings
testWindowsEventLog(t, winEventLogExpAPIName, true)
testWindowsEventLog(t, winEventLogExpAPIName, false)
}

func testWindowsEventLog(t *testing.T, api string) {
func testWindowsEventLog(t *testing.T, api string, includeXML bool) {
writer, teardown := createLog(t)
defer teardown()

Expand All @@ -192,7 +195,7 @@ func testWindowsEventLog(t *testing.T, api string) {
}

t.Run("has_message", func(t *testing.T) {
log := openLog(t, map[string]interface{}{"name": providerName, "batch_read_size": 1})
log := openLog(t, map[string]interface{}{"name": providerName, "batch_read_size": 1, "include_xml": includeXML})
defer log.Close()

for i := 0; i < 10; i++ {
Expand All @@ -208,8 +211,9 @@ func testWindowsEventLog(t *testing.T, api string) {
// Test reading from an event log using a custom XML query.
t.Run("custom_xml_query", func(t *testing.T) {
cfg := map[string]interface{}{
"id": "custom-xml-query",
"xml_query": customXMLQuery,
"id": "custom-xml-query",
"xml_query": customXMLQuery,
"include_xml": includeXML,
}

log := openLog(t, cfg)
Expand All @@ -236,7 +240,7 @@ func testWindowsEventLog(t *testing.T, api string) {
t.Run("batch_read_size_config", func(t *testing.T) {
const batchReadSize = 2

log := openLog(t, map[string]interface{}{"name": providerName, "batch_read_size": batchReadSize})
log := openLog(t, map[string]interface{}{"name": providerName, "batch_read_size": batchReadSize, "include_xml": includeXML})
defer log.Close()

records, err := log.Read()
Expand All @@ -251,7 +255,7 @@ func testWindowsEventLog(t *testing.T, api string) {
// When combined with large messages this causes EvtNext to fail with
// RPC_S_INVALID_BOUND error. The reader should recover from the error.
t.Run("large_batch_read", func(t *testing.T) {
log := openLog(t, map[string]interface{}{"name": providerName, "batch_read_size": 1024})
log := openLog(t, map[string]interface{}{"name": providerName, "batch_read_size": 1024, "include_xml": includeXML})
defer log.Close()

var eventCount int
Expand Down Expand Up @@ -282,6 +286,7 @@ func testWindowsEventLog(t *testing.T, api string) {
log := openLog(t, map[string]interface{}{
"name": path,
"no_more_events": "stop",
"include_xml": includeXML,
})
defer log.Close()

Expand Down Expand Up @@ -310,6 +315,7 @@ func testWindowsEventLog(t *testing.T, api string) {
"name": path,
"no_more_events": "stop",
"event_id": "3, 5",
"include_xml": includeXML,
})
defer log.Close()

Expand Down
2 changes: 1 addition & 1 deletion winlogbeat/sys/wineventlog/format_message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestFormatMessage(t *testing.T) {
evtHandle := mustNextHandle(t, log)
defer evtHandle.Close()

publisherMetadata, err := NewPublisherMetadata(NilHandle, "Microsoft-Windows-Security-Auditing")
publisherMetadata, err := NewPublisherMetadata(NilHandle, "Microsoft-Windows-Security-Auditing", 0)
if err != nil {
t.Fatal(err)
}
Expand Down
Loading

0 comments on commit 4278366

Please sign in to comment.