Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add initial support for configurable file identity tracking #18748

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add experimental dataset sonicwall/firewall for Sonicwall Firewalls logs {pull}19713[19713]
- Add experimental dataset squid/log for Squid Proxy Server logs {pull}19713[19713]
- Add experimental dataset zscaler/zia for Zscaler Internet Access logs {pull}19713[19713]
- Add initial support for configurable file identity tracking. {pull}18748[18748]

*Heartbeat*

Expand Down
4 changes: 4 additions & 0 deletions filebeat/_meta/config/filebeat.inputs.reference.yml.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ filebeat.inputs:
# are matching any regular expression from the list. By default, no files are dropped.
#exclude_files: ['.gz$']

# Method to determine if two files are the same or not. By default
# the Beat considers two files the same if their inode and device id are the same.
#file_identity.native: ~

# Optional additional fields. These fields can be freely picked
# to add additional information to the crawled log files for filtering
#fields:
Expand Down
45 changes: 45 additions & 0 deletions filebeat/docs/inputs/input-common-file-options.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ file is renamed or moved in such a way that it's no longer matched by the file
patterns specified for the , the file will not be picked up again.
{beatname_uc} will not finish reading the file.

Do not use this option when `path` based `file_identity` is configured. It does
not make sense to enable the option, as Filebeat cannot detect renames using
path names as unique identifiers.

WINDOWS: If your Windows log rotation system shows errors because it can't
rotate the files, you should enable this option.

Expand Down Expand Up @@ -397,3 +401,44 @@ file that hasn't been harvested for a longer period of time.
This configuration option applies per input. You can use this option to
indirectly set higher priorities on certain inputs by assigning a higher
limit of harvesters.

[float]
===== `file_identity`

Different `file_identity` methods can be configured to suit the
environment where you are collecting log messages.


*`native`*:: The default behaviour of {beatname_uc} is to differentiate
between files using their inodes and device ids.

[source,yaml]
----
file_identity.native: ~
----

*`path`*:: To identify files based on their paths use this strategy.
kvch marked this conversation as resolved.
Show resolved Hide resolved

WARNING: Only use this strategy if your log files are rotated to a folder
outside of the scope of your input or not at all. Otherwise you end up
with duplicated events.

WARNING: This strategy does not support renaming files.
If an input file is renamed, {beatname_uc} will read it again if the new path
matches the settings of the input.

[source,yaml]
----
file_identity.path: ~
----

*`inode_marker`*:: If the device id changes from time to time, you must use
this method to distinguish files. This option is not supported on Windows.

Set the location of the marker file the following way:

[source,yaml]
----
file_identity.inode_marker.path: /logs/.filebeat-marker
----

53 changes: 53 additions & 0 deletions filebeat/docs/inputs/input-log.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,55 @@ multiple input sections:
IMPORTANT: Make sure a file is not defined more than once across all inputs
because this can lead to unexpected behaviour.

[[file-identity]]
==== Reading files on network shares and cloud providers
kvch marked this conversation as resolved.
Show resolved Hide resolved

:WARNING: Filebeat does not support reading from network shares and cloud providers.

However, one of the limitations of these data sources can be mitigated
if you configure Filebeat adequately.

By default, {beatname_uc} identifies files based on their inodes and
device IDs. However, on network shares and cloud providers these
values might change during the lifetime of the file. If this happens
{beatname_uc} thinks that file is new and resends the whole content
of the file. To solve this problem you can configure `file_identity` option. Possible
values besides the default `inode_deviceid` are `path` and `inode_marker`.

Selecting `path` instructs {beatname_uc} to identify files based on their
paths. This is a quick way to aviod rereading files if inode and device ids
might change. However, keep in mind if the files are rotated (renamed), they
will be reread and resubmitted.

The option `inode_marker` can be used if the inodes stay the same even if
the device id is changed. You should choose this method if your files are
rotated instead of `path` if possible. You have to configure a marker file
readable by {beatname_uc} and set the path in the option `path` of `inode_marker`.

The content of this file must be unique to the device. You can put the
UUID of the device or mountpoint where the input is stored. The following
example oneliner generates a hidden marker file for the selected mountpoint `/logs`:
Please note that you should not use this option on Windows as file identifiers might be
more volatile.

["source","sh",subs="attributes"]
----
$ lsblk -o MOUNTPOINT,UUD | grep /logs | awk '{print $2}' >> /logs/.filebeat-marker
----

To set the generated file as a marker for `file_identity` you should configure
the input the following way:

