Skip to content

Commit

Permalink
[Filebeat] Fix reference leak in TCP and Unix socket inputs (elastic#…
Browse files Browse the repository at this point in the history
…19459)

The tcp and unix input sources were leaking references causing a memory leak.
When an accepted connection ended inputsource/common.Closer was
supposed to delete the pointer that it held to the connection, but due to a code
error `delete` was being called on the wrong map.

Instead of modifying the common.Closer I replaced it with a cancellable context.Context which
is designed to propagate signals from parent to children and requires less code.

(cherry picked from commit 61f4846)
  • Loading branch information
andrewkroh committed Jul 13, 2020
1 parent e0235ae commit bcf4477
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ field. You can revert this change by configuring tags for the module and omittin
- Fix date and timestamp formats for fortigate module {pull}19316[19316]
- Add missing `default_field: false` to aws filesets fields.yml. {pull}19568[19568]
- Fix tls mapping in suricata module {issue}19492[19492] {pull}19494[19494]
- Fix memory leak in tcp and unix input sources. {pull}19459[19459]

*Heartbeat*

Expand Down
12 changes: 4 additions & 8 deletions filebeat/inputsource/common/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package common

import (
"bufio"
"context"
"net"

"github.com/pkg/errors"
Expand All @@ -31,15 +32,15 @@ import (
type HandlerFactory func(config ListenerConfig) ConnectionHandler

// ConnectionHandler interface provides mechanisms for handling of incoming connections
type ConnectionHandler func(CloseRef, net.Conn) error
type ConnectionHandler func(context.Context, net.Conn) error

// MetadataFunc defines callback executed when a line is read from the split handler.
type MetadataFunc func(net.Conn) inputsource.NetworkMetadata

// SplitHandlerFactory allows creation of a handler that has splitting capabilities.
func SplitHandlerFactory(family Family, logger *logp.Logger, metadataCallback MetadataFunc, callback inputsource.NetworkFunc, splitFunc bufio.SplitFunc) HandlerFactory {
return func(config ListenerConfig) ConnectionHandler {
return ConnectionHandler(func(closer CloseRef, conn net.Conn) error {
return ConnectionHandler(func(ctx context.Context, conn net.Conn) error {
metadata := metadataCallback(conn)
maxMessageSize := uint64(config.MaxMessageSize)

Expand All @@ -60,16 +61,11 @@ func SplitHandlerFactory(family Family, logger *logp.Logger, metadataCallback Me
scanner.Buffer(buffer, int(maxMessageSize))
for {
select {
case <-closer.Done():
case <-ctx.Done():
break
default:
}

// Ensure that if the Conn is already closed then dont attempt to scan again
if closer.Err() == ErrClosed {
break
}

if !scanner.Scan() {
break
}
Expand Down
8 changes: 8 additions & 0 deletions filebeat/inputsource/common/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,14 @@ func (l *Listener) Start() error {
return err
}

l.ctx, l.cancel = context.WithCancel(context.Background())
go func() {
<-l.ctx.Done()
l.Listener.Close()
}()

l.log.Info("Started listening for " + l.family.String() + " connection")

l.wg.Add(1)
go func() {
defer l.wg.Done()
Expand Down

0 comments on commit bcf4477

Please sign in to comment.