Skip to content

Commit

Permalink
libbeat/beat: introduce ClientConfig.WaitCloseChan
Browse files Browse the repository at this point in the history
Introduce WaitCloseChan as an alternative to the
WaitClose timeout for more flexible control over
blocking during Client.Close.
  • Loading branch information
axw committed Jul 13, 2020
1 parent 8aa304e commit a22ae69
Show file tree
Hide file tree
Showing 4 changed files with 180 additions and 21 deletions.
14 changes: 10 additions & 4 deletions libbeat/beat/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,18 @@ type ClientConfig struct {

CloseRef CloseRef

// WaitClose sets the maximum duration to wait on ACK, if client still has events
// active non-acknowledged events in the publisher pipeline.
// WaitClose is only effective if one of ACKCount, ACKEvents and ACKLastEvents
// is configured
// WaitClose holds a channel which controls the blocking behaviour of Close.
// If WaitClose is greater than zero, then Close will block until all events
// in the publisher pipeline have been acknowledged, WaitClose elapses, or
// (if non-nil) WaitCloseChan is signalled.
WaitClose time.Duration

// WaitCloseChan holds a channel which controls the blocking behaviour of Close.
// If WaitCloseChan is non-nil, then Close will block until all events in the
// publisher pipeline have been acknowledged, the channel is signalled, or
// (if specified) WaitClose elapses.
WaitCloseChan <-chan struct{}

// Configure ACK callback.
ACKHandler ACKer

Expand Down
35 changes: 23 additions & 12 deletions libbeat/publisher/pipeline/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,11 @@ type client struct {
reportEvents bool

// Open state, signaling, and sync primitives for coordinating client Close.
isOpen atomic.Bool // set to false during shutdown, such that no new events will be accepted anymore.
closeOnce sync.Once // closeOnce ensure that the client shutdown sequence is only executed once
closeRef beat.CloseRef // extern closeRef for sending a signal that the client should be closed.
done chan struct{} // the done channel will be closed if the closeReg gets closed, or Close is run.
isOpen atomic.Bool // set to false during shutdown, such that no new events will be accepted anymore.
closeOnce sync.Once // closeOnce ensure that the client shutdown sequence is only executed once
closeRef beat.CloseRef // extern closeRef for sending a signal that the client should be closed.
done chan struct{} // the done channel will be closed if closeRef is signalled, or Close is run.
waitClose <-chan struct{} // waitClose controls waiting for event acknowledgement during Close

eventer beat.ClientEventer
}
Expand All @@ -58,9 +59,10 @@ type clientCloseWaiter struct {
events atomic.Uint32
closing atomic.Bool

signalAll chan struct{} // ack loop notifies `close` that all events have been acked
signalDone chan struct{} // shutdown handler telling `wait` that shutdown has been completed
waitClose time.Duration
signalAll chan struct{} // ack loop notifies `close` that all events have been acked
signalDone chan struct{} // shutdown handler telling `wait` that shutdown has been completed
waitCloseTimeout time.Duration
waitCloseAbort <-chan struct{}
}

func (c *client) PublishAll(events []beat.Event) {
Expand Down Expand Up @@ -235,11 +237,12 @@ func (c *client) onDroppedOnPublish(e beat.Event) {
}
}

func newClientCloseWaiter(timeout time.Duration) *clientCloseWaiter {
func newClientCloseWaiter(timeout time.Duration, abort <-chan struct{}) *clientCloseWaiter {
return &clientCloseWaiter{
signalAll: make(chan struct{}, 1),
signalDone: make(chan struct{}),
waitClose: timeout,
signalAll: make(chan struct{}, 1),
signalDone: make(chan struct{}),
waitCloseTimeout: timeout,
waitCloseAbort: abort,
}
}

Expand Down Expand Up @@ -282,9 +285,17 @@ func (w *clientCloseWaiter) signalClose() {
go func() {
defer w.finishClose()

var timeoutChan <-chan time.Time
if w.waitCloseTimeout > 0 {
timer := time.NewTimer(w.waitCloseTimeout)
defer timer.Stop()
timeoutChan = timer.C
}

select {
case <-w.signalAll:
case <-time.After(w.waitClose):
case <-timeoutChan:
case <-w.waitCloseAbort:
}
}()
}
Expand Down
142 changes: 142 additions & 0 deletions libbeat/publisher/pipeline/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@ import (
"context"
"sync"
"testing"
"time"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/outputs"
"github.com/elastic/beats/v7/libbeat/publisher"
"github.com/elastic/beats/v7/libbeat/publisher/queue"
"github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue"
"github.com/elastic/beats/v7/libbeat/tests/resources"
)

Expand Down Expand Up @@ -113,3 +116,142 @@ func TestClient(t *testing.T) {
}
})
}

