Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ClientFactory to TCP input source to add SplitFunc/NetworkFuncs per client #8543

Merged
merged 3 commits into from
Jul 17, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,4 @@ The list below covers the major changes between 7.0.0-rc2 and master only.
- Use the go-lookslike library for testing in heartbeat. Eventually the mapval package will be replaced with it. {pull}12540[12540]
- New ReporterV2 interfaces that can receive a context on `Fetch(ctx, reporter)`, or `Run(ctx, reporter)`. {pull}11981[11981]
- Generate configuration from `mage` for all Beats. {pull}12618[12618]
- Add ClientFactory to TCP input source to add SplitFunc/NetworkFuncs per client. {pull}8543[8543]
8 changes: 5 additions & 3 deletions filebeat/input/syslog/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ var defaultUDP = udp.Config{
}

func factory(
cb inputsource.NetworkFunc,
nf inputsource.NetworkFunc,
config common.ConfigNamespace,
) (inputsource.Network, error) {
n, cfg := config.Name(), config.Config()
Expand All @@ -77,13 +77,15 @@ func factory(
return nil, fmt.Errorf("error creating splitFunc from delimiter %s", config.LineDelimiter)
}

return tcp.New(&config.Config, splitFunc, cb)
factory := tcp.SplitHandlerFactory(nf, splitFunc)

return tcp.New(&config.Config, factory)
case udp.Name:
config := defaultUDP
if err := cfg.Unpack(&config); err != nil {
return nil, err
}
return udp.New(&config, cb), nil
return udp.New(&config, nf), nil
default:
return nil, fmt.Errorf("you must choose between TCP or UDP")
}
Expand Down
4 changes: 3 additions & 1 deletion filebeat/input/tcp/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@ func NewInput(
return nil, fmt.Errorf("unable to create splitFunc for delimiter %s", config.LineDelimiter)
}

server, err := tcp.New(&config.Config, splitFunc, cb)
factory := tcp.SplitHandlerFactory(cb, splitFunc)

server, err := tcp.New(&config.Config, factory)
if err != nil {
return nil, err
}
Expand Down
61 changes: 39 additions & 22 deletions filebeat/inputsource/tcp/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,9 @@ import (
"github.com/elastic/beats/libbeat/logp"
)

