-
Notifications
You must be signed in to change notification settings - Fork 681
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
d12e81e
commit bf6dbd3
Showing
1 changed file
with
219 additions
and
5 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,22 +1,236 @@ | ||
package syslog | ||
|
||
import ( | ||
"fmt" | ||
"net" | ||
"os" | ||
"strconv" | ||
"strings" | ||
"sync" | ||
"testing" | ||
"text/template" | ||
"time" | ||
|
||
docker "github.com/fsouza/go-dockerclient" | ||
"github.com/gliderlabs/logspout/router" | ||
|
||
_ "github.com/gliderlabs/logspout/transports/tcp" | ||
_ "github.com/gliderlabs/logspout/transports/tls" | ||
_ "github.com/gliderlabs/logspout/transports/udp" | ||
) | ||
|
||
func TestSyslogRetryCount(t *testing.T) { | ||
setRetryCount() | ||
if retryCount != defaultRetryCount { | ||
t.Errorf("expected %v got %v", defaultRetryCount, retryCount) | ||
const ( | ||
closeOnMsgIdx = 5 | ||
maxMsgCount = 10 | ||
) | ||
|
||
var ( | ||
container = &docker.Container{ | ||
ID: "8dfafdbc3a40", | ||
Name: "0michaelshobbs", | ||
Config: &docker.Config{ | ||
Hostname: "8dfafdbc3a40", | ||
}, | ||
} | ||
testPriority = "{{.Priority}}" | ||
testTimestamp = "{{.Timestamp}}" | ||
testHostname = "{{.Container.Config.Hostname}}" | ||
testTag = "{{.ContainerName}}" | ||
testPid = "{{.Container.State.Pid}}" | ||
testData = getopt("SYSLOG_DATA", "{{.Data}}") | ||
testTmplStr = fmt.Sprintf("<%s>%s %s %s[%s]: %s\n", | ||
testPriority, testTimestamp, testHostname, testTag, testPid, testData) | ||
) | ||
|
||
type localTCPServer struct { | ||
lnmu sync.RWMutex | ||
net.Listener | ||
} | ||
|
||
func (ls *localTCPServer) teardown() error { | ||
ls.lnmu.Lock() | ||
if ls.Listener != nil { | ||
ls.Listener.Close() | ||
ls.Listener = nil | ||
} | ||
ls.lnmu.Unlock() | ||
return nil | ||
} | ||
|
||
func TestSyslogRetryCount(t *testing.T) { | ||
newRetryCount := uint(20) | ||
os.Setenv("RETRY_COUNT", strconv.Itoa(int(newRetryCount))) | ||
defer os.Unsetenv("RETRY_COUNT") | ||
setRetryCount() | ||
if retryCount != newRetryCount { | ||
t.Errorf("expected %v got %v", newRetryCount, retryCount) | ||
} | ||
|
||
os.Unsetenv("RETRY_COUNT") | ||
setRetryCount() | ||
if retryCount != defaultRetryCount { | ||
t.Errorf("expected %v got %v", defaultRetryCount, retryCount) | ||
} | ||
} | ||
|
||
func TestSyslogReconnectOnClose(t *testing.T) { | ||
os.Setenv("RETRY_COUNT", strconv.Itoa(int(1))) | ||
setRetryCount() | ||
defer func() { | ||
os.Unsetenv("RETRY_COUNT") | ||
setRetryCount() | ||
}() | ||
tmpl, err := template.New("syslog").Parse(testTmplStr) | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
ls, err := newLocalTCPServer() | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
defer ls.teardown() | ||
|
||
route := &router.Route{ | ||
ID: "0", | ||
Adapter: "syslog", | ||
Address: ls.Listener.Addr().String(), | ||
} | ||
transport, found := router.AdapterTransports.Lookup(route.AdapterTransport("tcp")) | ||
if !found { | ||
t.Errorf("bad transport: " + route.Adapter) | ||
} | ||
|
||
datac := make(chan []byte, maxMsgCount) | ||
errc := make(chan error, 1) | ||
go acceptAndCloseConn(ls, datac, errc) | ||
|
||
// Dial connection for adapter | ||
conn, err := net.Dial(ls.Listener.Addr().Network(), ls.Listener.Addr().String()) | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
|
||
adapter := &Adapter{ | ||
route: route, | ||
conn: conn, | ||
tmpl: tmpl, | ||
transport: transport, | ||
} | ||
|
||
logstream := make(chan *router.Message) | ||
done := make(chan bool) | ||
sentMsgs := [][]byte{} | ||
// Send msgs to logstream | ||
go sendLogstream(logstream, done, &sentMsgs, tmpl) | ||
|
||
// Stream logstream to conn | ||
go adapter.Stream(logstream) | ||
|
||
for err := range errc { | ||
t.Errorf("%v", err) | ||
} | ||
|
||
readMsgs := [][]byte{} | ||
for { | ||
select { | ||
case <-done: | ||
if maxMsgCount-1 != len(datac) { | ||
t.Errorf("expected %v got %v", maxMsgCount-1, len(datac)) | ||
} | ||
for msg := range datac { | ||
readMsgs = append(readMsgs, msg) | ||
} | ||
sentMsgs = append(sentMsgs[:closeOnMsgIdx], sentMsgs[closeOnMsgIdx+1:]...) | ||
for i, v := range sentMsgs { | ||
sent := strings.Trim(fmt.Sprintf("%s", v), "\n") | ||
read := strings.Trim(fmt.Sprintf("%s", readMsgs[i]), "\x00\n") | ||
if sent != read { | ||
t.Errorf("expected %+q got %+q", sent, read) | ||
} | ||
} | ||
} | ||
return | ||
} | ||
} | ||
|
||
func newLocalTCPServer() (*localTCPServer, error) { | ||
ln, err := newLocalListener() | ||
if err != nil { | ||
return nil, err | ||
} | ||
return &localTCPServer{Listener: ln}, nil | ||
} | ||
|
||
func newLocalListener() (net.Listener, error) { | ||
ln, err := net.Listen("tcp4", "127.0.0.1:0") | ||
if err != nil { | ||
return nil, err | ||
} | ||
return ln, nil | ||
} | ||
|
||
func acceptAndCloseConn(ls *localTCPServer, datac chan []byte, errc chan error) { | ||
defer func() { | ||
close(datac) | ||
close(errc) | ||
}() | ||
c, err := ls.Accept() | ||
if err != nil { | ||
errc <- err | ||
return | ||
} | ||
count := 0 | ||
for { | ||
switch count { | ||
case maxMsgCount - closeOnMsgIdx: | ||
c.Close() | ||
c, err = ls.Accept() | ||
if err != nil { | ||
errc <- err | ||
return | ||
} | ||
c.SetReadDeadline(time.Now().Add(5 * time.Second)) | ||
readConn(c, datac) | ||
count++ | ||
case maxMsgCount: | ||
return | ||
default: | ||
readConn(c, datac) | ||
count++ | ||
} | ||
} | ||
} | ||
|
||
func readConn(c net.Conn, ch chan []byte) error { | ||
b := make([]byte, 256) | ||
_, err := c.Read(b) | ||
if err != nil { | ||
return err | ||
} | ||
ch <- b | ||
return nil | ||
} | ||
|
||
func sendLogstream(logstream chan *router.Message, done chan bool, msgs *[][]byte, tmpl *template.Template) { | ||
defer func() { | ||
close(logstream) | ||
close(done) | ||
}() | ||
var count int | ||
for { | ||
if count == maxMsgCount { | ||
done <- true | ||
return | ||
} | ||
msg := &router.Message{ | ||
Container: container, | ||
Data: "hellooo", | ||
Time: time.Now(), | ||
} | ||
m := &Message{msg} | ||
buf, _ := m.Render(tmpl) | ||
*msgs = append(*msgs, buf) | ||
logstream <- msg | ||
count++ | ||
time.Sleep(250 * time.Millisecond) | ||
} | ||
} |