Skip to content

Commit

Permalink
filtering reload support
Browse files Browse the repository at this point in the history
  • Loading branch information
dmachard committed Oct 26, 2023
1 parent 1fe61dd commit 1ee05ed
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 76 deletions.
2 changes: 2 additions & 0 deletions config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,8 @@ multiplexer:
# log-queries: true
# # forward received replies to configured loggers ?
# log-replies: true
# # only keep 1 out of every downsample records, e.g. if set to 20, then this will return every 20th record, dropping 95% of queries
# downsample: 0

# # GeoIP maxmind support, more information on https://www.maxmind.com/en/geoip-demo
# # this feature can be used to append additional informations like country, city, asn
Expand Down
2 changes: 1 addition & 1 deletion testsdata/filtering_keep_domains.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
google.fr
test.github.com
test.github.com
179 changes: 108 additions & 71 deletions transformers/filtering.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,17 +66,13 @@ func NewFilteringProcessor(config *dnsutils.ConfigTransformers, logger *logger.L
logInfo: logInfo,
logError: logError,
}

d.LoadRcodes()
d.LoadDomainsList()
d.LoadQueryIpList()
d.LoadrDataIpList()

d.LoadActiveFilters()

return d
}

func (p *FilteringProcessor) ReloadConfig(config *dnsutils.ConfigTransformers) {
p.config = config
}

