From bfb5a2a15ec0af887c2c888ff245130ba7eddaf6 Mon Sep 17 00:00:00 2001 From: Michael Hobbs Date: Mon, 10 Apr 2017 14:36:10 -0700 Subject: [PATCH 1/9] make test target more flexible --- Makefile | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/Makefile b/Makefile index 212d78bb..e9765635 100644 --- a/Makefile +++ b/Makefile @@ -2,12 +2,17 @@ NAME=logspout VERSION=$(shell cat VERSION) +# max image size of 40MB +MAX_IMAGE_SIZE := 40000000 + ifeq ($(shell uname), Darwin) XARGS_ARG="-L1" endif GOLINT := go list ./... | egrep -v '/custom/|/vendor/' | xargs $(XARGS_ARG) golint | egrep -v 'extpoints.go|types.go' -# max image size of 40MB -MAX_IMAGE_SIZE := 40000000 +TEST_MODULES ?= $(shell go list ./... | egrep -v '/vendor|/custom') +ifdef TEST_RUN + TESTRUN := -run ${TEST_RUN} +endif build-dev: docker build -f Dockerfile.dev -t $(NAME):dev . @@ -37,7 +42,10 @@ test: build-dev docker run \ -v /var/run/docker.sock:/var/run/docker.sock \ -v $(PWD):/go/src/github.com/gliderlabs/logspout \ - $(NAME):dev go test -v ./router/... + $(NAME):dev make -e test-direct + +test-direct: + go test -p 1 -v -race $(TEST_MODULES) $(TESTRUN) test-image-size: @if [ $(shell docker inspect -f '{{ .Size }}' $(NAME):$(VERSION)) -gt $(MAX_IMAGE_SIZE) ]; then \ From 68e309465d2e688acd8c93c19a99da920d6cca09 Mon Sep 17 00:00:00 2001 From: Michael Hobbs Date: Tue, 11 Apr 2017 21:49:43 -0700 Subject: [PATCH 2/9] attempt to preserve buffer on reconnect() --- adapters/syslog/syslog.go | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/adapters/syslog/syslog.go b/adapters/syslog/syslog.go index e89479e0..f24ad101 100644 --- a/adapters/syslog/syslog.go +++ b/adapters/syslog/syslog.go @@ -120,16 +120,14 @@ func (a *Adapter) Stream(logstream chan *router.Message) { log.Println("syslog:", err) return } - _, err = a.conn.Write(buf) - if err != nil { + if _, err = a.conn.Write(buf); err != nil { log.Println("syslog:", err) switch a.conn.(type) { case *net.UDPConn: continue default: - err = a.retry(buf, err) - if err != nil { - log.Println("syslog:", err) + if err = a.retry(buf, err); err != nil { + log.Println("syslog retry err:", err) return } } @@ -140,14 +138,20 @@ func (a *Adapter) Stream(logstream chan *router.Message) { func (a *Adapter) retry(buf []byte, err error) error { if opError, ok := err.(*net.OpError); ok { if opError.Temporary() || opError.Timeout() { - retryErr := a.retryTemporary(buf) - if retryErr == nil { + if retryErr := a.retryTemporary(buf); retryErr == nil { return nil } } } - - return a.reconnect() + if reconnErr := a.reconnect(); reconnErr != nil { + return reconnErr + } + if _, err = a.conn.Write(buf); err != nil { + log.Println("syslog: reconnect failed") + return err + } + log.Println("syslog: reconnect successful") + return nil } func (a *Adapter) retryTemporary(buf []byte) error { @@ -177,16 +181,13 @@ func (a *Adapter) reconnect() error { if err != nil { return err } - a.conn = conn return nil }, retryCount) if err != nil { - log.Println("syslog: reconnect failed") return err } - return nil } From d12e81e65585aa91011301697f20157edb172b81 Mon Sep 17 00:00:00 2001 From: Michael Hobbs Date: Wed, 12 Apr 2017 12:46:11 -0700 Subject: [PATCH 3/9] race detector for alpine is broken. disable it for now --- Makefile | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/Makefile b/Makefile index e9765635..2fde7469 100644 --- a/Makefile +++ b/Makefile @@ -10,6 +10,8 @@ ifeq ($(shell uname), Darwin) endif GOLINT := go list ./... | egrep -v '/custom/|/vendor/' | xargs $(XARGS_ARG) golint | egrep -v 'extpoints.go|types.go' TEST_MODULES ?= $(shell go list ./... | egrep -v '/vendor|/custom') +TEST_ARGS ?= -race + ifdef TEST_RUN TESTRUN := -run ${TEST_RUN} endif @@ -42,10 +44,11 @@ test: build-dev docker run \ -v /var/run/docker.sock:/var/run/docker.sock \ -v $(PWD):/go/src/github.com/gliderlabs/logspout \ + -e TEST_ARGS="" \ $(NAME):dev make -e test-direct test-direct: - go test -p 1 -v -race $(TEST_MODULES) $(TESTRUN) + go test -p 1 -v $(TEST_ARGS) $(TEST_MODULES) $(TESTRUN) test-image-size: @if [ $(shell docker inspect -f '{{ .Size }}' $(NAME):$(VERSION)) -gt $(MAX_IMAGE_SIZE) ]; then \ From 43c0db002ae867dfc98ea1c5d191f78bd0d36e05 Mon Sep 17 00:00:00 2001 From: Michael Hobbs Date: Wed, 12 Apr 2017 12:46:25 -0700 Subject: [PATCH 4/9] add test for reconnect() --- adapters/syslog/syslog_test.go | 224 ++++++++++++++++++++++++++++++++- 1 file changed, 219 insertions(+), 5 deletions(-) diff --git a/adapters/syslog/syslog_test.go b/adapters/syslog/syslog_test.go index f8c49625..e6ac923f 100644 --- a/adapters/syslog/syslog_test.go +++ b/adapters/syslog/syslog_test.go @@ -1,22 +1,236 @@ package syslog import ( + "fmt" + "net" "os" "strconv" + "strings" + "sync" "testing" + "text/template" + "time" + + docker "github.com/fsouza/go-dockerclient" + "github.com/gliderlabs/logspout/router" + + _ "github.com/gliderlabs/logspout/transports/tcp" + _ "github.com/gliderlabs/logspout/transports/tls" + _ "github.com/gliderlabs/logspout/transports/udp" ) -func TestSyslogRetryCount(t *testing.T) { - setRetryCount() - if retryCount != defaultRetryCount { - t.Errorf("expected %v got %v", defaultRetryCount, retryCount) +const ( + closeOnMsgIdx = 5 + maxMsgCount = 10 + testPriority = "{{.Priority}}" + testTimestamp = "{{.Timestamp}}" + testHostname = "{{.Container.Config.Hostname}}" + testTag = "{{.ContainerName}}" + testPid = "{{.Container.State.Pid}}" + testData = "{{.Data}}" +) + +var ( + container = &docker.Container{ + ID: "8dfafdbc3a40", + Name: "0michaelshobbs", + Config: &docker.Config{ + Hostname: "8dfafdbc3a40", + }, } + testTmplStr = fmt.Sprintf("<%s>%s %s %s[%s]: %s\n", + testPriority, testTimestamp, testHostname, testTag, testPid, testData) +) + +type localTCPServer struct { + lnmu sync.RWMutex + net.Listener +} + +func (ls *localTCPServer) teardown() error { + ls.lnmu.Lock() + if ls.Listener != nil { + ls.Listener.Close() + ls.Listener = nil + } + ls.lnmu.Unlock() + return nil +} +func TestSyslogRetryCount(t *testing.T) { newRetryCount := uint(20) os.Setenv("RETRY_COUNT", strconv.Itoa(int(newRetryCount))) - defer os.Unsetenv("RETRY_COUNT") setRetryCount() if retryCount != newRetryCount { t.Errorf("expected %v got %v", newRetryCount, retryCount) } + + os.Unsetenv("RETRY_COUNT") + setRetryCount() + if retryCount != defaultRetryCount { + t.Errorf("expected %v got %v", defaultRetryCount, retryCount) + } +} + +func TestSyslogReconnectOnClose(t *testing.T) { + os.Setenv("RETRY_COUNT", strconv.Itoa(int(1))) + setRetryCount() + defer func() { + os.Unsetenv("RETRY_COUNT") + setRetryCount() + }() + tmpl, err := template.New("syslog").Parse(testTmplStr) + if err != nil { + t.Fatal(err) + } + ls, err := newLocalTCPServer() + if err != nil { + t.Fatal(err) + } + defer ls.teardown() + + route := &router.Route{ + ID: "0", + Adapter: "syslog", + Address: ls.Listener.Addr().String(), + } + transport, found := router.AdapterTransports.Lookup(route.AdapterTransport("tcp")) + if !found { + t.Errorf("bad transport: " + route.Adapter) + } + + datac := make(chan []byte, maxMsgCount) + errc := make(chan error, 1) + go acceptAndCloseConn(ls, datac, errc) + + // Dial connection for adapter + conn, err := net.Dial(ls.Listener.Addr().Network(), ls.Listener.Addr().String()) + if err != nil { + t.Fatal(err) + } + + adapter := &Adapter{ + route: route, + conn: conn, + tmpl: tmpl, + transport: transport, + } + + logstream := make(chan *router.Message) + done := make(chan bool) + sentMsgs := [][]byte{} + // Send msgs to logstream + go sendLogstream(logstream, done, &sentMsgs, tmpl) + + // Stream logstream to conn + go adapter.Stream(logstream) + + for err := range errc { + t.Errorf("%v", err) + } + + readMsgs := [][]byte{} + for { + select { + case <-done: + if maxMsgCount-1 != len(datac) { + t.Errorf("expected %v got %v", maxMsgCount-1, len(datac)) + } + for msg := range datac { + readMsgs = append(readMsgs, msg) + } + sentMsgs = append(sentMsgs[:closeOnMsgIdx], sentMsgs[closeOnMsgIdx+1:]...) + for i, v := range sentMsgs { + sent := strings.Trim(fmt.Sprintf("%s", v), "\n") + read := strings.Trim(fmt.Sprintf("%s", readMsgs[i]), "\x00\n") + if sent != read { + t.Errorf("expected %+q got %+q", sent, read) + } + } + } + return + } +} + +func newLocalTCPServer() (*localTCPServer, error) { + ln, err := newLocalListener() + if err != nil { + return nil, err + } + return &localTCPServer{Listener: ln}, nil +} + +func newLocalListener() (net.Listener, error) { + ln, err := net.Listen("tcp4", "127.0.0.1:0") + if err != nil { + return nil, err + } + return ln, nil +} + +func acceptAndCloseConn(ls *localTCPServer, datac chan []byte, errc chan error) { + defer func() { + close(datac) + close(errc) + }() + c, err := ls.Accept() + if err != nil { + errc <- err + return + } + count := 0 + for { + switch count { + case maxMsgCount - closeOnMsgIdx: + c.Close() + c, err = ls.Accept() + if err != nil { + errc <- err + return + } + c.SetReadDeadline(time.Now().Add(5 * time.Second)) + readConn(c, datac) + count++ + case maxMsgCount: + return + default: + readConn(c, datac) + count++ + } + } +} + +func readConn(c net.Conn, ch chan []byte) error { + b := make([]byte, 256) + _, err := c.Read(b) + if err != nil { + return err + } + ch <- b + return nil +} + +func sendLogstream(logstream chan *router.Message, done chan bool, msgs *[][]byte, tmpl *template.Template) { + defer func() { + close(logstream) + close(done) + }() + var count int + for { + if count == maxMsgCount { + done <- true + return + } + msg := &router.Message{ + Container: container, + Data: "hellooo", + Time: time.Now(), + } + m := &Message{msg} + buf, _ := m.Render(tmpl) + *msgs = append(*msgs, buf) + logstream <- msg + count++ + time.Sleep(250 * time.Millisecond) + } } From 00668f8f69f316042313484577768e8fb141e53a Mon Sep 17 00:00:00 2001 From: Michael Hobbs Date: Thu, 13 Apr 2017 09:21:26 -0700 Subject: [PATCH 5/9] use mock conn for syslog tests --- adapters/syslog/syslog_test.go | 111 +++-------------------- testutil/mockconn.go | 155 +++++++++++++++++++++++++++++++++ testutil/testutil.go | 98 +++++++++++++++++++++ testutil/transport.go | 13 +++ 4 files changed, 279 insertions(+), 98 deletions(-) create mode 100644 testutil/mockconn.go create mode 100644 testutil/testutil.go create mode 100644 testutil/transport.go diff --git a/adapters/syslog/syslog_test.go b/adapters/syslog/syslog_test.go index e6ac923f..5f2b3f31 100644 --- a/adapters/syslog/syslog_test.go +++ b/adapters/syslog/syslog_test.go @@ -2,17 +2,16 @@ package syslog import ( "fmt" - "net" "os" "strconv" "strings" - "sync" "testing" "text/template" "time" docker "github.com/fsouza/go-dockerclient" "github.com/gliderlabs/logspout/router" + "github.com/gliderlabs/logspout/testutil" _ "github.com/gliderlabs/logspout/transports/tcp" _ "github.com/gliderlabs/logspout/transports/tls" @@ -20,8 +19,6 @@ import ( ) const ( - closeOnMsgIdx = 5 - maxMsgCount = 10 testPriority = "{{.Priority}}" testTimestamp = "{{.Timestamp}}" testHostname = "{{.Container.Config.Hostname}}" @@ -42,21 +39,6 @@ var ( testPriority, testTimestamp, testHostname, testTag, testPid, testData) ) -type localTCPServer struct { - lnmu sync.RWMutex - net.Listener -} - -func (ls *localTCPServer) teardown() error { - ls.lnmu.Lock() - if ls.Listener != nil { - ls.Listener.Close() - ls.Listener = nil - } - ls.lnmu.Unlock() - return nil -} - func TestSyslogRetryCount(t *testing.T) { newRetryCount := uint(20) os.Setenv("RETRY_COUNT", strconv.Itoa(int(newRetryCount))) @@ -83,37 +65,27 @@ func TestSyslogReconnectOnClose(t *testing.T) { if err != nil { t.Fatal(err) } - ls, err := newLocalTCPServer() + ls, err := testutil.NewLocalTCPServer() if err != nil { t.Fatal(err) } - defer ls.teardown() + defer ls.Teardown() route := &router.Route{ ID: "0", Adapter: "syslog", Address: ls.Listener.Addr().String(), } - transport, found := router.AdapterTransports.Lookup(route.AdapterTransport("tcp")) - if !found { - t.Errorf("bad transport: " + route.Adapter) - } - datac := make(chan []byte, maxMsgCount) + datac := make(chan []byte, testutil.MaxMsgCount) errc := make(chan error, 1) - go acceptAndCloseConn(ls, datac, errc) - - // Dial connection for adapter - conn, err := net.Dial(ls.Listener.Addr().Network(), ls.Listener.Addr().String()) - if err != nil { - t.Fatal(err) - } + go testutil.AcceptAndCloseConn(ls, datac, errc) adapter := &Adapter{ route: route, - conn: conn, + conn: testutil.Dial(ls), tmpl: tmpl, - transport: transport, + transport: testutil.MockTransport{Listener: ls}, } logstream := make(chan *router.Message) @@ -125,6 +97,7 @@ func TestSyslogReconnectOnClose(t *testing.T) { // Stream logstream to conn go adapter.Stream(logstream) + // Check for errs from goroutines for err := range errc { t.Errorf("%v", err) } @@ -133,13 +106,13 @@ func TestSyslogReconnectOnClose(t *testing.T) { for { select { case <-done: - if maxMsgCount-1 != len(datac) { - t.Errorf("expected %v got %v", maxMsgCount-1, len(datac)) + if testutil.MaxMsgCount-1 != len(datac) { + t.Errorf("expected %v got %v", testutil.MaxMsgCount-1, len(datac)) } for msg := range datac { readMsgs = append(readMsgs, msg) } - sentMsgs = append(sentMsgs[:closeOnMsgIdx], sentMsgs[closeOnMsgIdx+1:]...) + sentMsgs = append(sentMsgs[:testutil.CloseOnMsgIdx], sentMsgs[testutil.CloseOnMsgIdx+1:]...) for i, v := range sentMsgs { sent := strings.Trim(fmt.Sprintf("%s", v), "\n") read := strings.Trim(fmt.Sprintf("%s", readMsgs[i]), "\x00\n") @@ -152,64 +125,6 @@ func TestSyslogReconnectOnClose(t *testing.T) { } } -func newLocalTCPServer() (*localTCPServer, error) { - ln, err := newLocalListener() - if err != nil { - return nil, err - } - return &localTCPServer{Listener: ln}, nil -} - -func newLocalListener() (net.Listener, error) { - ln, err := net.Listen("tcp4", "127.0.0.1:0") - if err != nil { - return nil, err - } - return ln, nil -} - -func acceptAndCloseConn(ls *localTCPServer, datac chan []byte, errc chan error) { - defer func() { - close(datac) - close(errc) - }() - c, err := ls.Accept() - if err != nil { - errc <- err - return - } - count := 0 - for { - switch count { - case maxMsgCount - closeOnMsgIdx: - c.Close() - c, err = ls.Accept() - if err != nil { - errc <- err - return - } - c.SetReadDeadline(time.Now().Add(5 * time.Second)) - readConn(c, datac) - count++ - case maxMsgCount: - return - default: - readConn(c, datac) - count++ - } - } -} - -func readConn(c net.Conn, ch chan []byte) error { - b := make([]byte, 256) - _, err := c.Read(b) - if err != nil { - return err - } - ch <- b - return nil -} - func sendLogstream(logstream chan *router.Message, done chan bool, msgs *[][]byte, tmpl *template.Template) { defer func() { close(logstream) @@ -217,7 +132,7 @@ func sendLogstream(logstream chan *router.Message, done chan bool, msgs *[][]byt }() var count int for { - if count == maxMsgCount { + if count == testutil.MaxMsgCount { done <- true return } @@ -231,6 +146,6 @@ func sendLogstream(logstream chan *router.Message, done chan bool, msgs *[][]byt *msgs = append(*msgs, buf) logstream <- msg count++ - time.Sleep(250 * time.Millisecond) + time.Sleep(1000 * time.Millisecond) } } diff --git a/testutil/mockconn.go b/testutil/mockconn.go new file mode 100644 index 00000000..000bdc6e --- /dev/null +++ b/testutil/mockconn.go @@ -0,0 +1,155 @@ +package testutil + +import ( + "fmt" + "net" + "sync" + "time" +) + +var ( + connIsClosed = &mutexBool{ + mutex: &sync.Mutex{}, + state: false, + } + connIsReset = &mutexBool{ + mutex: &sync.Mutex{}, + state: false, + } +) + +type mutexBool struct { + mutex *sync.Mutex + state bool +} + +// Conn facilitates testing by providing two connected ReadWriteClosers +// each of which can be used in place of a net.Conn +type Conn struct { + Server *End + Client *End +} + +// Close closes server/client pipes +func (c *Conn) Close() error { + if err := c.Server.Close(); err != nil { + return err + } + if err := c.Client.Close(); err != nil { + return err + } + return nil +} + +// NewConn returns a new testutil.Conn +func NewConn() *Conn { + // A connection consists of two pipes: + // Client | Server + // writes ===> reads + // reads <=== writes + + serverRead, clientWrite := net.Pipe() + clientRead, serverWrite := net.Pipe() + + return &Conn{ + Server: &End{ + Reader: serverRead, + Writer: serverWrite, + }, + Client: &End{ + Reader: clientRead, + Writer: clientWrite, + }, + } +} + +// End is one 'end' of a simulated connection. +type End struct { + Reader net.Conn + Writer net.Conn +} + +// type End struct { +// Reader *io.PipeReader +// Writer *io.PipeWriter +// } + +// CloseTest simulates closing a net.Conn +func (e End) CloseTest() { + connIsClosed.mutex.Lock() + connIsClosed.state = true + connIsClosed.mutex.Unlock() + return +} + +// Close closes the Reader/Writer pipes +func (e End) Close() error { + if err := e.Writer.Close(); err != nil { + return err + } + if err := e.Reader.Close(); err != nil { + return err + } + return nil +} + +func (e End) Read(data []byte) (n int, err error) { + e.Reader.SetReadDeadline(time.Now().Add(5 * time.Second)) + return e.Reader.Read(data) +} +func (e End) Write(data []byte) (n int, err error) { + connIsClosed.mutex.Lock() + defer connIsClosed.mutex.Unlock() + if connIsClosed.state { + connIsClosed.state = false + return 0, &net.OpError{ + Op: "write", + Net: e.RemoteAddr().Network(), + Source: e.LocalAddr(), + Addr: e.RemoteAddr(), + Err: fmt.Errorf("write: broken pipe"), + } + } + return e.Writer.Write(data) +} + +// LocalAddr satisfies the net.Conn interface +func (e End) LocalAddr() net.Addr { + return Addr{ + NetworkString: "tcp", + AddrString: "127.0.0.1", + } +} + +// RemoteAddr satisfies the net.Conn interface +func (e End) RemoteAddr() net.Addr { + return Addr{ + NetworkString: "tcp", + AddrString: "127.0.0.1", + } +} + +// SetDeadline satisfies the net.Conn interface +func (e End) SetDeadline(t time.Time) error { return nil } + +// SetReadDeadline satisfies the net.Conn interface +func (e End) SetReadDeadline(t time.Time) error { return nil } + +// SetWriteDeadline satisfies the net.Conn interface +func (e End) SetWriteDeadline(t time.Time) error { return nil } + +// Addr is a fake network interface which implements the net.Addr interface +type Addr struct { + NetworkString string + AddrString string +} + +// Network satisfies the net.Addr interface +func (a Addr) Network() string { + return a.NetworkString +} + +// Network satisfies the net.Addr interface +func (a Addr) String() string { + return a.AddrString +} diff --git a/testutil/testutil.go b/testutil/testutil.go new file mode 100644 index 00000000..c32f4797 --- /dev/null +++ b/testutil/testutil.go @@ -0,0 +1,98 @@ +package testutil + +import ( + "net" + "sync" +) + +// Test constants +const ( + CloseOnMsgIdx = 5 + MaxMsgCount = 10 +) + +// LocalTCPServer tcp server for testing specific network errors +type LocalTCPServer struct { + lnmu sync.RWMutex + MockConn *Conn + net.Listener +} + +// Accept returns server side of the LocalTCPServer MockConn +func (ls *LocalTCPServer) Accept() (*End, error) { + return ls.MockConn.Server, nil +} + +// Teardown locks and tears down a LocalTCPServer +func (ls *LocalTCPServer) Teardown() error { + ls.lnmu.Lock() + if ls.Listener != nil { + ls.Listener.Close() + ls.Listener = nil + } + ls.lnmu.Unlock() + return nil +} + +// NewLocalTCPServer return a new LocalTCPServer +func NewLocalTCPServer() (*LocalTCPServer, error) { + ln, err := newLocalListener() + if err != nil { + return nil, err + } + return &LocalTCPServer{Listener: ln, MockConn: NewConn()}, nil +} + +func newLocalListener() (net.Listener, error) { + ln, err := net.Listen("tcp4", "127.0.0.1:0") + if err != nil { + return nil, err + } + return ln, nil +} + +// Dial returns the client side of our MockConn +func Dial(ls *LocalTCPServer) net.Conn { + return ls.MockConn.Client +} + +// AcceptAndCloseConn opens the listener side of a server, reads data and closes the connection after CloseOnMsgIdx +func AcceptAndCloseConn(ls *LocalTCPServer, datac chan []byte, errc chan error) { + defer func() { + close(datac) + close(errc) + }() + c, err := ls.Accept() + if err != nil { + errc <- err + return + } + + count := 0 + for { + switch count { + case MaxMsgCount - CloseOnMsgIdx: + c.CloseTest() + readConn(count, c, datac) + count++ + case MaxMsgCount: + return + default: + readConn(count, c, datac) + count++ + } + } +} + +func readConn(count int, c net.Conn, ch chan []byte) error { + b := make([]byte, 256) + _, err := c.Read(b) + if err != nil { + return err + } + // Simulate real-life network drop + if count != MaxMsgCount-CloseOnMsgIdx { + ch <- b + } + return nil +} diff --git a/testutil/transport.go b/testutil/transport.go new file mode 100644 index 00000000..a78de9bc --- /dev/null +++ b/testutil/transport.go @@ -0,0 +1,13 @@ +package testutil + +import "net" + +// MockTransport allows us to dial our test listener +type MockTransport struct { + Listener *LocalTCPServer +} + +// Dial always returns the client from our test listener +func (mt MockTransport) Dial(addr string, opt map[string]string) (net.Conn, error) { + return mt.Listener.MockConn.Client, nil +} From ea5f535b86ddd16a7aa6aae2c9617d57ca52e969 Mon Sep 17 00:00:00 2001 From: Michael Hobbs Date: Thu, 13 Apr 2017 10:17:12 -0700 Subject: [PATCH 6/9] make vet more reliable --- Makefile | 6 +++--- adapters/syslog/syslog_test.go | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/Makefile b/Makefile index 2fde7469..e0fc64e5 100644 --- a/Makefile +++ b/Makefile @@ -8,8 +8,8 @@ MAX_IMAGE_SIZE := 40000000 ifeq ($(shell uname), Darwin) XARGS_ARG="-L1" endif +GOPACKAGES ?= $(shell go list ./... | egrep -v 'custom|vendor') GOLINT := go list ./... | egrep -v '/custom/|/vendor/' | xargs $(XARGS_ARG) golint | egrep -v 'extpoints.go|types.go' -TEST_MODULES ?= $(shell go list ./... | egrep -v '/vendor|/custom') TEST_ARGS ?= -race ifdef TEST_RUN @@ -36,7 +36,7 @@ build: lint: test -x $(GOPATH)/bin/golint || go get github.com/golang/lint/golint go get \ - && go install \ + && go install $(GOPACKAGES) \ && ls -d */ | egrep -v 'custom/|vendor/' | xargs $(XARGS_ARG) go tool vet -v @if [ -n "$(shell $(GOLINT) | cut -d ':' -f 1)" ]; then $(GOLINT) && exit 1 ; fi @@ -48,7 +48,7 @@ test: build-dev $(NAME):dev make -e test-direct test-direct: - go test -p 1 -v $(TEST_ARGS) $(TEST_MODULES) $(TESTRUN) + go test -p 1 -v $(TEST_ARGS) $(GOPACKAGES) $(TESTRUN) test-image-size: @if [ $(shell docker inspect -f '{{ .Size }}' $(NAME):$(VERSION)) -gt $(MAX_IMAGE_SIZE) ]; then \ diff --git a/adapters/syslog/syslog_test.go b/adapters/syslog/syslog_test.go index 5f2b3f31..7c9b7d89 100644 --- a/adapters/syslog/syslog_test.go +++ b/adapters/syslog/syslog_test.go @@ -30,7 +30,7 @@ const ( var ( container = &docker.Container{ ID: "8dfafdbc3a40", - Name: "0michaelshobbs", + Name: "\x00michaelshobbs", Config: &docker.Config{ Hostname: "8dfafdbc3a40", }, From 2ad7bbfa04d4dde51a4cc5839f0f3b5814e71714 Mon Sep 17 00:00:00 2001 From: Michael Hobbs Date: Thu, 13 Apr 2017 14:58:07 -0700 Subject: [PATCH 7/9] don't use a mock conn yet. thanks @mattaitchison --- adapters/syslog/syslog_test.go | 175 +++++++++++++++++++-------------- testutil/mockconn.go | 155 ----------------------------- testutil/testutil.go | 98 ------------------ testutil/transport.go | 13 --- 4 files changed, 99 insertions(+), 342 deletions(-) delete mode 100644 testutil/mockconn.go delete mode 100644 testutil/testutil.go delete mode 100644 testutil/transport.go diff --git a/adapters/syslog/syslog_test.go b/adapters/syslog/syslog_test.go index 7c9b7d89..966b4012 100644 --- a/adapters/syslog/syslog_test.go +++ b/adapters/syslog/syslog_test.go @@ -1,17 +1,20 @@ package syslog import ( + "bufio" "fmt" + "io" + "log" + "net" "os" "strconv" - "strings" + "sync" "testing" "text/template" "time" docker "github.com/fsouza/go-dockerclient" "github.com/gliderlabs/logspout/router" - "github.com/gliderlabs/logspout/testutil" _ "github.com/gliderlabs/logspout/transports/tcp" _ "github.com/gliderlabs/logspout/transports/tls" @@ -25,12 +28,13 @@ const ( testTag = "{{.ContainerName}}" testPid = "{{.Container.State.Pid}}" testData = "{{.Data}}" + connCloseIdx = 5 ) var ( container = &docker.Container{ ID: "8dfafdbc3a40", - Name: "\x00michaelshobbs", + Name: "\x00container", Config: &docker.Config{ Hostname: "8dfafdbc3a40", }, @@ -55,97 +59,116 @@ func TestSyslogRetryCount(t *testing.T) { } func TestSyslogReconnectOnClose(t *testing.T) { - os.Setenv("RETRY_COUNT", strconv.Itoa(int(1))) - setRetryCount() - defer func() { - os.Unsetenv("RETRY_COUNT") - setRetryCount() - }() - tmpl, err := template.New("syslog").Parse(testTmplStr) - if err != nil { - t.Fatal(err) - } - ls, err := testutil.NewLocalTCPServer() + done := make(chan string) + addr, sock, srvWG := startServer("tcp", "", done) + defer srvWG.Wait() + defer os.Remove(addr) + defer sock.Close() + route := &router.Route{Adapter: "syslog+tcp", Address: addr} + adapter, err := NewSyslogAdapter(route) if err != nil { t.Fatal(err) } - defer ls.Teardown() - - route := &router.Route{ - ID: "0", - Adapter: "syslog", - Address: ls.Listener.Addr().String(), - } - - datac := make(chan []byte, testutil.MaxMsgCount) - errc := make(chan error, 1) - go testutil.AcceptAndCloseConn(ls, datac, errc) - - adapter := &Adapter{ - route: route, - conn: testutil.Dial(ls), - tmpl: tmpl, - transport: testutil.MockTransport{Listener: ls}, - } - logstream := make(chan *router.Message) - done := make(chan bool) - sentMsgs := [][]byte{} - // Send msgs to logstream - go sendLogstream(logstream, done, &sentMsgs, tmpl) + stream := make(chan *router.Message) + go adapter.Stream(stream) - // Stream logstream to conn - go adapter.Stream(logstream) - - // Check for errs from goroutines - for err := range errc { - t.Errorf("%v", err) - } + count := 100 + messages := make(chan string, count) + go sendLogstream(stream, messages, adapter, count) - readMsgs := [][]byte{} + timeout := time.After(6 * time.Second) + msgnum := 1 for { select { - case <-done: - if testutil.MaxMsgCount-1 != len(datac) { - t.Errorf("expected %v got %v", testutil.MaxMsgCount-1, len(datac)) + case msg := <-done: + // Don't check a message that we know was dropped + if msgnum%connCloseIdx == 0 { + _ = <-messages + msgnum++ } - for msg := range datac { - readMsgs = append(readMsgs, msg) - } - sentMsgs = append(sentMsgs[:testutil.CloseOnMsgIdx], sentMsgs[testutil.CloseOnMsgIdx+1:]...) - for i, v := range sentMsgs { - sent := strings.Trim(fmt.Sprintf("%s", v), "\n") - read := strings.Trim(fmt.Sprintf("%s", readMsgs[i]), "\x00\n") - if sent != read { - t.Errorf("expected %+q got %+q", sent, read) - } + check(t, adapter.(*Adapter).tmpl, <-messages, msg) + msgnum++ + case <-timeout: + adapter.(*Adapter).conn.Close() + t.Fatal("timeout after", msgnum, "messages") + return + default: + if msgnum == count { + adapter.(*Adapter).conn.Close() + return } } - return } } -func sendLogstream(logstream chan *router.Message, done chan bool, msgs *[][]byte, tmpl *template.Template) { - defer func() { - close(logstream) - close(done) +func startServer(n, la string, done chan<- string) (addr string, sock io.Closer, wg *sync.WaitGroup) { + if n == "udp" || n == "tcp" { + la = "127.0.0.1:0" + } + wg = new(sync.WaitGroup) + + l, err := net.Listen(n, la) + if err != nil { + log.Fatalf("startServer failed: %v", err) + } + addr = l.Addr().String() + sock = l + wg.Add(1) + go func() { + defer wg.Done() + runStreamSyslog(l, done, wg) }() - var count int + + return +} + +func runStreamSyslog(l net.Listener, done chan<- string, wg *sync.WaitGroup) { for { - if count == testutil.MaxMsgCount { - done <- true + c, err := l.Accept() + if err != nil { return } - msg := &router.Message{ - Container: container, - Data: "hellooo", - Time: time.Now(), + wg.Add(1) + go func(c net.Conn) { + defer wg.Done() + c.SetReadDeadline(time.Now().Add(5 * time.Second)) + b := bufio.NewReader(c) + var i = 1 + for { + i++ + s, err := b.ReadString('\n') + if err != nil { + break + } + done <- s + if i%connCloseIdx == 0 { + break + } + } + c.Close() + }(c) + } +} + +func sendLogstream(stream chan *router.Message, messages chan string, adapter router.LogAdapter, count int) { + for i := 1; i <= count; i++ { + msg := &Message{ + Message: &router.Message{ + Container: container, + Data: "test " + strconv.Itoa(i), + Time: time.Now(), + Source: "stdout", + }, } - m := &Message{msg} - buf, _ := m.Render(tmpl) - *msgs = append(*msgs, buf) - logstream <- msg - count++ - time.Sleep(1000 * time.Millisecond) + stream <- msg.Message + b, _ := msg.Render(adapter.(*Adapter).tmpl) + messages <- string(b) + } +} + +func check(t *testing.T, tmpl *template.Template, in string, out string) { + if in != out { + t.Errorf("expected: %s\ngot: %s\n", in, out) } } diff --git a/testutil/mockconn.go b/testutil/mockconn.go deleted file mode 100644 index 000bdc6e..00000000 --- a/testutil/mockconn.go +++ /dev/null @@ -1,155 +0,0 @@ -package testutil - -import ( - "fmt" - "net" - "sync" - "time" -) - -var ( - connIsClosed = &mutexBool{ - mutex: &sync.Mutex{}, - state: false, - } - connIsReset = &mutexBool{ - mutex: &sync.Mutex{}, - state: false, - } -) - -type mutexBool struct { - mutex *sync.Mutex - state bool -} - -// Conn facilitates testing by providing two connected ReadWriteClosers -// each of which can be used in place of a net.Conn -type Conn struct { - Server *End - Client *End -} - -// Close closes server/client pipes -func (c *Conn) Close() error { - if err := c.Server.Close(); err != nil { - return err - } - if err := c.Client.Close(); err != nil { - return err - } - return nil -} - -// NewConn returns a new testutil.Conn -func NewConn() *Conn { - // A connection consists of two pipes: - // Client | Server - // writes ===> reads - // reads <=== writes - - serverRead, clientWrite := net.Pipe() - clientRead, serverWrite := net.Pipe() - - return &Conn{ - Server: &End{ - Reader: serverRead, - Writer: serverWrite, - }, - Client: &End{ - Reader: clientRead, - Writer: clientWrite, - }, - } -} - -// End is one 'end' of a simulated connection. -type End struct { - Reader net.Conn - Writer net.Conn -} - -// type End struct { -// Reader *io.PipeReader -// Writer *io.PipeWriter -// } - -// CloseTest simulates closing a net.Conn -func (e End) CloseTest() { - connIsClosed.mutex.Lock() - connIsClosed.state = true - connIsClosed.mutex.Unlock() - return -} - -// Close closes the Reader/Writer pipes -func (e End) Close() error { - if err := e.Writer.Close(); err != nil { - return err - } - if err := e.Reader.Close(); err != nil { - return err - } - return nil -} - -func (e End) Read(data []byte) (n int, err error) { - e.Reader.SetReadDeadline(time.Now().Add(5 * time.Second)) - return e.Reader.Read(data) -} -func (e End) Write(data []byte) (n int, err error) { - connIsClosed.mutex.Lock() - defer connIsClosed.mutex.Unlock() - if connIsClosed.state { - connIsClosed.state = false - return 0, &net.OpError{ - Op: "write", - Net: e.RemoteAddr().Network(), - Source: e.LocalAddr(), - Addr: e.RemoteAddr(), - Err: fmt.Errorf("write: broken pipe"), - } - } - return e.Writer.Write(data) -} - -// LocalAddr satisfies the net.Conn interface -func (e End) LocalAddr() net.Addr { - return Addr{ - NetworkString: "tcp", - AddrString: "127.0.0.1", - } -} - -// RemoteAddr satisfies the net.Conn interface -func (e End) RemoteAddr() net.Addr { - return Addr{ - NetworkString: "tcp", - AddrString: "127.0.0.1", - } -} - -// SetDeadline satisfies the net.Conn interface -func (e End) SetDeadline(t time.Time) error { return nil } - -// SetReadDeadline satisfies the net.Conn interface -func (e End) SetReadDeadline(t time.Time) error { return nil } - -// SetWriteDeadline satisfies the net.Conn interface -func (e End) SetWriteDeadline(t time.Time) error { return nil } - -// Addr is a fake network interface which implements the net.Addr interface -type Addr struct { - NetworkString string - AddrString string -} - -// Network satisfies the net.Addr interface -func (a Addr) Network() string { - return a.NetworkString -} - -// Network satisfies the net.Addr interface -func (a Addr) String() string { - return a.AddrString -} diff --git a/testutil/testutil.go b/testutil/testutil.go deleted file mode 100644 index c32f4797..00000000 --- a/testutil/testutil.go +++ /dev/null @@ -1,98 +0,0 @@ -package testutil - -import ( - "net" - "sync" -) - -// Test constants -const ( - CloseOnMsgIdx = 5 - MaxMsgCount = 10 -) - -// LocalTCPServer tcp server for testing specific network errors -type LocalTCPServer struct { - lnmu sync.RWMutex - MockConn *Conn - net.Listener -} - -// Accept returns server side of the LocalTCPServer MockConn -func (ls *LocalTCPServer) Accept() (*End, error) { - return ls.MockConn.Server, nil -} - -// Teardown locks and tears down a LocalTCPServer -func (ls *LocalTCPServer) Teardown() error { - ls.lnmu.Lock() - if ls.Listener != nil { - ls.Listener.Close() - ls.Listener = nil - } - ls.lnmu.Unlock() - return nil -} - -// NewLocalTCPServer return a new LocalTCPServer -func NewLocalTCPServer() (*LocalTCPServer, error) { - ln, err := newLocalListener() - if err != nil { - return nil, err - } - return &LocalTCPServer{Listener: ln, MockConn: NewConn()}, nil -} - -func newLocalListener() (net.Listener, error) { - ln, err := net.Listen("tcp4", "127.0.0.1:0") - if err != nil { - return nil, err - } - return ln, nil -} - -// Dial returns the client side of our MockConn -func Dial(ls *LocalTCPServer) net.Conn { - return ls.MockConn.Client -} - -// AcceptAndCloseConn opens the listener side of a server, reads data and closes the connection after CloseOnMsgIdx -func AcceptAndCloseConn(ls *LocalTCPServer, datac chan []byte, errc chan error) { - defer func() { - close(datac) - close(errc) - }() - c, err := ls.Accept() - if err != nil { - errc <- err - return - } - - count := 0 - for { - switch count { - case MaxMsgCount - CloseOnMsgIdx: - c.CloseTest() - readConn(count, c, datac) - count++ - case MaxMsgCount: - return - default: - readConn(count, c, datac) - count++ - } - } -} - -func readConn(count int, c net.Conn, ch chan []byte) error { - b := make([]byte, 256) - _, err := c.Read(b) - if err != nil { - return err - } - // Simulate real-life network drop - if count != MaxMsgCount-CloseOnMsgIdx { - ch <- b - } - return nil -} diff --git a/testutil/transport.go b/testutil/transport.go deleted file mode 100644 index a78de9bc..00000000 --- a/testutil/transport.go +++ /dev/null @@ -1,13 +0,0 @@ -package testutil - -import "net" - -// MockTransport allows us to dial our test listener -type MockTransport struct { - Listener *LocalTCPServer -} - -// Dial always returns the client from our test listener -func (mt MockTransport) Dial(addr string, opt map[string]string) (net.Conn, error) { - return mt.Listener.MockConn.Client, nil -} From 78c6d3d851fb4239c075e1a46fa5aa8d8446c19c Mon Sep 17 00:00:00 2001 From: Luke Turner Date: Tue, 11 Apr 2017 09:50:32 -0700 Subject: [PATCH 8/9] Don't retry sending on ECONNRESET --- adapters/syslog/syslog.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/adapters/syslog/syslog.go b/adapters/syslog/syslog.go index f24ad101..e44f7872 100644 --- a/adapters/syslog/syslog.go +++ b/adapters/syslog/syslog.go @@ -9,6 +9,7 @@ import ( "net" "os" "strconv" + "syscall" "text/template" "time" @@ -18,12 +19,14 @@ import ( const defaultRetryCount = 10 var ( - hostname string - retryCount uint + hostname string + retryCount uint + econnResetErrStr string ) func init() { hostname, _ = os.Hostname() + econnResetErrStr = fmt.Sprintf("write: %s", syscall.ECONNRESET.Error()) router.AdapterFactories.Register(NewSyslogAdapter, "syslog") setRetryCount() } @@ -137,8 +140,9 @@ func (a *Adapter) Stream(logstream chan *router.Message) { func (a *Adapter) retry(buf []byte, err error) error { if opError, ok := err.(*net.OpError); ok { - if opError.Temporary() || opError.Timeout() { - if retryErr := a.retryTemporary(buf); retryErr == nil { + if (opError.Temporary() && opError.Err.Error() != econnResetErrStr) || opError.Timeout() { + retryErr := a.retryTemporary(buf) + if retryErr == nil { return nil } } From bb848d5fbbf284e21d2ababe96b6b46114e5f25b Mon Sep 17 00:00:00 2001 From: Michael Hobbs Date: Thu, 13 Apr 2017 17:40:32 -0700 Subject: [PATCH 9/9] add some reasonable delay to log stream --- adapters/syslog/syslog_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/adapters/syslog/syslog_test.go b/adapters/syslog/syslog_test.go index 966b4012..479acdec 100644 --- a/adapters/syslog/syslog_test.go +++ b/adapters/syslog/syslog_test.go @@ -164,6 +164,7 @@ func sendLogstream(stream chan *router.Message, messages chan string, adapter ro stream <- msg.Message b, _ := msg.Render(adapter.(*Adapter).tmpl) messages <- string(b) + time.Sleep(10 * time.Millisecond) } }