diff --git a/loggers/redispub.go b/loggers/redispub.go index 5f95142a..85ac63a8 100644 --- a/loggers/redispub.go +++ b/loggers/redispub.go @@ -5,7 +5,7 @@ import ( "bytes" "crypto/tls" "encoding/json" - "fmt" + "errors" "net" "strconv" "strings" @@ -21,6 +21,8 @@ type RedisPub struct { doneProcess chan bool stopRun chan bool doneRun chan bool + stopReceive chan bool + doneReceive chan bool inputChan chan dnsutils.DnsMessage outputChan chan dnsutils.DnsMessage config *dnsutils.Config @@ -92,6 +94,10 @@ func (o *RedisPub) Stop() { o.LogInfo("stopping to process...") o.stopProcess <- true <-o.doneProcess + + o.LogInfo("stopping to receive...") + o.stopReceive <- true + <-o.doneReceive } func (o *RedisPub) Disconnect() { @@ -101,6 +107,30 @@ func (o *RedisPub) Disconnect() { } } +func (o *RedisPub) ReadFromConnection() { + buf := make([]byte, 4096) + + for { + select { + // Stop signal received, exit the goroutine + case <-o.stopReceive: + o.doneReceive <- true + return + default: + _, err := o.transportConn.Read(buf) + if err != nil { + var netErr net.Error + if errors.As(err, &netErr) && netErr.Timeout() { + continue + } + o.LogError("Error reading from connection: %s", err.Error()) + return + } + // We just discard the data + } + } +} + func (o *RedisPub) ConnectToRemote() { // prepare the address var address string @@ -154,7 +184,13 @@ func (o *RedisPub) ConnectToRemote() { } func (o *RedisPub) FlushBuffer(buf *[]dnsutils.DnsMessage) { + // create escaping buffer + escape_buffer := new(bytes.Buffer) + // create a new encoder that writes to the buffer + encoder := json.NewEncoder(escape_buffer) + for _, dm := range *buf { + escape_buffer.Reset() cmd := "PUBLISH " + strconv.Quote(o.config.Loggers.RedisPub.RedisChannel) + " " o.transportWriter.WriteString(cmd) @@ -164,15 +200,9 @@ func (o *RedisPub) FlushBuffer(buf *[]dnsutils.DnsMessage) { o.transportWriter.WriteString(o.config.Loggers.RedisPub.PayloadDelimiter) } - // Create escaping buffer - buf := new(bytes.Buffer) - // Create a new encoder that writes to the buffer - encoder := json.NewEncoder(buf) - if o.config.Loggers.RedisPub.Mode == dnsutils.MODE_JSON { encoder.Encode(dm) - escapedData := strconv.Quote(buf.String()) - o.transportWriter.WriteString(escapedData) + o.transportWriter.WriteString(strconv.Quote(escape_buffer.String())) o.transportWriter.WriteString(o.config.Loggers.RedisPub.PayloadDelimiter) } @@ -183,8 +213,7 @@ func (o *RedisPub) FlushBuffer(buf *[]dnsutils.DnsMessage) { continue } encoder.Encode(flat) - escapedData := strconv.Quote(buf.String()) - o.transportWriter.WriteString(escapedData) + o.transportWriter.WriteString(strconv.Quote(escape_buffer.String())) o.transportWriter.WriteString(o.config.Loggers.RedisPub.PayloadDelimiter) } @@ -268,6 +297,8 @@ PROCESS_LOOP: o.LogInfo("transport connected with success") o.transportWriter = bufio.NewWriter(o.transportConn) o.writerReady = true + // read from the connection until we stop + go o.ReadFromConnection() // incoming dns message to process case dm, opened := <-o.outputChan: @@ -293,7 +324,7 @@ PROCESS_LOOP: // flush the buffer case <-flushTimer.C: if !o.writerReady { - fmt.Println("buffer cleared!") + o.LogInfo("Buffer cleared!") bufferDm = nil continue }