Skip to content

Commit

Permalink
[Winlogbeat] Tolerate faults when Windows Event Log session is interr…
Browse files Browse the repository at this point in the history
…upted (#28191)

- Added a retry mechanism to winlog/input and winlogbeat to reopen a
session to Windows Event Log when certain error conditions are encountered.
- This applies when opening a session to Windows Event Log and
also when reading from Windows Event Log.
  • Loading branch information
taylor-swanson authored Oct 6, 2021
1 parent 8ce047e commit 3c1731d
Show file tree
Hide file tree
Showing 6 changed files with 156 additions and 67 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Update indentation for azure filebeat configuration. {pull}26604[26604]
- Update Sophos xg module pipeline to deal with missing `date` and `time` fields. {pull}27834[27834]
- sophos/xg fileset: Add missing pipeline for System Health logs. {pull}27827[27827] {issue}27826[27826]
- Tolerate faults when Windows Event Log session is interrupted {issue}27947[27947] {pull}28191[28191]

*Heartbeat*

Expand Down Expand Up @@ -422,6 +423,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add source.ip validation for event ID 4778 in the Security module. {issue}19627[19627]
- Protect against accessing undefined variables in Sysmon module. {issue}22219[22219] {pull}22236[22236]
- Protect against accessing an undefined variable in Security module. {pull}22937[22937]
- Tolerate faults when Windows Event Log session is interrupted {issue}27947[27947] {pull}28191[28191]

*Functionbeat*

Expand Down
82 changes: 48 additions & 34 deletions filebeat/input/winlog/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,17 +85,8 @@ func (eventlogRunner) Run(
publisher cursor.Publisher,
) error {
log := ctx.Logger.With("eventlog", source.Name())
checkpoint := initCheckpoint(log, cursor)

api := source.(eventlog.EventLog)

err := api.Open(checkpoint)
if err != nil {
return fmt.Errorf("failed to open windows event log: %v", err)
}

log.Debugf("Windows Event Log '%s' opened successfully", source.Name())

// setup closing the API if either the run function is signaled asynchronously
// to shut down or when returning after io.EOF
cancelCtx, cancelFn := ctxtool.WithFunc(ctx.Cancelation, func() {
Expand All @@ -105,41 +96,64 @@ func (eventlogRunner) Run(
})
defer cancelFn()

// read loop
for cancelCtx.Err() == nil {
records, err := api.Read()
switch err {
case nil:
break
case io.EOF:
log.Debugf("End of Winlog event stream reached: %v", err)
runLoop:
for {
if cancelCtx.Err() != nil {
return nil
default:
// only log error if we are not shutting down
if cancelCtx.Err() != nil {
return nil
}

log.Errorf("Error occured while reading from Windows Event Log '%v': %v", source.Name(), err)
return err
}

if len(records) == 0 {
timed.Wait(cancelCtx, time.Second)
evtCheckpoint := initCheckpoint(log, cursor)
openErr := api.Open(evtCheckpoint)
if eventlog.IsRecoverable(openErr) {
log.Errorf("Encountered recoverable error when opening Windows Event Log: %v", openErr)
timed.Wait(cancelCtx, 5*time.Second)
continue
} else if openErr != nil {
return fmt.Errorf("failed to open windows event log: %v", openErr)
}
log.Debugf("Windows Event Log '%s' opened successfully", source.Name())

// read loop
for cancelCtx.Err() == nil {
records, err := api.Read()
if eventlog.IsRecoverable(err) {
log.Errorf("Encountered recoverable error when reading from Windows Event Log: %v", err)
if closeErr := api.Close(); closeErr != nil {
log.Errorf("Error closing Windows Event Log handle: %v", closeErr)
}
continue runLoop
}
switch err {
case nil:
break
case io.EOF:
log.Debugf("End of Winlog event stream reached: %v", err)
return nil
default:
// only log error if we are not shutting down
if cancelCtx.Err() != nil {
return nil
}

for _, record := range records {
event := record.ToEvent()
if err := publisher.Publish(event, record.Offset); err != nil {
// Publisher indicates disconnect when returning an error.
// stop trying to publish records and quit
log.Errorf("Error occured while reading from Windows Event Log '%v': %v", source.Name(), err)
return err
}

if len(records) == 0 {
timed.Wait(cancelCtx, time.Second)
continue
}

for _, record := range records {
event := record.ToEvent()
if err := publisher.Publish(event, record.Offset); err != nil {
// Publisher indicates disconnect when returning an error.
// stop trying to publish records and quit
return err
}
}
}
}

return nil
}

func initCheckpoint(log *logp.Logger, cursor cursor.Cursor) checkpoint.EventLogState {
Expand Down
80 changes: 47 additions & 33 deletions winlogbeat/beater/eventlogger.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,6 @@ func (e *eventLogger) run(
client.Close()
}()

err = api.Open(state)
if err != nil {
e.log.Warnw("Open() error. No events will be read from this source.", "error", err)
return
}
defer func() {
e.log.Info("Stop processing.")

Expand All @@ -134,36 +129,55 @@ func (e *eventLogger) run(
}
}()

e.log.Debug("Opened successfully.")

for stop := false; !stop; {
select {
case <-done:
return
default:
}

// Read from the event.
records, err := api.Read()
switch err {
case nil:
case io.EOF:
// Graceful stop.
stop = true
default:
e.log.Warnw("Read() error.", "error", err)
return
}

e.log.Debugf("Read() returned %d records.", len(records))
if len(records) == 0 {
time.Sleep(time.Second)
runLoop:
for {
err = api.Open(state)
if eventlog.IsRecoverable(err) {
e.log.Warnw("Open() encountered recoverable error. Trying again...", "error", err)
time.Sleep(time.Second * 5)
continue
} else if err != nil {
e.log.Warnw("Open() error. No events will be read from this source.", "error", err)
return
}

eventACKer.Add(len(records))
for _, lr := range records {
client.Publish(lr.ToEvent())
e.log.Debug("Opened successfully.")

for stop := false; !stop; {
select {
case <-done:
return
default:
}

// Read from the event.
records, err := api.Read()
if eventlog.IsRecoverable(err) {
e.log.Warnw("Read() encountered recoverable error. Reopening handle...", "error", err)
if closeErr := api.Close(); closeErr != nil {
e.log.Warnw("Close() error.", "error", err)
}
continue runLoop
}
switch err {
case nil:
case io.EOF:
// Graceful stop.
stop = true
default:
e.log.Warnw("Read() error.", "error", err)
return
}

e.log.Debugf("Read() returned %d records.", len(records))
if len(records) == 0 {
time.Sleep(time.Second)
continue
}

eventACKer.Add(len(records))
for _, lr := range records {
client.Publish(lr.ToEvent())
}
}
}
}
Expand Down
27 changes: 27 additions & 0 deletions winlogbeat/eventlog/errors_unix.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// +build !windows

package eventlog

// IsRecoverable returns a boolean indicating whether the error represents
// a condition where the Windows Event Log session can be recovered through a
// reopening of the handle (Close, Open).
func IsRecoverable(err error) bool {
return false
}
29 changes: 29 additions & 0 deletions winlogbeat/eventlog/errors_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package eventlog

import (
win "github.com/elastic/beats/v7/winlogbeat/sys/wineventlog"
)

// IsRecoverable returns a boolean indicating whether the error represents
// a condition where the Windows Event Log session can be recovered through a
// reopening of the handle (Close, Open).
func IsRecoverable(err error) bool {
return err == win.ERROR_INVALID_HANDLE || err == win.RPC_S_SERVER_UNAVAILABLE || err == win.RPC_S_CALL_CANCELLED
}
3 changes: 3 additions & 0 deletions winlogbeat/sys/wineventlog/syscall_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,12 @@ const NilHandle EvtHandle = 0
// Event log error codes.
// https://msdn.microsoft.com/en-us/library/windows/desktop/ms681382(v=vs.85).aspx
const (
ERROR_INVALID_HANDLE syscall.Errno = 6
ERROR_INSUFFICIENT_BUFFER syscall.Errno = 122
ERROR_NO_MORE_ITEMS syscall.Errno = 259
RPC_S_SERVER_UNAVAILABLE syscall.Errno = 1722
RPC_S_INVALID_BOUND syscall.Errno = 1734
RPC_S_CALL_CANCELLED syscall.Errno = 1818
ERROR_INVALID_OPERATION syscall.Errno = 4317
)

Expand Down

0 comments on commit 3c1731d

Please sign in to comment.