// Client is a remote client.
type client struct {
// splitHandler is a TCP client that has splitting capabilities.
type splitHandler struct {
conn net.Conn
log *logp.Logger
callback inputsource.NetworkFunc
done chan struct{}
metadata inputsource.NetworkMetadata
Expand All @@ -43,33 +42,50 @@ type client struct {
timeout time.Duration
}

func newClient(
conn net.Conn,
log *logp.Logger,
// ClientFactory returns a ConnectionHandler func
type ClientFactory func(config Config) ConnectionHandler

// ConnectionHandler interface provides mechanisms for handling of incoming TCP connections
type ConnectionHandler interface {
Handle(conn net.Conn) error
Close()
}

// SplitHandlerFactory allows creation of a ConnectionHandler that can do splitting of messages received on a TCP connection.
func SplitHandlerFactory(callback inputsource.NetworkFunc, splitFunc bufio.SplitFunc) ClientFactory {
return func(config Config) ConnectionHandler {
return newSplitHandler(callback, splitFunc, uint64(config.MaxMessageSize), config.Timeout)
}
}

// newSplitHandler allows creation of a TCP client that has splitting capabilities.
func newSplitHandler(
callback inputsource.NetworkFunc,
splitFunc bufio.SplitFunc,
maxReadMessage uint64,
timeout time.Duration,
) *client {
client := &client{
conn: conn,
log: log.With("remote_address", conn.RemoteAddr()),
) ConnectionHandler {
client := &splitHandler{
callback: callback,
done: make(chan struct{}),
splitFunc: splitFunc,
maxMessageSize: maxReadMessage,
timeout: timeout,
metadata: inputsource.NetworkMetadata{
RemoteAddr: conn.RemoteAddr(),
TLS: extractSSLInformation(conn),
},
}
extractSSLInformation(conn)
return client
}

func (c *client) handle() error {
r := NewResetableLimitedReader(NewDeadlineReader(c.conn, c.timeout), c.maxMessageSize)
// Handle takes a connection as input and processes data received on it.
func (c *splitHandler) Handle(conn net.Conn) error {
c.conn = conn
c.metadata = inputsource.NetworkMetadata{
RemoteAddr: conn.RemoteAddr(),
TLS: extractSSLInformation(conn),
}

log := logp.NewLogger("split_client").With("remote_addr", conn.RemoteAddr().String())

r := NewResetableLimitedReader(NewDeadlineReader(conn, c.timeout), c.maxMessageSize)
buf := bufio.NewReader(r)
scanner := bufio.NewScanner(buf)
scanner.Split(c.splitFunc)
Expand All @@ -79,24 +95,24 @@ func (c *client) handle() error {
for scanner.Scan() {
err := scanner.Err()
if err != nil {
// we are forcing a close on the socket, lets ignore any error that could happen.
// we are forcing a Close on the socket, lets ignore any error that could happen.
select {
case <-c.done:
break
default:
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The channel could be closed anytime, not only if the reader fails.
The way this function is written the callback if scan could produce an 'event', but channel is closed in the meantime. Is this expected behavior or do we rather not want to publish the event on close and guarantee the handler can return immediately.

// This is a user defined limit and we should notify the user.
if IsMaxReadBufferErr(err) {
c.log.Errorw("client error", "error", err)
log.Errorw("split_client error", "error", err)
}
return errors.Wrap(err, "tcp client error")
return errors.Wrap(err, "tcp split_client error")
}
r.Reset()
c.callback(scanner.Bytes(), c.metadata)
}

// We are out of the scanner, either we reached EOF or another fatal error occurred.
// like we failed to complete the TLS handshake or we are missing the client certificate when
// like we failed to complete the TLS handshake or we are missing the splitHandler certificate when
// mutual auth is on, which is the default.
if err := scanner.Err(); err != nil {
return err
Expand All @@ -105,7 +121,8 @@ func (c *client) handle() error {
return nil
}

func (c *client) close() {
// Close is used to perform clean up before the client is released.
func (c *splitHandler) Close() {
close(c.done)
c.conn.Close()
}
Expand Down
51 changes: 22 additions & 29 deletions filebeat/inputsource/tcp/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (

"golang.org/x/net/netutil"

"github.com/elastic/beats/filebeat/inputsource"
"github.com/elastic/beats/libbeat/common/transport/tlscommon"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/outputs/transport"
Expand All @@ -36,38 +35,35 @@ import (
// Server represent a TCP server
type Server struct {
sync.RWMutex
callback inputsource.NetworkFunc
config *Config
Listener net.Listener
clients map[*client]struct{}
clients map[ConnectionHandler]struct{}
wg sync.WaitGroup
done chan struct{}
splitFunc bufio.SplitFunc
factory ClientFactory
log *logp.Logger
tlsConfig *transport.TLSConfig
}

// New creates a new tcp server
func New(
config *Config,
splitFunc bufio.SplitFunc,
callback inputsource.NetworkFunc,
factory ClientFactory,
) (*Server, error) {
tlsConfig, err := tlscommon.LoadTLSServerConfig(config.TLS)
if err != nil {
return nil, err
}

if splitFunc == nil {
return nil, fmt.Errorf("SplitFunc can't be empty")
if factory == nil {
return nil, fmt.Errorf("ClientFactory can't be empty")
}

return &Server{
config: config,
callback: callback,
clients: make(map[*client]struct{}, 0),
clients: make(map[ConnectionHandler]struct{}, 0),
done: make(chan struct{}),
splitFunc: splitFunc,
factory: factory,
log: logp.NewLogger("tcp").With("address", config.Host),
tlsConfig: tlsConfig,
}, nil
Expand All @@ -91,7 +87,11 @@ func (s *Server) Start() error {
return nil
}

// Run start and run a new TCP listener to receive new data
// Run start and run a new TCP listener to receive new data. When a new connection is accepted, the factory is used
// to create a ConnectionHandler. The ConnectionHandler takes the connection as input and handles the data that is
// being received via tha io.Reader. Most clients use the splitHandler which can take a bufio.SplitFunc and parse
// out each message into an appropriate event. The Close() of the ConnectionHandler can be used to clean up the
// connection either by client or server based on need.
func (s *Server) run() {
for {
conn, err := s.Listener.Accept()
Expand All @@ -105,14 +105,7 @@ func (s *Server) run() {
}
}

client := newClient(
conn,
s.log,
s.callback,
s.splitFunc,
uint64(s.config.MaxMessageSize),
s.config.Timeout,
)
client := s.factory(*s.config)

s.wg.Add(1)
go func() {
Expand All @@ -124,13 +117,13 @@ func (s *Server) run() {
defer s.unregisterClient(client)
s.log.Debugw("New client", "remote_address", conn.RemoteAddr(), "total", s.clientsCount())

err := client.handle()
err := client.Handle(conn)
if err != nil {
s.log.Debugw("Client error", "error", err)
s.log.Debugw("client error", "error", err)
}

defer s.log.Debugw(
"Client disconnected",
"client disconnected",
"remote_address",
conn.RemoteAddr(),
"total",
Expand All @@ -140,34 +133,34 @@ func (s *Server) run() {
}
}

// Stop stops accepting new incoming TCP connection and close any active clients
// Stop stops accepting new incoming TCP connection and Close any active clients
func (s *Server) Stop() {
s.log.Info("Stopping TCP server")
close(s.done)
s.Listener.Close()
for _, client := range s.allClients() {
client.close()
client.Close()
}
s.wg.Wait()
s.log.Info("TCP server stopped")
}

func (s *Server) registerClient(client *client) {
func (s *Server) registerClient(client ConnectionHandler) {
s.Lock()
defer s.Unlock()
s.clients[client] = struct{}{}
}

func (s *Server) unregisterClient(client *client) {
func (s *Server) unregisterClient(client ConnectionHandler) {
s.Lock()
defer s.Unlock()
delete(s.clients, client)
}

func (s *Server) allClients() []*client {
func (s *Server) allClients() []ConnectionHandler {
s.RLock()
defer s.RUnlock()
currentClients := make([]*client, len(s.clients))
currentClients := make([]ConnectionHandler, len(s.clients))
idx := 0
for client := range s.clients {
currentClients[idx] = client
Expand Down
9 changes: 7 additions & 2 deletions filebeat/inputsource/tcp/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,9 @@ func TestReceiveEventsAndMetadata(t *testing.T) {
if !assert.NoError(t, err) {
return
}
server, err := New(&config, test.splitFunc, to)

factory := SplitHandlerFactory(to, test.splitFunc)
server, err := New(&config, factory)
if !assert.NoError(t, err) {
return
}
Expand Down Expand Up @@ -217,7 +219,10 @@ func TestReceiveNewEventsConcurrently(t *testing.T) {
if !assert.NoError(t, err) {
return
}
server, err := New(&config, bufio.ScanLines, to)

factory := SplitHandlerFactory(to, bufio.ScanLines)

server, err := New(&config, factory)
if !assert.NoError(t, err) {
return
}
Expand Down