Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Keep different registry entry per container stream #7281

Merged
merged 7 commits into from
Jun 12, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ https://github.com/elastic/beats/compare/v6.2.3...master[Check the HEAD diff]
- Fix a parsing issue in the syslog input for RFC3339 timestamp and time with nanoseconds. {pull}7046[7046]
- Comply with PostgreSQL database name format {pull}7198[7198]
- Fix an issue with an overflowing wait group when using the TCP input. {issue}7202[7202]
- Keep different registry entry per container stream to avoid wrong offsets. {issue}7281[7281]

*Heartbeat*
- Fix race due to updates of shared a map, that was not supposed to be shared between multiple go-routines. {issue}6616[6616]
Expand Down
6 changes: 6 additions & 0 deletions filebeat/input/docker/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ func NewInput(
if err := cfg.SetBool("docker-json.partial", -1, config.Partial); err != nil {
return nil, errors.Wrap(err, "update input config")
}

// Add stream to meta to ensure different state per stream
if config.Containers.Stream != "all" {
context.Meta["stream"] = config.Containers.Stream
}

return log.NewInput(cfg, outletFactory, context)
}

Expand Down
48 changes: 37 additions & 11 deletions filebeat/input/file/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,31 @@ package file

import (
"os"
"strconv"
"strings"
"time"

"github.com/mitchellh/hashstructure"

"github.com/elastic/beats/libbeat/common/file"
)

// State is used to communicate the reading state of a file
type State struct {
Id string `json:"-"` // local unique id to make comparison more efficient
Finished bool `json:"-"` // harvester state
Fileinfo os.FileInfo `json:"-"` // the file info
Source string `json:"source"`
Offset int64 `json:"offset"`
Timestamp time.Time `json:"timestamp"`
TTL time.Duration `json:"ttl"`
Type string `json:"type"`
Id string `json:"-"` // local unique id to make comparison more efficient

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

struct field Id should be ID

Finished bool `json:"-"` // harvester state
Fileinfo os.FileInfo `json:"-"` // the file info
Source string `json:"source"`
Offset int64 `json:"offset"`
Timestamp time.Time `json:"timestamp"`
TTL time.Duration `json:"ttl"`
Type string `json:"type"`
Meta map[string]string `json:"meta"`
FileStateOS file.StateOS
}

