Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

preserve buffer on reconnect() AND reconnect on connection reset by peer #291

Merged
merged 9 commits into from
Apr 14, 2017
19 changes: 15 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,19 @@

NAME=logspout
VERSION=$(shell cat VERSION)
# max image size of 40MB
MAX_IMAGE_SIZE := 40000000

ifeq ($(shell uname), Darwin)
XARGS_ARG="-L1"
endif
GOPACKAGES ?= $(shell go list ./... | egrep -v 'custom|vendor')
GOLINT := go list ./... | egrep -v '/custom/|/vendor/' | xargs $(XARGS_ARG) golint | egrep -v 'extpoints.go|types.go'
# max image size of 40MB
MAX_IMAGE_SIZE := 40000000
TEST_ARGS ?= -race

ifdef TEST_RUN
TESTRUN := -run ${TEST_RUN}
endif

build-dev:
docker build -f Dockerfile.dev -t $(NAME):dev .
Expand All @@ -29,15 +36,19 @@ build:
lint:
test -x $(GOPATH)/bin/golint || go get github.com/golang/lint/golint
go get \
&& go install \
&& go install $(GOPACKAGES) \
&& ls -d */ | egrep -v 'custom/|vendor/' | xargs $(XARGS_ARG) go tool vet -v
@if [ -n "$(shell $(GOLINT) | cut -d ':' -f 1)" ]; then $(GOLINT) && exit 1 ; fi

test: build-dev
docker run \
-v /var/run/docker.sock:/var/run/docker.sock \
-v $(PWD):/go/src/github.com/gliderlabs/logspout \
$(NAME):dev go test -v ./router/...
-e TEST_ARGS="" \
$(NAME):dev make -e test-direct

test-direct:
go test -p 1 -v $(TEST_ARGS) $(GOPACKAGES) $(TESTRUN)

