Skip to content

Commit

Permalink
fix to avoid deadlock situations in multiplexer mode - regression (#569)
Browse files Browse the repository at this point in the history
* fix to avoid deadlock situations
  • Loading branch information
dmachard authored Jan 24, 2024
1 parent 1c20809 commit 83be5c3
Show file tree
Hide file tree
Showing 9 changed files with 423 additions and 33 deletions.
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,10 @@ tests: check-go
@go test ./pkglinker/ -race -cover -v
@go test ./dnsutils/ -race -cover -v
@go test ./netlib/ -race -cover -v
@go test -timeout 30s ./transformers/ -race -cover -v
@go test -timeout 30s ./collectors/ -race -cover -v
@go test -timeout 60s ./transformers/ -race -cover -v
@go test -timeout 60s ./collectors/ -race -cover -v
@go test -timeout 90s ./loggers/ -race -cover -v
@go test -timeout 30s ./processors/ -race -cover -v
@go test -timeout 60s ./processors/ -race -cover -v

# Cleans the project using go clean.
clean: check-go
Expand Down
9 changes: 9 additions & 0 deletions loggers/fakelogger.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,15 @@ func NewFakeLogger() *FakeLogger {
return o
}

func NewFakeLoggerWithBufferSize(bufferSize int) *FakeLogger {
o := &FakeLogger{
inputChan: make(chan dnsutils.DNSMessage, bufferSize),
outputChan: make(chan dnsutils.DNSMessage, bufferSize),
name: "fake",
}
return o
}

func (c *FakeLogger) GetName() string { return c.name }

func (c *FakeLogger) AddDefaultRoute(wrk pkgutils.Worker) {}
Expand Down
8 changes: 8 additions & 0 deletions processors/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package processors

const (
ExpectedQname = "dnscollector.dev"
ExpectedBufferMsg511 = ".*buffer is full, 511.*"
ExpectedBufferMsg1023 = ".*buffer is full, 1023.*"
ExpectedIdentity = "powerdnspb"
)
57 changes: 55 additions & 2 deletions processors/dns.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ type DNSProcessor struct {
ConfigChan chan *pkgconfig.Config
name string
RoutingHandler pkgutils.RoutingHandler
dropped chan string
droppedCount map[string]int
}

func NewDNSProcessor(config *pkgconfig.Config, logger *logger.Logger, name string, size int) DNSProcessor {
Expand All @@ -43,6 +45,8 @@ func NewDNSProcessor(config *pkgconfig.Config, logger *logger.Logger, name strin
config: config,
ConfigChan: make(chan *pkgconfig.Config),
name: name,
dropped: make(chan string),
droppedCount: map[string]int{},
RoutingHandler: pkgutils.NewRoutingHandler(config, logger, name),
}
return d
Expand Down Expand Up @@ -83,6 +87,9 @@ func (d *DNSProcessor) Run(defaultWorkers []pkgutils.Worker, droppedworkers []pk
// prepare enabled transformers
transforms := transformers.NewTransforms(&d.config.IngoingTransformers, d.logger, d.name, defaultRoutes, 0)

// start goroutine to count dropped messsages
go d.MonitorLoggers()

// read incoming dns message
d.LogInfo("waiting dns message to process...")
RUN_LOOP:
Expand Down Expand Up @@ -145,17 +152,63 @@ RUN_LOOP:

// apply all enabled transformers
if transforms.ProcessMessage(&dm) == transformers.ReturnDrop {
d.RoutingHandler.SendTo(droppedRoutes, droppedNames, dm)
for i := range droppedRoutes {
select {
case droppedRoutes[i] <- dm: // Successful send to logger channel
default:
d.dropped <- droppedNames[i]
}
}
continue
}

// convert latency to human
dm.DNSTap.LatencySec = fmt.Sprintf("%.6f", dm.DNSTap.Latency)

// dispatch dns message to all generators
d.RoutingHandler.SendTo(defaultRoutes, defaultNames, dm)
for i := range defaultRoutes {
select {
case defaultRoutes[i] <- dm: // Successful send to logger channel
default:
d.dropped <- defaultNames[i]
}
}

}
}
d.LogInfo("processing terminated")
}

func (d *DNSProcessor) MonitorLoggers() {
watchInterval := 10 * time.Second
bufferFull := time.NewTimer(watchInterval)
FOLLOW_LOOP:
for {
select {
case <-d.stopMonitor:
close(d.dropped)
bufferFull.Stop()
d.doneMonitor <- true
break FOLLOW_LOOP

case loggerName := <-d.dropped:
if _, ok := d.droppedCount[loggerName]; !ok {
d.droppedCount[loggerName] = 1
} else {
d.droppedCount[loggerName]++
}

case <-bufferFull.C:

for v, k := range d.droppedCount {
if k > 0 {
d.LogError("logger[%s] buffer is full, %d packet(s) dropped", v, k)
d.droppedCount[v] = 0
}
}
bufferFull.Reset(watchInterval)

}
}
d.LogInfo("monitor terminated")
}
63 changes: 62 additions & 1 deletion processors/dns_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ package processors

