Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dnstap collector: avoid to stuck on stop in specific situation #597

Merged
merged 5 commits into from
Feb 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions collectors/dnsmessage.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type DNSMessage struct {
}

func NewDNSMessage(loggers []pkgutils.Worker, config *pkgconfig.Config, logger *logger.Logger, name string) *DNSMessage {
logger.Info("[%s] collector=dnsmessage - enabled", name)
logger.Info(pkgutils.PrefixLogCollector+"[%s] dnsmessage - enabled", name)
s := &DNSMessage{
doneRun: make(chan bool),
doneMonitor: make(chan bool),
Expand Down Expand Up @@ -218,19 +218,18 @@ func (c *DNSMessage) ReloadConfig(config *pkgconfig.Config) {
}

func (c *DNSMessage) LogInfo(msg string, v ...interface{}) {
c.logger.Info("["+c.name+"] collector=dnsmessage - "+msg, v...)
c.logger.Info(pkgutils.PrefixLogCollector+"["+c.name+"] dnsmessage - "+msg, v...)
}

func (c *DNSMessage) LogError(msg string, v ...interface{}) {
c.logger.Error("["+c.name+"] collector=dnsmessage - "+msg, v...)
c.logger.Error(pkgutils.PrefixLogCollector+"["+c.name+"] dnsmessage - "+msg, v...)
}

func (c *DNSMessage) Stop() {
// c.LogInfo("stopping routing handler...")
// c.RoutingHandler.Stop()
c.LogInfo("stopping collector...")

// read done channel and block until run is terminated
c.LogInfo("stopping run...")
c.LogInfo("stopping to run...")
c.stopRun <- true
<-c.doneRun

Expand Down
69 changes: 43 additions & 26 deletions collectors/dnstap.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"bufio"
"crypto/tls"
"errors"
"fmt"
"io"
"net"
"os"
Expand All @@ -26,6 +25,7 @@ type Dnstap struct {
doneMonitor chan bool
stopRun chan bool
stopMonitor chan bool
stopCalled bool
listen net.Listener
conns []net.Conn
sockPath string
Expand All @@ -44,7 +44,7 @@ type Dnstap struct {
}

func NewDnstap(loggers []pkgutils.Worker, config *pkgconfig.Config, logger *logger.Logger, name string) *Dnstap {
logger.Info("[%s] collector=dnstap - enabled", name)
logger.Info(pkgutils.PrefixLogCollector+"[%s] dnstap - enabled", name)
s := &Dnstap{
doneRun: make(chan bool),
doneMonitor: make(chan bool),
Expand Down Expand Up @@ -100,22 +100,22 @@ func (c *Dnstap) ReloadConfig(config *pkgconfig.Config) {
}

func (c *Dnstap) LogInfo(msg string, v ...interface{}) {
c.logger.Info("["+c.name+"] collector=dnstap - "+msg, v...)
c.logger.Info(pkgutils.PrefixLogCollector+"["+c.name+"] dnstap - "+msg, v...)
}

func (c *Dnstap) LogError(msg string, v ...interface{}) {
c.logger.Error("["+c.name+" collector=dnstap - "+msg, v...)
c.logger.Error(pkgutils.PrefixLogCollector+"["+c.name+" dnstap - "+msg, v...)
}

func (c *Dnstap) LogConnInfo(connID int, msg string, v ...interface{}) {
prefix := fmt.Sprintf("[%s] collector=dnstap#%d - ", c.name, connID)
c.logger.Info(prefix+msg, v...)
}
// func (c *Dnstap) LogConnInfo(connID int, msg string, v ...interface{}) {
// prefix := fmt.Sprintf(pkgutils.PrefixLogCollector+"[%s] dnstap#%d - ", c.name, connID)
// c.logger.Info(prefix+msg, v...)
// }

func (c *Dnstap) LogConnError(connID int, msg string, v ...interface{}) {
prefix := fmt.Sprintf("[%s] collector=dnstap#%d - ", c.name, connID)
c.logger.Error(prefix+msg, v...)
}
// func (c *Dnstap) LogConnError(connID int, msg string, v ...interface{}) {
// prefix := fmt.Sprintf(pkgutils.PrefixLogCollector+"[%s] dnstap#%d - ", c.name, connID)
// c.logger.Error(prefix+msg, v...)
// }

func (c *Dnstap) HandleConn(conn net.Conn) {
// close connection on function exit
Expand All @@ -129,7 +129,7 @@ func (c *Dnstap) HandleConn(conn net.Conn) {

// get peer address
peer := conn.RemoteAddr().String()
c.LogConnInfo(connID, "new connection from %s", peer)
c.LogInfo("new connection #%d from %s", connID, peer)

// start dnstap processor
dnstapProcessor := processors.NewDNSTapProcessor(
Expand All @@ -153,9 +153,9 @@ func (c *Dnstap) HandleConn(conn net.Conn) {

// init framestream receiver
if err := fs.InitReceiver(); err != nil {
c.LogConnError(connID, "stream initialization: %s", err)
c.LogError("conn #%d - stream initialization: %s", connID, err)
} else {
c.LogConnInfo(connID, "receiver framestream initialized")
c.LogInfo("conn #%d - receiver framestream initialized", connID)
}

// process incoming frame and send it to dnstap consumer channel
Expand All @@ -177,22 +177,24 @@ func (c *Dnstap) HandleConn(conn net.Conn) {
}

if connClosed {
c.LogConnInfo(connID, "connection closed with peer %s", peer)
c.LogInfo("conn #%d - connection closed with peer %s", connID, peer)
} else {
c.LogConnError(connID, "framestream reader error: %s", err)
c.LogError("conn #%d - framestream reader error: %s", connID, err)
}

// stop processor and exit the loop
dnstapProcessor.Stop()
// the Stop function is already called, don't stop again
if !c.stopCalled {
dnstapProcessor.Stop()
}
break
}

if frame.IsControl() {
if err := fs.ResetReceiver(frame); err != nil {
if errors.Is(err, io.EOF) {
c.LogConnInfo(connID, "framestream reseted by sender")
c.LogInfo("conn #%d - framestream reseted by sender", connID)
} else {
c.LogConnError(connID, "unexpected control framestream - %s", err)
c.LogError("conn #%d - unexpected control framestream: %s", connID, err)
}

}
Expand All @@ -207,6 +209,12 @@ func (c *Dnstap) HandleConn(conn net.Conn) {
}
}

// to avoid lock if the Stop function is already called
if c.stopCalled {
c.LogInfo("conn #%d - connection handler exited", connID)
return
}

// here the connection is closed,
// then removes the current tap processor from the list
c.Lock()
Expand All @@ -225,7 +233,7 @@ func (c *Dnstap) HandleConn(conn net.Conn) {
}
c.Unlock()

c.LogConnInfo(connID, "connection handler terminated")
c.LogInfo("conn #%d - connection handler terminated", connID)
}

func (c *Dnstap) GetInputChannel() chan dnsutils.DNSMessage {
Expand All @@ -236,8 +244,13 @@ func (c *Dnstap) Stop() {
c.Lock()
defer c.Unlock()

// to avoid some lock situations when the remose side closes
// the connection at the same time of this Stop function
c.stopCalled = true
c.LogInfo("stopping collector...")

// stop all powerdns processors
c.LogInfo("stopping processors...")
c.LogInfo("cleanup all active processors...")
for _, tapProc := range c.tapProcessors {
tapProc.Stop()
}
Expand Down Expand Up @@ -346,8 +359,7 @@ func (c *Dnstap) Run() {
c.LogInfo("starting collector...")
if c.listen == nil {
if err := c.Listen(); err != nil {
prefixlog := fmt.Sprintf("[%s] ", c.name)
c.logger.Fatal(prefixlog+"collector=dnstap listening failed: ", err)
c.logger.Fatal(pkgutils.PrefixLogCollector+"["+c.name+"] dnstap listening failed: ", err)
}
}

Expand Down Expand Up @@ -397,12 +409,17 @@ RUN_LOOP:
c.config.Collectors.Dnstap.TLSSupport,
)
if err != nil {
c.logger.Fatal("Unable to set SO_RCVBUF: ", err)
c.logger.Fatal(pkgutils.PrefixLogCollector+"["+c.name+"] dnstap - unable to set SO_RCVBUF: ", err)
}
c.LogInfo("set SO_RCVBUF option, value before: %d, desired: %d, actual: %d", before,
c.config.Collectors.Dnstap.RcvBufSize, actual)
}

// to avoid lock if the Stop function is already called
if c.stopCalled {
continue
}

c.Lock()
c.conns = append(c.conns, conn)
c.Unlock()
Expand Down
10 changes: 5 additions & 5 deletions collectors/dnstap_proxifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type DnstapProxifier struct {
}

func NewDnstapProxifier(loggers []pkgutils.Worker, config *pkgconfig.Config, logger *logger.Logger, name string) *DnstapProxifier {
logger.Info("[%s] collector=dnstaprelay - enabled", name)
logger.Info(pkgutils.PrefixLogCollector+"[%s] dnstaprelay - enabled", name)
s := &DnstapProxifier{
doneRun: make(chan bool),
stopRun: make(chan bool),
Expand Down Expand Up @@ -72,7 +72,7 @@ func (c *DnstapProxifier) Loggers() []chan dnsutils.DNSMessage {

func (c *DnstapProxifier) ReadConfig() {
if !pkgconfig.IsValidTLS(c.config.Collectors.DnstapProxifier.TLSMinVersion) {
c.logger.Fatal("collector=dnstaprelay - invalid tls min version")
c.logger.Fatal(pkgutils.PrefixLogCollector + "[" + c.name + "] dnstaprelay - invalid tls min version")
}

c.sockPath = c.config.Collectors.DnstapProxifier.SockPath
Expand All @@ -84,11 +84,11 @@ func (c *DnstapProxifier) ReloadConfig(config *pkgconfig.Config) {
}

func (c *DnstapProxifier) LogInfo(msg string, v ...interface{}) {
c.logger.Info("["+c.name+"] collector=dnstaprelay - "+msg, v...)
c.logger.Info(pkgutils.PrefixLogCollector+"["+c.name+"] dnstaprelay - "+msg, v...)
}

func (c *DnstapProxifier) LogError(msg string, v ...interface{}) {
c.logger.Error("["+c.name+"] collector=dnstaprelay - "+msg, v...)
c.logger.Error(pkgutils.PrefixLogCollector+"["+c.name+"] dnstaprelay - "+msg, v...)
}

func (c *DnstapProxifier) HandleFrame(recvFrom chan []byte, sendTo []chan dnsutils.DNSMessage) {
Expand Down Expand Up @@ -147,7 +147,7 @@ func (c *DnstapProxifier) GetInputChannel() chan dnsutils.DNSMessage {
}

func (c *DnstapProxifier) Stop() {
c.LogInfo("stopping...")
c.LogInfo("stopping collector...")
c.stopping = true

// closing properly current connections if exists
Expand Down
10 changes: 5 additions & 5 deletions collectors/file_ingestor.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type FileIngestor struct {
}

func NewFileIngestor(loggers []pkgutils.Worker, config *pkgconfig.Config, logger *logger.Logger, name string) *FileIngestor {
logger.Info("[%s] collector=fileingestor - enabled", name)
logger.Info(pkgutils.PrefixLogCollector+"[%s] fileingestor - enabled", name)
s := &FileIngestor{
done: make(chan bool),
exit: make(chan bool),
Expand Down Expand Up @@ -90,7 +90,7 @@ func (c *FileIngestor) Loggers() ([]chan dnsutils.DNSMessage, []string) {

func (c *FileIngestor) ReadConfig() {
if !IsValidMode(c.config.Collectors.FileIngestor.WatchMode) {
c.logger.Fatal("collector file ingestor - invalid mode: ", c.config.Collectors.FileIngestor.WatchMode)
c.logger.Fatal(pkgutils.PrefixLogCollector+"["+c.name+"]fileingestor - invalid mode: ", c.config.Collectors.FileIngestor.WatchMode)
}

c.identity = c.config.GetServerIdentity()
Expand All @@ -107,19 +107,19 @@ func (c *FileIngestor) ReloadConfig(config *pkgconfig.Config) {
}

func (c *FileIngestor) LogInfo(msg string, v ...interface{}) {
c.logger.Info("["+c.name+"] collector=fileingestor - "+msg, v...)
c.logger.Info(pkgutils.PrefixLogCollector+"["+c.name+"] fileingestor - "+msg, v...)
}

func (c *FileIngestor) LogError(msg string, v ...interface{}) {
c.logger.Error("["+c.name+"] collector=fileingestor - "+msg, v...)
c.logger.Error(pkgutils.PrefixLogCollector+"["+c.name+"] fileingestor - "+msg, v...)
}

func (c *FileIngestor) GetInputChannel() chan dnsutils.DNSMessage {
return nil
}

func (c *FileIngestor) Stop() {
c.LogInfo("stopping...")
c.LogInfo("stopping collector...")

// exit to close properly
c.exit <- true
Expand Down
8 changes: 4 additions & 4 deletions collectors/file_tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type Tail struct {
}

func NewTail(loggers []pkgutils.Worker, config *pkgconfig.Config, logger *logger.Logger, name string) *Tail {
logger.Info("[%s] collector=tail - enabled", name)
logger.Info(pkgutils.PrefixLogCollector+"[%s] tail - enabled", name)
s := &Tail{
doneRun: make(chan bool),
stopRun: make(chan bool),
Expand Down Expand Up @@ -74,19 +74,19 @@ func (c *Tail) ReloadConfig(config *pkgconfig.Config) {
}

func (c *Tail) LogInfo(msg string, v ...interface{}) {
c.logger.Info("["+c.name+"] collector=tail - "+msg, v...)
c.logger.Info(pkgutils.PrefixLogCollector+"["+c.name+"] tail - "+msg, v...)
}

func (c *Tail) LogError(msg string, v ...interface{}) {
c.logger.Error("["+c.name+"] collector=tail - "+msg, v...)
c.logger.Error(pkgutils.PrefixLogCollector+"["+c.name+"] tail - "+msg, v...)
}

func (c *Tail) GetInputChannel() chan dnsutils.DNSMessage {
return nil
}

func (c *Tail) Stop() {
c.LogInfo("stopping...")
c.LogInfo("stopping collector...")

// Stop to follow file
c.LogInfo("stop following file...")
Expand Down
Loading
Loading