Skip to content

Commit

Permalink
[8.1](backport #30155) [winlogbeat] Retry EvtSubscribe from start if …
Browse files Browse the repository at this point in the history
…fails with strict mode (#31044)

* [winlogbeat] Retry EvtSubscribe from start if fails with strict mode (#30155)

* Retry EvtSubscribe from start if fails with strict mode

* Add metrics and tests

* Shorten test name

* Fix debug message

* Update winlogbeat/beater/winlogbeat.go

Co-authored-by: Andrew Kroh <[email protected]>

* Shorten test names

* Add changelog

* Shorten bad bookmark test

* Close file on test

* restructure test

* Fix fake bookmark generation in test

One of the format strings was ignored, resulting in invalid YaML

* Additional logging

* Fix linting issues

* Fix linting issue

* Remove test output

* Fix usage of fmt.Errorf

Co-authored-by: Andrew Kroh <[email protected]>
Co-authored-by: Adrian Serrano <[email protected]>
(cherry picked from commit e8a4675)

* Update CHANGELOG.next.asciidoc

Co-authored-by: Marc Guasch <[email protected]>
  • Loading branch information
mergify[bot] and marc-gr authored Mar 31, 2022
1 parent 6f7594f commit 45f3866
Show file tree
Hide file tree
Showing 6 changed files with 163 additions and 40 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...main[Check the HEAD dif

*Winlogbeat*

- Retry EvtSubscribe from start if fails with strict mode. {issue}29793[29793] {pull}30155[30155]


*Elastic Log Driver*

Expand Down
2 changes: 1 addition & 1 deletion winlogbeat/beater/winlogbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (eb *Winlogbeat) init(b *beat.Beat) error {
if err != nil {
return fmt.Errorf("failed to create new event log: %w", err)
}
eb.log.Debugf("Initialized EventLog]", eventLog.Name())
eb.log.Debugw("Initialized EventLog", "id", eventLog.Name())

logger, err := newEventLogger(b.Info, eventLog, config, eb.log)
if err != nil {
Expand Down
59 changes: 37 additions & 22 deletions winlogbeat/eventlog/wineventlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package eventlog

import (
"encoding/xml"
"errors"
"fmt"
"io"
"path/filepath"
Expand All @@ -30,7 +31,6 @@ import (
"time"

"github.com/joeshaw/multierror"
"github.com/pkg/errors"
"golang.org/x/sys/windows"

"github.com/elastic/beats/v7/libbeat/common"
Expand Down Expand Up @@ -90,7 +90,7 @@ func (a *NoMoreEventsAction) Unpack(v string) error {
return nil
}
}
return errors.Errorf("invalid no_more_events action: %v", v)
return fmt.Errorf("invalid no_more_events action: %v", v)
}

// String returns the name of the action.
Expand Down Expand Up @@ -196,13 +196,15 @@ func (l *winEventLog) openChannel(bookmark win.EvtHandle) error {
// https://msdn.microsoft.com/en-us/library/windows/desktop/aa385771(v=vs.85).aspx#pull
signalEvent, err := windows.CreateEvent(nil, 0, 0, nil)
if err != nil {
return nil
return err
}
defer windows.CloseHandle(signalEvent)
defer func() { _ = windows.CloseHandle(signalEvent) }()

var flags win.EvtSubscribeFlag
if bookmark > 0 {
flags = win.EvtSubscribeStartAfterBookmark
// Use EvtSubscribeStrict to detect when the bookmark is missing and be able to
// subscribe again from the beginning.
flags = win.EvtSubscribeStartAfterBookmark | win.EvtSubscribeStrict
} else {
flags = win.EvtSubscribeStartAtOldestRecord
}
Expand All @@ -215,7 +217,18 @@ func (l *winEventLog) openChannel(bookmark win.EvtHandle) error {
l.query, // Query - nil means all events
bookmark, // Bookmark - for resuming from a specific event
flags)

switch {
case errors.Is(err, win.ERROR_NOT_FOUND), errors.Is(err, win.ERROR_EVT_QUERY_RESULT_STALE),
errors.Is(err, win.ERROR_EVT_QUERY_RESULT_INVALID_POSITION):
debugf("%s error subscribing (first chance): %v", l.logPrefix, err)
// The bookmarked event was not found, we retry the subscription from the start.
incrementMetric(readErrors, err)
subscriptionHandle, err = win.Subscribe(0, signalEvent, "", l.query, 0, win.EvtSubscribeStartAtOldestRecord)
}

if err != nil {
debugf("%s error subscribing (final): %v", l.logPrefix, err)
return err
}

Expand All @@ -228,7 +241,7 @@ func (l *winEventLog) openFile(state checkpoint.EventLogState, bookmark win.EvtH

h, err := win.EvtQuery(0, path, "", win.EvtQueryFilePath|win.EvtQueryForwardDirection)
if err != nil {
return errors.Wrapf(err, "failed to get handle to event log file %v", path)
return fmt.Errorf("failed to get handle to event log file %v: %w", path, err)
}

if bookmark > 0 {
Expand All @@ -240,16 +253,16 @@ func (l *winEventLog) openFile(state checkpoint.EventLogState, bookmark win.EvtH
if err = win.EvtSeek(h, 0, bookmark, win.EvtSeekRelativeToBookmark|win.EvtSeekStrict); err == nil {
// Then we advance past the last read event to avoid sending that
// event again. This won't fail if we're at the end of the file.
err = errors.Wrap(
win.EvtSeek(h, 1, bookmark, win.EvtSeekRelativeToBookmark),
"failed to seek past bookmarked position")
if seekErr := win.EvtSeek(h, 1, bookmark, win.EvtSeekRelativeToBookmark); seekErr != nil {
err = fmt.Errorf("failed to seek past bookmarked position: %w", seekErr)
}
} else {
logp.Warn("%s Failed to seek to bookmarked location in %v (error: %v). "+
"Recovering by reading the log from the beginning. (Did the file "+
"change since it was last read?)", l.logPrefix, path, err)
err = errors.Wrap(
win.EvtSeek(h, 0, 0, win.EvtSeekRelativeToFirst),
"failed to seek to beginning of log")
if seekErr := win.EvtSeek(h, 0, 0, win.EvtSeekRelativeToFirst); seekErr != nil {
err = fmt.Errorf("failed to seek to beginning of log: %w", seekErr)
}
}

if err != nil {
Expand All @@ -273,11 +286,13 @@ func (l *winEventLog) Read() ([]Record, error) {
}()
detailf("%s EventHandles returned %d handles", l.logPrefix, len(handles))

//nolint: prealloc // some handles can be skipped, the final size is unknown
var records []Record
for _, h := range handles {
l.outputBuf.Reset()
err := l.render(h, l.outputBuf)
if bufErr, ok := err.(sys.InsufficientBufferError); ok {
var bufErr sys.InsufficientBufferError
if ok := errors.As(err, &bufErr); ok {
detailf("%s Increasing render buffer size to %d", l.logPrefix,
bufErr.RequiredSize)
l.renderBuf = make([]byte, bufErr.RequiredSize)
Expand All @@ -290,7 +305,7 @@ func (l *winEventLog) Read() ([]Record, error) {
continue
}

r, _ := l.buildRecordFromXML(l.outputBuf.Bytes(), err)
r := l.buildRecordFromXML(l.outputBuf.Bytes(), err)
r.Offset = checkpoint.EventLogState{
Name: l.id,
RecordNumber: r.RecordID,
Expand All @@ -314,26 +329,26 @@ func (l *winEventLog) Close() error {

func (l *winEventLog) eventHandles(maxRead int) ([]win.EvtHandle, int, error) {
handles, err := win.EventHandles(l.subscription, maxRead)
switch err {
case nil:
switch {
case err == nil:
if l.maxRead > maxRead {
debugf("%s Recovered from RPC_S_INVALID_BOUND error (errno 1734) "+
"by decreasing batch_read_size to %v", l.logPrefix, maxRead)
}
return handles, maxRead, nil
case win.ERROR_NO_MORE_ITEMS:
case errors.Is(err, win.ERROR_NO_MORE_ITEMS):
detailf("%s No more events", l.logPrefix)
if l.config.NoMoreEvents == Stop {
return nil, maxRead, io.EOF
}
return nil, maxRead, nil
case win.RPC_S_INVALID_BOUND:
case errors.Is(err, win.RPC_S_INVALID_BOUND):
incrementMetric(readErrors, err)
if err := l.Close(); err != nil {
return nil, 0, errors.Wrap(err, "failed to recover from RPC_S_INVALID_BOUND")
return nil, 0, fmt.Errorf("failed to recover from RPC_S_INVALID_BOUND: %w", err)
}
if err := l.Open(l.lastRead); err != nil {
return nil, 0, errors.Wrap(err, "failed to recover from RPC_S_INVALID_BOUND")
return nil, 0, fmt.Errorf("failed to recover from RPC_S_INVALID_BOUND: %w", err)
}
return l.eventHandles(maxRead / 2)
default:
Expand All @@ -343,7 +358,7 @@ func (l *winEventLog) eventHandles(maxRead int) ([]win.EvtHandle, int, error) {
}
}

func (l *winEventLog) buildRecordFromXML(x []byte, recoveredErr error) (Record, error) {
func (l *winEventLog) buildRecordFromXML(x []byte, recoveredErr error) Record {
includeXML := l.config.IncludeXML
e, err := winevent.UnmarshalXML(x)
if err != nil {
Expand Down Expand Up @@ -388,7 +403,7 @@ func (l *winEventLog) buildRecordFromXML(x []byte, recoveredErr error) (Record,
r.XML = string(x)
}

return r, nil
return r
}

func newEventLogging(options *common.Config) (EventLog, error) {
Expand Down
40 changes: 28 additions & 12 deletions winlogbeat/eventlog/wineventlog_experimental.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@
package eventlog

import (
"errors"
"fmt"
"io"
"os"
"path/filepath"

"github.com/pkg/errors"
"go.uber.org/multierr"
"golang.org/x/sys/windows"

Expand Down Expand Up @@ -99,31 +100,45 @@ func (l *winEventLogExp) openChannel(bookmark win.Bookmark) (win.EvtHandle, erro
if err != nil {
return win.NilHandle, err
}
defer windows.CloseHandle(signalEvent)
defer func() { _ = windows.CloseHandle(signalEvent) }()

var flags win.EvtSubscribeFlag
if bookmark > 0 {
flags = win.EvtSubscribeStartAfterBookmark
// Use EvtSubscribeStrict to detect when the bookmark is missing and be able to
// subscribe again from the beginning.
flags = win.EvtSubscribeStartAfterBookmark | win.EvtSubscribeStrict
} else {
flags = win.EvtSubscribeStartAtOldestRecord
}

l.log.Debugw("Using subscription query.", "winlog.query", l.query)
return win.Subscribe(
h, err := win.Subscribe(
0, // Session - nil for localhost
signalEvent,
"", // Channel - empty b/c channel is in the query
l.query, // Query - nil means all events
win.EvtHandle(bookmark), // Bookmark - for resuming from a specific event
flags)

switch {
case err == nil:
return h, nil
case errors.Is(err, win.ERROR_NOT_FOUND), errors.Is(err, win.ERROR_EVT_QUERY_RESULT_STALE),
errors.Is(err, win.ERROR_EVT_QUERY_RESULT_INVALID_POSITION):
// The bookmarked event was not found, we retry the subscription from the start.
incrementMetric(readErrors, err)
return win.Subscribe(0, signalEvent, "", l.query, 0, win.EvtSubscribeStartAtOldestRecord)
default:
return 0, err
}
}

func (l *winEventLogExp) openFile(state checkpoint.EventLogState, bookmark win.Bookmark) (win.EvtHandle, error) {
path := l.channelName

h, err := win.EvtQuery(0, path, "", win.EvtQueryFilePath|win.EvtQueryForwardDirection)
if err != nil {
return win.NilHandle, errors.Wrapf(err, "failed to get handle to event log file %v", path)
return win.NilHandle, fmt.Errorf("failed to get handle to event log file %v: %w", path, err)
}

if bookmark > 0 {
Expand All @@ -135,16 +150,16 @@ func (l *winEventLogExp) openFile(state checkpoint.EventLogState, bookmark win.B
if err = win.EvtSeek(h, 0, win.EvtHandle(bookmark), win.EvtSeekRelativeToBookmark|win.EvtSeekStrict); err == nil {
// Then we advance past the last read event to avoid sending that
// event again. This won't fail if we're at the end of the file.
err = errors.Wrap(
win.EvtSeek(h, 1, win.EvtHandle(bookmark), win.EvtSeekRelativeToBookmark),
"failed to seek past bookmarked position")
if seekErr := win.EvtSeek(h, 1, win.EvtHandle(bookmark), win.EvtSeekRelativeToBookmark); seekErr != nil {
err = fmt.Errorf("failed to seek past bookmarked position: %w", seekErr)
}
} else {
l.log.Warnf("s Failed to seek to bookmarked location in %v (error: %v). "+
"Recovering by reading the log from the beginning. (Did the file "+
"change since it was last read?)", path, err)
err = errors.Wrap(
win.EvtSeek(h, 0, 0, win.EvtSeekRelativeToFirst),
"failed to seek to beginning of log")
if seekErr := win.EvtSeek(h, 0, 0, win.EvtSeekRelativeToFirst); seekErr != nil {
err = fmt.Errorf("failed to seek to beginning of log: %w", seekErr)
}
}

if err != nil {
Expand Down Expand Up @@ -198,6 +213,7 @@ func (l *winEventLogExp) processHandle(h win.EvtHandle) (*Record, error) {
evt.RenderErr = append(evt.RenderErr, err.Error())
}

//nolint: godox // keep to have a record of feature disparity between non-experimental vs experimental
// TODO: Need to add XML when configured.

r := &Record{
Expand All @@ -224,7 +240,7 @@ func (l *winEventLogExp) processHandle(h win.EvtHandle) (*Record, error) {
func (l *winEventLogExp) createBookmarkFromEvent(evtHandle win.EvtHandle) (string, error) {
bookmark, err := win.NewBookmarkFromEvent(evtHandle)
if err != nil {
return "", errors.Wrap(err, "failed to create new bookmark from event handle")
return "", fmt.Errorf("failed to create new bookmark from event handle: %w", err)
}
defer bookmark.Close()

Expand Down
22 changes: 17 additions & 5 deletions winlogbeat/sys/wineventlog/zsyscall_windows.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 45f3866

Please sign in to comment.