Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chg: [redispub] reading redis responses #411

Merged
merged 2 commits into from
Oct 13, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 42 additions & 11 deletions loggers/redispub.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"bytes"
"crypto/tls"
"encoding/json"
"fmt"
"errors"
"net"
"strconv"
"strings"
Expand All @@ -21,6 +21,8 @@ type RedisPub struct {
doneProcess chan bool
stopRun chan bool
doneRun chan bool
stopReceive chan bool
doneReceive chan bool
inputChan chan dnsutils.DnsMessage
outputChan chan dnsutils.DnsMessage
config *dnsutils.Config
Expand Down Expand Up @@ -92,6 +94,10 @@ func (o *RedisPub) Stop() {
o.LogInfo("stopping to process...")
o.stopProcess <- true
<-o.doneProcess

o.LogInfo("stopping to receive...")
o.stopReceive <- true
<-o.doneReceive
}

func (o *RedisPub) Disconnect() {
Expand All @@ -101,6 +107,30 @@ func (o *RedisPub) Disconnect() {
}
}

func (o *RedisPub) ReadFromConnection() {
buf := make([]byte, 4096)

for {
select {
// Stop signal received, exit the goroutine
case <-o.stopReceive:
o.doneReceive <- true
return
default:
_, err := o.transportConn.Read(buf)
if err != nil {
var netErr net.Error
if errors.As(err, &netErr) && netErr.Timeout() {
continue
}
o.LogError("Error reading from connection: %s", err.Error())
return
}
// We just discard the data
}
}
}

func (o *RedisPub) ConnectToRemote() {
// prepare the address
var address string
Expand Down Expand Up @@ -154,7 +184,13 @@ func (o *RedisPub) ConnectToRemote() {
}

func (o *RedisPub) FlushBuffer(buf *[]dnsutils.DnsMessage) {
// create escaping buffer
escape_buffer := new(bytes.Buffer)
// create a new encoder that writes to the buffer
encoder := json.NewEncoder(escape_buffer)

for _, dm := range *buf {
escape_buffer.Reset()

cmd := "PUBLISH " + strconv.Quote(o.config.Loggers.RedisPub.RedisChannel) + " "
o.transportWriter.WriteString(cmd)
Expand All @@ -164,15 +200,9 @@ func (o *RedisPub) FlushBuffer(buf *[]dnsutils.DnsMessage) {
o.transportWriter.WriteString(o.config.Loggers.RedisPub.PayloadDelimiter)
}

// Create escaping buffer
buf := new(bytes.Buffer)
// Create a new encoder that writes to the buffer
encoder := json.NewEncoder(buf)

if o.config.Loggers.RedisPub.Mode == dnsutils.MODE_JSON {
encoder.Encode(dm)
escapedData := strconv.Quote(buf.String())
o.transportWriter.WriteString(escapedData)
o.transportWriter.WriteString(strconv.Quote(escape_buffer.String()))
o.transportWriter.WriteString(o.config.Loggers.RedisPub.PayloadDelimiter)
}

Expand All @@ -183,8 +213,7 @@ func (o *RedisPub) FlushBuffer(buf *[]dnsutils.DnsMessage) {
continue
}
encoder.Encode(flat)
escapedData := strconv.Quote(buf.String())
o.transportWriter.WriteString(escapedData)
o.transportWriter.WriteString(strconv.Quote(escape_buffer.String()))
o.transportWriter.WriteString(o.config.Loggers.RedisPub.PayloadDelimiter)
}

Expand Down Expand Up @@ -268,6 +297,8 @@ PROCESS_LOOP:
o.LogInfo("transport connected with success")
o.transportWriter = bufio.NewWriter(o.transportConn)
o.writerReady = true
// read from the connection until we stop
go o.ReadFromConnection()

// incoming dns message to process
case dm, opened := <-o.outputChan:
Expand All @@ -293,7 +324,7 @@ PROCESS_LOOP:
// flush the buffer
case <-flushTimer.C:
if !o.writerReady {
fmt.Println("buffer cleared!")
o.LogInfo("Buffer cleared!")
bufferDm = nil
continue
}
Expand Down