diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 85dbbe04827..2997981db48 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -306,6 +306,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Update indentation for azure filebeat configuration. {pull}26604[26604] - Update Sophos xg module pipeline to deal with missing `date` and `time` fields. {pull}27834[27834] - sophos/xg fileset: Add missing pipeline for System Health logs. {pull}27827[27827] {issue}27826[27826] +- Tolerate faults when Windows Event Log session is interrupted {issue}27947[27947] {pull}28191[28191] *Heartbeat* @@ -422,6 +423,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Add source.ip validation for event ID 4778 in the Security module. {issue}19627[19627] - Protect against accessing undefined variables in Sysmon module. {issue}22219[22219] {pull}22236[22236] - Protect against accessing an undefined variable in Security module. {pull}22937[22937] +- Tolerate faults when Windows Event Log session is interrupted {issue}27947[27947] {pull}28191[28191] *Functionbeat* diff --git a/filebeat/input/winlog/input.go b/filebeat/input/winlog/input.go index 08b15a84b4b..fee8cecffcb 100644 --- a/filebeat/input/winlog/input.go +++ b/filebeat/input/winlog/input.go @@ -85,17 +85,8 @@ func (eventlogRunner) Run( publisher cursor.Publisher, ) error { log := ctx.Logger.With("eventlog", source.Name()) - checkpoint := initCheckpoint(log, cursor) - api := source.(eventlog.EventLog) - err := api.Open(checkpoint) - if err != nil { - return fmt.Errorf("failed to open windows event log: %v", err) - } - - log.Debugf("Windows Event Log '%s' opened successfully", source.Name()) - // setup closing the API if either the run function is signaled asynchronously // to shut down or when returning after io.EOF cancelCtx, cancelFn := ctxtool.WithFunc(ctx.Cancelation, func() { @@ -105,41 +96,64 @@ func (eventlogRunner) Run( }) defer cancelFn() - // read loop - for cancelCtx.Err() == nil { - records, err := api.Read() - switch err { - case nil: - break - case io.EOF: - log.Debugf("End of Winlog event stream reached: %v", err) +runLoop: + for { + if cancelCtx.Err() != nil { return nil - default: - // only log error if we are not shutting down - if cancelCtx.Err() != nil { - return nil - } - - log.Errorf("Error occured while reading from Windows Event Log '%v': %v", source.Name(), err) - return err } - if len(records) == 0 { - timed.Wait(cancelCtx, time.Second) + evtCheckpoint := initCheckpoint(log, cursor) + openErr := api.Open(evtCheckpoint) + if eventlog.IsRecoverable(openErr) { + log.Errorf("Encountered recoverable error when opening Windows Event Log: %v", openErr) + timed.Wait(cancelCtx, 5*time.Second) continue + } else if openErr != nil { + return fmt.Errorf("failed to open windows event log: %v", openErr) } + log.Debugf("Windows Event Log '%s' opened successfully", source.Name()) + + // read loop + for cancelCtx.Err() == nil { + records, err := api.Read() + if eventlog.IsRecoverable(err) { + log.Errorf("Encountered recoverable error when reading from Windows Event Log: %v", err) + if closeErr := api.Close(); closeErr != nil { + log.Errorf("Error closing Windows Event Log handle: %v", closeErr) + } + continue runLoop + } + switch err { + case nil: + break + case io.EOF: + log.Debugf("End of Winlog event stream reached: %v", err) + return nil + default: + // only log error if we are not shutting down + if cancelCtx.Err() != nil { + return nil + } - for _, record := range records { - event := record.ToEvent() - if err := publisher.Publish(event, record.Offset); err != nil { - // Publisher indicates disconnect when returning an error. - // stop trying to publish records and quit + log.Errorf("Error occured while reading from Windows Event Log '%v': %v", source.Name(), err) return err } + + if len(records) == 0 { + timed.Wait(cancelCtx, time.Second) + continue + } + + for _, record := range records { + event := record.ToEvent() + if err := publisher.Publish(event, record.Offset); err != nil { + // Publisher indicates disconnect when returning an error. + // stop trying to publish records and quit + return err + } + } } } - - return nil } func initCheckpoint(log *logp.Logger, cursor cursor.Cursor) checkpoint.EventLogState { diff --git a/winlogbeat/beater/eventlogger.go b/winlogbeat/beater/eventlogger.go index a47dcfadcc2..b7507cfe8c0 100644 --- a/winlogbeat/beater/eventlogger.go +++ b/winlogbeat/beater/eventlogger.go @@ -120,11 +120,6 @@ func (e *eventLogger) run( client.Close() }() - err = api.Open(state) - if err != nil { - e.log.Warnw("Open() error. No events will be read from this source.", "error", err) - return - } defer func() { e.log.Info("Stop processing.") @@ -134,36 +129,55 @@ func (e *eventLogger) run( } }() - e.log.Debug("Opened successfully.") - - for stop := false; !stop; { - select { - case <-done: - return - default: - } - - // Read from the event. - records, err := api.Read() - switch err { - case nil: - case io.EOF: - // Graceful stop. - stop = true - default: - e.log.Warnw("Read() error.", "error", err) - return - } - - e.log.Debugf("Read() returned %d records.", len(records)) - if len(records) == 0 { - time.Sleep(time.Second) +runLoop: + for { + err = api.Open(state) + if eventlog.IsRecoverable(err) { + e.log.Warnw("Open() encountered recoverable error. Trying again...", "error", err) + time.Sleep(time.Second * 5) continue + } else if err != nil { + e.log.Warnw("Open() error. No events will be read from this source.", "error", err) + return } - - eventACKer.Add(len(records)) - for _, lr := range records { - client.Publish(lr.ToEvent()) + e.log.Debug("Opened successfully.") + + for stop := false; !stop; { + select { + case <-done: + return + default: + } + + // Read from the event. + records, err := api.Read() + if eventlog.IsRecoverable(err) { + e.log.Warnw("Read() encountered recoverable error. Reopening handle...", "error", err) + if closeErr := api.Close(); closeErr != nil { + e.log.Warnw("Close() error.", "error", err) + } + continue runLoop + } + switch err { + case nil: + case io.EOF: + // Graceful stop. + stop = true + default: + e.log.Warnw("Read() error.", "error", err) + return + } + + e.log.Debugf("Read() returned %d records.", len(records)) + if len(records) == 0 { + time.Sleep(time.Second) + continue + } + + eventACKer.Add(len(records)) + for _, lr := range records { + client.Publish(lr.ToEvent()) + } } } } diff --git a/winlogbeat/eventlog/errors_unix.go b/winlogbeat/eventlog/errors_unix.go new file mode 100644 index 00000000000..e28786b2d3b --- /dev/null +++ b/winlogbeat/eventlog/errors_unix.go @@ -0,0 +1,27 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// +build !windows + +package eventlog + +// IsRecoverable returns a boolean indicating whether the error represents +// a condition where the Windows Event Log session can be recovered through a +// reopening of the handle (Close, Open). +func IsRecoverable(err error) bool { + return false +} diff --git a/winlogbeat/eventlog/errors_windows.go b/winlogbeat/eventlog/errors_windows.go new file mode 100644 index 00000000000..d60f80d791d --- /dev/null +++ b/winlogbeat/eventlog/errors_windows.go @@ -0,0 +1,29 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package eventlog + +import ( + win "github.com/elastic/beats/v7/winlogbeat/sys/wineventlog" +) + +// IsRecoverable returns a boolean indicating whether the error represents +// a condition where the Windows Event Log session can be recovered through a +// reopening of the handle (Close, Open). +func IsRecoverable(err error) bool { + return err == win.ERROR_INVALID_HANDLE || err == win.RPC_S_SERVER_UNAVAILABLE || err == win.RPC_S_CALL_CANCELLED +} diff --git a/winlogbeat/sys/wineventlog/syscall_windows.go b/winlogbeat/sys/wineventlog/syscall_windows.go index 836741f53f0..bc895ed9c25 100644 --- a/winlogbeat/sys/wineventlog/syscall_windows.go +++ b/winlogbeat/sys/wineventlog/syscall_windows.go @@ -41,9 +41,12 @@ const NilHandle EvtHandle = 0 // Event log error codes. // https://msdn.microsoft.com/en-us/library/windows/desktop/ms681382(v=vs.85).aspx const ( + ERROR_INVALID_HANDLE syscall.Errno = 6 ERROR_INSUFFICIENT_BUFFER syscall.Errno = 122 ERROR_NO_MORE_ITEMS syscall.Errno = 259 + RPC_S_SERVER_UNAVAILABLE syscall.Errno = 1722 RPC_S_INVALID_BOUND syscall.Errno = 1734 + RPC_S_CALL_CANCELLED syscall.Errno = 1818 ERROR_INVALID_OPERATION syscall.Errno = 4317 )