Skip to content

Commit

Permalink
De-duplicate inodes when converting from the old state (#2792) (#2797)
Browse files Browse the repository at this point in the history
When upgrading the registry file from 1.x versions, make sure
the new state doesn't contain any duplicate inodes, as these will cause
an invalid state. Fixes #2784.
  • Loading branch information
tsg authored and monicasarbu committed Oct 18, 2016
1 parent 898c7ec commit 6647ed1
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 9 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ https://github.com/elastic/beats/compare/v5.0.0-rc1...5.0[Check the HEAD diff]

*Filebeat*
- Fix issue when clean_removed and clean_inactive were used together that states were not directly removed from the registry.
- Fix issue where upgrading a 1.x registry file resulted in duplicate state entries. {pull}2792[2792]

*Winlogbeat*

Expand Down
36 changes: 27 additions & 9 deletions filebeat/registrar/registrar.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,16 +139,8 @@ func (r *Registrar) loadAndConvertOldState(f *os.File) bool {
}

// Convert old states to new states
states := make([]file.State, len(oldStates))
logp.Info("Old registry states found: %v", len(oldStates))
counter := 0
for _, state := range oldStates {
// Makes timestamp time of migration, as this is the best guess
state.Timestamp = time.Now()
states[counter] = state
counter++
}

states := convertOldStates(oldStates)
r.states.SetStates(states)

// Rewrite registry in new format
Expand All @@ -159,6 +151,32 @@ func (r *Registrar) loadAndConvertOldState(f *os.File) bool {
return true
}

func convertOldStates(oldStates map[string]file.State) []file.State {
// Convert old states to new states
states := []file.State{}
for _, state := range oldStates {
// Makes timestamp time of migration, as this is the best guess
state.Timestamp = time.Now()

// Check for duplicates
dupe := false
for i, other := range states {
if state.FileStateOS.IsSame(other.FileStateOS) {
dupe = true
if state.Offset > other.Offset {
// replace other
states[i] = state
break
}
}
}
if !dupe {
states = append(states, state)
}
}
return states
}

func (r *Registrar) Start() error {

// Load the previous log file locations now, for use in prospector
Expand Down
59 changes: 59 additions & 0 deletions filebeat/registrar/registrar_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// +build !windows,!integration

package registrar

import (
"sort"
"testing"

"github.com/elastic/beats/filebeat/input/file"
"github.com/stretchr/testify/assert"
)

func TestConvertOldStates(t *testing.T) {
type io struct {
Name string
Input map[string]file.State
Output []string
}
tests := []io{
{
Name: "Simple test with three files",
Input: map[string]file.State{
"test": file.State{Source: "test", FileStateOS: file.StateOS{Inode: 5}},
"test1": file.State{Source: "test1", FileStateOS: file.StateOS{Inode: 3}},
"test2": file.State{Source: "test2", FileStateOS: file.StateOS{Inode: 2}},
},
Output: []string{"test", "test1", "test2"},
},
{
Name: "De-duplicate inodes. Bigger offset wins (1)",
Input: map[string]file.State{
"test": file.State{Source: "test", FileStateOS: file.StateOS{Inode: 2}},
"test1": file.State{Source: "test1", FileStateOS: file.StateOS{Inode: 3}},
"test2": file.State{Source: "test2", FileStateOS: file.StateOS{Inode: 2}, Offset: 2},
},
Output: []string{"test1", "test2"},
},
{
Name: "De-duplicate inodes. Bigger offset wins (2)",
Input: map[string]file.State{
"test": file.State{Source: "test", FileStateOS: file.StateOS{Inode: 2}, Offset: 2},
"test1": file.State{Source: "test1", FileStateOS: file.StateOS{Inode: 3}},
"test2": file.State{Source: "test2", FileStateOS: file.StateOS{Inode: 2}, Offset: 0},
},
Output: []string{"test", "test1"},
},
}

for _, test := range tests {
result := convertOldStates(test.Input)
resultSources := []string{}
for _, state := range result {
resultSources = append(resultSources, state.Source)
}
sort.Strings(resultSources)
assert.Equal(t, test.Output, resultSources, test.Name)

}
}

0 comments on commit 6647ed1

Please sign in to comment.