Skip to content

Commit

Permalink
[6.8] Check if file has disappeared sooner in Log reader (#13907) (#1…
Browse files Browse the repository at this point in the history
…3960)

* Check if file has disappeared sooner in Log reader (#13907)

* Adding comment about EOF and removing redundant logic

* Adding debug logging when close_remove and close_renamed situations are reached

* Check file stats related errors before adding to buffer

* Reordering checks to be same as before

* Add CHANGELOG entry

* Fixing issue # in CHANGELOG entry

* Move CloseInactive and truncate checks back to being after EOF check

* Don't perform stat call for CloseRemoved/CloseRenamed unless those settings are enabled

* Better comments / function godoc

* Renaming method to be more descriptive

* Fixing up CHANGELOG

* Fix CHANGELOG

* Fixed compile error

* [WIP] Trying to check if file is removed

* Adding Removed() to Source interface

* Initialize procGetFileInformationByHandleEx

* Adding missed import
  • Loading branch information
ycombinator authored Oct 11, 2019
1 parent 945fd70 commit e28e5c2
Show file tree
Hide file tree
Showing 7 changed files with 128 additions and 8 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ https://github.com/elastic/beats/compare/v6.8.0...6.8.1[Check the HEAD diff]

*Filebeat*

- Fix delay in enforcing close_renamed and close_removed options. {issue}13488[13488] {pull}13907[13907]

*Heartbeat*

*Journalbeat*
Expand Down
1 change: 1 addition & 0 deletions filebeat/harvester/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
type Source interface {
io.ReadCloser
Name() string
Removed() bool // check if source has been removed
Stat() (os.FileInfo, error)
Continuable() bool // can we continue processing after EOF?
HasState() bool // does this source have a state?
Expand Down
1 change: 1 addition & 0 deletions filebeat/input/log/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,4 @@ type File struct {

func (File) Continuable() bool { return true }
func (File) HasState() bool { return true }
func (f File) Removed() bool { return isRemoved(f.File) }
41 changes: 33 additions & 8 deletions filebeat/input/log/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ func (f *Log) Read(buf []byte) (int, error) {
default:
}

err := f.checkFileDisappearedErrors()
if err != nil {
return totalN, err
}

n, err := f.fs.Read(buf)
if n > 0 {
f.offset += int64(n)
Expand Down Expand Up @@ -104,24 +109,27 @@ func (f *Log) Read(buf []byte) (int, error) {
}
}

// errorChecks checks how the given error should be handled based on the config options
// errorChecks determines the cause for EOF errors, and how the EOF event should be handled
// based on the config options.
func (f *Log) errorChecks(err error) error {
if err != io.EOF {
logp.Err("Unexpected state reading from %s; error: %s", f.fs.Name(), err)
return err
}

// At this point we have hit EOF!

// Stdin is not continuable
if !f.fs.Continuable() {
logp.Debug("harvester", "Source is not continuable: %s", f.fs.Name())
return err
}

if err == io.EOF && f.config.CloseEOF {
if f.config.CloseEOF {
return err
}

// Refetch fileinfo to check if the file was truncated or disappeared.
// Refetch fileinfo to check if the file was truncated.
// Errors if the file was removed/rotated after reading and before
// calling the stat function
info, statErr := f.fs.Stat()
Expand All @@ -143,19 +151,36 @@ func (f *Log) errorChecks(err error) error {
return ErrInactive
}

return nil
}

// checkFileDisappearedErrors checks if the log file has been removed or renamed (rotated).
func (f *Log) checkFileDisappearedErrors() error {
// No point doing a stat call on the file if configuration options are
// not enabled
if !f.config.CloseRenamed && !f.config.CloseRemoved {
return nil
}

// Refetch fileinfo to check if the file was renamed or removed.
// Errors if the file was removed/rotated after reading and before
// calling the stat function
info, statErr := f.fs.Stat()
if statErr != nil {
logp.Err("Unexpected error reading from %s; error: %s", f.fs.Name(), statErr)
return statErr
}

if f.config.CloseRenamed {
// Check if the file can still be found under the same path
if !file.IsSameFile(f.fs.Name(), info) {
logp.Debug("harvester", "close_renamed is enabled and file %s has been renamed", f.fs.Name())
return ErrRenamed
}
}

if f.config.CloseRemoved {
// Check if the file name exists. See https://github.com/elastic/filebeat/issues/93
_, statErr := os.Stat(f.fs.Name())

// Error means file does not exist.
if statErr != nil {
if f.fs.Removed() {
return ErrRemoved
}
}
Expand Down
28 changes: 28 additions & 0 deletions filebeat/input/log/log_other.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// +build !windows

package log

import "os"

// isRemoved checks wheter the file held by f is removed.
func isRemoved(f *os.File) bool {
_, err := os.Stat(f.Name())
return err != nil
}
62 changes: 62 additions & 0 deletions filebeat/input/log/log_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package log

import (
"os"
"syscall"
"unsafe"

"golang.org/x/sys/windows"
)

var (
modkernel32 = windows.NewLazySystemDLL("kernel32.dll")

procGetFileInformationByHandleEx = modkernel32.NewProc("GetFileInformationByHandleEx")
)

// isRemoved checks whether the file held by f is removed.
// On Windows isRemoved reads the DeletePending flags using the GetFileInformationByHandleEx.
// A file is not removed/unlinked as long as at least one process still own a
// file handle. A delete file is only marked as deleted, and file attributes
// can still be read. Only opening a file marked with 'DeletePending' will
// fail.
func isRemoved(f *os.File) bool {
hdl := f.Fd()
if hdl == uintptr(syscall.InvalidHandle) {
return false
}

info := struct {
AllocationSize int64
EndOfFile int64
NumberOfLinks int32
DeletePending bool
Directory bool
}{}
infoSz := unsafe.Sizeof(info)

const class = 1 // FileStandardInfo
r1, _, _ := syscall.Syscall6(
procGetFileInformationByHandleEx.Addr(), 4, uintptr(hdl), class, uintptr(unsafe.Pointer(&info)), infoSz, 0, 0)
if r1 == 0 {
return true // assume file is removed if syscall errors
}
return info.DeletePending
}
1 change: 1 addition & 0 deletions filebeat/input/log/stdin.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,4 @@ func (p Pipe) Name() string { return p.File.Name() }
func (p Pipe) Stat() (os.FileInfo, error) { return p.File.Stat() }
func (p Pipe) Continuable() bool { return false }
func (p Pipe) HasState() bool { return false }
func (p Pipe) Removed() bool { return false }

0 comments on commit e28e5c2

Please sign in to comment.