test-image-size:
@if [ $(shell docker inspect -f '{{ .Size }}' $(NAME):$(VERSION)) -gt $(MAX_IMAGE_SIZE) ]; then \
Expand Down
25 changes: 13 additions & 12 deletions adapters/syslog/syslog.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,16 +120,14 @@ func (a *Adapter) Stream(logstream chan *router.Message) {
log.Println("syslog:", err)
return
}
_, err = a.conn.Write(buf)
if err != nil {
if _, err = a.conn.Write(buf); err != nil {
log.Println("syslog:", err)
switch a.conn.(type) {
case *net.UDPConn:
continue
default:
err = a.retry(buf, err)
if err != nil {
log.Println("syslog:", err)
if err = a.retry(buf, err); err != nil {
log.Println("syslog retry err:", err)
return
}
}
Expand All @@ -140,14 +138,20 @@ func (a *Adapter) Stream(logstream chan *router.Message) {
func (a *Adapter) retry(buf []byte, err error) error {
if opError, ok := err.(*net.OpError); ok {
if opError.Temporary() || opError.Timeout() {
retryErr := a.retryTemporary(buf)
if retryErr == nil {
if retryErr := a.retryTemporary(buf); retryErr == nil {
return nil
}
}
}

return a.reconnect()
if reconnErr := a.reconnect(); reconnErr != nil {
return reconnErr
}
if _, err = a.conn.Write(buf); err != nil {
log.Println("syslog: reconnect failed")
return err
}
log.Println("syslog: reconnect successful")
return nil
}

func (a *Adapter) retryTemporary(buf []byte) error {
Expand Down Expand Up @@ -177,16 +181,13 @@ func (a *Adapter) reconnect() error {
if err != nil {
return err
}

a.conn = conn
return nil
}, retryCount)

if err != nil {
log.Println("syslog: reconnect failed")
return err
}

return nil
}

Expand Down
139 changes: 134 additions & 5 deletions adapters/syslog/syslog_test.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,151 @@
package syslog

import (
"fmt"
"os"
"strconv"
"strings"
"testing"
"text/template"
"time"

docker "github.com/fsouza/go-dockerclient"
"github.com/gliderlabs/logspout/router"
"github.com/gliderlabs/logspout/testutil"

_ "github.com/gliderlabs/logspout/transports/tcp"
_ "github.com/gliderlabs/logspout/transports/tls"
_ "github.com/gliderlabs/logspout/transports/udp"
)

func TestSyslogRetryCount(t *testing.T) {
setRetryCount()
if retryCount != defaultRetryCount {
t.Errorf("expected %v got %v", defaultRetryCount, retryCount)
const (
testPriority = "{{.Priority}}"
testTimestamp = "{{.Timestamp}}"
testHostname = "{{.Container.Config.Hostname}}"
testTag = "{{.ContainerName}}"
testPid = "{{.Container.State.Pid}}"
testData = "{{.Data}}"
)

var (
container = &docker.Container{
ID: "8dfafdbc3a40",
Name: "\x00michaelshobbs",
Config: &docker.Config{
Hostname: "8dfafdbc3a40",
},
}
testTmplStr = fmt.Sprintf("<%s>%s %s %s[%s]: %s\n",
testPriority, testTimestamp, testHostname, testTag, testPid, testData)
)

func TestSyslogRetryCount(t *testing.T) {
newRetryCount := uint(20)
os.Setenv("RETRY_COUNT", strconv.Itoa(int(newRetryCount)))
defer os.Unsetenv("RETRY_COUNT")
setRetryCount()
if retryCount != newRetryCount {
t.Errorf("expected %v got %v", newRetryCount, retryCount)
}

os.Unsetenv("RETRY_COUNT")
setRetryCount()
if retryCount != defaultRetryCount {
t.Errorf("expected %v got %v", defaultRetryCount, retryCount)
}
}

func TestSyslogReconnectOnClose(t *testing.T) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is pretty intense.
I'd recommend breaking it down a little. Maybe move the go func() to named functions so they're just go someReallyClearFuncName()
Would make things a little more readable.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

os.Setenv("RETRY_COUNT", strconv.Itoa(int(1)))
setRetryCount()
defer func() {
os.Unsetenv("RETRY_COUNT")
setRetryCount()
}()
tmpl, err := template.New("syslog").Parse(testTmplStr)
if err != nil {
t.Fatal(err)
}
ls, err := testutil.NewLocalTCPServer()
if err != nil {
t.Fatal(err)
}
defer ls.Teardown()

route := &router.Route{
ID: "0",
Adapter: "syslog",
Address: ls.Listener.Addr().String(),
}

datac := make(chan []byte, testutil.MaxMsgCount)
errc := make(chan error, 1)
go testutil.AcceptAndCloseConn(ls, datac, errc)

adapter := &Adapter{
route: route,
conn: testutil.Dial(ls),
tmpl: tmpl,
transport: testutil.MockTransport{Listener: ls},
}

logstream := make(chan *router.Message)
done := make(chan bool)
sentMsgs := [][]byte{}
// Send msgs to logstream
go sendLogstream(logstream, done, &sentMsgs, tmpl)

// Stream logstream to conn
go adapter.Stream(logstream)

// Check for errs from goroutines
for err := range errc {
t.Errorf("%v", err)
}

readMsgs := [][]byte{}
for {
select {
case <-done:
if testutil.MaxMsgCount-1 != len(datac) {
t.Errorf("expected %v got %v", testutil.MaxMsgCount-1, len(datac))
}
for msg := range datac {
readMsgs = append(readMsgs, msg)
}
sentMsgs = append(sentMsgs[:testutil.CloseOnMsgIdx], sentMsgs[testutil.CloseOnMsgIdx+1:]...)
for i, v := range sentMsgs {
sent := strings.Trim(fmt.Sprintf("%s", v), "\n")
read := strings.Trim(fmt.Sprintf("%s", readMsgs[i]), "\x00\n")
if sent != read {
t.Errorf("expected %+q got %+q", sent, read)
}
}
}
return
}
}

func sendLogstream(logstream chan *router.Message, done chan bool, msgs *[][]byte, tmpl *template.Template) {
defer func() {
close(logstream)
close(done)
}()
var count int
for {
if count == testutil.MaxMsgCount {
done <- true
return
}
msg := &router.Message{
Container: container,
Data: "hellooo",
Time: time.Now(),
}
m := &Message{msg}
buf, _ := m.Render(tmpl)
*msgs = append(*msgs, buf)
logstream <- msg
count++
time.Sleep(1000 * time.Millisecond)
}
}
Loading