Skip to content

Commit

Permalink
Add flush timeout setting to filebeat registrar (elastic#5146)
Browse files Browse the repository at this point in the history
  • Loading branch information
Steffen Siering authored and ruflin committed Sep 11, 2017
1 parent 68a6f35 commit 0d89cc7
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 12 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di
- Remove error log from runnerfactory as error is returned by API. {pull}5085[5085]
- Changed the number of shards in the default configuration to 3. {issue}5095[5095]
- Remove error log from runnerfactory as error is returned by API. {pull}5085[5085]
- Add `filebeat.registry_flush` setting, to delay the registry updates. {pull}5146[5146]

*Heartbeat*

Expand Down
2 changes: 1 addition & 1 deletion filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
finishedLogger := newFinishedLogger(wgEvents)

// Setup registrar to persist state
registrar, err := registrar.New(config.RegistryFile, finishedLogger)
registrar, err := registrar.New(config.RegistryFile, config.RegistryFlush, finishedLogger)
if err != nil {
logp.Err("Could not init registrar: %v", err)
return err
Expand Down
1 change: 1 addition & 0 deletions filebeat/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const (
type Config struct {
Prospectors []*common.Config `config:"prospectors"`
RegistryFile string `config:"registry_file"`
RegistryFlush time.Duration `config:"registry_flush"`
ConfigDir string `config:"config_dir"`
ShutdownTimeout time.Duration `config:"shutdown_timeout"`
Modules []*common.Config `config:"modules"`
Expand Down
47 changes: 36 additions & 11 deletions filebeat/registrar/registrar.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"os"
"path/filepath"
"sync"
"time"

"github.com/elastic/beats/filebeat/input/file"
helper "github.com/elastic/beats/libbeat/common/file"
Expand All @@ -18,9 +19,12 @@ type Registrar struct {
Channel chan []file.State
out successLogger
done chan struct{}
registryFile string // Path to the Registry File
states *file.States // Map with all file paths inside and the corresponding state
registryFile string // Path to the Registry File
wg sync.WaitGroup

states *file.States // Map with all file paths inside and the corresponding state
flushTimeout time.Duration
bufferedStateUpdates int
}

type successLogger interface {
Expand All @@ -34,12 +38,13 @@ var (
registryWrites = monitoring.NewInt(nil, "registrar.writes")
)

func New(registryFile string, out successLogger) (*Registrar, error) {
func New(registryFile string, flushTimeout time.Duration, out successLogger) (*Registrar, error) {
r := &Registrar{
registryFile: registryFile,
done: make(chan struct{}),
states: file.NewStates(),
Channel: make(chan []file.State, 1),
flushTimeout: flushTimeout,
out: out,
wg: sync.WaitGroup{},
}
Expand Down Expand Up @@ -149,13 +154,28 @@ func (r *Registrar) Run() {
r.wg.Done()
}()

var (
timer *time.Timer
flushC <-chan time.Time
)

for {
select {
case <-r.done:
logp.Info("Ending Registrar")
return
case <-flushC:
flushC = nil
timer.Stop()
r.flushRegistry()
case states := <-r.Channel:
r.onEvents(states)
if r.flushTimeout <= 0 {
r.flushRegistry()
} else if flushC == nil {
timer = time.NewTimer(r.flushTimeout)
flushC = timer.C
}
}
}
}
Expand All @@ -168,17 +188,11 @@ func (r *Registrar) onEvents(states []file.State) {
cleanedStates := r.states.Cleanup()
statesCleanup.Add(int64(cleanedStates))

r.bufferedStateUpdates += len(states)

logp.Debug("registrar",
"Registrar states cleaned up. Before: %d, After: %d",
beforeCount, beforeCount-cleanedStates)

if err := r.writeRegistry(); err != nil {
logp.Err("Writing of registry returned error: %v. Continuing...", err)
}

if r.out != nil {
r.out.Published(len(states))
}
}

// processEventStates gets the states from the events and writes them to the registrar state
Expand All @@ -198,6 +212,17 @@ func (r *Registrar) Stop() {
r.wg.Wait()
}

func (r *Registrar) flushRegistry() {
if err := r.writeRegistry(); err != nil {
logp.Err("Writing of registry returned error: %v. Continuing...", err)
}

if r.out != nil {
r.out.Published(r.bufferedStateUpdates)
}
r.bufferedStateUpdates = 0
}

// writeRegistry writes the new json registry file to disk.
func (r *Registrar) writeRegistry() error {
logp.Debug("registrar", "Write registry file: %s", r.registryFile)
Expand Down

0 comments on commit 0d89cc7

Please sign in to comment.