Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

preserve buffer on reconnect() AND reconnect on connection reset by peer #291

Merged
merged 9 commits into from
Apr 14, 2017
19 changes: 15 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,19 @@

NAME=logspout
VERSION=$(shell cat VERSION)
# max image size of 40MB
MAX_IMAGE_SIZE := 40000000

ifeq ($(shell uname), Darwin)
XARGS_ARG="-L1"
endif
GOPACKAGES ?= $(shell go list ./... | egrep -v 'custom|vendor')
GOLINT := go list ./... | egrep -v '/custom/|/vendor/' | xargs $(XARGS_ARG) golint | egrep -v 'extpoints.go|types.go'
# max image size of 40MB
MAX_IMAGE_SIZE := 40000000
TEST_ARGS ?= -race

ifdef TEST_RUN
TESTRUN := -run ${TEST_RUN}
endif

build-dev:
docker build -f Dockerfile.dev -t $(NAME):dev .
Expand All @@ -29,15 +36,19 @@ build:
lint:
test -x $(GOPATH)/bin/golint || go get github.com/golang/lint/golint
go get \
&& go install \
&& go install $(GOPACKAGES) \
&& ls -d */ | egrep -v 'custom/|vendor/' | xargs $(XARGS_ARG) go tool vet -v
@if [ -n "$(shell $(GOLINT) | cut -d ':' -f 1)" ]; then $(GOLINT) && exit 1 ; fi

test: build-dev
docker run \
-v /var/run/docker.sock:/var/run/docker.sock \
-v $(PWD):/go/src/github.com/gliderlabs/logspout \
$(NAME):dev go test -v ./router/...
-e TEST_ARGS="" \
$(NAME):dev make -e test-direct

test-direct:
go test -p 1 -v $(TEST_ARGS) $(GOPACKAGES) $(TESTRUN)

test-image-size:
@if [ $(shell docker inspect -f '{{ .Size }}' $(NAME):$(VERSION)) -gt $(MAX_IMAGE_SIZE) ]; then \
Expand Down
31 changes: 18 additions & 13 deletions adapters/syslog/syslog.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"net"
"os"
"strconv"
"syscall"
"text/template"
"time"

