Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

transformers reload config on SIGHUP and major refacto #440

Merged
merged 13 commits into from
Oct 29, 2023
2 changes: 1 addition & 1 deletion .github/workflows/testing-go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:
matrix:
os-version: ['ubuntu-22.04', 'macos-latest' ]
go-version: [ '1.20', '1.21' ]
package: ['dnsutils', 'collectors', 'loggers', 'transformers', 'netlib']
package: ['dnsutils', 'collectors', 'loggers', 'transformers', 'netlib', 'processors']
exclude:
- os-version: macos-latest
go-version: '1.20'
Expand Down
89 changes: 64 additions & 25 deletions collectors/dnstap.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,26 +14,29 @@ import (

"github.com/dmachard/go-dnscollector/dnsutils"
"github.com/dmachard/go-dnscollector/netlib"
"github.com/dmachard/go-dnscollector/processors"
"github.com/dmachard/go-framestream"
"github.com/dmachard/go-logger"
)

type Dnstap struct {
doneRun chan bool
doneMonitor chan bool
stopRun chan bool
stopMonitor chan bool
listen net.Listener
conns []net.Conn
sockPath string
loggers []dnsutils.Worker
config *dnsutils.Config
configChan chan *dnsutils.Config
logger *logger.Logger
name string
connMode string
connId int
droppedCount int
dropped chan int
tapProcessors []DnstapProcessor
tapProcessors []processors.DnstapProcessor
sync.RWMutex
}

