Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix gzip support in socket_listener with tcp sockets #7446

Merged
merged 2 commits into from
May 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 61 additions & 1 deletion internal/content_coding.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,78 @@
package internal

import (
"bufio"
"bytes"
"compress/gzip"
"errors"
"io"
)

// NewStreamContentDecoder returns a reader that will decode the stream
// according to the encoding type.
func NewStreamContentDecoder(encoding string, r io.Reader) (io.Reader, error) {
switch encoding {
case "gzip":
return NewGzipReader(r)
case "identity", "":
return r, nil
default:
return nil, errors.New("invalid value for content_encoding")
}
}

// GzipReader is similar to gzip.Reader but reads only a single gzip stream per read.
type GzipReader struct {
r io.Reader
z *gzip.Reader
endOfStream bool
}

func NewGzipReader(r io.Reader) (io.Reader, error) {
// We need a read that implements ByteReader in order to line up the next
// stream.
br := bufio.NewReader(r)

// Reads the first gzip stream header.
z, err := gzip.NewReader(br)
if err != nil {
return nil, err
}

// Prevent future calls to Read from reading the following gzip header.
z.Multistream(false)

return &GzipReader{r: br, z: z}, nil
}

func (r *GzipReader) Read(b []byte) (int, error) {
if r.endOfStream {
// Reads the next gzip header and prepares for the next stream.
err := r.z.Reset(r.r)
if err != nil {
return 0, err
}
r.z.Multistream(false)
r.endOfStream = false
}

n, err := r.z.Read(b)

// Since multistream is disabled, io.EOF indicates the end of the gzip
// sequence. On the next read we must read the next gzip header.
if err == io.EOF {
r.endOfStream = true
return n, nil
}
return n, err

}

