diff --git a/daemon/logger/syslog/conn.go b/daemon/logger/syslog/conn.go new file mode 100644 index 000000000..bc813b2b3 --- /dev/null +++ b/daemon/logger/syslog/conn.go @@ -0,0 +1,67 @@ +package syslog + +import ( + "net" + + "github.com/RackSec/srslog" +) + +// localConn implements the serverConn interface, used to send syslog messages +// to the remote syslog daemon. +type remoteConn struct { + conn net.Conn +} + +// writeString will use Framer/Formatter to format the content before write. +// +// NOTE: writeString implements serverConn.writeString() methods. +func (n *remoteConn) writeString(framer Framer, formatter Formatter, p Priority, hostname, tag, msg string) error { + if framer == nil { + framer = srslog.DefaultFramer + } + + if formatter == nil { + formatter = srslog.DefaultFormatter + } + + formattedMessage := framer(formatter(p, hostname, tag, msg)) + _, err := n.conn.Write([]byte(formattedMessage)) + return err +} + +// close closes the connection. +// +// NOTE:close implements serverConn.close() methods. +func (n *remoteConn) close() error { + return n.conn.Close() +} + +// localConn implements the serverConn interface, used to send syslog messages +// to the local syslog daemon over a Unix domain socket. +type localConn struct { + conn net.Conn +} + +// writeString will use Framer/Formatter to format the content before write. +// +// NOTE: writeString implements serverConn.writeString() methods. +func (n *localConn) writeString(framer Framer, formatter Formatter, p Priority, hostname, tag, msg string) error { + if framer == nil { + framer = srslog.DefaultFramer + } + + if formatter == nil { + formatter = srslog.UnixFormatter + } + + formattedMessage := framer(formatter(p, hostname, tag, msg)) + _, err := n.conn.Write([]byte(formattedMessage)) + return err +} + +// close closes the connection. +// +// NOTE:close implements serverConn.close() methods. +func (n *localConn) close() error { + return n.conn.Close() +} diff --git a/daemon/logger/syslog/const.go b/daemon/logger/syslog/const.go index 43af806c8..d50817aa6 100644 --- a/daemon/logger/syslog/const.go +++ b/daemon/logger/syslog/const.go @@ -6,7 +6,22 @@ import ( "github.com/RackSec/srslog" ) +// Priority is alias srslog.Priority. +type Priority = srslog.Priority + +// Framer is alias srslog.Framer. +type Framer = srslog.Framer + +// Formatter is alias srslog.Formatter. +type Formatter = srslog.Formatter + +const ( + severityMask = 0x07 + facilityMask = 0xf8 +) + var ( + // rfc5424 provides millisecond resolution. timeRfc5424fmt = "2006-01-02T15:04:05.999999Z07:00" @@ -16,7 +31,7 @@ var ( defaultSyslogPriority = srslog.LOG_DAEMON // facilityAliasMap allows user to use alias to set the syslog priority. - facilityAliasMap = map[string]srslog.Priority{ + facilityAliasMap = map[string]Priority{ "kern": srslog.LOG_KERN, "user": srslog.LOG_USER, "mail": srslog.LOG_MAIL, @@ -47,6 +62,9 @@ var ( "unixgram://", } + unixDialerTypes = []string{"unixgram", "unix"} + unixDialerLocalPaths = []string{"/dev/log", "/var/run/syslog", "/var/run/log"} + // tls client cipher suites defaultCipherSuites = []uint16{ tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384, diff --git a/daemon/logger/syslog/dialer.go b/daemon/logger/syslog/dialer.go new file mode 100644 index 000000000..97bd179ba --- /dev/null +++ b/daemon/logger/syslog/dialer.go @@ -0,0 +1,67 @@ +package syslog + +import ( + "crypto/tls" + "errors" + "net" +) + +type serverConn interface { + writeString(framer Framer, formatter Formatter, p Priority, hostname, tag, s string) error + close() error +} + +func makeDialer(proto string, addr string, cfg *tls.Config) (serverConn, string, error) { + switch proto { + case "": + return unixLocalDialer() + case secureProto: + return tlsDialer(addr, cfg) + default: + return commonDialer(proto, addr) + } +} + +// commonDialer is the most common dialer for TCP/UDP/Unix connections. +func commonDialer(network string, addr string) (serverConn, string, error) { + var ( + sc serverConn + hostname string + ) + + c, err := net.Dial(network, addr) + if err == nil { + sc = &remoteConn{conn: c} + hostname = c.LocalAddr().String() + } + return sc, hostname, err +} + +// tlsDialer connects to TLS over TCP, and is used for the "tcp+tls" network. +func tlsDialer(addr string, cfg *tls.Config) (serverConn, string, error) { + var ( + sc serverConn + hostname string + ) + + c, err := tls.Dial("tcp", addr, cfg) + if err == nil { + sc = &remoteConn{conn: c} + hostname = c.LocalAddr().String() + } + return sc, hostname, err +} + +// unixLocalDialer opens a Unix domain socket connection to the syslog daemon +// running on the local machine. +func unixLocalDialer() (serverConn, string, error) { + for _, network := range unixDialerTypes { + for _, path := range unixDialerLocalPaths { + conn, err := net.Dial(network, path) + if err == nil { + return &localConn{conn: conn}, "localhost", nil + } + } + } + return nil, "", errors.New("unix local syslog delivery error") +} diff --git a/daemon/logger/syslog/syslog.go b/daemon/logger/syslog/syslog.go index da61c2eab..6dea715e6 100644 --- a/daemon/logger/syslog/syslog.go +++ b/daemon/logger/syslog/syslog.go @@ -2,6 +2,9 @@ package syslog import ( "crypto/tls" + "os" + "strings" + "sync" "github.com/alibaba/pouch/daemon/logger" "github.com/alibaba/pouch/daemon/logger/loggerutils" @@ -11,16 +14,20 @@ import ( // Syslog writes the log data into syslog. type Syslog struct { - writer *srslog.Writer + mu sync.RWMutex + + opt *options + conn serverConn } type options struct { tag string proto string address string - priority srslog.Priority - formatter srslog.Formatter - framer srslog.Framer + hostname string + priority Priority + formatter Formatter + framer Framer tlsCfg *tls.Config } @@ -32,39 +39,121 @@ func defaultOptions() *options { // NewSyslog returns new Syslog based on the log config. func NewSyslog(info logger.Info) (*Syslog, error) { - opts, err := parseOptions(info) + opt, err := parseOptions(info) if err != nil { return nil, err } - var w *srslog.Writer - if opts.proto == secureProto { - w, err = srslog.DialWithTLSConfig(opts.proto, opts.address, opts.priority, opts.tag, opts.tlsCfg) - } else { - w, err = srslog.Dial(opts.proto, opts.address, opts.priority, opts.tag) - } - - if err != nil { - return nil, err - } - - w.SetFormatter(opts.formatter) - w.SetFramer(opts.framer) - return &Syslog{writer: w}, nil + opt.hostname, _ = os.Hostname() + return &Syslog{ + opt: opt, + conn: nil, + }, nil } // WriteLogMessage will write the LogMessage. func (s *Syslog) WriteLogMessage(msg *logger.LogMessage) error { line := string(msg.Line) if msg.Source == "stderr" { - return s.writer.Err(line) + return s.logError(line) } - return s.writer.Info(line) + return s.logInfo(line) } // Close closes the Syslog. func (s *Syslog) Close() error { - return s.writer.Close() + var err error + s.mu.Lock() + defer s.mu.Unlock() + if s.conn != nil { + err = s.conn.close() + s.conn = nil + } + return err +} + +// logInfo logs a content with severity LOG_INFO. +func (s *Syslog) logInfo(content string) error { + _, err := s.writeAndRetry(srslog.LOG_INFO, content) + return err +} + +// logError logs a content with severity LOG_ERR. +func (s *Syslog) logError(content string) error { + _, err := s.writeAndRetry(srslog.LOG_ERR, content) + return err +} + +// writeAndRetry takes a severity and the content to write. +// +// NOTE: Any facility passed to it as part of the severity Priority will be ignored. +func (s *Syslog) writeAndRetry(severity Priority, content string) (int, error) { + p := (s.opt.priority & facilityMask) | (severity & severityMask) + + conn := s.getConn() + if conn != nil { + if n, err := s.write(conn, p, content); err == nil { + return n, nil + } + } + + var err error + if conn, err = s.connect(); err != nil { + return 0, err + } + return s.write(conn, p, content) +} + +// write writes a syslog formatted string. +func (s *Syslog) write(conn serverConn, p Priority, content string) (int, error) { + // ensure it ends with a \n + if !strings.HasSuffix(content, "\n") { + content += "\n" + } + + err := conn.writeString(s.opt.framer, s.opt.formatter, p, s.opt.hostname, s.opt.tag, content) + if err != nil { + return 0, err + } + + return len(content), nil +} + +// connect uses current option to connect the remote host. +func (s *Syslog) connect() (serverConn, error) { + sc, hostname, err := makeDialer(s.opt.proto, s.opt.address, s.opt.tlsCfg) + if err != nil { + return nil, err + } + + s.setConn(sc, hostname) + return sc, nil +} + +// getConn returns the current serverConn. +func (s *Syslog) getConn() serverConn { + s.mu.RLock() + c := s.conn + s.mu.RUnlock() + return c +} + +// setConn updates the connection. +// +// NOTE: the Syslog takes lazy mode for connection. It might have more goroutines +// which try to connect the same remote host. If there is no close existing +// connection, it will be connection leak. +func (s *Syslog) setConn(c serverConn, hostname string) { + s.mu.Lock() + if s.conn != nil { + s.conn.close() + } + + s.conn = c + if s.opt.hostname == "" { + s.opt.hostname = hostname + } + s.mu.Unlock() } // parseOptions parses the log config into options. diff --git a/daemon/logger/syslog/syslog_test.go b/daemon/logger/syslog/syslog_test.go index 710ba9324..922e19cd3 100644 --- a/daemon/logger/syslog/syslog_test.go +++ b/daemon/logger/syslog/syslog_test.go @@ -1,13 +1,29 @@ package syslog import ( + "bufio" + "crypto/tls" + "crypto/x509" + "fmt" + "io" + "io/ioutil" + "log" + "net" + "os" + "strings" + "sync" "testing" + "time" "github.com/alibaba/pouch/daemon/logger" "github.com/RackSec/srslog" ) +type testingTB interface { + Fatalf(format string, args ...interface{}) +} + func TestParseOptions(t *testing.T) { info := logger.Info{ LogConfig: map[string]string{ @@ -48,3 +64,213 @@ func TestParseOptions(t *testing.T) { ) } } + +func TestConnectUnixSocket(t *testing.T) { + msgCh := make(chan string) + + addr, conn, wg := startStreamServer("unix", 2, msgCh) + defer func() { + conn.Close() + wg.Wait() + }() + + info := logger.Info{ + LogConfig: map[string]string{ + "syslog-address": "unix://" + addr, + }, + } + + slog, err := NewSyslog(info) + if err != nil { + t.Fatalf("failed to create Syslog: %v", err) + } + + msg := "hi" + if err := slog.logInfo(msg); err != nil { + t.Fatalf("failed to logInfo: %v", err) + } + checkUnixFormatterMessage(t, srslog.LOG_INFO|srslog.LOG_DAEMON, slog.opt.tag, msg, msgCh) +} + +func TestLazyAndRetryConnect(t *testing.T) { + msgCh := make(chan string) + + addr, conn, wg := startStreamServer("tcp", 3, msgCh) + defer func() { + conn.Close() + wg.Wait() + }() + + info := logger.Info{ + LogConfig: map[string]string{ + "syslog-address": "tcp://" + addr, + }, + } + slog, err := NewSyslog(info) + if err != nil { + t.Fatalf("failed to create Syslog: %v", err) + } + + // try to connect to the stream server + { + msg := "hi" + if err := slog.logInfo(msg); err != nil { + t.Fatalf("failed to logInfo: %v", err) + } + checkUnixFormatterMessage(t, srslog.LOG_INFO|srslog.LOG_DAEMON, slog.opt.tag, msg, msgCh) + + msg = "oops" + if err := slog.logError(msg); err != nil { + t.Fatalf("failed to logError: %v", err) + } + checkUnixFormatterMessage(t, srslog.LOG_ERR|srslog.LOG_DAEMON, slog.opt.tag, msg, msgCh) + } + + // stop the connection and retry to connect to the stream server + slog.getConn().close() + { + msg := "again+log-alert" + if _, err := slog.writeAndRetry(srslog.LOG_ALERT, msg); err != nil { + t.Fatalf("should reconnect, but got unexpected error here: %v", err) + } + checkUnixFormatterMessage(t, srslog.LOG_DAEMON|srslog.LOG_ALERT, slog.opt.tag, msg, msgCh) + } +} + +func checkUnixFormatterMessage(t testingTB, p Priority, tag, content string, msgCh <-chan string) { + var ( + msg string + ok bool + ) + + tc := time.NewTimer(1000 * time.Millisecond) + defer tc.Stop() + + select { + case msg, ok = <-msgCh: + if !ok { + t.Fatalf("failed to get message from msgCh``") + } + case <-tc.C: + t.Fatalf("failed to get message by timeout") + } + + var ( + prefixTmpl = fmt.Sprintf("<%d>", p) + suffixTmpl = fmt.Sprintf("%s[%d]: %s\n", tag, os.Getpid(), content) + ) + + if !strings.HasPrefix(msg, prefixTmpl) { + t.Fatalf("should contains prefix %s, but got %v", prefixTmpl, msg) + } + + if !strings.HasSuffix(msg, suffixTmpl) { + t.Fatalf("should contains suffix %s, but got %v", suffixTmpl, msg) + } +} + +func TestTLSDialer(t *testing.T) { + msgCh := make(chan string) + + addr, conn, _ := startStreamServer("tcp+tls", 3, msgCh) + defer conn.Close() + + pool := x509.NewCertPool() + cert, err := ioutil.ReadFile("test/ca.pem") + if err != nil { + t.Errorf("failed to read cert file: %v", err) + } + + pool.AppendCertsFromPEM(cert) + config := tls.Config{ + RootCAs: pool, + } + + _, _, err = tlsDialer(addr, &config) + if err != nil { + t.Errorf("failed to dial: %v", err) + } +} + +// startStreamServer starts stream server which holds the connection after timeout. +func startStreamServer(proto string, readTimeout int, msgCh chan<- string) (addr string, conn io.Closer, drainWg *sync.WaitGroup) { + if proto != "tcp" && proto != "tcp+tls" && proto != "unix" { + log.Fatalf("not support %s", proto) + } + + var ( + li net.Listener + err error + cert tls.Certificate + ) + + // 127.0.0.1:0 will use random available port + addr = "127.0.0.1:0" + if proto == "unix" { + addr = randomUnixSocketName() + } + + if proto == "tcp+tls" { + cert, err = tls.LoadX509KeyPair("test/ca.pem", "test/ca-key.pem") + if err != nil { + log.Fatalf("failed to load TLS keypair: %v", err) + } + + config := tls.Config{Certificates: []tls.Certificate{cert}} + li, err = tls.Listen("tcp", addr, &config) + if err != nil { + log.Fatalf("failed to listen on %s: %v", addr, err) + } + } else { + li, err = net.Listen(proto, addr) + if err != nil { + log.Fatalf("failed to listen on %s: %v", addr, err) + } + } + + addr = li.Addr().String() + conn = li + drainWg = new(sync.WaitGroup) + + go func() { + for { + var c net.Conn + var err error + + if c, err = li.Accept(); err != nil { + return + } + + drainWg.Add(1) + go func(c net.Conn) { + defer drainWg.Done() + + c.SetReadDeadline(time.Now().Add((time.Duration(readTimeout) * time.Second))) + b := bufio.NewReader(c) + + for { + s, err := b.ReadString('\n') + if err != nil { + break + } + msgCh <- s + } + c.Close() + }(c) + } + }() + return +} + +// randomUnixSocketName uses TempFile to create random file name. +func randomUnixSocketName() (name string) { + f, err := ioutil.TempFile("", "syslog-test-") + if err != nil { + log.Fatal("TempFile: ", err) + } + + name = f.Name() + f.Close() + os.Remove(name) + return +} diff --git a/daemon/logger/syslog/test/ca-key.pem b/daemon/logger/syslog/test/ca-key.pem new file mode 100644 index 000000000..a60cbccfe --- /dev/null +++ b/daemon/logger/syslog/test/ca-key.pem @@ -0,0 +1,28 @@ +-----BEGIN PRIVATE KEY----- +MIIEvAIBADANBgkqhkiG9w0BAQEFAASCBKYwggSiAgEAAoIBAQC7A6isTSj2GbZm +K9M677OdHDKxb/2dNsojOrpqUXpzLjInvevWl3l86BP3ZmTesifPpwCZeHiHqCDL +Yq3lXc+P1ChxvYG2HJ6uj1U4su+NY1MRsYKaYrFEvcy6XwJgx45el9YmZd8LRZ3X +eaJJBELi5xXAAJeQO29OHnZK85+s8nMqofMQ9ADBTugCikQ2EzbaDHHA0SNFbvnd +3C13t+b6oh97JOFs61jxAQN4Jylqe8v3+3JDw/XOjVHRWSNVYJzszb4YfnbyLh2+ +3Zo/DAPHjBa7x7sKNe7zIqKLlspHegBBBerOLPlkTzoD4jHkoyDKhyzboV6sLuyU +L5JYxhEXAgMBAAECggEALu98JcI3xYwuevYEzYXbTlMFZgL1Y7+ce7sBji1kzgOK +Eu9XgUQC+ZZFbJIGPt33n+YKAHp7xZAsqt7ZRfeFYFOe0uz84PfN24Gdod1GwfVr +o8UpQhYC5327JAs0TZXH5XW2CW0HzYxrUOj2Ed0/7DKTAt0jLM+9351FkwoRyRWh +nwqRezDHhwJK8BNkD7J+5yLGqy061w43iBwBxliy0u46c0Z5dawsQs5ZjMy0P+2k +6sWpEkODPUIfQas0tVzDf0yZu5K7s1ZK9Mk1OY5mNiWIKJi9yR2gSH6T0Xj+D2dQ +zxxNo3WnWi1lfcPhrIOEQfOqCOv6lciYAQO82OjMAQKBgQD4Gw1JJq0ij663RRHj +88Z0dI80aWwqq5OgYIIcigSOyjTn4zv2P+PUfMazG8M8j27fNUGk6hgoJdeT1wP0 +dCtyRXsnkDWOrXLPEdGqhokGxdXXkH9O2mFcuAnSHQsZGWYPiaSyMurbYJSsk6iy +LW5YsKsUd9/MRTqKIYQeoILqaQKBgQDA9vx//SrvQWkgBLfQRe7l24CC2hKyzoJ4 +BYO/tYGlNOSKvr5ixNkrEbJm2U3+lsmcsSEWsKKA8laq/ieDCsW2qw52h5e8WF+x +HkoJk5lnhQP4ObZkr+baWn36BMI8gStYde44nDQA4Ym/kXLIrKqPPNlrHq6XWvOs +Aqx7PiavfwKBgDmVtciztF2gMbIR0uwwiXBAGXjVuyhQOGxx2eEb26D0p8DmieVG +wO9xooCxwefNdrNR3hGTz9WJqvaCYkWbrPXR0JrHKtcZxNrwzNQic61hv5dPz2yS +3SQzBvyAhzHlJj4W+WXu0XnnebpwbygjyUfGgX63r/buXn7u9oZRRPF5AoGAetwG +8nq/3mvYQiGe40XTkJhMX4P3ic9AdKiWNPac4BIoPUqrL3nEB88Rtrg/F4zsOw7w +UWEPZmhl1CNUBvRR5DdPUxqL+siNzNjLLQPEFYw0ddRRFBSHOJmLQifLf/iCH6We +kn6qAbGYIde6rJh7VA3bJH/5i7bGFMvzA9kTioMCgYAm3hwQTZrE/w3p2m58Nko+ +dPxppJQWuWGakESVZdDEHSTPxLnQc1WSfz/wDcf9mw+T3QEkw9rF/Ji2yVkwZxQU +Z91ErLjpq9wuU7QRjnCSIHdo5TODTBE2uB6BmC/ifyta6UVI1UBD1WY+PLwHioZQ +Ja/1jATMJ+q671L0NDJffA== +-----END PRIVATE KEY----- diff --git a/daemon/logger/syslog/test/ca.pem b/daemon/logger/syslog/test/ca.pem new file mode 100644 index 000000000..bceabab0f --- /dev/null +++ b/daemon/logger/syslog/test/ca.pem @@ -0,0 +1,18 @@ +-----BEGIN CERTIFICATE----- +MIIC4TCCAcmgAwIBAgIRAOKBgOdGz0PNmDDiKrSWDkswDQYJKoZIhvcNAQELBQAw +FDESMBAGA1UEAwwJMTI3LjAuMC4xMB4XDTE4MDgwODE0MzEyMFoXDTE4MTIzMTAw +MDAwMFowFDESMBAGA1UEAwwJMTI3LjAuMC4xMIIBIjANBgkqhkiG9w0BAQEFAAOC +AQ8AMIIBCgKCAQEAuwOorE0o9hm2ZivTOu+znRwysW/9nTbKIzq6alF6cy4yJ73r +1pd5fOgT92Zk3rInz6cAmXh4h6ggy2Kt5V3Pj9Qocb2Bthyero9VOLLvjWNTEbGC +mmKxRL3Mul8CYMeOXpfWJmXfC0Wd13miSQRC4ucVwACXkDtvTh52SvOfrPJzKqHz +EPQAwU7oAopENhM22gxxwNEjRW753dwtd7fm+qIfeyThbOtY8QEDeCcpanvL9/ty +Q8P1zo1R0VkjVWCc7M2+GH528i4dvt2aPwwDx4wWu8e7CjXu8yKii5bKR3oAQQXq +ziz5ZE86A+Ix5KMgyocs26FerC7slC+SWMYRFwIDAQABoy4wLDAPBgNVHREECDAG +hwR/AAABMAsGA1UdDwQEAwIBrjAMBgNVHRMEBTADAQH/MA0GCSqGSIb3DQEBCwUA +A4IBAQCUNiaaHp9A5Ls0Sk8Xm3K0CzWoaZFofHfrRAl+9Walyl11miKiWXXX2c9J +TtrqGQHM0vmbgS1DEqkPY8HNDkgtU6Q4Q2F2lOyiB98Zs1n5bAq4P5t/5Pu9OAft +QwPKDyOZKKX2RRg4y04m55Bl22l+Gk7xso6Il+jU3jjRzO5yiTTJ/i3HUz6AZ4oV +o174HluFMx32I/fu4E1lpQBQN8aw+380Y342ljRDTrZBJo89cwBwbJNmV1Eqqd8P +DLiBQFDdRC9qEK+0huFVH+yyJfIb+3AR9GwJ3esQqfjTH1ZccQKi8KIBG7/X0dgv +1KRviUTwb+/4AtzX4NGVjYxfFE6M +-----END CERTIFICATE----- diff --git a/test/cli_run_syslog_test.go b/test/cli_run_syslog_test.go new file mode 100644 index 000000000..a7444d86a --- /dev/null +++ b/test/cli_run_syslog_test.go @@ -0,0 +1,143 @@ +package main + +import ( + "bufio" + "fmt" + "io" + "net" + "strings" + "time" + + "github.com/alibaba/pouch/test/command" + "github.com/alibaba/pouch/test/environment" + "github.com/gotestyourself/gotestyourself/icmd" + + "github.com/go-check/check" +) + +// PouchRunSyslogSuite is the test suite for run CLI. +type PouchRunSyslogSuite struct{} + +func init() { + check.Suite(&PouchRunSyslogSuite{}) +} + +// SetUpSuite does common setup in the beginning of each test suite. +func (suite *PouchRunSyslogSuite) SetUpSuite(c *check.C) { + SkipIfFalse(c, environment.IsLinux) + + PullImage(c, busyboxImage) +} + +func (suite *PouchRunSyslogSuite) TestRunRFC5424MicroSeq(c *check.C) { + msgCh := make(chan string) + addr, conn := suite.startTCPServer(c, msgCh) + defer conn.Close() + + type tCase struct { + env string // for container + optTag string + optEnv string + expectedTag string + } + + cname := "test-syslog-Basic" + for i, tc := range []tCase{ + { + env: "POUCH_VERSION=ga", + optTag: "{{.POUCH_VERSION}}", + optEnv: "POUCH_VERSION", + expectedTag: "ga", + }, + { + env: "POUCH_BUILD=unknow", + optTag: "{{.POUCH_VERSION}}", + optEnv: "POUCH_BUILD", + expectedTag: "", + }, + } { + name := fmt.Sprintf("%s-%d", cname, i+1) + + command.PouchRun("run", "-d", + "--name", name, + "--log-driver", "syslog", + "--log-opt", "syslog-address=tcp://"+addr, + "--log-opt", fmt.Sprintf("tag={{with .ExtraAttributes nil}}%s{{end}}", tc.optTag), + "--log-opt", fmt.Sprintf("env=%s", tc.optEnv), + "--log-opt", "syslog-format=rfc5424micro-seq", + "--env", tc.env, + busyboxImage, "echo", name, + ).Assert(c, icmd.Success) + defer DelContainerForceMultyTime(c, name) + + // rfc5424micro-seq will has the suffix template like "{{tag}} - {{content}} + c.Assert(suite.checkMessage(fmt.Sprintf("%s - %s\n", tc.expectedTag, name), msgCh), check.IsNil) + } +} + +func (suite *PouchRunSyslogSuite) checkMessage(expected string, msgCh <-chan string) error { + var ( + msg string + ok bool + ) + + tc := time.NewTimer(1000 * time.Millisecond) + defer tc.Stop() + + select { + case msg, ok = <-msgCh: + if !ok { + return fmt.Errorf("failed to get message from msgCh") + } + case <-tc.C: + return fmt.Errorf("failed to get message by timeout") + } + + if !strings.HasSuffix(msg, expected) { + return fmt.Errorf("expected has suffix %s, but got %s", expected, msg) + } + return nil +} + +func (suite *PouchRunSyslogSuite) startTCPServer(t testingTB, msgCh chan<- string) (addr string, conn io.Closer) { + var ( + li net.Listener + err error + ) + + // 127.0.0.1:0 will use random available port + addr = "127.0.0.1:0" + li, err = net.Listen("tcp", addr) + if err != nil { + t.Fatalf("failed to listen on %s: %v", addr, err) + } + + addr = li.Addr().String() + conn = li + + go func() { + for { + var c net.Conn + var err error + + if c, err = li.Accept(); err != nil { + return + } + + go func(c net.Conn) { + c.SetReadDeadline(time.Now().Add(5 * time.Second)) + b := bufio.NewReader(c) + + for { + s, err := b.ReadString('\n') + if err != nil { + break + } + msgCh <- s + } + c.Close() + }(c) + } + }() + return +}