Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

first shot for pipelining support #503

Merged
merged 69 commits into from
Jan 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
69 commits
Select commit Hold shift + click to select a range
591775c
first shot for pipelling support
dmachard Dec 10, 2023
75d4f1b
make linters happy
dmachard Dec 10, 2023
3bc0a99
regex support
dmachard Dec 10, 2023
ac18c1d
fix condition
dmachard Dec 10, 2023
ad38bdb
support operator greater-than for int
dmachard Dec 11, 2023
fdb8e31
test atags transform
dmachard Dec 11, 2023
47151c4
fix regex support
dmachard Dec 12, 2023
82ef7b8
Merge branch 'main' into pipeline_mode
dmachard Dec 13, 2023
58f4bbc
update config
dmachard Dec 13, 2023
e69d5ce
Merge branch 'main' into pipeline_mode
dmachard Dec 14, 2023
7147464
Merge branch 'main' into pipeline_mode
dmachard Dec 15, 2023
be59999
check routes defintion
dmachard Dec 17, 2023
aebecc8
Merge branch 'main' into pipeline_mode
dmachard Dec 17, 2023
a68487c
add slice support on matching
dmachard Dec 17, 2023
67f2a7c
fix linter
dmachard Dec 17, 2023
37b8355
fix slice support
dmachard Dec 17, 2023
b4a0f9a
support list of int
dmachard Dec 17, 2023
5855703
support external file and include/exclude
dmachard Dec 17, 2023
78da0be
implement match-source
dmachard Dec 18, 2023
9a4d963
make linter happy
dmachard Dec 18, 2023
85de089
rename drop-policy
dmachard Dec 18, 2023
89262b7
support boolean value in matching
dmachard Dec 18, 2023
c564d38
support list maching
dmachard Dec 22, 2023
c0d7efd
make linter happy again
dmachard Dec 22, 2023
cacb8e7
Merge branch 'main' into pipeline_mode
dmachard Dec 24, 2023
352c799
check config in pipeline mode
dmachard Dec 26, 2023
56f1ba1
Merge branch 'main' into pipeline_mode
dmachard Dec 26, 2023
4e0b439
check config
dmachard Dec 26, 2023
34bd902
add test
dmachard Dec 26, 2023
e1d5fe4
Merge branch 'main' into pipeline_mode
dmachard Dec 27, 2023
a74f441
add golang-lru
dmachard Dec 28, 2023
abfdbe0
counter to gauge
dmachard Dec 28, 2023
b91c002
make linter happy
dmachard Dec 28, 2023
4c84d6a
rename metrics
dmachard Dec 28, 2023
6630ce7
rename metrics
dmachard Dec 29, 2023
3645b2e
revert config
dmachard Dec 29, 2023
7c94d71
reverse conf
dmachard Dec 29, 2023
d06d608
Update config
dmachard Dec 29, 2023
552f5b4
Merge branch 'prom_fix_potential_memory_leak' into pipeline_mode
dmachard Dec 29, 2023
c7f54ac
Limit memory usage for prom
dmachard Dec 29, 2023
3f6e8b2
support stream_global as selector
dmachard Dec 31, 2023
8ad2f89
set default value if no label provided
dmachard Dec 31, 2023
07dd7da
Merge branch 'prom_fix_potential_memory_leak' into pipeline_mode
dmachard Dec 31, 2023
578ccb8
Update docs
dmachard Dec 31, 2023
1570634
Update docs
dmachard Dec 31, 2023
7069277
update and fix tests
dmachard Jan 2, 2024
cca2f67
Merge branch 'prom_fix_potential_memory_leak' into pipeline_mode
dmachard Jan 2, 2024
05ed745
new routing policy syntax
dmachard Jan 3, 2024
6f33f09
fix linter
dmachard Jan 3, 2024
829d9be
fix test config
dmachard Jan 3, 2024
a7be5dd
fix regression
dmachard Jan 3, 2024
53dae4d
fix regression
dmachard Jan 3, 2024
3356b19
fix regression
dmachard Jan 3, 2024
8eec845
fix tzsp regression
dmachard Jan 3, 2024
e4740df
fix reg in tzsp
dmachard Jan 3, 2024
76e860c
Merge branch 'main' into pipeline_mode
dmachard Jan 3, 2024
1efaadb
code cleanup
dmachard Jan 3, 2024
564a7e8
code cleanup
dmachard Jan 3, 2024
f1e10fb
fix GetDefaultRoutes
dmachard Jan 4, 2024
9f58ee3
Update routing mode
dmachard Jan 4, 2024
3ef1fcc
support routing on loggers
dmachard Jan 4, 2024
05986f7
fix return loggers
dmachard Jan 4, 2024
999f84a
code factory
dmachard Jan 5, 2024
beb73d6
coe factory
dmachard Jan 5, 2024
0fe6451
final support of routing on all loggers
dmachard Jan 5, 2024
81ac8e5
config to default
dmachard Jan 5, 2024
6a9f4dc
Update config
dmachard Jan 5, 2024
a3a76cc
Merge branch 'main' into pipeline_mode
dmachard Jan 5, 2024
d104945
Update README.md
dmachard Jan 5, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion .github/workflows/testing-go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,17 @@ jobs:
matrix:
os-version: ['ubuntu-22.04', 'macos-latest' ]
go-version: [ '1.20', '1.21' ]
package: ['.', 'pkgconfig', 'dnsutils', 'collectors', 'loggers', 'transformers', 'netlib', 'processors']
package:
- '.'
- 'pkgconfig'
- 'pkglinker'
- 'pkgutils'
- 'dnsutils'
- 'collectors'
- 'loggers'
- 'transformers'
- 'netlib'
- 'processors'
exclude:
- os-version: macos-latest
go-version: '1.20'
Expand Down
4 changes: 3 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ dep: check-go

