Skip to content

Commit

Permalink
Filebeat in memory states registry improvements (elastic#6346)
Browse files Browse the repository at this point in the history
* Filebeat in memory states registry improvements

- make States type easier to find by moving into separate states.go file
- Ensure provides States constructor is actually used
- Add ID->array index, index for faster lookups on update and find
  operations. When updating states in a big registry, the Updates
  converged to quadratic complexity. The index helps in keeping the
  complexity about linear in number of state updates.
- Debug will print number of states subject to future cleanups (if state TTL > 0)
- Add title to states unit tests

* Fix godocs

* update state index in Cleanup

* Refine index handling + reintroduce debug message

* review
  • Loading branch information
Steffen Siering authored and kvch committed Feb 15, 2018
1 parent 4f64c49 commit 490cbcd
Show file tree
Hide file tree
Showing 6 changed files with 180 additions and 133 deletions.
116 changes: 0 additions & 116 deletions filebeat/input/file/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@ package file

import (
"os"
"sync"
"time"

"github.com/elastic/beats/libbeat/common/file"
"github.com/elastic/beats/libbeat/logp"
)

// State is used to communicate the reading state of a file
Expand Down Expand Up @@ -53,117 +51,3 @@ func (s *State) IsEqual(c *State) bool {
func (s *State) IsEmpty() bool {
return *s == State{}
}

// States handles list of FileState
type States struct {
states []State
sync.RWMutex
}

func NewStates() *States {
return &States{
states: []State{},
}
}

// Update updates a state. If previous state didn't exist, new one is created
func (s *States) Update(newState State) {
s.Lock()
defer s.Unlock()

index, _ := s.findPrevious(newState)
newState.Timestamp = time.Now()

if index >= 0 {
s.states[index] = newState
} else {
// No existing state found, add new one
s.states = append(s.states, newState)
logp.Debug("input", "New state added for %s", newState.Source)
}
}

func (s *States) FindPrevious(newState State) State {
s.RLock()
defer s.RUnlock()
_, state := s.findPrevious(newState)
return state
}

// findPreviousState returns the previous state fo the file
// In case no previous state exists, index -1 is returned
func (s *States) findPrevious(newState State) (int, State) {
// TODO: This could be made potentially more performance by using an index (harvester id) and only use iteration as fall back
for index, oldState := range s.states {
// This is using the FileStateOS for comparison as FileInfo identifiers can only be fetched for existing files
if oldState.IsEqual(&newState) {
return index, oldState
}
}

return -1, State{}
}

// Cleanup cleans up the state array. All states which are older then `older` are removed
// The number of states that were cleaned up is returned
func (s *States) Cleanup() int {
s.Lock()
defer s.Unlock()

statesBefore := len(s.states)

currentTime := time.Now()
states := s.states[:0]

for _, state := range s.states {

expired := (state.TTL > 0 && currentTime.Sub(state.Timestamp) > state.TTL)

if state.TTL == 0 || expired {
if state.Finished {
logp.Debug("state", "State removed for %v because of older: %v", state.Source, state.TTL)
continue // drop state
} else {
logp.Err("State for %s should have been dropped, but couldn't as state is not finished.", state.Source)
}
}

states = append(states, state) // in-place copy old state
}
s.states = states

return statesBefore - len(s.states)
}

// Count returns number of states
func (s *States) Count() int {
s.RLock()
defer s.RUnlock()

return len(s.states)
}

// Returns a copy of the file states
func (s *States) GetStates() []State {
s.RLock()
defer s.RUnlock()

newStates := make([]State, len(s.states))
copy(newStates, s.states)

return newStates
}

// SetStates overwrites all internal states with the given states array
func (s *States) SetStates(states []State) {
s.Lock()
defer s.Unlock()
s.states = states
}

// Copy create a new copy of the states object
func (s *States) Copy() *States {
states := NewStates()
states.states = s.GetStates()
return states
}
157 changes: 157 additions & 0 deletions filebeat/input/file/states.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
package file

import (
"sync"
"time"

"github.com/elastic/beats/libbeat/logp"
)

// States handles list of FileState. One must use NewStates to instantiate a
// file states regisry. Using the zero-value is not safe.
type States struct {
sync.RWMutex

// states store
states []State

// idx maps state IDs to state indexes for fast lookup and modifications.
idx map[string]int
}

// NewStates generates a new states registry.
func NewStates() *States {
return &States{
states: nil,
idx: map[string]int{},
}
}

// Update updates a state. If previous state didn't exist, new one is created
func (s *States) Update(newState State) {
s.UpdateWithTs(newState, time.Now())
}

// UpdateWithTs updates a state, assigning the given timestamp.
// If previous state didn't exist, new one is created
func (s *States) UpdateWithTs(newState State, ts time.Time) {
s.Lock()
defer s.Unlock()

id := newState.ID()
index := s.findPrevious(id)
newState.Timestamp = ts

if index >= 0 {
s.states[index] = newState
} else {
// No existing state found, add new one
s.idx[id] = len(s.states)
s.states = append(s.states, newState)
logp.Debug("input", "New state added for %s", newState.Source)
}
}

// FindPrevious lookups a registered state, that matching the new state.
// Returns a zero-state if no match is found.
func (s *States) FindPrevious(newState State) State {
s.RLock()
defer s.RUnlock()
i := s.findPrevious(newState.ID())
if i < 0 {
return State{}
}
return s.states[i]
}

// findPrevious returns the previous state for the file.
// In case no previous state exists, index -1 is returned
func (s *States) findPrevious(id string) int {
if i, exists := s.idx[id]; exists {
return i
}
return -1
}

// Cleanup cleans up the state array. All states which are older then `older` are removed
// The number of states that were cleaned up and number of states that can be
// cleaned up in the future is returned.
func (s *States) Cleanup() (int, int) {
s.Lock()
defer s.Unlock()

currentTime := time.Now()
statesBefore := len(s.states)
numCanExpire := 0

L := len(s.states)
for i := 0; i < L; {
state := &s.states[i]
canExpire := state.TTL > 0
expired := (canExpire && currentTime.Sub(state.Timestamp) > state.TTL)

if state.TTL == 0 || expired {
if !state.Finished {
logp.Err("State for %s should have been dropped, but couldn't as state is not finished.", state.Source)
i++
continue
}

delete(s.idx, state.ID())
logp.Debug("state", "State removed for %v because of older: %v", state.Source, state.TTL)

L--
if L != i {
s.states[i] = s.states[L]
s.idx[s.states[i].ID()] = i
}
} else {
i++
if canExpire {
numCanExpire++
}
}
}

s.states = s.states[:L]
return statesBefore - L, numCanExpire
}

// Count returns number of states
func (s *States) Count() int {
s.RLock()
defer s.RUnlock()

return len(s.states)
}

// GetStates creates copy of the file states.
func (s *States) GetStates() []State {
s.RLock()
defer s.RUnlock()

newStates := make([]State, len(s.states))
copy(newStates, s.states)

return newStates
}

// SetStates overwrites all internal states with the given states array
func (s *States) SetStates(states []State) {
s.Lock()
defer s.Unlock()
s.states = states

// create new index
s.idx = map[string]int{}
for i := range states {
s.idx[states[i].ID()] = i
}
}

// Copy create a new copy of the states object
func (s *States) Copy() *States {
new := NewStates()
new.SetStates(s.GetStates())
return new
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,42 +10,43 @@ import (
)

var cleanupTests = []struct {
title string
state State
countBefore int
cleanupCount int
countAfter int
}{
{
// Finished and TTL set to 0
"Finished and TTL set to 0",
State{
TTL: 0,
Finished: true,
}, 1, 1, 0,
},
{
// Unfinished but TTL set to 0
"Unfinished but TTL set to 0",
State{
TTL: 0,
Finished: false,
}, 1, 0, 1,
},
{
// TTL = -1 means not expiring
"TTL = -1 means not expiring",
State{
TTL: -1,
Finished: true,
}, 1, 0, 1,
},
{
// Expired and finished
"Expired and finished",
State{
TTL: 1 * time.Second,
Timestamp: time.Now().Add(-2 * time.Second),
Finished: true,
}, 1, 1, 0,
},
{
// Expired but unfinished
"Expired but unfinished",
State{
TTL: 1 * time.Second,
Timestamp: time.Now().Add(-2 * time.Second),
Expand All @@ -56,11 +57,15 @@ var cleanupTests = []struct {

func TestCleanup(t *testing.T) {
for _, test := range cleanupTests {
states := NewStates()
states.states = append(states.states, test.state)
test := test
t.Run(test.title, func(t *testing.T) {
states := NewStates()
states.SetStates([]State{test.state})

assert.Equal(t, test.countBefore, states.Count())
assert.Equal(t, test.cleanupCount, states.Cleanup())
assert.Equal(t, test.countAfter, states.Count())
assert.Equal(t, test.countBefore, states.Count())
cleanupCount, _ := states.Cleanup()
assert.Equal(t, test.cleanupCount, cleanupCount)
assert.Equal(t, test.countAfter, states.Count())
})
}
}
7 changes: 4 additions & 3 deletions filebeat/input/log/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func NewInput(
harvesters: harvester.NewRegistry(),
outlet: out,
stateOutlet: stateOut,
states: &file.States{},
states: file.NewStates(),
done: context.Done,
}

Expand Down Expand Up @@ -164,8 +164,9 @@ func (p *Input) Run() {
// It is important that a first scan is run before cleanup to make sure all new states are read first
if p.config.CleanInactive > 0 || p.config.CleanRemoved {
beforeCount := p.states.Count()
cleanedStates := p.states.Cleanup()
logp.Debug("input", "input states cleaned up. Before: %d, After: %d", beforeCount, beforeCount-cleanedStates)
cleanedStates, pendingClean := p.states.Cleanup()
logp.Debug("input", "input states cleaned up. Before: %d, After: %d, Pending: %d",
beforeCount, beforeCount-cleanedStates, pendingClean)
}

// Marking removed files to be cleaned up. Cleanup happens after next scan to make sure all states are updated first
Expand Down
2 changes: 1 addition & 1 deletion filebeat/input/log/input_other_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func TestInit(t *testing.T) {
config: config{
Paths: test.paths,
},
states: &file.States{},
states: file.NewStates(),
outlet: TestOutlet{},
}

Expand Down
Loading

0 comments on commit 490cbcd

Please sign in to comment.