Skip to content

Commit

Permalink
Allow TCP helper to support delimiters (elastic#8278)
Browse files Browse the repository at this point in the history
(cherry picked from commit eeb1d3b)
  • Loading branch information
vjsamuel authored and jsoriano committed Sep 13, 2018
1 parent e6220c2 commit 872989f
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 15 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ https://github.com/elastic/beats/compare/v6.4.0...6.x[Check the HEAD diff]
- Add fields for memory fragmentation, memory allocator stats, copy on write, master-slave status, and active defragmentation to `info` metricset of Redis module. {pull}7695[7695]
- Increase ignore_above for system.process.cmdline to 2048. {pull}8101[8100]
- Add support to renamed fields planned for redis 5.0. {pull}8167[8167]
- Allow TCP helper to support delimiters. {pull}8278[8278]


- Add `metrics` metricset to MongoDB module. {pull}7611[7611]
Expand Down
15 changes: 14 additions & 1 deletion metricbeat/helper/server/tcp/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,29 @@

package tcp

import "fmt"

type TcpConfig struct {
Host string `config:"host"`
Port int `config:"port"`
ReceiveBufferSize int `config:"receive_buffer_size"`
Delimiter string `config:"delimiter"`
}

func defaultTcpConfig() TcpConfig {
return TcpConfig{
Host: "localhost",
Port: 2003,
ReceiveBufferSize: 1024,
ReceiveBufferSize: 4096,
Delimiter: "\n",
}
}

// Validate ensures that the configured delimiter has only one character
func (t *TcpConfig) Validate() error {
if len(t.Delimiter) != 1 {
return fmt.Errorf("length of delimiter is expected to be 1 but is %v", len(t.Delimiter))
}

return nil
}
52 changes: 39 additions & 13 deletions metricbeat/helper/server/tcp/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package tcp

import (
"bufio"
"fmt"
"net"

Expand All @@ -35,6 +36,7 @@ type TcpServer struct {
receiveBufferSize int
done chan struct{}
eventQueue chan server.Event
delimiter byte
}

type TcpEvent struct {
Expand Down Expand Up @@ -67,6 +69,7 @@ func NewTcpServer(base mb.BaseMetricSet) (server.Server, error) {
receiveBufferSize: config.ReceiveBufferSize,
done: make(chan struct{}),
eventQueue: make(chan server.Event),
delimiter: byte(config.Delimiter[0]),
}, nil
}

Expand All @@ -83,7 +86,6 @@ func (g *TcpServer) Start() error {
}

func (g *TcpServer) watchMetrics() {
buffer := make([]byte, g.receiveBufferSize)
for {
select {
case <-g.done:
Expand All @@ -96,22 +98,46 @@ func (g *TcpServer) watchMetrics() {
logp.Err("Unable to accept connection due to error: %v", err)
continue
}
defer func() {
if conn != nil {
conn.Close()
}
}()

length, err := conn.Read(buffer)
if conn != nil {
go g.handle(conn)
}
}
}

func (g *TcpServer) handle(conn net.Conn) {
logp.Debug("tcp", "Handling new connection...")

// Close connection when this function ends
defer func() {
conn.Close()
}()

// Get a new reader with buffer size as the same as receiveBufferSize
bufReader := bufio.NewReaderSize(conn, g.receiveBufferSize)

for {
// Read tokens delimited by delimiter
bytes, err := bufReader.ReadBytes(g.delimiter)
if err != nil {
logp.Err("Error reading from buffer: %v", err.Error())
continue
logp.Debug("tcp", "unable to read bytes due to error: %v", err)
return
}

// Truncate to max buffer size if too big of a payload
if len(bytes) > g.receiveBufferSize {
bytes = bytes[:g.receiveBufferSize]
}
g.eventQueue <- &TcpEvent{
event: common.MapStr{
server.EventDataKey: buffer[:length],
},

// Drop the delimiter and send the data
if len(bytes) > 0 {
g.eventQueue <- &TcpEvent{
event: common.MapStr{
server.EventDataKey: bytes[:len(bytes)-1],
},
}
}

}
}

Expand Down
3 changes: 2 additions & 1 deletion metricbeat/helper/server/tcp/tcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func GetTestTcpServer(host string, port int) (server.Server, error) {
receiveBufferSize: 1024,
done: make(chan struct{}),
eventQueue: make(chan server.Event),
delimiter: '\n',
}, nil
}

Expand All @@ -62,7 +63,7 @@ func TestTcpServer(t *testing.T) {
}

defer svc.Stop()
writeToServer(t, "test1", host, port)
writeToServer(t, "test1\n", host, port)
msg := <-svc.GetEvents()

assert.True(t, msg.GetEvent() != nil)
Expand Down

0 comments on commit 872989f

Please sign in to comment.