["source","yaml",subs="attributes"]
----
{beatname_lc}.inputs:
- type: log
paths:
- /logs/*.log
file_identity.inode_marker.path: /logs/.filebeat-marker
----


[[rotating-logs]]
==== Reading from rotating logs

Expand All @@ -66,6 +115,10 @@ a pattern that matches the file you want to harvest and all of its rotated
files. Also make sure your log rotation strategy prevents lost or duplicate
messages. For more information, see <<file-log-rotation>>.

Furthermore, to avoid duplicate of rotated log messages, do not use the
`path` method for `file_identity`. Or exclude the rotated files with `exclude_files`
option.

[id="{beatname_lc}-input-{type}-options"]
==== Configuration options

Expand Down
4 changes: 4 additions & 0 deletions filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,10 @@ filebeat.inputs:
# are matching any regular expression from the list. By default, no files are dropped.
#exclude_files: ['.gz$']

# Method to determine if two files are the same or not. By default
# the Beat considers two files the same if their inode and device id are the same.
#file_identity.native: ~

# Optional additional fields. These fields can be freely picked
# to add additional information to the crawled log files for filtering
#fields:
Expand Down
6 changes: 1 addition & 5 deletions filebeat/input/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,8 @@ type File struct {
State *State
}

// Checks if the two files are the same.
func (f *File) IsSameFile(f2 *File) bool {
return os.SameFile(f.FileInfo, f2.FileInfo)
}

// IsSameFile checks if the given File path corresponds with the FileInfo given
// It is used to check if the file has been renamed.
func IsSameFile(path string, info os.FileInfo) bool {
fileInfo, err := os.Stat(path)

Expand Down
121 changes: 121 additions & 0 deletions filebeat/input/file/identifier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package file

import (
"fmt"
"strconv"
"strings"

"github.com/mitchellh/hashstructure"

"github.com/elastic/beats/v7/libbeat/common"
)

const (
nativeName = "native"
pathName = "path"
inodeMarkerName = "inode_marker"

DefaultIdentifierName = nativeName
identitySep = "::"
)

var (
identifierFactories = map[string]IdentifierFactory{
nativeName: newINodeDeviceIdentifier,
pathName: newPathIdentifier,
inodeMarkerName: newINodeMarkerIdentifier,
}
)

type IdentifierFactory func(*common.Config) (StateIdentifier, error)

// StateIdentifier generates an ID for a State.
type StateIdentifier interface {
// GenerateID generates and returns the ID of the state and its type
GenerateID(State) (id, identifierType string)
}

// NewStateIdentifier creates a new state identifier for a log input.
func NewStateIdentifier(ns *common.ConfigNamespace) (StateIdentifier, error) {
if ns == nil {
return newINodeDeviceIdentifier(nil)
}

identifierType := ns.Name()
f, ok := identifierFactories[identifierType]
if !ok {
return nil, fmt.Errorf("no such file_identity generator: %s", identifierType)
}

return f(ns.Config())
}

type inodeDeviceIdentifier struct {
name string
}

func newINodeDeviceIdentifier(_ *common.Config) (StateIdentifier, error) {
return &inodeDeviceIdentifier{
name: nativeName,
}, nil
}

func (i *inodeDeviceIdentifier) GenerateID(s State) (id, identifierType string) {
stateID := i.name + identitySep + s.FileStateOS.String()
return genIDWithHash(s.Meta, stateID), i.name
}

type pathIdentifier struct {
name string
}

func newPathIdentifier(_ *common.Config) (StateIdentifier, error) {
return &pathIdentifier{
name: pathName,
}, nil
}

func (p *pathIdentifier) GenerateID(s State) (id, identifierType string) {
stateID := p.name + identitySep + s.Source
return genIDWithHash(s.Meta, stateID), p.name
}

func genIDWithHash(meta map[string]string, fileID string) string {
if len(meta) == 0 {
return fileID
}
kvch marked this conversation as resolved.
Show resolved Hide resolved

hashValue, _ := hashstructure.Hash(meta, nil)
var hashBuf [17]byte
hash := strconv.AppendUint(hashBuf[:0], hashValue, 16)
hash = append(hash, '-')

var b strings.Builder
b.Grow(len(hash) + len(fileID))
b.Write(hash)
b.WriteString(fileID)

return b.String()
}

// mockIdentifier is used for testing
type MockIdentifier struct{}

func (m *MockIdentifier) GenerateID(s State) (string, string) { return s.Id, "mock" }
98 changes: 98 additions & 0 deletions filebeat/input/file/identifier_inode_deviceid.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// +build !windows

package file

import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
"time"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"
)

type inodeMarkerIdentifier struct {
log *logp.Logger
name string
markerPath string

markerFileLastModifitaion time.Time
markerTxt string
}

func newINodeMarkerIdentifier(cfg *common.Config) (StateIdentifier, error) {
var config struct {
MarkerPath string `config:"path" validate:"required"`
}
err := cfg.Unpack(&config)
if err != nil {
return nil, fmt.Errorf("error while reading configuration of INode + marker file configuration: %v", err)
}

fi, err := os.Stat(config.MarkerPath)
if err != nil {
return nil, fmt.Errorf("error while opening marker file at %s: %v", config.MarkerPath, err)
}
markerContent, err := ioutil.ReadFile(config.MarkerPath)
if err != nil {
return nil, fmt.Errorf("error while reading marker file at %s: %v", config.MarkerPath, err)
}
return &inodeMarkerIdentifier{
log: logp.NewLogger("inode_marker_identifier_" + filepath.Base(config.MarkerPath)),
name: inodeMarkerName,
markerPath: config.MarkerPath,
markerFileLastModifitaion: fi.ModTime(),
markerTxt: string(markerContent),
}, nil
}

func (i *inodeMarkerIdentifier) markerContents() string {
f, err := os.Open(i.markerPath)
if err != nil {
i.log.Errorf("Failed to open marker file %s: %v", i.markerPath, err)
return ""
}
defer f.Close()

fi, err := f.Stat()
if err != nil {
i.log.Errorf("Failed to fetch file information for %s: %v", i.markerPath, err)
return ""
}
if i.markerFileLastModifitaion.Before(fi.ModTime()) {
contents, err := ioutil.ReadFile(i.markerPath)
if err != nil {
i.log.Errorf("Error while reading contents of marker file: %v", err)
return ""
}
i.markerTxt = string(contents)
}

return i.markerTxt
}

func (i *inodeMarkerIdentifier) GenerateID(s State) (id, identifierType string) {
m := i.markerContents()

stateID := fmt.Sprintf("%s%s%s-%s", i.name, identitySep, s.FileStateOS.InodeString(), m)
return genIDWithHash(s.Meta, stateID), i.name
}
Loading