func TestClientWaitClose(t *testing.T) {
routinesChecker := resources.NewGoroutinesChecker()
defer routinesChecker.Check(t)

makePipeline := func(settings Settings, qu queue.Queue) *Pipeline {
p, err := New(beat.Info{},
Monitors{},
func(queue.ACKListener) (queue.Queue, error) { return qu, nil },
outputs.Group{},
settings,
)
if err != nil {
panic(err)
}

return p
}
if testing.Verbose() {
logp.TestingSetup()
}

q := memqueue.NewQueue(logp.L(), memqueue.Settings{Events: 1})
pipeline := makePipeline(Settings{}, q)
defer pipeline.Close()

t.Run("WaitCloseChan blocks", func(t *testing.T) {
for name, waitCloseTimeout := range map[string]time.Duration{
"without timeout": 0,
"with timeout": time.Minute,
} {
t.Run(name, func(t *testing.T) {
waitCloseChan := make(chan struct{})
client, err := pipeline.ConnectWith(beat.ClientConfig{
WaitCloseChan: waitCloseChan,
WaitClose: waitCloseTimeout,
})
if err != nil {
t.Fatal(err)
}
defer client.Close()

// Send an event which never gets acknowledged.
client.Publish(beat.Event{})

closed := make(chan struct{})
go func() {
defer close(closed)
client.Close()
}()

select {
case <-closed:
t.Fatal("expected Close to wait for event acknowledgement")
case <-time.After(100 * time.Millisecond):
}

close(waitCloseChan)
select {
case <-closed:
case <-time.After(10 * time.Second):
t.Fatal("expected Close to stop waiting after WaitCloseChan signalled")
}
})
}
})

t.Run("WaitClose blocks", func(t *testing.T) {
for name, waitCloseChan := range map[string]<-chan struct{}{
"without channel": nil,
"with channel": make(chan struct{}),
} {
t.Run(name, func(t *testing.T) {
client, err := pipeline.ConnectWith(beat.ClientConfig{
WaitCloseChan: waitCloseChan,
WaitClose: 500 * time.Millisecond,
})
if err != nil {
t.Fatal(err)
}
defer client.Close()

// Send an event which never gets acknowledged.
client.Publish(beat.Event{})

closed := make(chan struct{})
go func() {
defer close(closed)
client.Close()
}()

select {
case <-closed:
t.Fatal("expected Close to wait for event acknowledgement")
case <-time.After(100 * time.Millisecond):
}

select {
case <-closed:
case <-time.After(10 * time.Second):
t.Fatal("expected Close to stop waiting after WaitClose elapses")
}
})
}
})

t.Run("ACKing events unblocks", func(t *testing.T) {
client, err := pipeline.ConnectWith(beat.ClientConfig{
WaitCloseChan: make(chan struct{}),
WaitClose: time.Minute,
})
if err != nil {
t.Fatal(err)
}
defer client.Close()

// Send an event which gets acknowledged immediately.
client.Publish(beat.Event{})
output := newMockClient(func(batch publisher.Batch) error {
batch.ACK()
return nil
})
defer output.Close()
pipeline.output.Set(outputs.Group{Clients: []outputs.Client{output}})
defer pipeline.output.Set(outputs.Group{})

closed := make(chan struct{})
go func() {
defer close(closed)
client.Close()
}()

select {
case <-closed:
case <-time.After(10 * time.Second):
t.Fatal("expected Close to stop waiting after event acknowledgement")
}
})
}
10 changes: 5 additions & 5 deletions libbeat/publisher/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,15 +266,15 @@ func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) {
canDrop = true
}

waitClose := cfg.WaitClose
waitCloseTimeout := cfg.WaitClose
reportEvents := p.waitCloser != nil

switch p.waitCloseMode {
case NoWaitOnClose:

case WaitOnClientClose:
if waitClose <= 0 {
waitClose = p.waitCloseTimeout
if waitCloseTimeout <= 0 {
waitCloseTimeout = p.waitCloseTimeout
}
}

Expand Down Expand Up @@ -311,8 +311,8 @@ func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) {
}

var waiter *clientCloseWaiter
if waitClose > 0 {
waiter = newClientCloseWaiter(waitClose)
if waitCloseTimeout > 0 || cfg.WaitCloseChan != nil {
waiter = newClientCloseWaiter(waitCloseTimeout, cfg.WaitCloseChan)
}

if waiter != nil {
Expand Down

0 comments on commit a22ae69

Please sign in to comment.