Skip to content

Commit

Permalink
support config reload on all
Browse files Browse the repository at this point in the history
loggers and collectors
major code factory
fix some race condition
  • Loading branch information
dmachard committed Oct 29, 2023
1 parent afa24f9 commit b7bdcfc
Show file tree
Hide file tree
Showing 46 changed files with 819 additions and 707 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/testing-go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:
matrix:
os-version: ['ubuntu-22.04', 'macos-latest' ]
go-version: [ '1.20', '1.21' ]
package: ['dnsutils', 'collectors', 'loggers', 'transformers', 'netlib']
package: ['dnsutils', 'collectors', 'loggers', 'transformers', 'netlib', 'processors']
exclude:
- os-version: macos-latest
go-version: '1.20'
Expand Down
96 changes: 61 additions & 35 deletions collectors/dnstap.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,26 +14,29 @@ import (

"github.com/dmachard/go-dnscollector/dnsutils"
"github.com/dmachard/go-dnscollector/netlib"
"github.com/dmachard/go-dnscollector/processors"
"github.com/dmachard/go-framestream"
"github.com/dmachard/go-logger"
)

type Dnstap struct {
doneRun chan bool
doneMonitor chan bool
stopRun chan bool
stopMonitor chan bool
listen net.Listener
conns []net.Conn
sockPath string
loggers []dnsutils.Worker
config *dnsutils.Config
configChan chan *dnsutils.Config
logger *logger.Logger
name string
connMode string
connId int
droppedCount int
dropped chan int
tapProcessors []DnstapProcessor
tapProcessors []processors.DnstapProcessor
sync.RWMutex
}

