Skip to content

Commit

Permalink
add test for reconnect()
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelshobbs committed Apr 12, 2017
1 parent 68e3094 commit f65caa1
Showing 1 changed file with 193 additions and 5 deletions.
198 changes: 193 additions & 5 deletions adapters/syslog/syslog_test.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,210 @@
package syslog

import (
"fmt"
"net"
"os"
"strconv"
"sync"
"testing"
"text/template"
"time"

docker "github.com/fsouza/go-dockerclient"
"github.com/gliderlabs/logspout/router"

_ "github.com/gliderlabs/logspout/transports/tcp"
_ "github.com/gliderlabs/logspout/transports/tls"
_ "github.com/gliderlabs/logspout/transports/udp"
)

func TestSyslogRetryCount(t *testing.T) {
setRetryCount()
if retryCount != defaultRetryCount {
t.Errorf("expected %v got %v", defaultRetryCount, retryCount)
const maxMsgCount = 10

var (
container = &docker.Container{
ID: "8dfafdbc3a40",
Name: "0michaelshobbs",
Config: &docker.Config{
Hostname: "8dfafdbc3a40",
},
}
testPriority = "{{.Priority}}"
testTimestamp = "{{.Timestamp}}"
testHostname = "{{.Container.Config.Hostname}}"
testTag = "{{.ContainerName}}"
testPid = "{{.Container.State.Pid}}"
testData = getopt("SYSLOG_DATA", "{{.Data}}")
testTmplStr = fmt.Sprintf("<%s>%s %s %s[%s]: %s\n",
testPriority, testTimestamp, testHostname, testTag, testPid, testData)
)

type localTCPServer struct {
lnmu sync.RWMutex
net.Listener
}

func (ls *localTCPServer) teardown() error {
ls.lnmu.Lock()
if ls.Listener != nil {
ls.Listener.Close()
ls.Listener = nil
}
ls.lnmu.Unlock()
return nil
}

func newLocalTCPServer() (*localTCPServer, error) {
ln, err := newLocalListener()
if err != nil {
return nil, err
}
return &localTCPServer{Listener: ln}, nil
}

func newLocalListener() (net.Listener, error) {
ln, err := net.Listen("tcp4", "127.0.0.1:0")
if err != nil {
return nil, err
}
return ln, nil
}

func readConn(c net.Conn, ch chan []byte) error {
b := make([]byte, 256)
_, err := c.Read(b)
if err != nil {
return err
}
ch <- b
return nil
}

func TestSyslogRetryCount(t *testing.T) {
newRetryCount := uint(20)
os.Setenv("RETRY_COUNT", strconv.Itoa(int(newRetryCount)))
defer os.Unsetenv("RETRY_COUNT")
setRetryCount()
if retryCount != newRetryCount {
t.Errorf("expected %v got %v", newRetryCount, retryCount)
}

os.Unsetenv("RETRY_COUNT")
setRetryCount()
if retryCount != defaultRetryCount {
t.Errorf("expected %v got %v", defaultRetryCount, retryCount)
}
}

func TestSyslogRetryClose(t *testing.T) {
os.Setenv("RETRY_COUNT", strconv.Itoa(int(1)))
setRetryCount()
defer func() {
os.Unsetenv("RETRY_COUNT")
setRetryCount()
}()
tmpl, err := template.New("syslog").Parse(testTmplStr)
if err != nil {
t.Fatal(err)
}

ls, err := newLocalTCPServer()
if err != nil {
t.Fatal(err)
}
defer ls.teardown()
route := &router.Route{
ID: "0",
Adapter: "syslog",
Address: ls.Listener.Addr().String(),
}
transport, found := router.AdapterTransports.Lookup(route.AdapterTransport("tcp"))
if !found {
t.Errorf("bad transport: " + route.Adapter)
}

ch := make(chan error, 1)
dchan := make(chan []byte, maxMsgCount)

// Read from connection
go func() {
defer close(ch)
defer close(dchan)
c, err := ls.Accept()
if err != nil {
ch <- err
return
}
count := 0
for {
switch count {
case maxMsgCount - 5:
c.Close()
c, err = ls.Accept()
if err != nil {
ch <- err
return
}
c.SetReadDeadline(time.Now().Add(5 * time.Second))
readConn(c, dchan)
count++
case maxMsgCount:
return
default:
readConn(c, dchan)
count++
}
}
}()

// Dial connection
conn, err := net.Dial(ls.Listener.Addr().Network(), ls.Listener.Addr().String())
if err != nil {
t.Fatal(err)
}

adapter := &Adapter{
route: route,
conn: conn,
tmpl: tmpl,
transport: transport,
}

// Send logstream
logstream := make(chan *router.Message)
done := make(chan bool)
go func() {
defer close(logstream)
defer close(done)
var count int
for {
if count == maxMsgCount {
done <- true
return
}
msg := router.Message{
Container: container,
Data: "hellooo",
Time: time.Now(),
}
logstream <- &msg
count++
}
}()

// Stream logstream to conn
go adapter.Stream(logstream)

for err := range ch {
t.Errorf("%v", err)
}

mcount := 0
for {
select {
case <-done:
if len(dchan) != maxMsgCount-1 {
t.Errorf("expected %v got %v", maxMsgCount, mcount)
}
}
return
}
}

0 comments on commit f65caa1

Please sign in to comment.