From 1905f40322cf032ea24b126b6c02beeb296ab6a9 Mon Sep 17 00:00:00 2001 From: Jean-Louis Huynen Date: Thu, 5 Oct 2023 15:59:38 +0200 Subject: [PATCH 1/2] chg: [redispub] reading redis responses --- loggers/redispub.go | 36 ++++++++++++++++++++++++++++++++++-- 1 file changed, 34 insertions(+), 2 deletions(-) diff --git a/loggers/redispub.go b/loggers/redispub.go index 5f95142a..7c08b2bd 100644 --- a/loggers/redispub.go +++ b/loggers/redispub.go @@ -5,7 +5,7 @@ import ( "bytes" "crypto/tls" "encoding/json" - "fmt" + "errors" "net" "strconv" "strings" @@ -21,6 +21,8 @@ type RedisPub struct { doneProcess chan bool stopRun chan bool doneRun chan bool + stopReceive chan bool + doneReceive chan bool inputChan chan dnsutils.DnsMessage outputChan chan dnsutils.DnsMessage config *dnsutils.Config @@ -92,6 +94,10 @@ func (o *RedisPub) Stop() { o.LogInfo("stopping to process...") o.stopProcess <- true <-o.doneProcess + + o.LogInfo("stopping to receive...") + o.stopReceive <- true + <-o.doneReceive } func (o *RedisPub) Disconnect() { @@ -101,6 +107,30 @@ func (o *RedisPub) Disconnect() { } } +func (o *RedisPub) ReadFromConnection() { + buf := make([]byte, 4096) + + for { + select { + // Stop signal received, exit the goroutine + case <-o.stopReceive: + o.doneReceive <- true + return + default: + _, err := o.transportConn.Read(buf) + if err != nil { + var netErr net.Error + if errors.As(err, &netErr) && netErr.Timeout() { + continue + } + o.LogError("Error reading from connection: %s", err.Error()) + return + } + // We just discard the data + } + } +} + func (o *RedisPub) ConnectToRemote() { // prepare the address var address string @@ -268,6 +298,8 @@ PROCESS_LOOP: o.LogInfo("transport connected with success") o.transportWriter = bufio.NewWriter(o.transportConn) o.writerReady = true + // read from the connection until we stop + go o.ReadFromConnection() // incoming dns message to process case dm, opened := <-o.outputChan: @@ -293,7 +325,7 @@ PROCESS_LOOP: // flush the buffer case <-flushTimer.C: if !o.writerReady { - fmt.Println("buffer cleared!") + o.LogInfo("Buffer cleared!") bufferDm = nil continue } From 5634d1d660e7a35e64db4483704952e52135ad93 Mon Sep 17 00:00:00 2001 From: Jean-Louis Huynen Date: Wed, 11 Oct 2023 08:59:54 +0200 Subject: [PATCH 2/2] chg: [redispub] avoid allocating in flusher loop --- loggers/redispub.go | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/loggers/redispub.go b/loggers/redispub.go index 7c08b2bd..85ac63a8 100644 --- a/loggers/redispub.go +++ b/loggers/redispub.go @@ -184,7 +184,13 @@ func (o *RedisPub) ConnectToRemote() { } func (o *RedisPub) FlushBuffer(buf *[]dnsutils.DnsMessage) { + // create escaping buffer + escape_buffer := new(bytes.Buffer) + // create a new encoder that writes to the buffer + encoder := json.NewEncoder(escape_buffer) + for _, dm := range *buf { + escape_buffer.Reset() cmd := "PUBLISH " + strconv.Quote(o.config.Loggers.RedisPub.RedisChannel) + " " o.transportWriter.WriteString(cmd) @@ -194,15 +200,9 @@ func (o *RedisPub) FlushBuffer(buf *[]dnsutils.DnsMessage) { o.transportWriter.WriteString(o.config.Loggers.RedisPub.PayloadDelimiter) } - // Create escaping buffer - buf := new(bytes.Buffer) - // Create a new encoder that writes to the buffer - encoder := json.NewEncoder(buf) - if o.config.Loggers.RedisPub.Mode == dnsutils.MODE_JSON { encoder.Encode(dm) - escapedData := strconv.Quote(buf.String()) - o.transportWriter.WriteString(escapedData) + o.transportWriter.WriteString(strconv.Quote(escape_buffer.String())) o.transportWriter.WriteString(o.config.Loggers.RedisPub.PayloadDelimiter) } @@ -213,8 +213,7 @@ func (o *RedisPub) FlushBuffer(buf *[]dnsutils.DnsMessage) { continue } encoder.Encode(flat) - escapedData := strconv.Quote(buf.String()) - o.transportWriter.WriteString(escapedData) + o.transportWriter.WriteString(strconv.Quote(escape_buffer.String())) o.transportWriter.WriteString(o.config.Loggers.RedisPub.PayloadDelimiter) }