From f0b97a6a59a2c99dc12aa7d90b260e6fae0fb5cf Mon Sep 17 00:00:00 2001 From: Daniel Nelson Date: Thu, 30 Apr 2020 18:06:41 -0700 Subject: [PATCH 1/2] Fix gzip support in socket_listener with tcp sockets --- internal/content_coding.go | 99 ++++++++++++++++++- internal/content_coding_test.go | 67 +++++++++++++ .../inputs/socket_listener/socket_listener.go | 33 +++---- .../socket_listener/socket_listener_test.go | 6 +- 4 files changed, 181 insertions(+), 24 deletions(-) diff --git a/internal/content_coding.go b/internal/content_coding.go index 936dd95620a58..b6a1706c76b15 100644 --- a/internal/content_coding.go +++ b/internal/content_coding.go @@ -1,18 +1,115 @@ package internal import ( + "bufio" "bytes" "compress/gzip" "errors" "io" ) +// NewStreamContentEncoder returns a writer that will encode the stream +// according to the encoding type. +func NewStreamContentEncoder(encoding string, w io.Writer) (io.Writer, error) { + switch encoding { + case "gzip": + return NewGzipWriter(w) + case "identity", "": + return w, nil + default: + return nil, errors.New("invalid value for content_encoding") + } +} + +// NewStreamContentDecoder returns a reader that will decode the stream +// according to the encoding type. +func NewStreamContentDecoder(encoding string, r io.Reader) (io.Reader, error) { + switch encoding { + case "gzip": + return NewGzipReader(r) + case "identity", "": + return r, nil + default: + return nil, errors.New("invalid value for content_encoding") + } +} + +// GzipWriter is similar to the gzip.Writer but flushes after each call to +// write, this ensure that the written data can be decompressed on the read +// size immediately. +type GzipWriter struct { + w io.Writer + z *gzip.Writer +} + +func NewGzipWriter(w io.Writer) (io.Writer, error) { + return &GzipWriter{w: w, z: new(gzip.Writer)}, nil +} + +func (w *GzipWriter) Write(p []byte) (int, error) { + w.z.Reset(w.w) + + n, err := w.z.Write(p) + if err != nil { + return n, err + } + + err = w.z.Close() + return n, err +} + +// GzipReader is similar to gzip.Reader but reads only a single gzip stream per read. +type GzipReader struct { + r io.Reader + z *gzip.Reader + endOfStream bool +} + +func NewGzipReader(r io.Reader) (io.Reader, error) { + // We need a read that implements ByteReader in order to line up the next + // stream. + br := bufio.NewReader(r) + + // Reads the first gzip stream header. + z, err := gzip.NewReader(br) + if err != nil { + return nil, err + } + + // Prevent future calls to Read from reading the following gzip header. + z.Multistream(false) + + return &GzipReader{r: br, z: z}, nil +} + +func (r *GzipReader) Read(b []byte) (int, error) { + if r.endOfStream { + // Reads the next gzip header and prepares for the next stream. + err := r.z.Reset(r.r) + if err != nil { + return 0, err + } + r.z.Multistream(false) + r.endOfStream = false + } + + n, err := r.z.Read(b) + + // Since multistream is disabled, io.EOF indicates the end of the gzip + // sequence. On the next read we must read the next gzip header. + if err == io.EOF { + r.endOfStream = true + return n, nil + } + return n, err + +} + // NewContentEncoder returns a ContentEncoder for the encoding type. func NewContentEncoder(encoding string) (ContentEncoder, error) { switch encoding { case "gzip": return NewGzipEncoder() - case "identity", "": return NewIdentityEncoder(), nil default: diff --git a/internal/content_coding_test.go b/internal/content_coding_test.go index 0316331127f3b..b2714b06fdbb7 100644 --- a/internal/content_coding_test.go +++ b/internal/content_coding_test.go @@ -1,6 +1,8 @@ package internal import ( + "bytes" + "io/ioutil" "testing" "github.com/stretchr/testify/require" @@ -56,3 +58,68 @@ func TestIdentityEncodeDecode(t *testing.T) { require.Equal(t, "howdy", string(actual)) } + +func TestStreamIdentityEncode(t *testing.T) { + var w bytes.Buffer + + enc, err := NewStreamContentEncoder("identity", &w) + require.NoError(t, err) + + n, err := enc.Write([]byte("howdy")) + require.NoError(t, err) + require.Equal(t, 5, n) + + require.Equal(t, []byte("howdy"), w.Bytes()) +} + +func TestStreamIdentityDecode(t *testing.T) { + var r bytes.Buffer + n, err := r.Write([]byte("howdy")) + require.NoError(t, err) + require.Equal(t, 5, n) + + dec, err := NewStreamContentDecoder("identity", &r) + require.NoError(t, err) + + data, err := ioutil.ReadAll(dec) + require.NoError(t, err) + + require.Equal(t, []byte("howdy"), data) +} + +func TestStreamGzipEncode(t *testing.T) { + var w bytes.Buffer + + enc, err := NewStreamContentEncoder("gzip", &w) + require.NoError(t, err) + + n, err := enc.Write([]byte("howdy")) + require.NoError(t, err) + require.Equal(t, 5, n) + + dec, err := NewGzipDecoder() + require.NoError(t, err) + + actual, err := dec.Decode(w.Bytes()) + require.NoError(t, err) + require.Equal(t, []byte("howdy"), actual) +} + +func TestStreamGzipDecode(t *testing.T) { + enc, err := NewGzipEncoder() + require.NoError(t, err) + written, err := enc.Encode([]byte("howdy")) + require.NoError(t, err) + + w := bytes.NewBuffer(written) + + dec, err := NewStreamContentDecoder("gzip", w) + require.NoError(t, err) + + b := make([]byte, 10) + n, err := dec.Read(b) + require.NoError(t, err) + require.Equal(t, 5, n) + + require.Equal(t, []byte("howdy"), b[:n]) +} diff --git a/plugins/inputs/socket_listener/socket_listener.go b/plugins/inputs/socket_listener/socket_listener.go index b1e9338510d73..d79030f664b2c 100644 --- a/plugins/inputs/socket_listener/socket_listener.go +++ b/plugins/inputs/socket_listener/socket_listener.go @@ -111,7 +111,12 @@ func (ssl *streamSocketListener) read(c net.Conn) { defer ssl.removeConnection(c) defer c.Close() - scnr := bufio.NewScanner(c) + decoder, err := internal.NewStreamContentDecoder(ssl.ContentEncoding, c) + if err != nil { + ssl.Log.Error("Read error: %v", err) + } + + scnr := bufio.NewScanner(decoder) for { if ssl.ReadTimeout != nil && ssl.ReadTimeout.Duration > 0 { c.SetReadDeadline(time.Now().Add(ssl.ReadTimeout.Duration)) @@ -120,11 +125,7 @@ func (ssl *streamSocketListener) read(c net.Conn) { break } - body, err := ssl.decoder.Decode(scnr.Bytes()) - if err != nil { - ssl.Log.Errorf("Unable to decode incoming line: %s", err.Error()) - continue - } + body := scnr.Bytes() metrics, err := ssl.Parse(body) if err != nil { @@ -149,6 +150,7 @@ func (ssl *streamSocketListener) read(c net.Conn) { type packetSocketListener struct { net.PacketConn *SocketListener + decoder internal.ContentDecoder } func (psl *packetSocketListener) listen() { @@ -196,7 +198,6 @@ type SocketListener struct { parsers.Parser telegraf.Accumulator io.Closer - decoder internal.ContentDecoder } func (sl *SocketListener) Description() string { @@ -283,12 +284,6 @@ func (sl *SocketListener) Start(acc telegraf.Accumulator) error { protocol := spl[0] addr := spl[1] - var err error - sl.decoder, err = internal.NewContentDecoder(sl.ContentEncoding) - if err != nil { - return err - } - if protocol == "unix" || protocol == "unixpacket" || protocol == "unixgram" { // no good way of testing for "file does not exist". // Instead just ignore error and blow up when we try to listen, which will @@ -298,16 +293,12 @@ func (sl *SocketListener) Start(acc telegraf.Accumulator) error { switch protocol { case "tcp", "tcp4", "tcp6", "unix", "unixpacket": - var ( - err error - l net.Listener - ) - tlsCfg, err := sl.ServerConfig.TLSConfig() if err != nil { return err } + var l net.Listener if tlsCfg == nil { l, err = net.Listen(protocol, addr) } else { @@ -344,6 +335,11 @@ func (sl *SocketListener) Start(acc telegraf.Accumulator) error { ssl.listen() }() case "udp", "udp4", "udp6", "ip", "ip4", "ip6", "unixgram": + decoder, err := internal.NewContentDecoder(sl.ContentEncoding) + if err != nil { + return err + } + pc, err := udpListen(protocol, addr) if err != nil { return err @@ -373,6 +369,7 @@ func (sl *SocketListener) Start(acc telegraf.Accumulator) error { psl := &packetSocketListener{ PacketConn: pc, SocketListener: sl, + decoder: decoder, } sl.Closer = psl diff --git a/plugins/inputs/socket_listener/socket_listener_test.go b/plugins/inputs/socket_listener/socket_listener_test.go index c6adf4cdebe7f..a46add15cf61b 100644 --- a/plugins/inputs/socket_listener/socket_listener_test.go +++ b/plugins/inputs/socket_listener/socket_listener_test.go @@ -222,7 +222,7 @@ func TestSocketListenerDecode_udp(t *testing.T) { func testSocketListener(t *testing.T, sl *SocketListener, client net.Conn) { mstr12 := []byte("test,foo=bar v=1i 123456789\ntest,foo=baz v=2i 123456790\n") - mstr3 := []byte("test,foo=zab v=3i 123456791") + mstr3 := []byte("test,foo=zab v=3i 123456791\n") if sl.ContentEncoding == "gzip" { encoder, err := internal.NewContentEncoder(sl.ContentEncoding) @@ -238,10 +238,6 @@ func testSocketListener(t *testing.T, sl *SocketListener, client net.Conn) { client.Write(mstr12) client.Write(mstr3) - if client.LocalAddr().Network() != "udp" { - // stream connection. needs trailing newline to terminate mstr3 - client.Write([]byte{'\n'}) - } acc := sl.Accumulator.(*testutil.Accumulator) From bc9e35e72d2b42b283702847210c364ef44028e2 Mon Sep 17 00:00:00 2001 From: Daniel Nelson Date: Mon, 4 May 2020 15:41:42 -0700 Subject: [PATCH 2/2] Remove GzipWriter since it isn't needed yet --- internal/content_coding.go | 37 --------------------------------- internal/content_coding_test.go | 31 --------------------------- 2 files changed, 68 deletions(-) diff --git a/internal/content_coding.go b/internal/content_coding.go index b6a1706c76b15..daefa20eea633 100644 --- a/internal/content_coding.go +++ b/internal/content_coding.go @@ -8,19 +8,6 @@ import ( "io" ) -// NewStreamContentEncoder returns a writer that will encode the stream -// according to the encoding type. -func NewStreamContentEncoder(encoding string, w io.Writer) (io.Writer, error) { - switch encoding { - case "gzip": - return NewGzipWriter(w) - case "identity", "": - return w, nil - default: - return nil, errors.New("invalid value for content_encoding") - } -} - // NewStreamContentDecoder returns a reader that will decode the stream // according to the encoding type. func NewStreamContentDecoder(encoding string, r io.Reader) (io.Reader, error) { @@ -34,30 +21,6 @@ func NewStreamContentDecoder(encoding string, r io.Reader) (io.Reader, error) { } } -// GzipWriter is similar to the gzip.Writer but flushes after each call to -// write, this ensure that the written data can be decompressed on the read -// size immediately. -type GzipWriter struct { - w io.Writer - z *gzip.Writer -} - -func NewGzipWriter(w io.Writer) (io.Writer, error) { - return &GzipWriter{w: w, z: new(gzip.Writer)}, nil -} - -func (w *GzipWriter) Write(p []byte) (int, error) { - w.z.Reset(w.w) - - n, err := w.z.Write(p) - if err != nil { - return n, err - } - - err = w.z.Close() - return n, err -} - // GzipReader is similar to gzip.Reader but reads only a single gzip stream per read. type GzipReader struct { r io.Reader diff --git a/internal/content_coding_test.go b/internal/content_coding_test.go index b2714b06fdbb7..85496df59c5b6 100644 --- a/internal/content_coding_test.go +++ b/internal/content_coding_test.go @@ -59,19 +59,6 @@ func TestIdentityEncodeDecode(t *testing.T) { require.Equal(t, "howdy", string(actual)) } -func TestStreamIdentityEncode(t *testing.T) { - var w bytes.Buffer - - enc, err := NewStreamContentEncoder("identity", &w) - require.NoError(t, err) - - n, err := enc.Write([]byte("howdy")) - require.NoError(t, err) - require.Equal(t, 5, n) - - require.Equal(t, []byte("howdy"), w.Bytes()) -} - func TestStreamIdentityDecode(t *testing.T) { var r bytes.Buffer n, err := r.Write([]byte("howdy")) @@ -87,24 +74,6 @@ func TestStreamIdentityDecode(t *testing.T) { require.Equal(t, []byte("howdy"), data) } -func TestStreamGzipEncode(t *testing.T) { - var w bytes.Buffer - - enc, err := NewStreamContentEncoder("gzip", &w) - require.NoError(t, err) - - n, err := enc.Write([]byte("howdy")) - require.NoError(t, err) - require.Equal(t, 5, n) - - dec, err := NewGzipDecoder() - require.NoError(t, err) - - actual, err := dec.Decode(w.Bytes()) - require.NoError(t, err) - require.Equal(t, []byte("howdy"), actual) -} - func TestStreamGzipDecode(t *testing.T) { enc, err := NewGzipEncoder() require.NoError(t, err)