func (p *FilteringProcessor) LogInfo(msg string, v ...interface{}) {
log := fmt.Sprintf("transformer=filtering#%d - ", p.instance)
p.logInfo(log+msg, v...)
Expand All @@ -90,12 +86,17 @@ func (p *FilteringProcessor) LogError(msg string, v ...interface{}) {
func (p *FilteringProcessor) LoadActiveFilters() {
// TODO: Change to iteration through Filtering to add filters in custom order.

// clean the slice
p.activeFilters = p.activeFilters[:0]

if !p.config.Filtering.LogQueries {
p.activeFilters = append(p.activeFilters, p.ignoreQueryFilter)
p.LogInfo("drop queries subprocessor is enabled")
}

if !p.config.Filtering.LogReplies {
p.activeFilters = append(p.activeFilters, p.ignoreReplyFilter)
p.LogInfo("drop replies subprocessor is enabled")
}

if len(p.mapRcodes) > 0 {
Expand Down Expand Up @@ -135,76 +136,20 @@ func (p *FilteringProcessor) LoadActiveFilters() {
p.downsample = p.config.Filtering.Downsample
p.downsampleCount = 0
p.activeFilters = append(p.activeFilters, p.downsampleFilter)
p.LogInfo("down sampling subprocessor is enabled")
}
}

func (p *FilteringProcessor) LoadRcodes() {
for _, v := range p.config.Filtering.DropRcodes {
p.mapRcodes[v] = true
// empty
for key := range p.mapRcodes {
delete(p.mapRcodes, key)
}
}

func (p *FilteringProcessor) loadQueryIpList(fname string, drop bool) (uint64, error) {
file, err := os.Open(fname)
if err != nil {
return 0, err
}

scanner := bufio.NewScanner(file)
var read uint64
var ipsetbuilder netaddr.IPSetBuilder
for scanner.Scan() {
read++
ipOrPrefix := strings.ToLower(scanner.Text())
prefix, err := netaddr.ParseIPPrefix(ipOrPrefix)
if err != nil {
ip, err := netaddr.ParseIP(ipOrPrefix)
if err != nil {
p.LogError("%s in in %s is neither an IP address nor a prefix", ipOrPrefix, fname)
continue
}
ipsetbuilder.Add(ip)
continue
}
ipsetbuilder.AddPrefix(prefix)
}
if drop {
p.ipsetDrop, err = ipsetbuilder.IPSet()
} else {
p.ipsetKeep, err = ipsetbuilder.IPSet()
}

return read, err
}

func (p *FilteringProcessor) loadKeepRdataIpList(fname string) (uint64, error) {
file, err := os.Open(fname)
if err != nil {
return 0, err
}

scanner := bufio.NewScanner(file)
var read uint64
var ipsetbuilder netaddr.IPSetBuilder
for scanner.Scan() {
read++
ipOrPrefix := strings.ToLower(scanner.Text())
prefix, err := netaddr.ParseIPPrefix(ipOrPrefix)
if err != nil {
ip, err := netaddr.ParseIP(ipOrPrefix)
if err != nil {
p.LogError("%s in in %s is neither an IP address nor a prefix", ipOrPrefix, fname)
continue
}
ipsetbuilder.Add(ip)
continue
}
ipsetbuilder.AddPrefix(prefix)
// add
for _, v := range p.config.Filtering.DropRcodes {
p.mapRcodes[v] = true
}

p.rDataIpsetKeep, err = ipsetbuilder.IPSet()

return read, err
}

func (p *FilteringProcessor) LoadQueryIpList() {
Expand Down Expand Up @@ -236,6 +181,23 @@ func (p *FilteringProcessor) LoadrDataIpList() {
}

func (p *FilteringProcessor) LoadDomainsList() {
// before to start, reset all maps
p.dropDomains = false
p.keepDomains = false

for key := range p.listFqdns {
delete(p.listFqdns, key)
}
for key := range p.listDomainsRegex {
delete(p.listDomainsRegex, key)
}
for key := range p.listKeepFqdns {
delete(p.listKeepFqdns, key)
}
for key := range p.listKeepDomainsRegex {
delete(p.listKeepDomainsRegex, key)
}

if len(p.config.Filtering.DropFqdnFile) > 0 {
file, err := os.Open(p.config.Filtering.DropFqdnFile)
if err != nil {
Expand Down Expand Up @@ -304,6 +266,81 @@ func (p *FilteringProcessor) LoadDomainsList() {
}
}

func (p *FilteringProcessor) loadQueryIpList(fname string, drop bool) (uint64, error) {
var emptyIPSet *netaddr.IPSet
p.ipsetDrop = emptyIPSet
p.ipsetKeep = emptyIPSet

file, err := os.Open(fname)
if err != nil {
return 0, err
}

scanner := bufio.NewScanner(file)
var read uint64
var ipsetbuilder netaddr.IPSetBuilder
for scanner.Scan() {
read++
ipOrPrefix := strings.ToLower(scanner.Text())
prefix, err := netaddr.ParseIPPrefix(ipOrPrefix)
if err != nil {
ip, err := netaddr.ParseIP(ipOrPrefix)
if err != nil {
p.LogError("%s in in %s is neither an IP address nor a prefix", ipOrPrefix, fname)
continue
}
ipsetbuilder.Add(ip)
continue
}
ipsetbuilder.AddPrefix(prefix)
}

file.Close()

if drop {
p.ipsetDrop, err = ipsetbuilder.IPSet()
} else {
p.ipsetKeep, err = ipsetbuilder.IPSet()
}

return read, err
}

func (p *FilteringProcessor) loadKeepRdataIpList(fname string) (uint64, error) {
var emptyIPSet *netaddr.IPSet
p.rDataIpsetKeep = emptyIPSet

file, err := os.Open(fname)
if err != nil {
return 0, err
}

scanner := bufio.NewScanner(file)
var read uint64
var ipsetbuilder netaddr.IPSetBuilder
for scanner.Scan() {
read++
ipOrPrefix := strings.ToLower(scanner.Text())
prefix, err := netaddr.ParseIPPrefix(ipOrPrefix)
if err != nil {
ip, err := netaddr.ParseIP(ipOrPrefix)
if err != nil {
p.LogError("%s in in %s is neither an IP address nor a prefix", ipOrPrefix, fname)
continue
}
ipsetbuilder.Add(ip)
continue
}
ipsetbuilder.AddPrefix(prefix)
}

file.Close()

p.rDataIpsetKeep, err = ipsetbuilder.IPSet()

return read, err
}

func (p *FilteringProcessor) Run() {
for {
select {
Expand Down
16 changes: 12 additions & 4 deletions transformers/subprocessors.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func (p *Transforms) ReloadConfig(config *dnsutils.ConfigTransformers) {
p.config = config
p.NormalizeTransform.ReloadConfig(config)
p.GeoipTransform.ReloadConfig(config)
p.FilteringTransform.ReloadConfig(config)

p.Prepare()
}
Expand All @@ -69,15 +70,15 @@ func (p *Transforms) Prepare() error {

if p.config.Normalize.Enable {
prefixlog := fmt.Sprintf("transformer=normalize#%d ", p.instance)
p.LogInfo(prefixlog + "loaded")
p.LogInfo(prefixlog + "enabled")

p.NormalizeTransform.LoadActiveProcessors()
}

if p.config.GeoIP.Enable {
p.activeTransforms = append(p.activeTransforms, p.geoipTransform)
prefixlog := fmt.Sprintf("transformer=geoip#%d ", p.instance)
p.LogInfo(prefixlog + "loaded")
p.LogInfo(prefixlog + "enabled")

if err := p.GeoipTransform.Open(); err != nil {
p.LogError(prefixlog+"open error %v", err)
Expand Down Expand Up @@ -106,8 +107,15 @@ func (p *Transforms) Prepare() error {
}

if p.config.Filtering.Enable {
prefixlog := fmt.Sprintf("transformer=filtering#%d - ", p.instance)
p.LogInfo(prefixlog + "is enabled")
prefixlog := fmt.Sprintf("transformer=filtering#%d ", p.instance)
p.LogInfo(prefixlog + "enabled")

p.FilteringTransform.LoadRcodes()
p.FilteringTransform.LoadDomainsList()
p.FilteringTransform.LoadQueryIpList()
p.FilteringTransform.LoadrDataIpList()

p.FilteringTransform.LoadActiveFilters()
}

if p.config.Latency.Enable {
Expand Down

0 comments on commit 1ee05ed

Please sign in to comment.