Skip to content

Commit

Permalink
add test to check deadlocks
Browse files Browse the repository at this point in the history
  • Loading branch information
dmachard committed Jan 25, 2024
1 parent 83be5c3 commit 7516f7e
Show file tree
Hide file tree
Showing 4 changed files with 192 additions and 29 deletions.
133 changes: 104 additions & 29 deletions collectors/dnsmessage.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"reflect"
"regexp"
"strings"
"time"

"github.com/dmachard/go-dnscollector/dnsutils"
"github.com/dmachard/go-dnscollector/pkgconfig"
Expand All @@ -30,31 +31,37 @@ type MatchSource struct {
}

type DNSMessage struct {
doneRun chan bool
doneMonitor chan bool
stopRun chan bool
stopMonitor chan bool
config *pkgconfig.Config
configChan chan *pkgconfig.Config
inputChan chan dnsutils.DNSMessage
logger *logger.Logger
name string
RoutingHandler pkgutils.RoutingHandler
doneRun chan bool
doneMonitor chan bool
stopRun chan bool
stopMonitor chan bool
config *pkgconfig.Config
configChan chan *pkgconfig.Config
inputChan chan dnsutils.DNSMessage
logger *logger.Logger
name string
// RoutingHandler pkgutils.RoutingHandler
droppedRoutes []pkgutils.Worker
defaultRoutes []pkgutils.Worker
dropped chan string
droppedCount map[string]int
}