// NewState creates a new file state
func NewState(fileInfo os.FileInfo, path string, t string) State {
func NewState(fileInfo os.FileInfo, path string, t string, meta map[string]string) State {
return State{
Fileinfo: fileInfo,
Source: path,
Expand All @@ -30,15 +35,33 @@ func NewState(fileInfo os.FileInfo, path string, t string) State {
Timestamp: time.Now(),
TTL: -1, // By default, state does have an infinite ttl
Type: t,
Meta: meta,
}
}

// ID returns a unique id for the state as a string
func (s *State) ID() string {
// Generate id on first request. This is needed as id is not set when converting back from json
if s.Id == "" {
s.Id = s.FileStateOS.String()
if s.Meta == nil {
s.Id = s.FileStateOS.String()
} else {
hashValue, _ := hashstructure.Hash(s.Meta, nil)
var hashBuf [17]byte
hash := strconv.AppendUint(hashBuf[:0], hashValue, 16)
hash = append(hash, '-')

fileID := s.FileStateOS.String()

var b strings.Builder
b.Grow(len(hash) + len(fileID))
b.Write(hash)
b.WriteString(fileID)

s.Id = b.String()
}
}

return s.Id
}

Expand All @@ -49,5 +72,8 @@ func (s *State) IsEqual(c *State) bool {

// IsEmpty returns true if the state is empty
func (s *State) IsEmpty() bool {
return *s == State{}
return s.FileStateOS == file.StateOS{} &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you share some details here on why not only add s.Meta but also the other chnages?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I added a map to the State struct, a simple comparison is no longer possible (Go doesn't implement map compare), I had to update the code to do an individual comparison on the struct fields

s.Source == "" &&
s.Meta == nil &&
s.Timestamp.IsZero()
}
1 change: 1 addition & 0 deletions filebeat/input/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func New(
Done: input.done,
BeatDone: input.beatDone,
DynamicFields: dynFields,
Meta: map[string]string{},
}
var ipt Input
ipt, err = f(conf, outlet, context)
Expand Down
23 changes: 20 additions & 3 deletions filebeat/input/log/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type Input struct {
stateOutlet channel.Outleter
done chan struct{}
numHarvesters atomic.Uint32
meta map[string]string
}

// NewInput instantiates a new Log
Expand Down Expand Up @@ -83,6 +84,7 @@ func NewInput(
stateOutlet: stateOut,
states: file.NewStates(),
done: context.Done,
meta: context.Meta,
}

if err := cfg.Unpack(&p.config); err != nil {
Expand Down Expand Up @@ -124,7 +126,7 @@ func (p *Input) loadStates(states []file.State) error {

for _, state := range states {
// Check if state source belongs to this input. If yes, update the state.
if p.matchesFile(state.Source) {
if p.matchesFile(state.Source) && p.matchesMeta(state.Meta) {
state.TTL = -1

// In case a input is tried to be started with an unfinished state matching the glob pattern
Expand Down Expand Up @@ -186,7 +188,7 @@ func (p *Input) Run() {
}
} else {
// Check if existing source on disk and state are the same. Remove if not the case.
newState := file.NewState(stat, state.Source, p.config.Type)
newState := file.NewState(stat, state.Source, p.config.Type, p.meta)
if !newState.FileStateOS.IsSame(state.FileStateOS) {
p.removeState(state)
logp.Debug("input", "Remove state for file as file removed or renamed: %s", state.Source)
Expand Down Expand Up @@ -300,6 +302,21 @@ func (p *Input) matchesFile(filePath string) bool {
return false
}

// matchesMeta returns true in case the given meta is equal to the one of this input, false if not
func (p *Input) matchesMeta(meta map[string]string) bool {
if len(meta) != len(p.meta) {
return false
}

for k, v := range p.meta {
if meta[k] != v {
return false
}
}

return true
}

type FileSortInfo struct {
info os.FileInfo
path string
Expand Down Expand Up @@ -364,7 +381,7 @@ func getFileState(path string, info os.FileInfo, p *Input) (file.State, error) {
}
logp.Debug("input", "Check file for harvesting: %s", absolutePath)
// Create new state for comparison
newState := file.NewState(info, absolutePath, p.config.Type)
newState := file.NewState(info, absolutePath, p.config.Type, p.meta)
return newState, nil
}

Expand Down
55 changes: 55 additions & 0 deletions filebeat/input/log/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,61 @@ func TestIsCleanInactive(t *testing.T) {
}
}

func TestMatchesMeta(t *testing.T) {
tests := []struct {
Input *Input
Meta map[string]string
Result bool
}{
{
Input: &Input{
meta: map[string]string{
"it": "matches",
},
},
Meta: map[string]string{
"it": "matches",
},
Result: true,
},
{
Input: &Input{
meta: map[string]string{
"it": "doesnt",
"doesnt": "match",
},
},
Meta: map[string]string{
"it": "doesnt",
},
Result: false,
},
{
Input: &Input{
meta: map[string]string{
"it": "doesnt",
},
},
Meta: map[string]string{
"it": "doesnt",
"doesnt": "match",
},
Result: false,
},
{
Input: &Input{
meta: map[string]string{},
},
Meta: map[string]string{},
Result: true,
},
}

for _, test := range tests {
assert.Equal(t, test.Result, test.Input.matchesMeta(test.Meta))
}
}

type TestFileInfo struct {
time time.Time
}
Expand Down
1 change: 1 addition & 0 deletions filebeat/input/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type Context struct {
Done chan struct{}
BeatDone chan struct{}
DynamicFields *common.MapStrPointer
Meta map[string]string
}

// Factory is used to register functions creating new Input instances.
Expand Down
55 changes: 55 additions & 0 deletions filebeat/tests/system/test_registrar.py
Original file line number Diff line number Diff line change
Expand Up @@ -1506,3 +1506,58 @@ def test_registrar_files_with_input_level_processors(self):
"inode": stat.st_ino,
"device": stat.st_dev,
}, file_state_os)

def test_registrar_meta(self):
"""
Check that multiple entries for the same file are on the registry when they have
different meta
"""

self.render_config_template(
type='docker',
input_raw='''
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw. check out the textwrap package. This allows you to indent multiline strings within the code more properly.

containers:
path: {path}
stream: stdout
ids:
- container_id
- type: docker
containers:
path: {path}
stream: stderr
ids:
- container_id
'''.format(path=os.path.abspath(self.working_dir) + "/log/")
)
os.mkdir(self.working_dir + "/log/")
os.mkdir(self.working_dir + "/log/container_id")
testfile_path1 = self.working_dir + "/log/container_id/test.log"

with open(testfile_path1, 'w') as f:
for i in range(0, 10):
f.write('{"log":"hello\\n","stream":"stdout","time":"2018-04-13T13:39:57.924216596Z"}\n')
f.write('{"log":"hello\\n","stream":"stderr","time":"2018-04-13T13:39:57.924216596Z"}\n')

filebeat = self.start_beat()

self.wait_until(
lambda: self.output_has(lines=20),
max_timeout=15)

# wait until the registry file exist. Needed to avoid a race between
# the logging and actual writing the file. Seems to happen on Windows.

self.wait_until(
lambda: os.path.isfile(os.path.join(self.working_dir,
"registry")),
max_timeout=1)

filebeat.check_kill_and_wait()

# Check registry contains 2 entries with meta
data = self.get_registry()
assert len(data) == 2
assert data[0]["source"] == data[1]["source"]
assert data[0]["meta"]["stream"] in ("stdout", "stderr")
assert data[1]["meta"]["stream"] in ("stdout", "stderr")
assert data[0]["meta"]["stream"] != data[1]["meta"]["stream"]
2 changes: 1 addition & 1 deletion filebeat/util/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (d *Data) GetState() file.State {

// HasState returns true if the data object contains state data
func (d *Data) HasState() bool {
return d.state != file.State{}
return !d.state.IsEmpty()
}

// GetEvent returns the event in the data object
Expand Down