From d8ef0812ffdacdb10bc98af8e4cd340891cd6e7e Mon Sep 17 00:00:00 2001 From: Denis Machard <5562930+dmachard@users.noreply.github.com> Date: Sat, 20 Jan 2024 20:42:03 +0100 Subject: [PATCH] kafka, redis, fluentd and tcpclient loggers: no flush occurred on specific conditions for on connection attempts (#552) * Fix flush buffer properly * Add test conn attempt * Update README.md --- README.md | 2 +- loggers/fluentd.go | 1 - loggers/kafkaproducer.go | 2 -- loggers/redispub.go | 2 -- loggers/tcpclient.go | 2 -- loggers/tcpclient_test.go | 58 +++++++++++++++++++++++++++++++++++++++ 6 files changed, 59 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index 8fa56d0d..6c97af5e 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,7 @@
- +
diff --git a/loggers/fluentd.go b/loggers/fluentd.go index 3553ed5c..c2ce6722 100644 --- a/loggers/fluentd.go +++ b/loggers/fluentd.go @@ -362,7 +362,6 @@ PROCESS_LOOP: case <-flushTimer.C: if !fc.writerReady { bufferDm = nil - continue } if len(bufferDm) > 0 { diff --git a/loggers/kafkaproducer.go b/loggers/kafkaproducer.go index 89d5ecb3..b68c4652 100644 --- a/loggers/kafkaproducer.go +++ b/loggers/kafkaproducer.go @@ -380,9 +380,7 @@ PROCESS_LOOP: // flush the buffer case <-flushTimer.C: if !k.kafkaConnected { - k.LogInfo("buffer cleared!") bufferDm = nil - continue } if len(bufferDm) > 0 { diff --git a/loggers/redispub.go b/loggers/redispub.go index 5b4e8c29..d949d30e 100644 --- a/loggers/redispub.go +++ b/loggers/redispub.go @@ -389,9 +389,7 @@ PROCESS_LOOP: // flush the buffer case <-flushTimer.C: if !c.writerReady { - c.LogInfo("Buffer cleared!") bufferDm = nil - continue } if len(bufferDm) > 0 { diff --git a/loggers/tcpclient.go b/loggers/tcpclient.go index d8c9cac1..70ac854d 100644 --- a/loggers/tcpclient.go +++ b/loggers/tcpclient.go @@ -378,9 +378,7 @@ PROCESS_LOOP: // flush the buffer case <-flushTimer.C: if !c.writerReady { - c.LogInfo("buffer cleared!") bufferDm = nil - continue } if len(bufferDm) > 0 { diff --git a/loggers/tcpclient_test.go b/loggers/tcpclient_test.go index 8e2c0371..27fb5354 100644 --- a/loggers/tcpclient_test.go +++ b/loggers/tcpclient_test.go @@ -86,3 +86,61 @@ func Test_TcpClientRun(t *testing.T) { }) } } + +func Test_TcpClient_ConnectionAttempt(t *testing.T) { + // init logger + cfg := pkgconfig.GetFakeConfig() + cfg.Loggers.TCPClient.FlushInterval = 1 + cfg.Loggers.TCPClient.Mode = pkgconfig.ModeText + cfg.Loggers.TCPClient.RemoteAddress = "127.0.0.1" + cfg.Loggers.TCPClient.RemotePort = 9999 + cfg.Loggers.TCPClient.ConnectTimeout = 1 + cfg.Loggers.TCPClient.RetryInterval = 2 + + g := NewTCPClient(cfg, logger.New(true), "test") + + // start the logger + go g.Run() + + // just way to get connect attempt + time.Sleep(time.Second * 3) + + // start receiver + fakeRcvr, err := net.Listen(netlib.SocketTCP, ":9999") + if err != nil { + t.Fatal(err) + } + defer fakeRcvr.Close() + + // accept conn from logger + conn, err := fakeRcvr.Accept() + if err != nil { + return + } + defer conn.Close() + + // wait connection on logger + time.Sleep(time.Second) + + // send fake dns message to logger + dm := dnsutils.GetFakeDNSMessage() + g.GetInputChannel() <- dm + + // read data on server side and decode-it + reader := bufio.NewReader(conn) + line, err := reader.ReadString('\n') + if err != nil { + t.Error(err) + return + } + + pattern := regexp.MustCompile("dns.collector") + if !pattern.MatchString(line) { + t.Errorf("tcp error want dns.collector, got: %s", line) + } + + // stop all + fakeRcvr.Close() + g.Stop() + +}