Expand All @@ -42,9 +45,11 @@ func NewDnstap(loggers []dnsutils.Worker, config *dnsutils.Config, logger *logge
s := &Dnstap{
doneRun: make(chan bool),
doneMonitor: make(chan bool),
stopRun: make(chan bool),
stopMonitor: make(chan bool),
dropped: make(chan int),
config: config,
configChan: make(chan *dnsutils.Config),
loggers: loggers,
logger: logger,
name: name,
Expand Down Expand Up @@ -85,6 +90,11 @@ func (c *Dnstap) ReadConfig() {
}
}

func (c *Dnstap) ReloadConfig(config *dnsutils.Config) {
c.LogInfo("reload configuration...")
c.configChan <- config
}

func (c *Dnstap) LogInfo(msg string, v ...interface{}) {
c.logger.Info("["+c.name+"] collector=dnstap - "+msg, v...)
}
Expand Down Expand Up @@ -118,7 +128,7 @@ func (c *Dnstap) HandleConn(conn net.Conn) {
c.LogConnInfo(connId, "new connection from %s", peer)

// start dnstap subprocessor
dnstapProcessor := NewDnstapProcessor(connId, c.config, c.logger, c.name, c.config.Collectors.Dnstap.ChannelBufferSize)
dnstapProcessor := processors.NewDnstapProcessor(connId, c.config, c.logger, c.name, c.config.Collectors.Dnstap.ChannelBufferSize)
c.Lock()
c.tapProcessors = append(c.tapProcessors, dnstapProcessor)
c.Unlock()
Expand Down Expand Up @@ -177,7 +187,7 @@ func (c *Dnstap) HandleConn(conn net.Conn) {
// then removes the current tap processor from the list
c.Lock()
for i, t := range c.tapProcessors {
if t.connId == connId {
if t.ConnId == connId {
c.tapProcessors = append(c.tapProcessors[:i], c.tapProcessors[i+1:]...)
}
}
Expand Down Expand Up @@ -225,8 +235,8 @@ func (c *Dnstap) Stop() {

// read done channel and block until run is terminated
c.LogInfo("stopping run...")
c.stopRun <- true
<-c.doneRun
close(c.doneRun)
}

func (c *Dnstap) Listen() error {
Expand Down Expand Up @@ -320,32 +330,61 @@ func (c *Dnstap) Run() {
// start goroutine to count dropped messsages
go c.MonitorCollector()

for {
// Accept() blocks waiting for new connection.
conn, err := c.listen.Accept()
if err != nil {
break
// goroutine to Accept() blocks waiting for new connection.
acceptChan := make(chan net.Conn)
go func() {
for {
conn, err := c.listen.Accept()
if err != nil {
return
}
acceptChan <- conn
}
}()

if (c.connMode == "tls" || c.connMode == "tcp") && c.config.Collectors.Dnstap.RcvBufSize > 0 {
before, actual, err := netlib.SetSock_RCVBUF(
conn,
c.config.Collectors.Dnstap.RcvBufSize,
c.config.Collectors.Dnstap.TlsSupport,
)
if err != nil {
c.logger.Fatal("Unable to set SO_RCVBUF: ", err)
RUN_LOOP:
for {
select {
case <-c.stopRun:
close(acceptChan)
c.doneRun <- true
break RUN_LOOP

case cfg := <-c.configChan:

// save the new config
c.config = cfg
c.ReadConfig()

// refresh config for all conns
for i := range c.tapProcessors {
c.tapProcessors[i].ConfigChan <- cfg
}

case conn, opened := <-acceptChan:
if !opened {
return
}
c.LogInfo("set SO_RCVBUF option, value before: %d, desired: %d, actual: %d", before,
c.config.Collectors.Dnstap.RcvBufSize, actual)

if (c.connMode == "tls" || c.connMode == "tcp") && c.config.Collectors.Dnstap.RcvBufSize > 0 {
before, actual, err := netlib.SetSock_RCVBUF(
conn,
c.config.Collectors.Dnstap.RcvBufSize,
c.config.Collectors.Dnstap.TlsSupport,
)
if err != nil {
c.logger.Fatal("Unable to set SO_RCVBUF: ", err)
}
c.LogInfo("set SO_RCVBUF option, value before: %d, desired: %d, actual: %d", before,
c.config.Collectors.Dnstap.RcvBufSize, actual)
}

c.Lock()
c.conns = append(c.conns, conn)
c.Unlock()
go c.HandleConn(conn)
}

c.Lock()
c.conns = append(c.conns, conn)
c.Unlock()
go c.HandleConn(conn)
}

c.LogInfo("run terminated")
c.doneRun <- true
}
82 changes: 58 additions & 24 deletions collectors/dnstap_proxifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,29 @@ import (
)

type DnstapProxifier struct {
done chan bool
listen net.Listener
conns []net.Conn
sockPath string
loggers []dnsutils.Worker
config *dnsutils.Config
logger *logger.Logger
name string
stopping bool
doneRun chan bool
stopRun chan bool
listen net.Listener
conns []net.Conn
sockPath string
loggers []dnsutils.Worker
config *dnsutils.Config
configChan chan *dnsutils.Config
logger *logger.Logger
name string
stopping bool
}

func NewDnstapProxifier(loggers []dnsutils.Worker, config *dnsutils.Config, logger *logger.Logger, name string) *DnstapProxifier {
logger.Info("[%s] collector=dnstaprelay - enabled", name)
s := &DnstapProxifier{
done: make(chan bool),
config: config,
loggers: loggers,
logger: logger,
name: name,
doneRun: make(chan bool),
stopRun: make(chan bool),
config: config,
configChan: make(chan *dnsutils.Config),
loggers: loggers,
logger: logger,
name: name,
}
s.ReadConfig()
return s
Expand Down Expand Up @@ -60,6 +64,11 @@ func (c *DnstapProxifier) ReadConfig() {
c.sockPath = c.config.Collectors.DnstapProxifier.SockPath
}

func (c *DnstapProxifier) ReloadConfig(config *dnsutils.Config) {
c.LogInfo("reload configuration...")
c.configChan <- config
}

func (c *DnstapProxifier) LogInfo(msg string, v ...interface{}) {
c.logger.Info("["+c.name+"] collector=dnstaprelay - "+msg, v...)
}
Expand Down Expand Up @@ -138,8 +147,8 @@ func (c *DnstapProxifier) Stop() {
c.listen.Close()

// read done channel and block until run is terminated
<-c.done
close(c.done)
c.stopRun <- true
<-c.doneRun
}

func (c *DnstapProxifier) Listen() error {
Expand Down Expand Up @@ -201,17 +210,42 @@ func (c *DnstapProxifier) Run() {
c.logger.Fatal("collector dnstap listening failed: ", err)
}
}
for {
// Accept() blocks waiting for new connection.
conn, err := c.listen.Accept()
if err != nil {
break

// goroutine to Accept() blocks waiting for new connection.
acceptChan := make(chan net.Conn)
go func() {
for {
conn, err := c.listen.Accept()
if err != nil {
return
}
acceptChan <- conn
}
}()

c.conns = append(c.conns, conn)
go c.HandleConn(conn)
RUN_LOOP:
for {
select {
case <-c.stopRun:
close(acceptChan)
c.doneRun <- true
break RUN_LOOP

case cfg := <-c.configChan:

// save the new config
c.config = cfg
c.ReadConfig()

case conn, opened := <-acceptChan:
if !opened {
return
}

c.conns = append(c.conns, conn)
go c.HandleConn(conn)
}
}

c.LogInfo("run terminated")
c.done <- true
}
5 changes: 3 additions & 2 deletions collectors/dnstap_proxifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/dmachard/go-dnscollector/dnsutils"
"github.com/dmachard/go-dnscollector/loggers"
"github.com/dmachard/go-dnscollector/processors"
"github.com/dmachard/go-framestream"
"github.com/dmachard/go-logger"
"google.golang.org/protobuf/proto"
Expand Down Expand Up @@ -75,13 +76,13 @@ func Test_DnstapProxifier(t *testing.T) {
frame := &framestream.Frame{}

// get fake dns question
dnsquery, err := GetFakeDns()
dnsquery, err := processors.GetFakeDns()
if err != nil {
t.Fatalf("dns question pack error")
}

// get fake dnstap message
dt_query := GetFakeDnstap(dnsquery)
dt_query := processors.GetFakeDnstap(dnsquery)

// serialize to bytes
data, err := proto.Marshal(dt_query)
Expand Down
5 changes: 3 additions & 2 deletions collectors/dnstap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/dmachard/go-dnscollector/dnsutils"
"github.com/dmachard/go-dnscollector/loggers"
"github.com/dmachard/go-dnscollector/processors"
"github.com/dmachard/go-framestream"
"github.com/dmachard/go-logger"
"google.golang.org/protobuf/proto"
Expand Down Expand Up @@ -79,13 +80,13 @@ func Test_DnstapCollector(t *testing.T) {
frame := &framestream.Frame{}

// get fake dns question
dnsquery, err := GetFakeDns()
dnsquery, err := processors.GetFakeDns()
if err != nil {
t.Fatalf("dns question pack error")
}

// get fake dnstap message
dt_query := GetFakeDnstap(dnsquery)
dt_query := processors.GetFakeDnstap(dnsquery)

// serialize to bytes
data, err := proto.Marshal(dt_query)
Expand Down
Loading