// NewContentEncoder returns a ContentEncoder for the encoding type.
func NewContentEncoder(encoding string) (ContentEncoder, error) {
switch encoding {
case "gzip":
return NewGzipEncoder()

case "identity", "":
return NewIdentityEncoder(), nil
default:
Expand Down
36 changes: 36 additions & 0 deletions internal/content_coding_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package internal

import (
"bytes"
"io/ioutil"
"testing"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -56,3 +58,37 @@ func TestIdentityEncodeDecode(t *testing.T) {

require.Equal(t, "howdy", string(actual))
}

func TestStreamIdentityDecode(t *testing.T) {
var r bytes.Buffer
n, err := r.Write([]byte("howdy"))
require.NoError(t, err)
require.Equal(t, 5, n)

dec, err := NewStreamContentDecoder("identity", &r)
require.NoError(t, err)

data, err := ioutil.ReadAll(dec)
require.NoError(t, err)

require.Equal(t, []byte("howdy"), data)
}

func TestStreamGzipDecode(t *testing.T) {
enc, err := NewGzipEncoder()
require.NoError(t, err)
written, err := enc.Encode([]byte("howdy"))
require.NoError(t, err)

w := bytes.NewBuffer(written)

dec, err := NewStreamContentDecoder("gzip", w)
require.NoError(t, err)

b := make([]byte, 10)
n, err := dec.Read(b)
require.NoError(t, err)
require.Equal(t, 5, n)

require.Equal(t, []byte("howdy"), b[:n])
}
33 changes: 15 additions & 18 deletions plugins/inputs/socket_listener/socket_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,12 @@ func (ssl *streamSocketListener) read(c net.Conn) {
defer ssl.removeConnection(c)
defer c.Close()

scnr := bufio.NewScanner(c)
decoder, err := internal.NewStreamContentDecoder(ssl.ContentEncoding, c)
if err != nil {
ssl.Log.Error("Read error: %v", err)
}

scnr := bufio.NewScanner(decoder)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We always wrap with decoder here. Wrapping makes sense to me for gzip, but not for identity. If identity, why not pass in c and not wrap at all?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the 'identity' case, internal.NewStreamContentDecoder is just returning the reader unwrapped. I thought it would be a bit nicer to push the switch logic into this factory function.

for {
if ssl.ReadTimeout != nil && ssl.ReadTimeout.Duration > 0 {
c.SetReadDeadline(time.Now().Add(ssl.ReadTimeout.Duration))
Expand All @@ -120,11 +125,7 @@ func (ssl *streamSocketListener) read(c net.Conn) {
break
}

body, err := ssl.decoder.Decode(scnr.Bytes())
if err != nil {
ssl.Log.Errorf("Unable to decode incoming line: %s", err.Error())
continue
}
body := scnr.Bytes()

metrics, err := ssl.Parse(body)
if err != nil {
Expand All @@ -149,6 +150,7 @@ func (ssl *streamSocketListener) read(c net.Conn) {
type packetSocketListener struct {
net.PacketConn
*SocketListener
decoder internal.ContentDecoder
}

func (psl *packetSocketListener) listen() {
Expand Down Expand Up @@ -196,7 +198,6 @@ type SocketListener struct {
parsers.Parser
telegraf.Accumulator
io.Closer
decoder internal.ContentDecoder
}

func (sl *SocketListener) Description() string {
Expand Down Expand Up @@ -283,12 +284,6 @@ func (sl *SocketListener) Start(acc telegraf.Accumulator) error {
protocol := spl[0]
addr := spl[1]

var err error
sl.decoder, err = internal.NewContentDecoder(sl.ContentEncoding)
if err != nil {
return err
}

if protocol == "unix" || protocol == "unixpacket" || protocol == "unixgram" {
// no good way of testing for "file does not exist".
// Instead just ignore error and blow up when we try to listen, which will
Expand All @@ -298,16 +293,12 @@ func (sl *SocketListener) Start(acc telegraf.Accumulator) error {

switch protocol {
case "tcp", "tcp4", "tcp6", "unix", "unixpacket":
var (
err error
l net.Listener
)

tlsCfg, err := sl.ServerConfig.TLSConfig()
if err != nil {
return err
}

var l net.Listener
if tlsCfg == nil {
l, err = net.Listen(protocol, addr)
} else {
Expand Down Expand Up @@ -344,6 +335,11 @@ func (sl *SocketListener) Start(acc telegraf.Accumulator) error {
ssl.listen()
}()
case "udp", "udp4", "udp6", "ip", "ip4", "ip6", "unixgram":
decoder, err := internal.NewContentDecoder(sl.ContentEncoding)
if err != nil {
return err
}

pc, err := udpListen(protocol, addr)
if err != nil {
return err
Expand Down Expand Up @@ -373,6 +369,7 @@ func (sl *SocketListener) Start(acc telegraf.Accumulator) error {
psl := &packetSocketListener{
PacketConn: pc,
SocketListener: sl,
decoder: decoder,
}

sl.Closer = psl
Expand Down
6 changes: 1 addition & 5 deletions plugins/inputs/socket_listener/socket_listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func TestSocketListenerDecode_udp(t *testing.T) {

func testSocketListener(t *testing.T, sl *SocketListener, client net.Conn) {
mstr12 := []byte("test,foo=bar v=1i 123456789\ntest,foo=baz v=2i 123456790\n")
mstr3 := []byte("test,foo=zab v=3i 123456791")
mstr3 := []byte("test,foo=zab v=3i 123456791\n")

if sl.ContentEncoding == "gzip" {
encoder, err := internal.NewContentEncoder(sl.ContentEncoding)
Expand All @@ -238,10 +238,6 @@ func testSocketListener(t *testing.T, sl *SocketListener, client net.Conn) {

client.Write(mstr12)
client.Write(mstr3)
if client.LocalAddr().Network() != "udp" {
// stream connection. needs trailing newline to terminate mstr3
client.Write([]byte{'\n'})
}

acc := sl.Accumulator.(*testutil.Accumulator)

Expand Down