Skip to content

Commit

Permalink
enable reload on collectors
Browse files Browse the repository at this point in the history
  • Loading branch information
dmachard committed Oct 26, 2023
1 parent 38f48b9 commit afa24f9
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 63 deletions.
52 changes: 32 additions & 20 deletions collectors/dns_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,33 @@ func GetFakeDns() ([]byte, error) {
}

type DnsProcessor struct {
doneRun chan bool
stopRun chan bool
doneMonitor chan bool
stopMonitor chan bool
recvFrom chan dnsutils.DnsMessage
logger *logger.Logger
config *dnsutils.Config
name string
dropped chan string
droppedCount map[string]int
doneRun chan bool
stopRun chan bool
doneMonitor chan bool
stopMonitor chan bool
recvFrom chan dnsutils.DnsMessage
logger *logger.Logger
config *dnsutils.Config
configTransformers chan *dnsutils.ConfigTransformers
name string
dropped chan string
droppedCount map[string]int
}

func NewDnsProcessor(config *dnsutils.Config, logger *logger.Logger, name string, size int) DnsProcessor {
logger.Info("[%s] processor=dns - initialization...", name)
d := DnsProcessor{
doneMonitor: make(chan bool),
doneRun: make(chan bool),
stopMonitor: make(chan bool),
stopRun: make(chan bool),
recvFrom: make(chan dnsutils.DnsMessage, size),
logger: logger,
config: config,
name: name,
dropped: make(chan string),
droppedCount: map[string]int{},
doneMonitor: make(chan bool),
doneRun: make(chan bool),
stopMonitor: make(chan bool),
stopRun: make(chan bool),
recvFrom: make(chan dnsutils.DnsMessage, size),
logger: logger,
config: config,
configTransformers: make(chan *dnsutils.ConfigTransformers),
name: name,
dropped: make(chan string),
droppedCount: map[string]int{},
}

d.ReadConfig()
Expand All @@ -51,6 +53,13 @@ func NewDnsProcessor(config *dnsutils.Config, logger *logger.Logger, name string

func (d *DnsProcessor) ReadConfig() {}

func (d *DnsProcessor) ReloadConfig(config *dnsutils.Config) {
d.config = config
d.ReadConfig()

d.configTransformers <- &config.IngoingTransformers
}

func (c *DnsProcessor) LogInfo(msg string, v ...interface{}) {
c.logger.Info("["+c.name+"] processor=dns - "+msg, v...)
}
Expand Down Expand Up @@ -125,6 +134,9 @@ func (d *DnsProcessor) Run(loggersChannel []chan dnsutils.DnsMessage, loggersNam
RUN_LOOP:
for {
select {
case cfg := <-d.configTransformers:
transforms.ReloadConfig(cfg)

case <-d.stopRun:
transforms.Reset()
close(d.recvFrom)
Expand Down
7 changes: 5 additions & 2 deletions collectors/powerdns.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,12 @@ func (c *ProtobufPowerDNS) ReloadConfig(config *dnsutils.Config) {

// save the new config
c.config = config

// read again
c.ReadConfig()

// refresh config for all conns
for i := range c.pdnsProcessors {
c.pdnsProcessors[i].ReloadConfig(config)
}
}

func (c *ProtobufPowerDNS) LogInfo(msg string, v ...interface{}) {
Expand Down
60 changes: 36 additions & 24 deletions collectors/powerdns_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,35 +25,37 @@ var (
)

type PdnsProcessor struct {
connId int
doneRun chan bool
stopRun chan bool
doneMonitor chan bool
stopMonitor chan bool
recvFrom chan []byte
logger *logger.Logger
config *dnsutils.Config
name string
chanSize int
dropped chan string
droppedCount map[string]int
connId int
doneRun chan bool
stopRun chan bool
doneMonitor chan bool
stopMonitor chan bool
recvFrom chan []byte
logger *logger.Logger
config *dnsutils.Config
configTransformers chan *dnsutils.ConfigTransformers
name string
chanSize int
dropped chan string
droppedCount map[string]int
}

func NewPdnsProcessor(connId int, config *dnsutils.Config, logger *logger.Logger, name string, size int) PdnsProcessor {
logger.Info("[%s] processor=pdns#%d - initialization...", name, connId)
d := PdnsProcessor{
connId: connId,
doneMonitor: make(chan bool),
doneRun: make(chan bool),
stopMonitor: make(chan bool),
stopRun: make(chan bool),
recvFrom: make(chan []byte, size),
chanSize: size,
logger: logger,
config: config,
name: name,
dropped: make(chan string),
droppedCount: map[string]int{},
connId: connId,
doneMonitor: make(chan bool),
doneRun: make(chan bool),
stopMonitor: make(chan bool),
stopRun: make(chan bool),
recvFrom: make(chan []byte, size),
chanSize: size,
logger: logger,
config: config,
configTransformers: make(chan *dnsutils.ConfigTransformers),
name: name,
dropped: make(chan string),
droppedCount: map[string]int{},
}

d.ReadConfig()
Expand All @@ -65,6 +67,13 @@ func (c *PdnsProcessor) ReadConfig() {
// nothing to read
}

func (d *PdnsProcessor) ReloadConfig(config *dnsutils.Config) {
d.config = config
d.ReadConfig()

d.configTransformers <- &config.IngoingTransformers
}

func (c *PdnsProcessor) LogInfo(msg string, v ...interface{}) {
var log string
if c.connId == 0 {
Expand Down Expand Up @@ -147,6 +156,9 @@ func (d *PdnsProcessor) Run(loggersChannel []chan dnsutils.DnsMessage, loggersNa
RUN_LOOP:
for {
select {
case cfg := <-d.configTransformers:
transforms.ReloadConfig(cfg)

case <-d.stopRun:
transforms.Reset()
//close(d.recvFrom)
Expand Down
34 changes: 17 additions & 17 deletions collectors/sniffer_afpacket.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,16 +147,17 @@ func RemoveBpfFilter(fd int) (err error) {
}

type AfpacketSniffer struct {
done chan bool
exit chan bool
fd int
port int
device string
identity string
loggers []dnsutils.Worker
config *dnsutils.Config
logger *logger.Logger
name string
done chan bool
exit chan bool
fd int
port int
device string
identity string
loggers []dnsutils.Worker
config *dnsutils.Config
logger *logger.Logger
dnsProcessor DnsProcessor
name string
}

func NewAfpacketSniffer(loggers []dnsutils.Worker, config *dnsutils.Config, logger *logger.Logger, name string) *AfpacketSniffer {
Expand Down Expand Up @@ -206,11 +207,10 @@ func (c *AfpacketSniffer) ReadConfig() {
func (c *AfpacketSniffer) ReloadConfig(config *dnsutils.Config) {
c.LogInfo("reload config...")

// save the new config
c.config = config

// read again
c.ReadConfig()

c.dnsProcessor.ReloadConfig(config)
}

func (c *AfpacketSniffer) Channel() chan dnsutils.DnsMessage {
Expand Down Expand Up @@ -284,8 +284,8 @@ func (c *AfpacketSniffer) Run() {
}
}

dnsProcessor := NewDnsProcessor(c.config, c.logger, c.name, c.config.Collectors.AfpacketLiveCapture.ChannelBufferSize)
go dnsProcessor.Run(c.Loggers())
c.dnsProcessor = NewDnsProcessor(c.config, c.logger, c.name, c.config.Collectors.AfpacketLiveCapture.ChannelBufferSize)
go c.dnsProcessor.Run(c.Loggers())

dnsChan := make(chan netlib.DnsPacket)
udpChan := make(chan gopacket.Packet)
Expand Down Expand Up @@ -332,7 +332,7 @@ func (c *AfpacketSniffer) Run() {
dm.DnsTap.TimeNsec = int(timestamp - seconds*int64(time.Second)*int64(time.Nanosecond))

// send DNS message to DNS processor
dnsProcessor.GetChannel() <- dm
c.dnsProcessor.GetChannel() <- dm
}
}()

Expand Down Expand Up @@ -427,7 +427,7 @@ func (c *AfpacketSniffer) Run() {
close(dnsChan)

// stop dns processor
dnsProcessor.Stop()
c.dnsProcessor.Stop()

c.LogInfo("run terminated")
c.done <- true
Expand Down

0 comments on commit afa24f9

Please sign in to comment.