From 872989fb1caefcd861b345cee308ae9d2b016a0e Mon Sep 17 00:00:00 2001 From: Vijay Samuel Date: Wed, 12 Sep 2018 11:13:56 -0700 Subject: [PATCH] Allow TCP helper to support delimiters (#8278) (cherry picked from commit eeb1d3b6a9b7bdaaa7c440e294b923c38ec339f6) --- CHANGELOG.asciidoc | 1 + metricbeat/helper/server/tcp/config.go | 15 ++++++- metricbeat/helper/server/tcp/tcp.go | 52 ++++++++++++++++++------ metricbeat/helper/server/tcp/tcp_test.go | 3 +- 4 files changed, 56 insertions(+), 15 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index dba8d0d49f7..6d64a248c4c 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -105,6 +105,7 @@ https://github.com/elastic/beats/compare/v6.4.0...6.x[Check the HEAD diff] - Add fields for memory fragmentation, memory allocator stats, copy on write, master-slave status, and active defragmentation to `info` metricset of Redis module. {pull}7695[7695] - Increase ignore_above for system.process.cmdline to 2048. {pull}8101[8100] - Add support to renamed fields planned for redis 5.0. {pull}8167[8167] +- Allow TCP helper to support delimiters. {pull}8278[8278] - Add `metrics` metricset to MongoDB module. {pull}7611[7611] diff --git a/metricbeat/helper/server/tcp/config.go b/metricbeat/helper/server/tcp/config.go index 88c039706a2..def34d1502b 100644 --- a/metricbeat/helper/server/tcp/config.go +++ b/metricbeat/helper/server/tcp/config.go @@ -17,16 +17,29 @@ package tcp +import "fmt" + type TcpConfig struct { Host string `config:"host"` Port int `config:"port"` ReceiveBufferSize int `config:"receive_buffer_size"` + Delimiter string `config:"delimiter"` } func defaultTcpConfig() TcpConfig { return TcpConfig{ Host: "localhost", Port: 2003, - ReceiveBufferSize: 1024, + ReceiveBufferSize: 4096, + Delimiter: "\n", } } + +// Validate ensures that the configured delimiter has only one character +func (t *TcpConfig) Validate() error { + if len(t.Delimiter) != 1 { + return fmt.Errorf("length of delimiter is expected to be 1 but is %v", len(t.Delimiter)) + } + + return nil +} diff --git a/metricbeat/helper/server/tcp/tcp.go b/metricbeat/helper/server/tcp/tcp.go index 7b8529d7023..d8a2a48b2f5 100644 --- a/metricbeat/helper/server/tcp/tcp.go +++ b/metricbeat/helper/server/tcp/tcp.go @@ -18,6 +18,7 @@ package tcp import ( + "bufio" "fmt" "net" @@ -35,6 +36,7 @@ type TcpServer struct { receiveBufferSize int done chan struct{} eventQueue chan server.Event + delimiter byte } type TcpEvent struct { @@ -67,6 +69,7 @@ func NewTcpServer(base mb.BaseMetricSet) (server.Server, error) { receiveBufferSize: config.ReceiveBufferSize, done: make(chan struct{}), eventQueue: make(chan server.Event), + delimiter: byte(config.Delimiter[0]), }, nil } @@ -83,7 +86,6 @@ func (g *TcpServer) Start() error { } func (g *TcpServer) watchMetrics() { - buffer := make([]byte, g.receiveBufferSize) for { select { case <-g.done: @@ -96,22 +98,46 @@ func (g *TcpServer) watchMetrics() { logp.Err("Unable to accept connection due to error: %v", err) continue } - defer func() { - if conn != nil { - conn.Close() - } - }() - length, err := conn.Read(buffer) + if conn != nil { + go g.handle(conn) + } + } +} + +func (g *TcpServer) handle(conn net.Conn) { + logp.Debug("tcp", "Handling new connection...") + + // Close connection when this function ends + defer func() { + conn.Close() + }() + + // Get a new reader with buffer size as the same as receiveBufferSize + bufReader := bufio.NewReaderSize(conn, g.receiveBufferSize) + + for { + // Read tokens delimited by delimiter + bytes, err := bufReader.ReadBytes(g.delimiter) if err != nil { - logp.Err("Error reading from buffer: %v", err.Error()) - continue + logp.Debug("tcp", "unable to read bytes due to error: %v", err) + return + } + + // Truncate to max buffer size if too big of a payload + if len(bytes) > g.receiveBufferSize { + bytes = bytes[:g.receiveBufferSize] } - g.eventQueue <- &TcpEvent{ - event: common.MapStr{ - server.EventDataKey: buffer[:length], - }, + + // Drop the delimiter and send the data + if len(bytes) > 0 { + g.eventQueue <- &TcpEvent{ + event: common.MapStr{ + server.EventDataKey: bytes[:len(bytes)-1], + }, + } } + } } diff --git a/metricbeat/helper/server/tcp/tcp_test.go b/metricbeat/helper/server/tcp/tcp_test.go index 018ef0d1f71..27ad81a060f 100644 --- a/metricbeat/helper/server/tcp/tcp_test.go +++ b/metricbeat/helper/server/tcp/tcp_test.go @@ -43,6 +43,7 @@ func GetTestTcpServer(host string, port int) (server.Server, error) { receiveBufferSize: 1024, done: make(chan struct{}), eventQueue: make(chan server.Event), + delimiter: '\n', }, nil } @@ -62,7 +63,7 @@ func TestTcpServer(t *testing.T) { } defer svc.Stop() - writeToServer(t, "test1", host, port) + writeToServer(t, "test1\n", host, port) msg := <-svc.GetEvents() assert.True(t, msg.GetEvent() != nil)