From 83be5c345c38c3077f7e46f5c56785552514fdfe Mon Sep 17 00:00:00 2001 From: Denis Machard <5562930+dmachard@users.noreply.github.com> Date: Wed, 24 Jan 2024 23:12:32 +0100 Subject: [PATCH] fix to avoid deadlock situations in multiplexer mode - regression (#569) * fix to avoid deadlock situations --- Makefile | 6 +-- loggers/fakelogger.go | 9 ++++ processors/constants.go | 8 ++++ processors/dns.go | 57 ++++++++++++++++++++++- processors/dns_test.go | 63 ++++++++++++++++++++++++- processors/dnstap.go | 80 ++++++++++++++++++++++++++------ processors/dnstap_test.go | 81 ++++++++++++++++++++++++++++++++- processors/powerdns.go | 61 ++++++++++++++++++++++++- processors/powerdns_test.go | 91 +++++++++++++++++++++++++++++++++---- 9 files changed, 423 insertions(+), 33 deletions(-) create mode 100644 processors/constants.go diff --git a/Makefile b/Makefile index 62d3b92e..67144811 100644 --- a/Makefile +++ b/Makefile @@ -72,10 +72,10 @@ tests: check-go @go test ./pkglinker/ -race -cover -v @go test ./dnsutils/ -race -cover -v @go test ./netlib/ -race -cover -v - @go test -timeout 30s ./transformers/ -race -cover -v - @go test -timeout 30s ./collectors/ -race -cover -v + @go test -timeout 60s ./transformers/ -race -cover -v + @go test -timeout 60s ./collectors/ -race -cover -v @go test -timeout 90s ./loggers/ -race -cover -v - @go test -timeout 30s ./processors/ -race -cover -v + @go test -timeout 60s ./processors/ -race -cover -v # Cleans the project using go clean. clean: check-go diff --git a/loggers/fakelogger.go b/loggers/fakelogger.go index 970c963d..c60d975a 100644 --- a/loggers/fakelogger.go +++ b/loggers/fakelogger.go @@ -21,6 +21,15 @@ func NewFakeLogger() *FakeLogger { return o } +func NewFakeLoggerWithBufferSize(bufferSize int) *FakeLogger { + o := &FakeLogger{ + inputChan: make(chan dnsutils.DNSMessage, bufferSize), + outputChan: make(chan dnsutils.DNSMessage, bufferSize), + name: "fake", + } + return o +} + func (c *FakeLogger) GetName() string { return c.name } func (c *FakeLogger) AddDefaultRoute(wrk pkgutils.Worker) {} diff --git a/processors/constants.go b/processors/constants.go new file mode 100644 index 00000000..d124a807 --- /dev/null +++ b/processors/constants.go @@ -0,0 +1,8 @@ +package processors + +const ( + ExpectedQname = "dnscollector.dev" + ExpectedBufferMsg511 = ".*buffer is full, 511.*" + ExpectedBufferMsg1023 = ".*buffer is full, 1023.*" + ExpectedIdentity = "powerdnspb" +) diff --git a/processors/dns.go b/processors/dns.go index f1691192..825d7754 100644 --- a/processors/dns.go +++ b/processors/dns.go @@ -29,6 +29,8 @@ type DNSProcessor struct { ConfigChan chan *pkgconfig.Config name string RoutingHandler pkgutils.RoutingHandler + dropped chan string + droppedCount map[string]int } func NewDNSProcessor(config *pkgconfig.Config, logger *logger.Logger, name string, size int) DNSProcessor { @@ -43,6 +45,8 @@ func NewDNSProcessor(config *pkgconfig.Config, logger *logger.Logger, name strin config: config, ConfigChan: make(chan *pkgconfig.Config), name: name, + dropped: make(chan string), + droppedCount: map[string]int{}, RoutingHandler: pkgutils.NewRoutingHandler(config, logger, name), } return d @@ -83,6 +87,9 @@ func (d *DNSProcessor) Run(defaultWorkers []pkgutils.Worker, droppedworkers []pk // prepare enabled transformers transforms := transformers.NewTransforms(&d.config.IngoingTransformers, d.logger, d.name, defaultRoutes, 0) + // start goroutine to count dropped messsages + go d.MonitorLoggers() + // read incoming dns message d.LogInfo("waiting dns message to process...") RUN_LOOP: @@ -145,7 +152,13 @@ RUN_LOOP: // apply all enabled transformers if transforms.ProcessMessage(&dm) == transformers.ReturnDrop { - d.RoutingHandler.SendTo(droppedRoutes, droppedNames, dm) + for i := range droppedRoutes { + select { + case droppedRoutes[i] <- dm: // Successful send to logger channel + default: + d.dropped <- droppedNames[i] + } + } continue } @@ -153,9 +166,49 @@ RUN_LOOP: dm.DNSTap.LatencySec = fmt.Sprintf("%.6f", dm.DNSTap.Latency) // dispatch dns message to all generators - d.RoutingHandler.SendTo(defaultRoutes, defaultNames, dm) + for i := range defaultRoutes { + select { + case defaultRoutes[i] <- dm: // Successful send to logger channel + default: + d.dropped <- defaultNames[i] + } + } } } d.LogInfo("processing terminated") } + +func (d *DNSProcessor) MonitorLoggers() { + watchInterval := 10 * time.Second + bufferFull := time.NewTimer(watchInterval) +FOLLOW_LOOP: + for { + select { + case <-d.stopMonitor: + close(d.dropped) + bufferFull.Stop() + d.doneMonitor <- true + break FOLLOW_LOOP + + case loggerName := <-d.dropped: + if _, ok := d.droppedCount[loggerName]; !ok { + d.droppedCount[loggerName] = 1 + } else { + d.droppedCount[loggerName]++ + } + + case <-bufferFull.C: + + for v, k := range d.droppedCount { + if k > 0 { + d.LogError("logger[%s] buffer is full, %d packet(s) dropped", v, k) + d.droppedCount[v] = 0 + } + } + bufferFull.Reset(watchInterval) + + } + } + d.LogInfo("monitor terminated") +} diff --git a/processors/dns_test.go b/processors/dns_test.go index fc5927fc..f3d37ab3 100644 --- a/processors/dns_test.go +++ b/processors/dns_test.go @@ -2,7 +2,10 @@ package processors import ( "bytes" + "fmt" + "regexp" "testing" + "time" "github.com/dmachard/go-dnscollector/dnsutils" "github.com/dmachard/go-dnscollector/loggers" @@ -27,8 +30,66 @@ func Test_DnsProcessor(t *testing.T) { // read dns message from dnstap consumer dmOut := <-fl.GetInputChannel() - if dmOut.DNS.Qname != "dnscollector.dev" { + if dmOut.DNS.Qname != ExpectedQname { t.Errorf("invalid qname in dns message: %s", dm.DNS.Qname) } +} + +func Test_DnsProcessor_BufferLoggerIsFull(t *testing.T) { + // redirect stdout output to bytes buffer + logsChan := make(chan logger.LogEntry, 10) + lg := logger.New(true) + lg.SetOutputChannel((logsChan)) + + // init and run the dns processor + consumer := NewDNSProcessor(pkgconfig.GetFakeConfig(), lg, "test", 512) + + fl := loggers.NewFakeLoggerWithBufferSize(1) + go consumer.Run([]pkgutils.Worker{fl}, []pkgutils.Worker{fl}) + + dm := dnsutils.GetFakeDNSMessageWithPayload() + + // add packets to consumer + for i := 0; i < 512; i++ { + consumer.GetChannel() <- dm + } + + // waiting monitor to run in consumer + time.Sleep(12 * time.Second) + + for entry := range logsChan { + fmt.Println(entry) + pattern := regexp.MustCompile(ExpectedBufferMsg511) + if pattern.MatchString(entry.Message) { + break + } + } + + // read dns message from dnstap consumer + dmOut := <-fl.GetInputChannel() + if dmOut.DNS.Qname != ExpectedQname { + t.Errorf("invalid qname in dns message: %s", dmOut.DNS.Qname) + } + + // send second shot of packets to consumer + for i := 0; i < 1024; i++ { + consumer.GetChannel() <- dm + } + + // waiting monitor to run in consumer + time.Sleep(12 * time.Second) + + for entry := range logsChan { + fmt.Println(entry) + pattern := regexp.MustCompile(ExpectedBufferMsg1023) + if pattern.MatchString(entry.Message) { + break + } + } + // read dns message from dnstap consumer + dm2 := <-fl.GetInputChannel() + if dm2.DNS.Qname != ExpectedQname { + t.Errorf("invalid qname in second dns message: %s", dm2.DNS.Qname) + } } diff --git a/processors/dnstap.go b/processors/dnstap.go index 506e0918..34c11c9c 100644 --- a/processors/dnstap.go +++ b/processors/dnstap.go @@ -64,25 +64,27 @@ type DNSTapProcessor struct { name string chanSize int RoutingHandler pkgutils.RoutingHandler + dropped chan string + droppedCount map[string]int } func NewDNSTapProcessor(connID int, config *pkgconfig.Config, logger *logger.Logger, name string, size int) DNSTapProcessor { logger.Info("[%s] processor=dnstap#%d - initialization...", name, connID) d := DNSTapProcessor{ - ConnID: connID, - doneMonitor: make(chan bool), - doneRun: make(chan bool), - stopMonitor: make(chan bool), - stopRun: make(chan bool), - recvFrom: make(chan []byte, size), - chanSize: size, - logger: logger, - config: config, - ConfigChan: make(chan *pkgconfig.Config), - name: name, - // dropped: make(chan string), - // droppedCount: map[string]int{}, + ConnID: connID, + doneMonitor: make(chan bool), + doneRun: make(chan bool), + stopMonitor: make(chan bool), + stopRun: make(chan bool), + recvFrom: make(chan []byte, size), + chanSize: size, + logger: logger, + config: config, + ConfigChan: make(chan *pkgconfig.Config), + name: name, + dropped: make(chan string), + droppedCount: map[string]int{}, RoutingHandler: pkgutils.NewRoutingHandler(config, logger, name), } @@ -133,6 +135,9 @@ func (d *DNSTapProcessor) Run(defaultWorkers []pkgutils.Worker, droppedworkers [ // prepare enabled transformers transforms := transformers.NewTransforms(&d.config.IngoingTransformers, d.logger, d.name, defaultRoutes, d.ConnID) + // start goroutine to count dropped messsages + go d.MonitorLoggers() + // read incoming dns message d.LogInfo("waiting dns message to process...") RUN_LOOP: @@ -293,7 +298,13 @@ RUN_LOOP: // apply all enabled transformers if transforms.ProcessMessage(&dm) == transformers.ReturnDrop { - d.RoutingHandler.SendTo(droppedRoutes, droppedNames, dm) + for i := range droppedRoutes { + select { + case droppedRoutes[i] <- dm: // Successful send to logger channel + default: + d.dropped <- droppedNames[i] + } + } continue } @@ -301,10 +312,49 @@ RUN_LOOP: dm.DNSTap.LatencySec = fmt.Sprintf("%.6f", dm.DNSTap.Latency) // dispatch dns message to connected routes - d.RoutingHandler.SendTo(defaultRoutes, defaultNames, dm) + for i := range defaultRoutes { + select { + case defaultRoutes[i] <- dm: // Successful send to logger channel + default: + d.dropped <- defaultNames[i] + } + } } } d.LogInfo("processing terminated") } + +func (d *DNSTapProcessor) MonitorLoggers() { + watchInterval := 10 * time.Second + bufferFull := time.NewTimer(watchInterval) +MONITOR_LOOP: + for { + select { + case <-d.stopMonitor: + close(d.dropped) + bufferFull.Stop() + d.doneMonitor <- true + break MONITOR_LOOP + + case loggerName := <-d.dropped: + if _, ok := d.droppedCount[loggerName]; !ok { + d.droppedCount[loggerName] = 1 + } else { + d.droppedCount[loggerName]++ + } + + case <-bufferFull.C: + for v, k := range d.droppedCount { + if k > 0 { + d.LogError("logger[%s] buffer is full, %d packet(s) dropped", v, k) + d.droppedCount[v] = 0 + } + } + bufferFull.Reset(watchInterval) + + } + } + d.LogInfo("monitor terminated") +} diff --git a/processors/dnstap_test.go b/processors/dnstap_test.go index 24e9a993..d45ed2d6 100644 --- a/processors/dnstap_test.go +++ b/processors/dnstap_test.go @@ -2,7 +2,10 @@ package processors import ( "bytes" + "fmt" + "regexp" "testing" + "time" "github.com/dmachard/go-dnscollector/dnsutils" "github.com/dmachard/go-dnscollector/loggers" @@ -24,7 +27,7 @@ func Test_DnstapProcessor(t *testing.T) { // prepare dns query dnsmsg := new(dns.Msg) - dnsmsg.SetQuestion("www.google.fr.", dns.TypeA) + dnsmsg.SetQuestion(ExpectedQname+".", dns.TypeA) dnsquestion, _ := dnsmsg.Pack() // prepare dnstap @@ -46,7 +49,7 @@ func Test_DnstapProcessor(t *testing.T) { // read dns message from dnstap consumer dm := <-fl.GetInputChannel() - if dm.DNS.Qname != "www.google.fr" { + if dm.DNS.Qname != ExpectedQname { t.Errorf("invalid qname in dns message: %s", dm.DNS.Qname) } } @@ -274,3 +277,77 @@ func Test_DnstapProcessor_Extended(t *testing.T) { t.Errorf("invalid sample rate: %d", dm.Filtering.SampleRate) } } + +// test for issue https://github.com/dmachard/go-dnscollector/issues/568 +func Test_DnstapProcessor_BufferLoggerIsFull(t *testing.T) { + // redirect stdout output to bytes buffer + logsChan := make(chan logger.LogEntry, 10) + lg := logger.New(true) + lg.SetOutputChannel((logsChan)) + + // init the dnstap consumer + consumer := NewDNSTapProcessor(0, pkgconfig.GetFakeConfig(), lg, "test", 512) + + // prepare dns query + dnsmsg := new(dns.Msg) + dnsmsg.SetQuestion(ExpectedQname+".", dns.TypeA) + dnsquestion, _ := dnsmsg.Pack() + + // prepare dnstap + dt := &dnstap.Dnstap{} + dt.Type = dnstap.Dnstap_Type.Enum(1) + + dt.Message = &dnstap.Message{} + dt.Message.Type = dnstap.Message_Type.Enum(5) + dt.Message.QueryMessage = dnsquestion + + data, _ := proto.Marshal(dt) + + // run the consumer with a fake logger + fl := loggers.NewFakeLoggerWithBufferSize(1) + go consumer.Run([]pkgutils.Worker{fl}, []pkgutils.Worker{fl}) + + // add packets to consumer + for i := 0; i < 512; i++ { + consumer.GetChannel() <- data + } + + // waiting monitor to run in consumer + time.Sleep(12 * time.Second) + + for entry := range logsChan { + fmt.Println(entry) + pattern := regexp.MustCompile(ExpectedBufferMsg511) + if pattern.MatchString(entry.Message) { + break + } + } + + // read dns message from dnstap consumer + dm := <-fl.GetInputChannel() + if dm.DNS.Qname != ExpectedQname { + t.Errorf("invalid qname in dns message: %s", dm.DNS.Qname) + } + + // send second shot of packets to consumer + for i := 0; i < 1024; i++ { + consumer.GetChannel() <- data + } + + // waiting monitor to run in consumer + time.Sleep(12 * time.Second) + + for entry := range logsChan { + fmt.Println(entry) + pattern := regexp.MustCompile(ExpectedBufferMsg1023) + if pattern.MatchString(entry.Message) { + break + } + } + + // read dns message from dnstap consumer + dm2 := <-fl.GetInputChannel() + if dm2.DNS.Qname != ExpectedQname { + t.Errorf("invalid qname in second dns message: %s", dm2.DNS.Qname) + } +} diff --git a/processors/powerdns.go b/processors/powerdns.go index 9d2b317c..d56c04ec 100644 --- a/processors/powerdns.go +++ b/processors/powerdns.go @@ -40,6 +40,8 @@ type PdnsProcessor struct { name string chanSize int RoutingHandler pkgutils.RoutingHandler + dropped chan string + droppedCount map[string]int } func NewPdnsProcessor(connID int, config *pkgconfig.Config, logger *logger.Logger, name string, size int) PdnsProcessor { @@ -57,6 +59,8 @@ func NewPdnsProcessor(connID int, config *pkgconfig.Config, logger *logger.Logge ConfigChan: make(chan *pkgconfig.Config), name: name, RoutingHandler: pkgutils.NewRoutingHandler(config, logger, name), + dropped: make(chan string), + droppedCount: map[string]int{}, } return d } @@ -92,6 +96,10 @@ func (p *PdnsProcessor) Stop() { p.LogInfo("stopping to process...") p.stopRun <- true <-p.doneRun + + p.LogInfo("stopping to monitor loggers...") + p.stopMonitor <- true + <-p.doneMonitor } func (p *PdnsProcessor) Run(defaultWorkers []pkgutils.Worker, droppedworkers []pkgutils.Worker) { @@ -104,6 +112,9 @@ func (p *PdnsProcessor) Run(defaultWorkers []pkgutils.Worker, droppedworkers []p // prepare enabled transformers transforms := transformers.NewTransforms(&p.config.IngoingTransformers, p.logger, p.name, defaultRoutes, p.ConnID) + // start goroutine to count dropped messsages + go p.MonitorLoggers() + // read incoming dns message p.LogInfo("waiting dns message to process...") RUN_LOOP: @@ -319,13 +330,59 @@ RUN_LOOP: // apply all enabled transformers if transforms.ProcessMessage(&dm) == transformers.ReturnDrop { - p.RoutingHandler.SendTo(droppedRoutes, droppedNames, dm) + for i := range droppedRoutes { + select { + case droppedRoutes[i] <- dm: // Successful send to logger channel + default: + p.dropped <- droppedNames[i] + } + } continue } // dispatch dns messages to connected loggers - p.RoutingHandler.SendTo(defaultRoutes, defaultNames, dm) + for i := range defaultRoutes { + select { + case defaultRoutes[i] <- dm: // Successful send to logger channel + default: + p.dropped <- defaultNames[i] + } + } } } p.LogInfo("processing terminated") } + +func (p *PdnsProcessor) MonitorLoggers() { + watchInterval := 10 * time.Second + bufferFull := time.NewTimer(watchInterval) +FOLLOW_LOOP: + for { + select { + case <-p.stopMonitor: + close(p.dropped) + bufferFull.Stop() + p.doneMonitor <- true + break FOLLOW_LOOP + + case loggerName := <-p.dropped: + if _, ok := p.droppedCount[loggerName]; !ok { + p.droppedCount[loggerName] = 1 + } else { + p.droppedCount[loggerName]++ + } + + case <-bufferFull.C: + + for v, k := range p.droppedCount { + if k > 0 { + p.LogError("logger[%s] buffer is full, %d packet(s) dropped", v, k) + p.droppedCount[v] = 0 + } + } + bufferFull.Reset(watchInterval) + + } + } + p.LogInfo("monitor terminated") +} diff --git a/processors/powerdns_test.go b/processors/powerdns_test.go index 2c34bcda..a11dc430 100644 --- a/processors/powerdns_test.go +++ b/processors/powerdns_test.go @@ -1,7 +1,10 @@ package processors import ( + "fmt" + "regexp" "testing" + "time" "github.com/dmachard/go-dnscollector/loggers" "github.com/dmachard/go-dnscollector/pkgconfig" @@ -12,7 +15,7 @@ import ( "google.golang.org/protobuf/proto" ) -func TestPowerDNS_Processor(t *testing.T) { +func Test_PowerDNSProcessor(t *testing.T) { // init the dnstap consumer consumer := NewPdnsProcessor(0, pkgconfig.GetFakeConfig(), logger.New(false), "test", 512) @@ -21,7 +24,7 @@ func TestPowerDNS_Processor(t *testing.T) { dnsQuestion := powerdns_protobuf.PBDNSMessage_DNSQuestion{QName: &dnsQname} dm := &powerdns_protobuf.PBDNSMessage{} - dm.ServerIdentity = []byte("powerdnspb") + dm.ServerIdentity = []byte(ExpectedIdentity) dm.Type = powerdns_protobuf.PBDNSMessage_DNSQueryType.Enum() dm.SocketProtocol = powerdns_protobuf.PBDNSMessage_DNSCryptUDP.Enum() dm.SocketFamily = powerdns_protobuf.PBDNSMessage_INET.Enum() @@ -38,12 +41,12 @@ func TestPowerDNS_Processor(t *testing.T) { // read dns message from dnstap consumer msg := <-fl.GetInputChannel() - if msg.DNSTap.Identity != "powerdnspb" { + if msg.DNSTap.Identity != ExpectedIdentity { t.Errorf("invalid identity in dns message: %s", msg.DNSTap.Identity) } } -func TestPowerDNS_Processor_AddDNSPayload_Valid(t *testing.T) { +func Test_PowerDNSProcessor_AddDNSPayload_Valid(t *testing.T) { cfg := pkgconfig.GetFakeConfig() cfg.Collectors.PowerDNS.AddDNSPayload = true @@ -55,7 +58,7 @@ func TestPowerDNS_Processor_AddDNSPayload_Valid(t *testing.T) { dnsQuestion := powerdns_protobuf.PBDNSMessage_DNSQuestion{QName: &dnsQname} dm := &powerdns_protobuf.PBDNSMessage{} - dm.ServerIdentity = []byte("powerdnspb") + dm.ServerIdentity = []byte(ExpectedIdentity) dm.Id = proto.Uint32(2000) dm.Type = powerdns_protobuf.PBDNSMessage_DNSQueryType.Enum() dm.SocketProtocol = powerdns_protobuf.PBDNSMessage_DNSCryptUDP.Enum() @@ -93,7 +96,7 @@ func TestPowerDNS_Processor_AddDNSPayload_Valid(t *testing.T) { } } -func TestPowerDNS_Processor_AddDNSPayload_InvalidLabelLength(t *testing.T) { +func Test_PowerDNSProcessor_AddDNSPayload_InvalidLabelLength(t *testing.T) { cfg := pkgconfig.GetFakeConfig() cfg.Collectors.PowerDNS.AddDNSPayload = true @@ -128,7 +131,7 @@ func TestPowerDNS_Processor_AddDNSPayload_InvalidLabelLength(t *testing.T) { } } -func TestPowerDNS_Processor_AddDNSPayload_QnameTooLongDomain(t *testing.T) { +func Test_PowerDNSProcessor_AddDNSPayload_QnameTooLongDomain(t *testing.T) { cfg := pkgconfig.GetFakeConfig() cfg.Collectors.PowerDNS.AddDNSPayload = true @@ -162,7 +165,7 @@ func TestPowerDNS_Processor_AddDNSPayload_QnameTooLongDomain(t *testing.T) { } } -func TestPowerDNS_Processor_AddDNSPayload_AnswersTooLongDomain(t *testing.T) { +func Test_PowerDNSProcessor_AddDNSPayload_AnswersTooLongDomain(t *testing.T) { cfg := pkgconfig.GetFakeConfig() cfg.Collectors.PowerDNS.AddDNSPayload = true @@ -208,3 +211,75 @@ func TestPowerDNS_Processor_AddDNSPayload_AnswersTooLongDomain(t *testing.T) { t.Errorf("DNS message is not malformed") } } + +// test for issue https://github.com/dmachard/go-dnscollector/issues/568 +func Test_PowerDNSProcessor_BufferLoggerIsFull(t *testing.T) { + // redirect stdout output to bytes buffer + logsChan := make(chan logger.LogEntry, 10) + lg := logger.New(true) + lg.SetOutputChannel((logsChan)) + + // init the dnstap consumer + cfg := pkgconfig.GetFakeConfig() + consumer := NewPdnsProcessor(0, cfg, lg, "test", 512) + + // init the powerdns processor + dnsQname := pkgconfig.ValidDomain + dnsQuestion := powerdns_protobuf.PBDNSMessage_DNSQuestion{QName: &dnsQname} + + dm := &powerdns_protobuf.PBDNSMessage{} + dm.ServerIdentity = []byte(ExpectedIdentity) + dm.Type = powerdns_protobuf.PBDNSMessage_DNSQueryType.Enum() + dm.SocketProtocol = powerdns_protobuf.PBDNSMessage_DNSCryptUDP.Enum() + dm.SocketFamily = powerdns_protobuf.PBDNSMessage_INET.Enum() + dm.Question = &dnsQuestion + + data, _ := proto.Marshal(dm) + + // run the consumer with a fake logger + fl := loggers.NewFakeLoggerWithBufferSize(1) + go consumer.Run([]pkgutils.Worker{fl}, []pkgutils.Worker{fl}) + + // add packets to consumer + for i := 0; i < 512; i++ { + consumer.GetChannel() <- data + } + + // waiting monitor to run in consumer + time.Sleep(12 * time.Second) + + for entry := range logsChan { + fmt.Println(entry) + pattern := regexp.MustCompile(ExpectedBufferMsg511) + if pattern.MatchString(entry.Message) { + break + } + } + + // read dns message from dnstap consumer + msg := <-fl.GetInputChannel() + if msg.DNSTap.Identity != ExpectedIdentity { + t.Errorf("invalid identity in dns message: %s", msg.DNSTap.Identity) + } + + // send second shot of packets to consumer + for i := 0; i < 1024; i++ { + consumer.GetChannel() <- data + } + + // waiting monitor to run in consumer + time.Sleep(12 * time.Second) + for entry := range logsChan { + fmt.Println(entry) + pattern := regexp.MustCompile(ExpectedBufferMsg1023) + if pattern.MatchString(entry.Message) { + break + } + } + + // read just one dns message from dnstap consumer + msg2 := <-fl.GetInputChannel() + if msg2.DNSTap.Identity != ExpectedIdentity { + t.Errorf("invalid identity in second dns message: %s", msg2.DNSTap.Identity) + } +}