Skip to content

Commit

Permalink
Replace common.Closer with context.Context
Browse files Browse the repository at this point in the history
The inputsource/common.Closer was managing bidirectional links between parents and children.
Anytime you closed an instance it would close all of its children and also remove itself from its
parents list of children (this is where the bug was). Every instance has its own mutex. While recursively
closing children it was easy to run into a deadlock because the parent holds a lock while closing
its children and then the child must edit the parent to remove itself so it also tries to acquire
the parent lock.

Instead of modifying the common.Closer I replaced it with a cancellable context.Context which
is designed to propagate signals from parent to children and requires less code.
  • Loading branch information
andrewkroh committed Jun 29, 2020
1 parent 53f2eab commit 4b2f104
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 211 deletions.
139 changes: 0 additions & 139 deletions filebeat/inputsource/common/closeref.go

This file was deleted.

54 changes: 0 additions & 54 deletions filebeat/inputsource/common/closeref_test.go

This file was deleted.

12 changes: 4 additions & 8 deletions filebeat/inputsource/common/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package common

import (
"bufio"
"context"
"net"

"github.com/pkg/errors"
Expand All @@ -31,15 +32,15 @@ import (
type HandlerFactory func(config ListenerConfig) ConnectionHandler

// ConnectionHandler interface provides mechanisms for handling of incoming connections
type ConnectionHandler func(CloseRef, net.Conn) error
type ConnectionHandler func(context.Context, net.Conn) error

// MetadataFunc defines callback executed when a line is read from the split handler.
type MetadataFunc func(net.Conn) inputsource.NetworkMetadata

// SplitHandlerFactory allows creation of a handler that has splitting capabilities.
func SplitHandlerFactory(family Family, logger *logp.Logger, metadataCallback MetadataFunc, callback inputsource.NetworkFunc, splitFunc bufio.SplitFunc) HandlerFactory {
return func(config ListenerConfig) ConnectionHandler {
return ConnectionHandler(func(closer CloseRef, conn net.Conn) error {
return ConnectionHandler(func(ctx context.Context, conn net.Conn) error {
metadata := metadataCallback(conn)
maxMessageSize := uint64(config.MaxMessageSize)

Expand All @@ -60,16 +61,11 @@ func SplitHandlerFactory(family Family, logger *logp.Logger, metadataCallback Me
scanner.Buffer(buffer, int(maxMessageSize))
for {
select {
case <-closer.Done():
case <-ctx.Done():
break
default:
}

// Ensure that if the Conn is already closed then dont attempt to scan again
if closer.Err() == ErrClosed {
break
}

if !scanner.Scan() {
break
}
Expand Down
28 changes: 18 additions & 10 deletions filebeat/inputsource/common/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package common
import (
"bufio"
"bytes"
"context"
"net"
"strings"
"sync"
Expand Down Expand Up @@ -51,9 +52,9 @@ type Listener struct {
config *ListenerConfig
family Family
wg sync.WaitGroup
done chan struct{}
log *logp.Logger
closer *Closer
ctx context.Context
cancel context.CancelFunc
clientsCount atomic.Int
handlerFactory HandlerFactory
listenerFactory ListenerFactory
Expand All @@ -63,10 +64,8 @@ type Listener struct {
func NewListener(family Family, location string, handlerFactory HandlerFactory, listenerFactory ListenerFactory, config *ListenerConfig) *Listener {
return &Listener{
config: config,
done: make(chan struct{}),
family: family,
log: logp.NewLogger(string(family)).With("address", location),
closer: NewCloser(nil),
handlerFactory: handlerFactory,
listenerFactory: listenerFactory,
}
Expand All @@ -80,7 +79,12 @@ func (l *Listener) Start() error {
return err
}

l.closer.SetCallback(func() { l.Listener.Close() })
l.ctx, l.cancel = context.WithCancel(context.Background())
go func() {
<-l.ctx.Done()
l.Listener.Close()
}()

l.log.Info("Started listening for " + l.family.String() + " connection")

l.wg.Add(1)
Expand All @@ -101,7 +105,7 @@ func (l *Listener) run() {
conn, err := l.Listener.Accept()
if err != nil {
select {
case <-l.closer.Done():
case <-l.ctx.Done():
return
default:
l.log.Debugw("Can not accept the connection", "error", err)
Expand All @@ -110,13 +114,17 @@ func (l *Listener) run() {
}

handler := l.handlerFactory(*l.config)
closer := WithCloser(l.closer, func() { conn.Close() })
ctx, cancel := context.WithCancel(l.ctx)
go func() {
<-ctx.Done()
conn.Close()
}()

l.wg.Add(1)
go func() {
defer logp.Recover("recovering from a " + l.family.String() + " client crash")
defer l.wg.Done()
defer closer.Close()
defer cancel()

l.registerHandler()
defer l.unregisterHandler()
Expand All @@ -128,7 +136,7 @@ func (l *Listener) run() {
l.log.Debugw("New client", "remote_address", conn.RemoteAddr(), "total", l.clientsCount.Load())
}

err := handler(closer, conn)
err := handler(ctx, conn)
if err != nil {
l.log.Debugw("client error", "error", err)
}
Expand All @@ -148,7 +156,7 @@ func (l *Listener) run() {
// Stop stops accepting new incoming connections and Close any active clients
func (l *Listener) Stop() {
l.log.Info("Stopping" + l.family.String() + "server")
l.closer.Close()
l.cancel()
l.wg.Wait()
l.log.Info(l.family.String() + " server stopped")
}
Expand Down

0 comments on commit 4b2f104

Please sign in to comment.