import (
"bytes"
"fmt"
"regexp"
"testing"
"time"

"github.com/dmachard/go-dnscollector/dnsutils"
"github.com/dmachard/go-dnscollector/loggers"
Expand All @@ -27,8 +30,66 @@ func Test_DnsProcessor(t *testing.T) {

// read dns message from dnstap consumer
dmOut := <-fl.GetInputChannel()
if dmOut.DNS.Qname != "dnscollector.dev" {
if dmOut.DNS.Qname != ExpectedQname {
t.Errorf("invalid qname in dns message: %s", dm.DNS.Qname)
}
}

func Test_DnsProcessor_BufferLoggerIsFull(t *testing.T) {
// redirect stdout output to bytes buffer
logsChan := make(chan logger.LogEntry, 10)
lg := logger.New(true)
lg.SetOutputChannel((logsChan))

// init and run the dns processor
consumer := NewDNSProcessor(pkgconfig.GetFakeConfig(), lg, "test", 512)

fl := loggers.NewFakeLoggerWithBufferSize(1)
go consumer.Run([]pkgutils.Worker{fl}, []pkgutils.Worker{fl})

dm := dnsutils.GetFakeDNSMessageWithPayload()

// add packets to consumer
for i := 0; i < 512; i++ {
consumer.GetChannel() <- dm
}

// waiting monitor to run in consumer
time.Sleep(12 * time.Second)

for entry := range logsChan {
fmt.Println(entry)
pattern := regexp.MustCompile(ExpectedBufferMsg511)
if pattern.MatchString(entry.Message) {
break
}
}

// read dns message from dnstap consumer
dmOut := <-fl.GetInputChannel()
if dmOut.DNS.Qname != ExpectedQname {
t.Errorf("invalid qname in dns message: %s", dmOut.DNS.Qname)
}

// send second shot of packets to consumer
for i := 0; i < 1024; i++ {
consumer.GetChannel() <- dm
}

// waiting monitor to run in consumer
time.Sleep(12 * time.Second)

for entry := range logsChan {
fmt.Println(entry)
pattern := regexp.MustCompile(ExpectedBufferMsg1023)
if pattern.MatchString(entry.Message) {
break
}
}

// read dns message from dnstap consumer
dm2 := <-fl.GetInputChannel()
if dm2.DNS.Qname != ExpectedQname {
t.Errorf("invalid qname in second dns message: %s", dm2.DNS.Qname)
}
}
80 changes: 65 additions & 15 deletions processors/dnstap.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,25 +64,27 @@ type DNSTapProcessor struct {
name string
chanSize int
RoutingHandler pkgutils.RoutingHandler
dropped chan string
droppedCount map[string]int
}

func NewDNSTapProcessor(connID int, config *pkgconfig.Config, logger *logger.Logger, name string, size int) DNSTapProcessor {
logger.Info("[%s] processor=dnstap#%d - initialization...", name, connID)

d := DNSTapProcessor{
ConnID: connID,
doneMonitor: make(chan bool),
doneRun: make(chan bool),
stopMonitor: make(chan bool),
stopRun: make(chan bool),
recvFrom: make(chan []byte, size),
chanSize: size,
logger: logger,
config: config,
ConfigChan: make(chan *pkgconfig.Config),
name: name,
// dropped: make(chan string),
// droppedCount: map[string]int{},
ConnID: connID,
doneMonitor: make(chan bool),
doneRun: make(chan bool),
stopMonitor: make(chan bool),
stopRun: make(chan bool),
recvFrom: make(chan []byte, size),
chanSize: size,
logger: logger,
config: config,
ConfigChan: make(chan *pkgconfig.Config),
name: name,
dropped: make(chan string),
droppedCount: map[string]int{},
RoutingHandler: pkgutils.NewRoutingHandler(config, logger, name),
}

Expand Down Expand Up @@ -133,6 +135,9 @@ func (d *DNSTapProcessor) Run(defaultWorkers []pkgutils.Worker, droppedworkers [
// prepare enabled transformers
transforms := transformers.NewTransforms(&d.config.IngoingTransformers, d.logger, d.name, defaultRoutes, d.ConnID)

// start goroutine to count dropped messsages
go d.MonitorLoggers()

// read incoming dns message
d.LogInfo("waiting dns message to process...")
RUN_LOOP:
Expand Down Expand Up @@ -293,18 +298,63 @@ RUN_LOOP:

// apply all enabled transformers
if transforms.ProcessMessage(&dm) == transformers.ReturnDrop {
d.RoutingHandler.SendTo(droppedRoutes, droppedNames, dm)
for i := range droppedRoutes {
select {
case droppedRoutes[i] <- dm: // Successful send to logger channel
default:
d.dropped <- droppedNames[i]
}
}
continue
}

// convert latency to human
dm.DNSTap.LatencySec = fmt.Sprintf("%.6f", dm.DNSTap.Latency)

// dispatch dns message to connected routes
d.RoutingHandler.SendTo(defaultRoutes, defaultNames, dm)
for i := range defaultRoutes {
select {
case defaultRoutes[i] <- dm: // Successful send to logger channel
default:
d.dropped <- defaultNames[i]
}
}

}
}

d.LogInfo("processing terminated")
}

func (d *DNSTapProcessor) MonitorLoggers() {
watchInterval := 10 * time.Second
bufferFull := time.NewTimer(watchInterval)
MONITOR_LOOP:
for {
select {
case <-d.stopMonitor:
close(d.dropped)
bufferFull.Stop()
d.doneMonitor <- true
break MONITOR_LOOP

case loggerName := <-d.dropped:
if _, ok := d.droppedCount[loggerName]; !ok {
d.droppedCount[loggerName] = 1
} else {
d.droppedCount[loggerName]++
}

case <-bufferFull.C:
for v, k := range d.droppedCount {
if k > 0 {
d.LogError("logger[%s] buffer is full, %d packet(s) dropped", v, k)
d.droppedCount[v] = 0
}
}
bufferFull.Reset(watchInterval)

}
}
d.LogInfo("monitor terminated")
}
Loading

0 comments on commit 83be5c3

Please sign in to comment.