Expand All @@ -18,12 +19,14 @@ import (
const defaultRetryCount = 10

var (
hostname string
retryCount uint
hostname string
retryCount uint
econnResetErrStr string
)

func init() {
hostname, _ = os.Hostname()
econnResetErrStr = fmt.Sprintf("write: %s", syscall.ECONNRESET.Error())
router.AdapterFactories.Register(NewSyslogAdapter, "syslog")
setRetryCount()
}
Expand Down Expand Up @@ -120,16 +123,14 @@ func (a *Adapter) Stream(logstream chan *router.Message) {
log.Println("syslog:", err)
return
}
_, err = a.conn.Write(buf)
if err != nil {
if _, err = a.conn.Write(buf); err != nil {
log.Println("syslog:", err)
switch a.conn.(type) {
case *net.UDPConn:
continue
default:
err = a.retry(buf, err)
if err != nil {
log.Println("syslog:", err)
if err = a.retry(buf, err); err != nil {
log.Println("syslog retry err:", err)
return
}
}
Expand All @@ -139,15 +140,22 @@ func (a *Adapter) Stream(logstream chan *router.Message) {

func (a *Adapter) retry(buf []byte, err error) error {
if opError, ok := err.(*net.OpError); ok {
if opError.Temporary() || opError.Timeout() {
if (opError.Temporary() && opError.Err.Error() != econnResetErrStr) || opError.Timeout() {
retryErr := a.retryTemporary(buf)
if retryErr == nil {
return nil
}
}
}

return a.reconnect()
if reconnErr := a.reconnect(); reconnErr != nil {
return reconnErr
}
if _, err = a.conn.Write(buf); err != nil {
log.Println("syslog: reconnect failed")
return err
}
log.Println("syslog: reconnect successful")
return nil
}

func (a *Adapter) retryTemporary(buf []byte) error {
Expand Down Expand Up @@ -177,16 +185,13 @@ func (a *Adapter) reconnect() error {
if err != nil {
return err
}

a.conn = conn
return nil
}, retryCount)

if err != nil {
log.Println("syslog: reconnect failed")
return err
}

return nil
}

Expand Down
163 changes: 158 additions & 5 deletions adapters/syslog/syslog_test.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,175 @@
package syslog

import (
"bufio"
"fmt"
"io"
"log"
"net"
"os"
"strconv"
"sync"
"testing"
"text/template"
"time"

docker "github.com/fsouza/go-dockerclient"
"github.com/gliderlabs/logspout/router"

_ "github.com/gliderlabs/logspout/transports/tcp"
_ "github.com/gliderlabs/logspout/transports/tls"
_ "github.com/gliderlabs/logspout/transports/udp"
)

func TestSyslogRetryCount(t *testing.T) {
setRetryCount()
if retryCount != defaultRetryCount {
t.Errorf("expected %v got %v", defaultRetryCount, retryCount)
const (
testPriority = "{{.Priority}}"
testTimestamp = "{{.Timestamp}}"
testHostname = "{{.Container.Config.Hostname}}"
testTag = "{{.ContainerName}}"
testPid = "{{.Container.State.Pid}}"
testData = "{{.Data}}"
connCloseIdx = 5
)

var (
container = &docker.Container{
ID: "8dfafdbc3a40",
Name: "\x00container",
Config: &docker.Config{
Hostname: "8dfafdbc3a40",
},
}
testTmplStr = fmt.Sprintf("<%s>%s %s %s[%s]: %s\n",
testPriority, testTimestamp, testHostname, testTag, testPid, testData)
)

func TestSyslogRetryCount(t *testing.T) {
newRetryCount := uint(20)
os.Setenv("RETRY_COUNT", strconv.Itoa(int(newRetryCount)))
defer os.Unsetenv("RETRY_COUNT")
setRetryCount()
if retryCount != newRetryCount {
t.Errorf("expected %v got %v", newRetryCount, retryCount)
}

os.Unsetenv("RETRY_COUNT")
setRetryCount()
if retryCount != defaultRetryCount {
t.Errorf("expected %v got %v", defaultRetryCount, retryCount)
}
}

func TestSyslogReconnectOnClose(t *testing.T) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is pretty intense.
I'd recommend breaking it down a little. Maybe move the go func() to named functions so they're just go someReallyClearFuncName()
Would make things a little more readable.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

done := make(chan string)
addr, sock, srvWG := startServer("tcp", "", done)
defer srvWG.Wait()
defer os.Remove(addr)
defer sock.Close()
route := &router.Route{Adapter: "syslog+tcp", Address: addr}
adapter, err := NewSyslogAdapter(route)
if err != nil {
t.Fatal(err)
}

stream := make(chan *router.Message)
go adapter.Stream(stream)

count := 100
messages := make(chan string, count)
go sendLogstream(stream, messages, adapter, count)

timeout := time.After(6 * time.Second)
msgnum := 1
for {
select {
case msg := <-done:
// Don't check a message that we know was dropped
if msgnum%connCloseIdx == 0 {
_ = <-messages
msgnum++
}
check(t, adapter.(*Adapter).tmpl, <-messages, msg)
msgnum++
case <-timeout:
adapter.(*Adapter).conn.Close()
t.Fatal("timeout after", msgnum, "messages")
return
default:
if msgnum == count {
adapter.(*Adapter).conn.Close()
return
}
}
}
}

func startServer(n, la string, done chan<- string) (addr string, sock io.Closer, wg *sync.WaitGroup) {
if n == "udp" || n == "tcp" {
la = "127.0.0.1:0"
}
wg = new(sync.WaitGroup)

l, err := net.Listen(n, la)
if err != nil {
log.Fatalf("startServer failed: %v", err)
}
addr = l.Addr().String()
sock = l
wg.Add(1)
go func() {
defer wg.Done()
runStreamSyslog(l, done, wg)
}()

return
}

func runStreamSyslog(l net.Listener, done chan<- string, wg *sync.WaitGroup) {
for {
c, err := l.Accept()
if err != nil {
return
}
wg.Add(1)
go func(c net.Conn) {
defer wg.Done()
c.SetReadDeadline(time.Now().Add(5 * time.Second))
b := bufio.NewReader(c)
var i = 1
for {
i++
s, err := b.ReadString('\n')
if err != nil {
break
}
done <- s
if i%connCloseIdx == 0 {
break
}
}
c.Close()
}(c)
}
}

func sendLogstream(stream chan *router.Message, messages chan string, adapter router.LogAdapter, count int) {
for i := 1; i <= count; i++ {
msg := &Message{
Message: &router.Message{
Container: container,
Data: "test " + strconv.Itoa(i),
Time: time.Now(),
Source: "stdout",
},
}
stream <- msg.Message
b, _ := msg.Render(adapter.(*Adapter).tmpl)
messages <- string(b)
time.Sleep(10 * time.Millisecond)
}
}

func check(t *testing.T, tmpl *template.Template, in string, out string) {
if in != out {
t.Errorf("expected: %s\ngot: %s\n", in, out)
}
}