# Builds the project using go build.
build: check-go
CGO_ENABLED=0 go build -v -ldflags="$(LD_FLAGS)" -o ${BINARY_NAME} dnscollector.go multiplexer.go
CGO_ENABLED=0 go build -v -ldflags="$(LD_FLAGS)" -o ${BINARY_NAME} dnscollector.go

# Builds and runs the project.
run: build
Expand All @@ -68,7 +68,9 @@ lint:
tests: check-go
@go test -race -cover -v
@go test ./pkgconfig/ -race -cover -v
@go test ./pkgutils/ -race -cover -v
@go test ./dnsutils/ -race -cover -v
@go test ./routing/ -race -cover -v
@go test ./netlib/ -race -cover -v
@go test -timeout 30s ./transformers/ -race -cover -v
@go test -timeout 30s ./collectors/ -race -cover -v
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@

[![Go Report Card](https://goreportcard.com/badge/github.com/dmachard/go-dns-collector)](https://goreportcard.com/report/dmachard/go-dns-collector)
![Go version](https://img.shields.io/badge/go%20version-min%201.20-blue)
![Go tests](https://img.shields.io/badge/go%20tests-370-green)
![Go lines](https://img.shields.io/badge/go%20lines-32932-red)
![Go tests](https://img.shields.io/badge/go%20tests-377-green)
![Go lines](https://img.shields.io/badge/go%20lines-36222-red)
![Go Tests](https://github.com/dmachard/go-dns-collector/actions/workflows/testing-go.yml/badge.svg)
![Github Actions](https://github.com/dmachard/go-dns-collector/actions/workflows/testing-dnstap.yml/badge.svg)
![Github Actions PDNS](https://github.com/dmachard/go-dns-collector/actions/workflows/testing-powerdns.yml/badge.svg)
Expand Down
311 changes: 311 additions & 0 deletions collectors/dnsmessage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,311 @@
package collectors

import (
"bufio"
"fmt"
"net/http"
"os"
"reflect"
"regexp"
"strings"

"github.com/dmachard/go-dnscollector/dnsutils"
"github.com/dmachard/go-dnscollector/pkgconfig"
"github.com/dmachard/go-dnscollector/pkgutils"
"github.com/dmachard/go-dnscollector/transformers"
"github.com/dmachard/go-logger"
)

func isFileSource(matchSource string) bool {
return strings.HasPrefix(matchSource, "file://")
}

func isURLSource(matchSource string) bool {
return strings.HasPrefix(matchSource, "http://") || strings.HasPrefix(matchSource, "https://")
}

type MatchSource struct {
regexList []*regexp.Regexp
stringList []string
}

type DNSMessage struct {
doneRun chan bool
doneMonitor chan bool
stopRun chan bool
stopMonitor chan bool
config *pkgconfig.Config
configChan chan *pkgconfig.Config
inputChan chan dnsutils.DNSMessage
logger *logger.Logger
name string
RoutingHandler pkgutils.RoutingHandler
}

func NewDNSMessage(loggers []pkgutils.Worker, config *pkgconfig.Config, logger *logger.Logger, name string) *DNSMessage {
logger.Info("[%s] collector=dnsmessage - enabled", name)
s := &DNSMessage{
doneRun: make(chan bool),
doneMonitor: make(chan bool),
stopRun: make(chan bool),
stopMonitor: make(chan bool),
config: config,
configChan: make(chan *pkgconfig.Config),
inputChan: make(chan dnsutils.DNSMessage, config.Collectors.DNSMessage.ChannelBufferSize),
logger: logger,
name: name,
RoutingHandler: pkgutils.NewRoutingHandler(config, logger, name),
}
s.ReadConfig()
return s
}

func (c *DNSMessage) GetName() string { return c.name }

func (c *DNSMessage) AddDroppedRoute(wrk pkgutils.Worker) {
c.RoutingHandler.AddDroppedRoute(wrk)
}

func (c *DNSMessage) AddDefaultRoute(wrk pkgutils.Worker) {
c.RoutingHandler.AddDefaultRoute(wrk)
}

// deprecated function
func (c *DNSMessage) SetLoggers(loggers []pkgutils.Worker) {}

// deprecated function
func (c *DNSMessage) Loggers() ([]chan dnsutils.DNSMessage, []string) {
return nil, nil
}

func (c *DNSMessage) ReadConfigMatching(value interface{}) {
reflectedValue := reflect.ValueOf(value)
if reflectedValue.Kind() == reflect.Map {
keys := reflectedValue.MapKeys()
matchSrc := ""
srcKind := dnsutils.MatchingKindString
for _, k := range keys {
v := reflectedValue.MapIndex(k)
if k.Interface().(string) == "match-source" {
matchSrc = v.Interface().(string)
}
if k.Interface().(string) == "source-kind" {
srcKind = v.Interface().(string)
}
}
if len(matchSrc) > 0 {
sourceData, err := c.LoadData(matchSrc, srcKind)
if err != nil {
c.logger.Fatal(err)
}
if len(sourceData.regexList) > 0 {
value.(map[interface{}]interface{})[srcKind] = sourceData.regexList
}
if len(sourceData.stringList) > 0 {
value.(map[interface{}]interface{})[srcKind] = sourceData.stringList
}
}
}
}

func (c *DNSMessage) GetInputChannel() chan dnsutils.DNSMessage {
return c.inputChan
}

func (c *DNSMessage) ReadConfig() {
// load external file for include
if len(c.config.Collectors.DNSMessage.Matching.Include) > 0 {
for _, value := range c.config.Collectors.DNSMessage.Matching.Include {
c.ReadConfigMatching(value)
}
}
// load external file for exclude
if len(c.config.Collectors.DNSMessage.Matching.Exclude) > 0 {
for _, value := range c.config.Collectors.DNSMessage.Matching.Exclude {
c.ReadConfigMatching(value)
}
}
}

func (c *DNSMessage) LoadData(matchSource string, srcKind string) (MatchSource, error) {
if isFileSource(matchSource) {
dataSource, err := c.LoadFromFile(matchSource, srcKind)
if err != nil {
c.logger.Fatal(err)
}
return dataSource, nil
} else if isURLSource(matchSource) {
dataSource, err := c.LoadFromURL(matchSource, srcKind)
if err != nil {
c.logger.Fatal(err)
}
return dataSource, nil
}
return MatchSource{}, fmt.Errorf("match source not supported %s", matchSource)
}

func (c *DNSMessage) LoadFromURL(matchSource string, srcKind string) (MatchSource, error) {
c.LogInfo("loading matching source from url=%s", matchSource)
resp, err := http.Get(matchSource)
if err != nil {
return MatchSource{}, err
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return MatchSource{}, fmt.Errorf("invalid status code: %d", resp.StatusCode)
}

matchSources := MatchSource{}
scanner := bufio.NewScanner(resp.Body)

switch srcKind {
case dnsutils.MatchingKindRegexp:
for scanner.Scan() {
matchSources.regexList = append(matchSources.regexList, regexp.MustCompile(scanner.Text()))
}
c.LogInfo("remote source loaded with %d entries kind=%s", len(matchSources.regexList), srcKind)
case dnsutils.MatchingKindString:
for scanner.Scan() {
matchSources.stringList = append(matchSources.stringList, scanner.Text())
}
c.LogInfo("remote source loaded with %d entries kind=%s", len(matchSources.stringList), srcKind)
}

return matchSources, nil
}

func (c *DNSMessage) LoadFromFile(filePath string, srcKind string) (MatchSource, error) {
localFile := strings.TrimPrefix(filePath, "file://")

c.LogInfo("loading matching source from file=%s", localFile)
file, err := os.Open(localFile)
if err != nil {
return MatchSource{}, fmt.Errorf("unable to open file: %w", err)
}

matchSources := MatchSource{}
scanner := bufio.NewScanner(file)

switch srcKind {
case dnsutils.MatchingKindRegexp:
for scanner.Scan() {
matchSources.regexList = append(matchSources.regexList, regexp.MustCompile(scanner.Text()))
}
c.LogInfo("file loaded with %d entries kind=%s", len(matchSources.regexList), srcKind)
case dnsutils.MatchingKindString:
for scanner.Scan() {
matchSources.stringList = append(matchSources.stringList, scanner.Text())
}
c.LogInfo("file loaded with %d entries kind=%s", len(matchSources.stringList), srcKind)
}

return matchSources, nil
}

func (c *DNSMessage) ReloadConfig(config *pkgconfig.Config) {
c.LogInfo("reload configuration...")
c.configChan <- config
}

func (c *DNSMessage) LogInfo(msg string, v ...interface{}) {
c.logger.Info("["+c.name+"] collector=dnsmessage - "+msg, v...)
}

func (c *DNSMessage) LogError(msg string, v ...interface{}) {
c.logger.Error("["+c.name+"] collector=dnsmessage - "+msg, v...)
}

func (c *DNSMessage) Stop() {
c.LogInfo("stopping routing handler...")
c.RoutingHandler.Stop()

// read done channel and block until run is terminated
c.LogInfo("stopping run...")
c.stopRun <- true
<-c.doneRun
}

func (c *DNSMessage) Run() {
c.LogInfo("starting collector...")
var err error

// prepare next channels
defaultRoutes, defaultNames := c.RoutingHandler.GetDefaultRoutes()
droppedRoutes, droppedNames := c.RoutingHandler.GetDroppedRoutes()

// prepare transforms
subprocessors := transformers.NewTransforms(&c.config.IngoingTransformers, c.logger, c.name, defaultRoutes, 0)

RUN_LOOP:
for {
select {
case <-c.stopRun:
c.doneRun <- true
break RUN_LOOP

case cfg := <-c.configChan:

// save the new config
c.config = cfg
c.ReadConfig()

case dm, opened := <-c.inputChan:
if !opened {
c.LogInfo("channel closed, exit")
return
}

// matching enabled, filtering DNS messages ?
matched := true
matchedInclude := false
matchedExclude := false

if len(c.config.Collectors.DNSMessage.Matching.Include) > 0 {
err, matchedInclude = dm.Matching(c.config.Collectors.DNSMessage.Matching.Include)
if err != nil {
c.LogError(err.Error())
}
if matched && matchedInclude {
matched = true
} else {
matched = false
}
}

if len(c.config.Collectors.DNSMessage.Matching.Exclude) > 0 {
err, matchedExclude = dm.Matching(c.config.Collectors.DNSMessage.Matching.Exclude)
if err != nil {
c.LogError(err.Error())
}
if matched && !matchedExclude {
matched = true
} else {
matched = false
}
}

// apply tranforms on matched packets only
// init dns message with additionnals parts if necessary
if matched {
subprocessors.InitDNSMessageFormat(&dm)
if subprocessors.ProcessMessage(&dm) == transformers.ReturnDrop {
c.RoutingHandler.SendTo(droppedRoutes, droppedNames, dm)
continue
}
}

// drop packet ?
if !matched {
c.RoutingHandler.SendTo(droppedRoutes, droppedNames, dm)
continue
}

// send to next
c.RoutingHandler.SendTo(defaultRoutes, defaultNames, dm)

}

}
c.LogInfo("run terminated")
}
Loading
Loading