Expand All @@ -42,9 +45,11 @@ func NewDnstap(loggers []dnsutils.Worker, config *dnsutils.Config, logger *logge
s := &Dnstap{
doneRun: make(chan bool),
doneMonitor: make(chan bool),
stopRun: make(chan bool),
stopMonitor: make(chan bool),
dropped: make(chan int),
config: config,
configChan: make(chan *dnsutils.Config),
loggers: loggers,
logger: logger,
name: name,
Expand Down Expand Up @@ -86,16 +91,8 @@ func (c *Dnstap) ReadConfig() {
}

func (c *Dnstap) ReloadConfig(config *dnsutils.Config) {
c.LogInfo("reload config...")

// save the new config
c.config = config
c.ReadConfig()

// refresh config for all conns
for i := range c.tapProcessors {
c.tapProcessors[i].ReloadConfig(config)
}
c.LogInfo("reload configuration...")
c.configChan <- config
}

func (c *Dnstap) LogInfo(msg string, v ...interface{}) {
Expand Down Expand Up @@ -131,7 +128,7 @@ func (c *Dnstap) HandleConn(conn net.Conn) {
c.LogConnInfo(connId, "new connection from %s", peer)

// start dnstap subprocessor
dnstapProcessor := NewDnstapProcessor(connId, c.config, c.logger, c.name, c.config.Collectors.Dnstap.ChannelBufferSize)
dnstapProcessor := processors.NewDnstapProcessor(connId, c.config, c.logger, c.name, c.config.Collectors.Dnstap.ChannelBufferSize)
c.Lock()
c.tapProcessors = append(c.tapProcessors, dnstapProcessor)
c.Unlock()
Expand Down Expand Up @@ -190,7 +187,7 @@ func (c *Dnstap) HandleConn(conn net.Conn) {
// then removes the current tap processor from the list
c.Lock()
for i, t := range c.tapProcessors {
if t.connId == connId {
if t.ConnId == connId {
c.tapProcessors = append(c.tapProcessors[:i], c.tapProcessors[i+1:]...)
}
}
Expand Down Expand Up @@ -238,8 +235,8 @@ func (c *Dnstap) Stop() {

// read done channel and block until run is terminated
c.LogInfo("stopping run...")
c.stopRun <- true
<-c.doneRun
close(c.doneRun)
}

func (c *Dnstap) Listen() error {
Expand Down Expand Up @@ -333,32 +330,61 @@ func (c *Dnstap) Run() {
// start goroutine to count dropped messsages
go c.MonitorCollector()

for {
// Accept() blocks waiting for new connection.
conn, err := c.listen.Accept()
if err != nil {
break
// goroutine to Accept() blocks waiting for new connection.
acceptChan := make(chan net.Conn)
go func() {
for {
conn, err := c.listen.Accept()
if err != nil {
return
}
acceptChan <- conn
}
}()

if (c.connMode == "tls" || c.connMode == "tcp") && c.config.Collectors.Dnstap.RcvBufSize > 0 {
before, actual, err := netlib.SetSock_RCVBUF(
conn,
c.config.Collectors.Dnstap.RcvBufSize,
c.config.Collectors.Dnstap.TlsSupport,
)
if err != nil {
c.logger.Fatal("Unable to set SO_RCVBUF: ", err)
RUN_LOOP:
for {
select {
case <-c.stopRun:
close(acceptChan)
c.doneRun <- true
break RUN_LOOP

case cfg := <-c.configChan:

// save the new config
c.config = cfg
c.ReadConfig()

// refresh config for all conns
for i := range c.tapProcessors {
c.tapProcessors[i].ConfigChan <- cfg
}

case conn, opened := <-acceptChan:
if !opened {
return
}
c.LogInfo("set SO_RCVBUF option, value before: %d, desired: %d, actual: %d", before,
c.config.Collectors.Dnstap.RcvBufSize, actual)

if (c.connMode == "tls" || c.connMode == "tcp") && c.config.Collectors.Dnstap.RcvBufSize > 0 {
before, actual, err := netlib.SetSock_RCVBUF(
conn,
c.config.Collectors.Dnstap.RcvBufSize,
c.config.Collectors.Dnstap.TlsSupport,
)
if err != nil {
c.logger.Fatal("Unable to set SO_RCVBUF: ", err)
}
c.LogInfo("set SO_RCVBUF option, value before: %d, desired: %d, actual: %d", before,
c.config.Collectors.Dnstap.RcvBufSize, actual)
}

c.Lock()
c.conns = append(c.conns, conn)
c.Unlock()
go c.HandleConn(conn)
}

c.Lock()
c.conns = append(c.conns, conn)
c.Unlock()
go c.HandleConn(conn)
}

c.LogInfo("run terminated")
c.doneRun <- true
}
86 changes: 55 additions & 31 deletions collectors/dnstap_proxifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,29 @@ import (
)

type DnstapProxifier struct {
done chan bool
listen net.Listener
conns []net.Conn
sockPath string
loggers []dnsutils.Worker
config *dnsutils.Config
logger *logger.Logger
name string
stopping bool
doneRun chan bool
stopRun chan bool
listen net.Listener
conns []net.Conn
sockPath string
loggers []dnsutils.Worker
config *dnsutils.Config
configChan chan *dnsutils.Config
logger *logger.Logger
name string
stopping bool
}

func NewDnstapProxifier(loggers []dnsutils.Worker, config *dnsutils.Config, logger *logger.Logger, name string) *DnstapProxifier {
logger.Info("[%s] collector=dnstaprelay - enabled", name)
s := &DnstapProxifier{
done: make(chan bool),
config: config,
loggers: loggers,
logger: logger,
name: name,
doneRun: make(chan bool),
stopRun: make(chan bool),
config: config,
configChan: make(chan *dnsutils.Config),
loggers: loggers,
logger: logger,
name: name,
}
s.ReadConfig()
return s
Expand Down Expand Up @@ -61,13 +65,8 @@ func (c *DnstapProxifier) ReadConfig() {
}

func (c *DnstapProxifier) ReloadConfig(config *dnsutils.Config) {
c.LogInfo("reload config...")

// save the new config
c.config = config

// read again
c.ReadConfig()
c.LogInfo("reload configuration...")
c.configChan <- config
}

func (c *DnstapProxifier) LogInfo(msg string, v ...interface{}) {
Expand Down Expand Up @@ -148,8 +147,8 @@ func (c *DnstapProxifier) Stop() {
c.listen.Close()

// read done channel and block until run is terminated
<-c.done
close(c.done)
c.stopRun <- true
<-c.doneRun
}

func (c *DnstapProxifier) Listen() error {
Expand Down Expand Up @@ -211,17 +210,42 @@ func (c *DnstapProxifier) Run() {
c.logger.Fatal("collector dnstap listening failed: ", err)
}
}
for {
// Accept() blocks waiting for new connection.
conn, err := c.listen.Accept()
if err != nil {
break

// goroutine to Accept() blocks waiting for new connection.
acceptChan := make(chan net.Conn)
go func() {
for {
conn, err := c.listen.Accept()
if err != nil {
return
}
acceptChan <- conn
}
}()

c.conns = append(c.conns, conn)
go c.HandleConn(conn)
RUN_LOOP:
for {
select {
case <-c.stopRun:
close(acceptChan)
c.doneRun <- true
break RUN_LOOP

case cfg := <-c.configChan:

// save the new config
c.config = cfg
c.ReadConfig()

case conn, opened := <-acceptChan:
if !opened {
return
}

c.conns = append(c.conns, conn)
go c.HandleConn(conn)
}
}

c.LogInfo("run terminated")
c.done <- true
}
5 changes: 3 additions & 2 deletions collectors/dnstap_proxifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/dmachard/go-dnscollector/dnsutils"
"github.com/dmachard/go-dnscollector/loggers"
"github.com/dmachard/go-dnscollector/processors"
"github.com/dmachard/go-framestream"
"github.com/dmachard/go-logger"
"google.golang.org/protobuf/proto"
Expand Down Expand Up @@ -75,13 +76,13 @@ func Test_DnstapProxifier(t *testing.T) {
frame := &framestream.Frame{}

// get fake dns question
dnsquery, err := GetFakeDns()
dnsquery, err := processors.GetFakeDns()
if err != nil {
t.Fatalf("dns question pack error")
}

// get fake dnstap message
dt_query := GetFakeDnstap(dnsquery)
dt_query := processors.GetFakeDnstap(dnsquery)

// serialize to bytes
data, err := proto.Marshal(dt_query)
Expand Down
5 changes: 3 additions & 2 deletions collectors/dnstap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/dmachard/go-dnscollector/dnsutils"
"github.com/dmachard/go-dnscollector/loggers"
"github.com/dmachard/go-dnscollector/processors"
"github.com/dmachard/go-framestream"
"github.com/dmachard/go-logger"
"google.golang.org/protobuf/proto"
Expand Down Expand Up @@ -79,13 +80,13 @@ func Test_DnstapCollector(t *testing.T) {
frame := &framestream.Frame{}

// get fake dns question
dnsquery, err := GetFakeDns()
dnsquery, err := processors.GetFakeDns()
if err != nil {
t.Fatalf("dns question pack error")
}

// get fake dnstap message
dt_query := GetFakeDnstap(dnsquery)
dt_query := processors.GetFakeDnstap(dnsquery)

// serialize to bytes
data, err := proto.Marshal(dt_query)
Expand Down
Loading

0 comments on commit b7bdcfc

Please sign in to comment.