Skip to content

Commit

Permalink
Defer registry state cleanup (elastic#6347)
Browse files Browse the repository at this point in the history
- Defer registrar state gc until the registry needs to be written.
- when updating regitrar states from batch, ensure all updated states will
have same timestamp.

Depends on elastic#6346
  • Loading branch information
Steffen Siering authored and ruflin committed Feb 23, 2018
1 parent 3e408ca commit 2d2400b
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 4 deletions.
37 changes: 34 additions & 3 deletions filebeat/registrar/registrar.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ type Registrar struct {
wg sync.WaitGroup

states *file.States // Map with all file paths inside and the corresponding state
gcRequired bool // gcRequired is set if registry state needs to be gc'ed before the next write
gcEnabled bool // gcEnabled indictes the registry contains some state that can be gc'ed in the future
flushTimeout time.Duration
bufferedStateUpdates int
}
Expand Down Expand Up @@ -183,24 +185,51 @@ func (r *Registrar) Run() {
// onEvents processes events received from the publisher pipeline
func (r *Registrar) onEvents(states []file.State) {
r.processEventStates(states)
r.bufferedStateUpdates += len(states)

// check if we need to enable state cleanup
if !r.gcEnabled {
for i := range states {
if states[i].TTL >= 0 || states[i].Finished {
r.gcEnabled = true
break
}
}
}

logp.Debug("registrar", "Registrar state updates processed. Count: %v", len(states))

// new set of events received -> mark state registry ready for next
// cleanup phase in case gc'able events are stored in the registry.
r.gcRequired = r.gcEnabled
}

// gcStates runs a registry Cleanup. The bool returned indicates wether more
// events in the registry can be gc'ed in the future.
func (r *Registrar) gcStates() {
if !r.gcRequired {
return
}

beforeCount := r.states.Count()
cleanedStates, pendingClean := r.states.Cleanup()
statesCleanup.Add(int64(cleanedStates))

r.bufferedStateUpdates += len(states)

logp.Debug("registrar",
"Registrar states cleaned up. Before: %d, After: %d, Pending: %d",
beforeCount, beforeCount-cleanedStates, pendingClean)

r.gcRequired = false
r.gcEnabled = pendingClean > 0
}

// processEventStates gets the states from the events and writes them to the registrar state
func (r *Registrar) processEventStates(states []file.State) {
logp.Debug("registrar", "Processing %d events", len(states))

ts := time.Now()
for i := range states {
r.states.Update(states[i])
r.states.UpdateWithTs(states[i], ts)
statesUpdate.Add(1)
}
}
Expand All @@ -225,6 +254,8 @@ func (r *Registrar) flushRegistry() {

// writeRegistry writes the new json registry file to disk.
func (r *Registrar) writeRegistry() error {
r.gcStates()

logp.Debug("registrar", "Write registry file: %s", r.registryFile)

tempfile := r.registryFile + ".new"
Expand Down
2 changes: 1 addition & 1 deletion filebeat/tests/system/test_harvester.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ def test_exceed_buffer(self):
# Wait until state is written
self.wait_until(
lambda: self.log_contains(
"Registrar states cleaned up"),
"Registrar state updates processed"),
max_timeout=15)

filebeat.check_kill_and_wait()
Expand Down

0 comments on commit 2d2400b

Please sign in to comment.