func NewDNSMessage(loggers []pkgutils.Worker, config *pkgconfig.Config, logger *logger.Logger, name string) *DNSMessage {
logger.Info("[%s] collector=dnsmessage - enabled", name)
s := &DNSMessage{
doneRun: make(chan bool),
doneMonitor: make(chan bool),
stopRun: make(chan bool),
stopMonitor: make(chan bool),
config: config,
configChan: make(chan *pkgconfig.Config),
inputChan: make(chan dnsutils.DNSMessage, config.Collectors.DNSMessage.ChannelBufferSize),
logger: logger,
name: name,
RoutingHandler: pkgutils.NewRoutingHandler(config, logger, name),
doneRun: make(chan bool),
doneMonitor: make(chan bool),
stopRun: make(chan bool),
stopMonitor: make(chan bool),
config: config,
configChan: make(chan *pkgconfig.Config),
inputChan: make(chan dnsutils.DNSMessage, config.Collectors.DNSMessage.ChannelBufferSize),
logger: logger,
name: name,
//RoutingHandler: pkgutils.NewRoutingHandler(config, logger, name),
dropped: make(chan string),
droppedCount: map[string]int{},
}
s.ReadConfig()
return s
Expand All @@ -63,11 +70,13 @@ func NewDNSMessage(loggers []pkgutils.Worker, config *pkgconfig.Config, logger *
func (c *DNSMessage) GetName() string { return c.name }

func (c *DNSMessage) AddDroppedRoute(wrk pkgutils.Worker) {
c.RoutingHandler.AddDroppedRoute(wrk)
//c.RoutingHandler.AddDroppedRoute(wrk)

Check failure on line 73 in collectors/dnsmessage.go

View workflow job for this annotation

GitHub Actions / linter

commentFormatting: put a space between `//` and comment text (gocritic)
c.droppedRoutes = append(c.droppedRoutes, wrk)
}

func (c *DNSMessage) AddDefaultRoute(wrk pkgutils.Worker) {

Check failure on line 77 in collectors/dnsmessage.go

View workflow job for this annotation

GitHub Actions / linter

ST1016: methods on the same type should have the same receiver name (seen 16x "c", 1x "p") (stylecheck)
c.RoutingHandler.AddDefaultRoute(wrk)
//c.RoutingHandler.AddDefaultRoute(wrk)

Check failure on line 78 in collectors/dnsmessage.go

View workflow job for this annotation

GitHub Actions / linter

commentFormatting: put a space between `//` and comment text (gocritic)
c.defaultRoutes = append(c.defaultRoutes, wrk)
}

// deprecated function
Expand Down Expand Up @@ -217,26 +226,37 @@ func (c *DNSMessage) LogError(msg string, v ...interface{}) {
}

func (c *DNSMessage) Stop() {
c.LogInfo("stopping routing handler...")
c.RoutingHandler.Stop()
// c.LogInfo("stopping routing handler...")
// c.RoutingHandler.Stop()

// read done channel and block until run is terminated
c.LogInfo("stopping run...")
c.stopRun <- true
<-c.doneRun

c.LogInfo("stopping monitor...")
c.stopMonitor <- true
<-c.doneMonitor
}

func (c *DNSMessage) Run() {
c.LogInfo("starting collector...")
var err error

// prepare next channels
defaultRoutes, defaultNames := c.RoutingHandler.GetDefaultRoutes()
droppedRoutes, droppedNames := c.RoutingHandler.GetDroppedRoutes()
//defaultRoutes, defaultNames := c.RoutingHandler.GetDefaultRoutes()

Check failure on line 247 in collectors/dnsmessage.go

View workflow job for this annotation

GitHub Actions / linter

commentFormatting: put a space between `//` and comment text (gocritic)
//droppedRoutes, droppedNames := c.RoutingHandler.GetDroppedRoutes()
defaultRoutes, defaultNames := pkgutils.GetRoutes(c.defaultRoutes)
droppedRoutes, droppedNames := pkgutils.GetRoutes(c.droppedRoutes)

// prepare transforms
subprocessors := transformers.NewTransforms(&c.config.IngoingTransformers, c.logger, c.name, defaultRoutes, 0)

// start goroutine to count dropped messsages
go c.MonitorNextStanzas()

// read incoming dns message
c.LogInfo("waiting dns message to process...")
RUN_LOOP:
for {
select {
Expand Down Expand Up @@ -290,22 +310,77 @@ RUN_LOOP:
if matched {
subprocessors.InitDNSMessageFormat(&dm)
if subprocessors.ProcessMessage(&dm) == transformers.ReturnDrop {
c.RoutingHandler.SendTo(droppedRoutes, droppedNames, dm)
//c.RoutingHandler.SendTo(droppedRoutes, droppedNames, dm)
for i := range droppedRoutes {
select {
case droppedRoutes[i] <- dm:
default:
c.dropped <- droppedNames[i]
}
}
continue
}
}

// drop packet ?
if !matched {
c.RoutingHandler.SendTo(droppedRoutes, droppedNames, dm)
//c.RoutingHandler.SendTo(droppedRoutes, droppedNames, dm)
for i := range droppedRoutes {
select {
case droppedRoutes[i] <- dm:
default:
c.dropped <- droppedNames[i]
}
}
continue
}

// send to next
c.RoutingHandler.SendTo(defaultRoutes, defaultNames, dm)
//c.RoutingHandler.SendTo(defaultRoutes, defaultNames, dm)
for i := range defaultRoutes {
select {
case defaultRoutes[i] <- dm:
default:
c.dropped <- defaultNames[i]
}
}

}

}
c.LogInfo("run terminated")
}

func (p *DNSMessage) MonitorNextStanzas() {
watchInterval := 10 * time.Second
bufferFull := time.NewTimer(watchInterval)
FOLLOW_LOOP:
for {
select {
case <-p.stopMonitor:
close(p.dropped)
bufferFull.Stop()
p.doneMonitor <- true
break FOLLOW_LOOP

case loggerName := <-p.dropped:
if _, ok := p.droppedCount[loggerName]; !ok {
p.droppedCount[loggerName] = 1
} else {
p.droppedCount[loggerName]++
}

case <-bufferFull.C:

for v, k := range p.droppedCount {
if k > 0 {
p.LogError("stanza[%s] buffer is full, %d dnsmessage(s) dropped", v, k)
p.droppedCount[v] = 0
}
}
bufferFull.Reset(watchInterval)

}
}
p.LogInfo("monitor terminated")
}
80 changes: 80 additions & 0 deletions collectors/dnsmessage_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package collectors

import (
"fmt"
"regexp"
"testing"
"time"

"github.com/dmachard/go-dnscollector/dnsutils"
"github.com/dmachard/go-dnscollector/loggers"
"github.com/dmachard/go-dnscollector/pkgconfig"
"github.com/dmachard/go-dnscollector/processors"
"github.com/dmachard/go-logger"
)

func Test_DnsMessage_BufferLoggerIsFull(t *testing.T) {
// redirect stdout output to bytes buffer
logsChan := make(chan logger.LogEntry, 10)
lg := logger.New(true)
lg.SetOutputChannel((logsChan))

// init the collector and run-it
config := pkgconfig.GetFakeConfig()
c := NewDNSMessage(nil, config, lg, "test")

// init next logger with a buffer of one element
nxt := loggers.NewFakeLoggerWithBufferSize(1)
c.AddDefaultRoute(nxt)

// run collector
go c.Run()

// add a shot of dnsmessages to collector
dmIn := dnsutils.GetFakeDNSMessage()
for i := 0; i < 512; i++ {
c.GetInputChannel() <- dmIn
}

// waiting monitor to run in consumer
time.Sleep(12 * time.Second)

for entry := range logsChan {
fmt.Println(entry)
pattern := regexp.MustCompile(processors.ExpectedBufferMsg511)
if pattern.MatchString(entry.Message) {
break
}
}

// read dnsmessage from next logger
dmOut := <-nxt.GetInputChannel()
if dmOut.DNS.Qname != "dns.collector" {

Check failure on line 52 in collectors/dnsmessage_test.go

View workflow job for this annotation

GitHub Actions / linter

string `dns.collector` has 3 occurrences, make it a constant (goconst)
t.Errorf("invalid qname in dns message: %s", dmOut.DNS.Qname)
}

// send second shot of packets to consumer
for i := 0; i < 1024; i++ {
c.GetInputChannel() <- dmIn
}

// waiting monitor to run in consumer
time.Sleep(12 * time.Second)

for entry := range logsChan {
fmt.Println(entry)
pattern := regexp.MustCompile(processors.ExpectedBufferMsg1023)
if pattern.MatchString(entry.Message) {
break
}
}
// read dnsmessage from next logger
dm2 := <-nxt.GetInputChannel()
if dm2.DNS.Qname != "dns.collector" {
t.Errorf("invalid qname in dns message: %s", dm2.DNS.Qname)
}

// stop all
c.Stop()
nxt.Stop()
}
4 changes: 4 additions & 0 deletions processors/dns.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ func (d *DNSProcessor) Stop() {
d.LogInfo("stopping to process...")
d.stopRun <- true
<-d.doneRun

d.LogInfo("stopping monitor...")
d.stopMonitor <- true
<-d.doneMonitor
}

func (d *DNSProcessor) Run(defaultWorkers []pkgutils.Worker, droppedworkers []pkgutils.Worker) {
Expand Down
4 changes: 4 additions & 0 deletions processors/dnstap.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ func (d *DNSTapProcessor) Stop() {
d.LogInfo("stopping to process...")
d.stopRun <- true
<-d.doneRun

d.LogInfo("stopping monitor...")
d.stopMonitor <- true
<-d.doneMonitor
}

func (d *DNSTapProcessor) Run(defaultWorkers []pkgutils.Worker, droppedworkers []pkgutils.Worker) {
Expand Down

0 comments on commit 7516f7e

Please sign in to comment.