From 4b2f1045812a4b6623506d8b6f50ceae511f596b Mon Sep 17 00:00:00 2001 From: Andrew Kroh Date: Fri, 26 Jun 2020 16:12:25 -0400 Subject: [PATCH] Replace common.Closer with context.Context The inputsource/common.Closer was managing bidirectional links between parents and children. Anytime you closed an instance it would close all of its children and also remove itself from its parents list of children (this is where the bug was). Every instance has its own mutex. While recursively closing children it was easy to run into a deadlock because the parent holds a lock while closing its children and then the child must edit the parent to remove itself so it also tries to acquire the parent lock. Instead of modifying the common.Closer I replaced it with a cancellable context.Context which is designed to propagate signals from parent to children and requires less code. --- filebeat/inputsource/common/closeref.go | 139 ------------------- filebeat/inputsource/common/closeref_test.go | 54 ------- filebeat/inputsource/common/handler.go | 12 +- filebeat/inputsource/common/listener.go | 28 ++-- 4 files changed, 22 insertions(+), 211 deletions(-) delete mode 100644 filebeat/inputsource/common/closeref.go delete mode 100644 filebeat/inputsource/common/closeref_test.go diff --git a/filebeat/inputsource/common/closeref.go b/filebeat/inputsource/common/closeref.go deleted file mode 100644 index b7ea97cd6a2..00000000000 --- a/filebeat/inputsource/common/closeref.go +++ /dev/null @@ -1,139 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package common - -import ( - "sync" - - "github.com/pkg/errors" -) - -// CloserFunc is the function called by the Closer on `Close()`. -type CloserFunc func() - -// ErrClosed is returned when the Closer is closed. -var ErrClosed = errors.New("closer is closed") - -// CloseRef implements a subset of the context.Context interface and it's use to synchronize -// the shutdown of multiple go-routines. -type CloseRef interface { - Done() <-chan struct{} - Err() error -} - -// Closer implements a shutdown strategy when dealing with multiples go-routines, it creates a tree -// of Closer, when you call `Close()` on a parent the `Close()` method will be called on the current -// closer and any of the childs it may have and will remove the current node from the parent. -// -// NOTE: The `Close()` is reentrant but will propage the close only once. -type Closer struct { - mu sync.Mutex - done chan struct{} - err error - parent *Closer - children map[*Closer]struct{} - callback CloserFunc -} - -// Close closes the closes and propagates the close to any child, on close the close callback will -// be called, this can be used for custom cleanup like closing a socket. -func (c *Closer) Close() { - c.mu.Lock() - if c.err != nil { - c.mu.Unlock() - return - } - - // Close the channel first so that all processing in Handle() ends first - close(c.done) - - if c.callback != nil { - c.callback() - } - - // propagate close to children. - if c.children != nil { - for child := range c.children { - // Remove parent to prevent circular references (and deadlock). - child.parent = nil - child.Close() - } - c.children = nil - } - - c.err = ErrClosed - c.mu.Unlock() - - if c.parent != nil { - c.parent.removeChild(c) - } -} - -// Done returns the synchronization channel, the channel will be closed if `Close()` was called on -// the current node or any parent it may have. -func (c *Closer) Done() <-chan struct{} { - return c.done -} - -// SetCallback sets the underlying callback function invoked -// when the Closer is Closed. -func (c *Closer) SetCallback(callback CloserFunc) { - c.callback = callback -} - -// Err returns an error if the Closer was already closed. -func (c *Closer) Err() error { - c.mu.Lock() - err := c.err - c.mu.Unlock() - return err -} - -func (c *Closer) removeChild(child *Closer) { - c.mu.Lock() - delete(c.children, child) - c.mu.Unlock() -} - -func (c *Closer) addChild(child *Closer) { - c.mu.Lock() - if c.children == nil { - c.children = make(map[*Closer]struct{}) - } - c.children[child] = struct{}{} - c.mu.Unlock() -} - -// WithCloser wraps a new closer into a child of an existing closer. -func WithCloser(parent *Closer, fn CloserFunc) *Closer { - child := &Closer{ - done: make(chan struct{}), - parent: parent, - callback: fn, - } - parent.addChild(child) - return child -} - -// NewCloser creates a new Closer. -func NewCloser(fn CloserFunc) *Closer { - return &Closer{ - done: make(chan struct{}), - callback: fn, - } -} diff --git a/filebeat/inputsource/common/closeref_test.go b/filebeat/inputsource/common/closeref_test.go deleted file mode 100644 index 83bb941a011..00000000000 --- a/filebeat/inputsource/common/closeref_test.go +++ /dev/null @@ -1,54 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package common - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestCloser(t *testing.T) { - var closeCount int - closerFunc := func() { - closeCount++ - } - - t.Run("parent closes children", func(t *testing.T) { - closeCount = 0 - parent := NewCloser(closerFunc) - WithCloser(parent, closerFunc) - - parent.Close() - assert.Equal(t, 2, closeCount) - }) - - t.Run("children are released from parent", func(t *testing.T) { - closeCount = 0 - parent := NewCloser(closerFunc) - child := WithCloser(parent, closerFunc) - - child.Close() - assert.Equal(t, 1, closeCount) - - // Inspect internal state to verify all children were removed. - parent.mu.Lock() - assert.Len(t, parent.children, 0) - parent.mu.Unlock() - }) -} diff --git a/filebeat/inputsource/common/handler.go b/filebeat/inputsource/common/handler.go index 84786086f4e..a55ee1755d5 100644 --- a/filebeat/inputsource/common/handler.go +++ b/filebeat/inputsource/common/handler.go @@ -19,6 +19,7 @@ package common import ( "bufio" + "context" "net" "github.com/pkg/errors" @@ -31,7 +32,7 @@ import ( type HandlerFactory func(config ListenerConfig) ConnectionHandler // ConnectionHandler interface provides mechanisms for handling of incoming connections -type ConnectionHandler func(CloseRef, net.Conn) error +type ConnectionHandler func(context.Context, net.Conn) error // MetadataFunc defines callback executed when a line is read from the split handler. type MetadataFunc func(net.Conn) inputsource.NetworkMetadata @@ -39,7 +40,7 @@ type MetadataFunc func(net.Conn) inputsource.NetworkMetadata // SplitHandlerFactory allows creation of a handler that has splitting capabilities. func SplitHandlerFactory(family Family, logger *logp.Logger, metadataCallback MetadataFunc, callback inputsource.NetworkFunc, splitFunc bufio.SplitFunc) HandlerFactory { return func(config ListenerConfig) ConnectionHandler { - return ConnectionHandler(func(closer CloseRef, conn net.Conn) error { + return ConnectionHandler(func(ctx context.Context, conn net.Conn) error { metadata := metadataCallback(conn) maxMessageSize := uint64(config.MaxMessageSize) @@ -60,16 +61,11 @@ func SplitHandlerFactory(family Family, logger *logp.Logger, metadataCallback Me scanner.Buffer(buffer, int(maxMessageSize)) for { select { - case <-closer.Done(): + case <-ctx.Done(): break default: } - // Ensure that if the Conn is already closed then dont attempt to scan again - if closer.Err() == ErrClosed { - break - } - if !scanner.Scan() { break } diff --git a/filebeat/inputsource/common/listener.go b/filebeat/inputsource/common/listener.go index 9d686f922a6..4692e474930 100644 --- a/filebeat/inputsource/common/listener.go +++ b/filebeat/inputsource/common/listener.go @@ -20,6 +20,7 @@ package common import ( "bufio" "bytes" + "context" "net" "strings" "sync" @@ -51,9 +52,9 @@ type Listener struct { config *ListenerConfig family Family wg sync.WaitGroup - done chan struct{} log *logp.Logger - closer *Closer + ctx context.Context + cancel context.CancelFunc clientsCount atomic.Int handlerFactory HandlerFactory listenerFactory ListenerFactory @@ -63,10 +64,8 @@ type Listener struct { func NewListener(family Family, location string, handlerFactory HandlerFactory, listenerFactory ListenerFactory, config *ListenerConfig) *Listener { return &Listener{ config: config, - done: make(chan struct{}), family: family, log: logp.NewLogger(string(family)).With("address", location), - closer: NewCloser(nil), handlerFactory: handlerFactory, listenerFactory: listenerFactory, } @@ -80,7 +79,12 @@ func (l *Listener) Start() error { return err } - l.closer.SetCallback(func() { l.Listener.Close() }) + l.ctx, l.cancel = context.WithCancel(context.Background()) + go func() { + <-l.ctx.Done() + l.Listener.Close() + }() + l.log.Info("Started listening for " + l.family.String() + " connection") l.wg.Add(1) @@ -101,7 +105,7 @@ func (l *Listener) run() { conn, err := l.Listener.Accept() if err != nil { select { - case <-l.closer.Done(): + case <-l.ctx.Done(): return default: l.log.Debugw("Can not accept the connection", "error", err) @@ -110,13 +114,17 @@ func (l *Listener) run() { } handler := l.handlerFactory(*l.config) - closer := WithCloser(l.closer, func() { conn.Close() }) + ctx, cancel := context.WithCancel(l.ctx) + go func() { + <-ctx.Done() + conn.Close() + }() l.wg.Add(1) go func() { defer logp.Recover("recovering from a " + l.family.String() + " client crash") defer l.wg.Done() - defer closer.Close() + defer cancel() l.registerHandler() defer l.unregisterHandler() @@ -128,7 +136,7 @@ func (l *Listener) run() { l.log.Debugw("New client", "remote_address", conn.RemoteAddr(), "total", l.clientsCount.Load()) } - err := handler(closer, conn) + err := handler(ctx, conn) if err != nil { l.log.Debugw("client error", "error", err) } @@ -148,7 +156,7 @@ func (l *Listener) run() { // Stop stops accepting new incoming connections and Close any active clients func (l *Listener) Stop() { l.log.Info("Stopping" + l.family.String() + "server") - l.closer.Close() + l.cancel() l.wg.Wait() l.log.Info(l.family.String() + " server stopped") }