Skip to content

Commit

Permalink
major code refacto (#667)
Browse files Browse the repository at this point in the history
* refacto all collectors
* cleanup config
  • Loading branch information
dmachard authored Apr 22, 2024
1 parent c9ebf8d commit cd12734
Show file tree
Hide file tree
Showing 62 changed files with 1,729 additions and 2,937 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/testing-go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ jobs:
- 'collectors'
- 'loggers'
- 'transformers'
- 'netlib'
- 'netutils'
- 'processors'
# exclude:
# - os-version: macos-latest
Expand Down Expand Up @@ -148,7 +148,7 @@ jobs:

- id: count_tests
run: |
data=$(sudo go test -timeout 360s -v ./collectors ./processors ./dnsutils ./netlib ./loggers ./transformers ./pkgconfig ./pkglinker ./pkgutils ././ 2>&1 | grep -c RUN)
data=$(sudo go test -timeout 360s -v ./collectors ./processors ./dnsutils ./netutils ./loggers ./transformers ./pkgconfig ./pkglinker ./pkgutils ././ 2>&1 | grep -c RUN)
echo "Count of Tests: $data"
echo "data=$data" >> $GITHUB_OUTPUT
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ tests: check-go
@go test -race -cover -v
@go test ./pkgconfig/ -race -cover -v
@go test ./pkglinker/ -race -cover -v
@go test ./netlib/ -race -cover -v
@go test ./netutils/ -race -cover -v
@go test -timeout 90s ./dnsutils/ -race -cover -v
@go test -timeout 90s ./transformers/ -race -cover -v
@go test -timeout 90s ./collectors/ -race -cover -v
Expand Down
184 changes: 38 additions & 146 deletions collectors/dnsmessage.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"reflect"
"regexp"
"strings"
"time"

"github.com/dmachard/go-dnscollector/dnsutils"
"github.com/dmachard/go-dnscollector/pkgconfig"
Expand All @@ -31,58 +30,20 @@ type MatchSource struct {
}

type DNSMessage struct {
doneRun, stopRun chan bool
doneMonitor, stopMonitor chan bool
config *pkgconfig.Config
configChan chan *pkgconfig.Config
inputChan chan dnsutils.DNSMessage
logger *logger.Logger
name string
droppedRoutes, defaultRoutes []pkgutils.Worker
dropped chan string
droppedCount map[string]int
*pkgutils.Collector
inputChan chan dnsutils.DNSMessage
}

func NewDNSMessage(loggers []pkgutils.Worker, config *pkgconfig.Config, logger *logger.Logger, name string) *DNSMessage {
logger.Info(pkgutils.PrefixLogCollector+"[%s] dnsmessage - enabled", name)
func NewDNSMessage(next []pkgutils.Worker, config *pkgconfig.Config, logger *logger.Logger, name string) *DNSMessage {
s := &DNSMessage{
doneRun: make(chan bool),
doneMonitor: make(chan bool),
stopRun: make(chan bool),
stopMonitor: make(chan bool),
config: config,
configChan: make(chan *pkgconfig.Config),
inputChan: make(chan dnsutils.DNSMessage, config.Collectors.DNSMessage.ChannelBufferSize),
logger: logger,
name: name,
// RoutingHandler: pkgutils.NewRoutingHandler(config, logger, name),
dropped: make(chan string),
droppedCount: map[string]int{},
Collector: pkgutils.NewCollector(config, logger, name, "dnsmessage"),
inputChan: make(chan dnsutils.DNSMessage, config.Collectors.DNSMessage.ChannelBufferSize),
}
s.SetDefaultRoutes(next)
s.ReadConfig()
return s
}

func (c *DNSMessage) GetName() string { return c.name }

func (c *DNSMessage) AddDroppedRoute(wrk pkgutils.Worker) {
// c.RoutingHandler.AddDroppedRoute(wrk)
c.droppedRoutes = append(c.droppedRoutes, wrk)
}

func (c *DNSMessage) AddDefaultRoute(wrk pkgutils.Worker) {
// c.RoutingHandler.AddDefaultRoute(wrk)
c.defaultRoutes = append(c.defaultRoutes, wrk)
}

// deprecated function
func (c *DNSMessage) SetLoggers(loggers []pkgutils.Worker) {}

// deprecated function
func (c *DNSMessage) Loggers() ([]chan dnsutils.DNSMessage, []string) {
return nil, nil
}

func (c *DNSMessage) ReadConfigMatching(value interface{}) {
reflectedValue := reflect.ValueOf(value)
if reflectedValue.Kind() == reflect.Map {
Expand All @@ -101,7 +62,7 @@ func (c *DNSMessage) ReadConfigMatching(value interface{}) {
if len(matchSrc) > 0 {
sourceData, err := c.LoadData(matchSrc, srcKind)
if err != nil {
c.logger.Fatal(err)
c.LogFatal(err)
}
if len(sourceData.regexList) > 0 {
value.(map[interface{}]interface{})[srcKind] = sourceData.regexList
Expand All @@ -113,36 +74,36 @@ func (c *DNSMessage) ReadConfigMatching(value interface{}) {
}
}

func (c *DNSMessage) GetInputChannel() chan dnsutils.DNSMessage {
return c.inputChan
}

func (c *DNSMessage) ReadConfig() {
// load external file for include
if len(c.config.Collectors.DNSMessage.Matching.Include) > 0 {
for _, value := range c.config.Collectors.DNSMessage.Matching.Include {
if len(c.GetConfig().Collectors.DNSMessage.Matching.Include) > 0 {
for _, value := range c.GetConfig().Collectors.DNSMessage.Matching.Include {
c.ReadConfigMatching(value)
}
}
// load external file for exclude
if len(c.config.Collectors.DNSMessage.Matching.Exclude) > 0 {
for _, value := range c.config.Collectors.DNSMessage.Matching.Exclude {
if len(c.GetConfig().Collectors.DNSMessage.Matching.Exclude) > 0 {
for _, value := range c.GetConfig().Collectors.DNSMessage.Matching.Exclude {
c.ReadConfigMatching(value)
}
}
}

func (c *DNSMessage) GetInputChannel() chan dnsutils.DNSMessage {
return c.inputChan
}

func (c *DNSMessage) LoadData(matchSource string, srcKind string) (MatchSource, error) {
if isFileSource(matchSource) {
dataSource, err := c.LoadFromFile(matchSource, srcKind)
if err != nil {
c.logger.Fatal(err)
c.LogFatal(err)
}
return dataSource, nil
} else if isURLSource(matchSource) {
dataSource, err := c.LoadFromURL(matchSource, srcKind)
if err != nil {
c.logger.Fatal(err)
c.LogFatal(err)
}
return dataSource, nil
}
Expand Down Expand Up @@ -208,61 +169,32 @@ func (c *DNSMessage) LoadFromFile(filePath string, srcKind string) (MatchSource,
return matchSources, nil
}

func (c *DNSMessage) ReloadConfig(config *pkgconfig.Config) {
c.LogInfo("reload configuration...")
c.configChan <- config
}

func (c *DNSMessage) LogInfo(msg string, v ...interface{}) {
c.logger.Info(pkgutils.PrefixLogCollector+"["+c.name+"] dnsmessage - "+msg, v...)
}

func (c *DNSMessage) LogError(msg string, v ...interface{}) {
c.logger.Error(pkgutils.PrefixLogCollector+"["+c.name+"] dnsmessage - "+msg, v...)
}

func (c *DNSMessage) Stop() {
c.LogInfo("stopping collector...")

// read done channel and block until run is terminated
c.LogInfo("stopping to run...")
c.stopRun <- true
<-c.doneRun

c.LogInfo("stopping monitor...")
c.stopMonitor <- true
<-c.doneMonitor
}

func (c *DNSMessage) Run() {
c.LogInfo("starting collector...")
c.LogInfo("running collector...")
defer func() {
c.LogInfo("run terminated")
c.StopIsDone()
}()

var err error

// prepare next channels
// defaultRoutes, defaultNames := c.RoutingHandler.GetDefaultRoutes()
// droppedRoutes, droppedNames := c.RoutingHandler.GetDroppedRoutes()
defaultRoutes, defaultNames := pkgutils.GetRoutes(c.defaultRoutes)
droppedRoutes, droppedNames := pkgutils.GetRoutes(c.droppedRoutes)
defaultRoutes, defaultNames := pkgutils.GetRoutes(c.GetDefaultRoutes())
droppedRoutes, droppedNames := pkgutils.GetRoutes(c.GetDefaultRoutes())

// prepare transforms
subprocessors := transformers.NewTransforms(&c.config.IngoingTransformers, c.logger, c.name, defaultRoutes, 0)

// start goroutine to count dropped messsages
go c.MonitorNextStanzas()
subprocessors := transformers.NewTransforms(&c.GetConfig().IngoingTransformers, c.GetLogger(), c.GetName(), defaultRoutes, 0)

// read incoming dns message
c.LogInfo("waiting dns message to process...")
RUN_LOOP:
for {
select {
case <-c.stopRun:
c.doneRun <- true
break RUN_LOOP
case <-c.OnStop():
return

case cfg := <-c.configChan:

// save the new config
c.config = cfg
// save the new config
case cfg := <-c.NewConfig():
c.SetConfig(cfg)
c.ReadConfig()

case dm, opened := <-c.inputChan:
Expand All @@ -276,8 +208,8 @@ RUN_LOOP:
matchedInclude := false
matchedExclude := false

if len(c.config.Collectors.DNSMessage.Matching.Include) > 0 {
err, matchedInclude = dm.Matching(c.config.Collectors.DNSMessage.Matching.Include)
if len(c.GetConfig().Collectors.DNSMessage.Matching.Include) > 0 {
err, matchedInclude = dm.Matching(c.GetConfig().Collectors.DNSMessage.Matching.Include)
if err != nil {
c.LogError(err.Error())
}
Expand All @@ -288,8 +220,8 @@ RUN_LOOP:
}
}

if len(c.config.Collectors.DNSMessage.Matching.Exclude) > 0 {
err, matchedExclude = dm.Matching(c.config.Collectors.DNSMessage.Matching.Exclude)
if len(c.GetConfig().Collectors.DNSMessage.Matching.Exclude) > 0 {
err, matchedExclude = dm.Matching(c.GetConfig().Collectors.DNSMessage.Matching.Exclude)
if err != nil {
c.LogError(err.Error())
}
Expand All @@ -305,12 +237,11 @@ RUN_LOOP:
if matched {
subprocessors.InitDNSMessageFormat(&dm)
if subprocessors.ProcessMessage(&dm) == transformers.ReturnDrop {
// c.RoutingHandler.SendTo(droppedRoutes, droppedNames, dm)
for i := range droppedRoutes {
select {
case droppedRoutes[i] <- dm:
default:
c.dropped <- droppedNames[i]
c.NextStanzaIsBusy(droppedNames[i])
}
}
continue
Expand All @@ -319,63 +250,24 @@ RUN_LOOP:

// drop packet ?
if !matched {
// c.RoutingHandler.SendTo(droppedRoutes, droppedNames, dm)
for i := range droppedRoutes {
select {
case droppedRoutes[i] <- dm:
default:
c.dropped <- droppedNames[i]
c.NextStanzaIsBusy(droppedNames[i])
}
}
continue
}

// send to next
// c.RoutingHandler.SendTo(defaultRoutes, defaultNames, dm)
for i := range defaultRoutes {
select {
case defaultRoutes[i] <- dm:
default:
c.dropped <- defaultNames[i]
c.NextStanzaIsBusy(defaultNames[i])
}
}

}

}
c.LogInfo("run terminated")
}

func (c *DNSMessage) MonitorNextStanzas() {
watchInterval := 10 * time.Second
bufferFull := time.NewTimer(watchInterval)
FOLLOW_LOOP:
for {
select {
case <-c.stopMonitor:
close(c.dropped)
bufferFull.Stop()
c.doneMonitor <- true
break FOLLOW_LOOP

case loggerName := <-c.dropped:
if _, ok := c.droppedCount[loggerName]; !ok {
c.droppedCount[loggerName] = 1
} else {
c.droppedCount[loggerName]++
}

case <-bufferFull.C:

for v, k := range c.droppedCount {
if k > 0 {
c.LogError("stanza[%s] buffer is full, %d dnsmessage(s) dropped", v, k)
c.droppedCount[v] = 0
}
}
bufferFull.Reset(watchInterval)

}
}
c.LogInfo("monitor terminated")
}
2 changes: 1 addition & 1 deletion collectors/dnsmessage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (

func Test_DnsMessage_BufferLoggerIsFull(t *testing.T) {
// redirect stdout output to bytes buffer
logsChan := make(chan logger.LogEntry, 10)
logsChan := make(chan logger.LogEntry, 50)
lg := logger.New(true)
lg.SetOutputChannel((logsChan))

Expand Down
Loading

0 comments on commit cd12734

Please sign in to comment.