diff --git a/.github/workflows/testing-go.yml b/.github/workflows/testing-go.yml index fbb46c10..95226535 100644 --- a/.github/workflows/testing-go.yml +++ b/.github/workflows/testing-go.yml @@ -20,7 +20,7 @@ jobs: matrix: os-version: ['ubuntu-22.04', 'macos-latest' ] go-version: [ '1.20', '1.21' ] - package: ['dnsutils', 'collectors', 'loggers', 'transformers', 'netlib'] + package: ['dnsutils', 'collectors', 'loggers', 'transformers', 'netlib', 'processors'] exclude: - os-version: macos-latest go-version: '1.20' diff --git a/collectors/dnstap.go b/collectors/dnstap.go index 2566dc45..827ff2ba 100644 --- a/collectors/dnstap.go +++ b/collectors/dnstap.go @@ -14,6 +14,7 @@ import ( "github.com/dmachard/go-dnscollector/dnsutils" "github.com/dmachard/go-dnscollector/netlib" + "github.com/dmachard/go-dnscollector/processors" "github.com/dmachard/go-framestream" "github.com/dmachard/go-logger" ) @@ -21,19 +22,21 @@ import ( type Dnstap struct { doneRun chan bool doneMonitor chan bool + stopRun chan bool stopMonitor chan bool listen net.Listener conns []net.Conn sockPath string loggers []dnsutils.Worker config *dnsutils.Config + configChan chan *dnsutils.Config logger *logger.Logger name string connMode string connId int droppedCount int dropped chan int - tapProcessors []DnstapProcessor + tapProcessors []processors.DnstapProcessor sync.RWMutex } @@ -42,9 +45,11 @@ func NewDnstap(loggers []dnsutils.Worker, config *dnsutils.Config, logger *logge s := &Dnstap{ doneRun: make(chan bool), doneMonitor: make(chan bool), + stopRun: make(chan bool), stopMonitor: make(chan bool), dropped: make(chan int), config: config, + configChan: make(chan *dnsutils.Config), loggers: loggers, logger: logger, name: name, @@ -85,6 +90,11 @@ func (c *Dnstap) ReadConfig() { } } +func (c *Dnstap) ReloadConfig(config *dnsutils.Config) { + c.LogInfo("reload configuration...") + c.configChan <- config +} + func (c *Dnstap) LogInfo(msg string, v ...interface{}) { c.logger.Info("["+c.name+"] collector=dnstap - "+msg, v...) } @@ -118,7 +128,7 @@ func (c *Dnstap) HandleConn(conn net.Conn) { c.LogConnInfo(connId, "new connection from %s", peer) // start dnstap subprocessor - dnstapProcessor := NewDnstapProcessor(connId, c.config, c.logger, c.name, c.config.Collectors.Dnstap.ChannelBufferSize) + dnstapProcessor := processors.NewDnstapProcessor(connId, c.config, c.logger, c.name, c.config.Collectors.Dnstap.ChannelBufferSize) c.Lock() c.tapProcessors = append(c.tapProcessors, dnstapProcessor) c.Unlock() @@ -177,7 +187,7 @@ func (c *Dnstap) HandleConn(conn net.Conn) { // then removes the current tap processor from the list c.Lock() for i, t := range c.tapProcessors { - if t.connId == connId { + if t.ConnId == connId { c.tapProcessors = append(c.tapProcessors[:i], c.tapProcessors[i+1:]...) } } @@ -225,8 +235,8 @@ func (c *Dnstap) Stop() { // read done channel and block until run is terminated c.LogInfo("stopping run...") + c.stopRun <- true <-c.doneRun - close(c.doneRun) } func (c *Dnstap) Listen() error { @@ -320,32 +330,61 @@ func (c *Dnstap) Run() { // start goroutine to count dropped messsages go c.MonitorCollector() - for { - // Accept() blocks waiting for new connection. - conn, err := c.listen.Accept() - if err != nil { - break + // goroutine to Accept() blocks waiting for new connection. + acceptChan := make(chan net.Conn) + go func() { + for { + conn, err := c.listen.Accept() + if err != nil { + return + } + acceptChan <- conn } + }() - if (c.connMode == "tls" || c.connMode == "tcp") && c.config.Collectors.Dnstap.RcvBufSize > 0 { - before, actual, err := netlib.SetSock_RCVBUF( - conn, - c.config.Collectors.Dnstap.RcvBufSize, - c.config.Collectors.Dnstap.TlsSupport, - ) - if err != nil { - c.logger.Fatal("Unable to set SO_RCVBUF: ", err) +RUN_LOOP: + for { + select { + case <-c.stopRun: + close(acceptChan) + c.doneRun <- true + break RUN_LOOP + + case cfg := <-c.configChan: + + // save the new config + c.config = cfg + c.ReadConfig() + + // refresh config for all conns + for i := range c.tapProcessors { + c.tapProcessors[i].ConfigChan <- cfg + } + + case conn, opened := <-acceptChan: + if !opened { + return } - c.LogInfo("set SO_RCVBUF option, value before: %d, desired: %d, actual: %d", before, - c.config.Collectors.Dnstap.RcvBufSize, actual) + + if (c.connMode == "tls" || c.connMode == "tcp") && c.config.Collectors.Dnstap.RcvBufSize > 0 { + before, actual, err := netlib.SetSock_RCVBUF( + conn, + c.config.Collectors.Dnstap.RcvBufSize, + c.config.Collectors.Dnstap.TlsSupport, + ) + if err != nil { + c.logger.Fatal("Unable to set SO_RCVBUF: ", err) + } + c.LogInfo("set SO_RCVBUF option, value before: %d, desired: %d, actual: %d", before, + c.config.Collectors.Dnstap.RcvBufSize, actual) + } + + c.Lock() + c.conns = append(c.conns, conn) + c.Unlock() + go c.HandleConn(conn) } - c.Lock() - c.conns = append(c.conns, conn) - c.Unlock() - go c.HandleConn(conn) } - c.LogInfo("run terminated") - c.doneRun <- true } diff --git a/collectors/dnstap_proxifier.go b/collectors/dnstap_proxifier.go index 7f23d832..0f07ea04 100644 --- a/collectors/dnstap_proxifier.go +++ b/collectors/dnstap_proxifier.go @@ -14,25 +14,29 @@ import ( ) type DnstapProxifier struct { - done chan bool - listen net.Listener - conns []net.Conn - sockPath string - loggers []dnsutils.Worker - config *dnsutils.Config - logger *logger.Logger - name string - stopping bool + doneRun chan bool + stopRun chan bool + listen net.Listener + conns []net.Conn + sockPath string + loggers []dnsutils.Worker + config *dnsutils.Config + configChan chan *dnsutils.Config + logger *logger.Logger + name string + stopping bool } func NewDnstapProxifier(loggers []dnsutils.Worker, config *dnsutils.Config, logger *logger.Logger, name string) *DnstapProxifier { logger.Info("[%s] collector=dnstaprelay - enabled", name) s := &DnstapProxifier{ - done: make(chan bool), - config: config, - loggers: loggers, - logger: logger, - name: name, + doneRun: make(chan bool), + stopRun: make(chan bool), + config: config, + configChan: make(chan *dnsutils.Config), + loggers: loggers, + logger: logger, + name: name, } s.ReadConfig() return s @@ -60,6 +64,11 @@ func (c *DnstapProxifier) ReadConfig() { c.sockPath = c.config.Collectors.DnstapProxifier.SockPath } +func (c *DnstapProxifier) ReloadConfig(config *dnsutils.Config) { + c.LogInfo("reload configuration...") + c.configChan <- config +} + func (c *DnstapProxifier) LogInfo(msg string, v ...interface{}) { c.logger.Info("["+c.name+"] collector=dnstaprelay - "+msg, v...) } @@ -138,8 +147,8 @@ func (c *DnstapProxifier) Stop() { c.listen.Close() // read done channel and block until run is terminated - <-c.done - close(c.done) + c.stopRun <- true + <-c.doneRun } func (c *DnstapProxifier) Listen() error { @@ -201,17 +210,42 @@ func (c *DnstapProxifier) Run() { c.logger.Fatal("collector dnstap listening failed: ", err) } } - for { - // Accept() blocks waiting for new connection. - conn, err := c.listen.Accept() - if err != nil { - break + + // goroutine to Accept() blocks waiting for new connection. + acceptChan := make(chan net.Conn) + go func() { + for { + conn, err := c.listen.Accept() + if err != nil { + return + } + acceptChan <- conn } + }() - c.conns = append(c.conns, conn) - go c.HandleConn(conn) +RUN_LOOP: + for { + select { + case <-c.stopRun: + close(acceptChan) + c.doneRun <- true + break RUN_LOOP + + case cfg := <-c.configChan: + + // save the new config + c.config = cfg + c.ReadConfig() + + case conn, opened := <-acceptChan: + if !opened { + return + } + + c.conns = append(c.conns, conn) + go c.HandleConn(conn) + } } c.LogInfo("run terminated") - c.done <- true } diff --git a/collectors/dnstap_proxifier_test.go b/collectors/dnstap_proxifier_test.go index 7845a902..364cd8f2 100644 --- a/collectors/dnstap_proxifier_test.go +++ b/collectors/dnstap_proxifier_test.go @@ -9,6 +9,7 @@ import ( "github.com/dmachard/go-dnscollector/dnsutils" "github.com/dmachard/go-dnscollector/loggers" + "github.com/dmachard/go-dnscollector/processors" "github.com/dmachard/go-framestream" "github.com/dmachard/go-logger" "google.golang.org/protobuf/proto" @@ -75,13 +76,13 @@ func Test_DnstapProxifier(t *testing.T) { frame := &framestream.Frame{} // get fake dns question - dnsquery, err := GetFakeDns() + dnsquery, err := processors.GetFakeDns() if err != nil { t.Fatalf("dns question pack error") } // get fake dnstap message - dt_query := GetFakeDnstap(dnsquery) + dt_query := processors.GetFakeDnstap(dnsquery) // serialize to bytes data, err := proto.Marshal(dt_query) diff --git a/collectors/dnstap_test.go b/collectors/dnstap_test.go index d1d092e9..dbab6bed 100644 --- a/collectors/dnstap_test.go +++ b/collectors/dnstap_test.go @@ -9,6 +9,7 @@ import ( "github.com/dmachard/go-dnscollector/dnsutils" "github.com/dmachard/go-dnscollector/loggers" + "github.com/dmachard/go-dnscollector/processors" "github.com/dmachard/go-framestream" "github.com/dmachard/go-logger" "google.golang.org/protobuf/proto" @@ -79,13 +80,13 @@ func Test_DnstapCollector(t *testing.T) { frame := &framestream.Frame{} // get fake dns question - dnsquery, err := GetFakeDns() + dnsquery, err := processors.GetFakeDns() if err != nil { t.Fatalf("dns question pack error") } // get fake dnstap message - dt_query := GetFakeDnstap(dnsquery) + dt_query := processors.GetFakeDnstap(dnsquery) // serialize to bytes data, err := proto.Marshal(dt_query) diff --git a/collectors/file_ingestor.go b/collectors/file_ingestor.go index 2ae60bf4..636f00ac 100644 --- a/collectors/file_ingestor.go +++ b/collectors/file_ingestor.go @@ -13,6 +13,7 @@ import ( "github.com/dmachard/go-dnscollector/dnsutils" "github.com/dmachard/go-dnscollector/netlib" + "github.com/dmachard/go-dnscollector/processors" "github.com/dmachard/go-logger" framestream "github.com/farsightsec/golang-framestream" "github.com/fsnotify/fsnotify" @@ -38,11 +39,12 @@ type FileIngestor struct { exit chan bool loggers []dnsutils.Worker config *dnsutils.Config + configChan chan *dnsutils.Config logger *logger.Logger watcher *fsnotify.Watcher watcherTimers map[string]*time.Timer - dnsProcessor DnsProcessor - dnstapProcessor DnstapProcessor + dnsProcessor processors.DnsProcessor + dnstapProcessor processors.DnstapProcessor filterDnsPort int identity string name string @@ -55,6 +57,7 @@ func NewFileIngestor(loggers []dnsutils.Worker, config *dnsutils.Config, logger done: make(chan bool), exit: make(chan bool), config: config, + configChan: make(chan *dnsutils.Config), loggers: loggers, logger: logger, name: name, @@ -93,6 +96,11 @@ func (c *FileIngestor) ReadConfig() { c.config.Collectors.FileIngestor.WatchMode) } +func (c *FileIngestor) ReloadConfig(config *dnsutils.Config) { + c.LogInfo("reload configuration...") + c.configChan <- config +} + func (c *FileIngestor) LogInfo(msg string, v ...interface{}) { c.logger.Info("["+c.name+"] collector=fileingestor - "+msg, v...) } @@ -366,11 +374,11 @@ func (c *FileIngestor) RemoveEvent(filePath string) { func (c *FileIngestor) Run() { c.LogInfo("starting collector...") - c.dnsProcessor = NewDnsProcessor(c.config, c.logger, c.name, c.config.Collectors.FileIngestor.ChannelBufferSize) + c.dnsProcessor = processors.NewDnsProcessor(c.config, c.logger, c.name, c.config.Collectors.FileIngestor.ChannelBufferSize) go c.dnsProcessor.Run(c.Loggers()) // start dnstap subprocessor - c.dnstapProcessor = NewDnstapProcessor(0, c.config, c.logger, c.name, c.config.Collectors.FileIngestor.ChannelBufferSize) + c.dnstapProcessor = processors.NewDnstapProcessor(0, c.config, c.logger, c.name, c.config.Collectors.FileIngestor.ChannelBufferSize) go c.dnstapProcessor.Run(c.Loggers()) // read current folder content @@ -416,6 +424,17 @@ func (c *FileIngestor) Run() { go func() { for { select { + // new config provided? + case cfg, opened := <-c.configChan: + if !opened { + return + } + c.config = cfg + c.ReadConfig() + + c.dnsProcessor.ConfigChan <- cfg + c.dnstapProcessor.ConfigChan <- cfg + case event, ok := <-c.watcher.Events: if !ok { // Channel was closed (i.e. Watcher.Close() was called). return diff --git a/collectors/file_tail.go b/collectors/file_tail.go index a16061ce..6ac04643 100644 --- a/collectors/file_tail.go +++ b/collectors/file_tail.go @@ -16,22 +16,26 @@ import ( ) type Tail struct { - done chan bool - tailf *tail.Tail - loggers []dnsutils.Worker - config *dnsutils.Config - logger *logger.Logger - name string + doneRun chan bool + stopRun chan bool + tailf *tail.Tail + loggers []dnsutils.Worker + config *dnsutils.Config + configChan chan *dnsutils.Config + logger *logger.Logger + name string } func NewTail(loggers []dnsutils.Worker, config *dnsutils.Config, logger *logger.Logger, name string) *Tail { logger.Info("[%s] collector=tail - enabled", name) s := &Tail{ - done: make(chan bool), - config: config, - loggers: loggers, - logger: logger, - name: name, + doneRun: make(chan bool), + stopRun: make(chan bool), + config: config, + configChan: make(chan *dnsutils.Config), + loggers: loggers, + logger: logger, + name: name, } s.ReadConfig() return s @@ -51,8 +55,11 @@ func (c *Tail) Loggers() []chan dnsutils.DnsMessage { return channels } -func (c *Tail) ReadConfig() { - //tbc +func (c *Tail) ReadConfig() {} + +func (c *Tail) ReloadConfig(config *dnsutils.Config) { + c.LogInfo("reload configuration...") + c.configChan <- config } func (c *Tail) LogInfo(msg string, v ...interface{}) { @@ -75,8 +82,8 @@ func (c *Tail) Stop() { c.tailf.Stop() // read done channel and block until run is terminated - <-c.done - close(c.done) + c.stopRun <- true + <-c.doneRun } func (c *Tail) Follow() error { @@ -114,162 +121,179 @@ func (c *Tail) Run() { dm.DnsTap.Identity = "undefined" } - for line := range c.tailf.Lines { - var matches []string - var re *regexp.Regexp +RUN_LOOP: + for { + select { + // new config provided? + case cfg, opened := <-c.configChan: + if !opened { + return + } + c.config = cfg + c.ReadConfig() - if len(c.config.Collectors.Tail.PatternQuery) > 0 { - re = regexp.MustCompile(c.config.Collectors.Tail.PatternQuery) - matches = re.FindStringSubmatch(line.Text) - dm.DNS.Type = dnsutils.DnsQuery - dm.DnsTap.Operation = dnsutils.DNSTAP_OPERATION_QUERY - } + subprocessors.ReloadConfig(&cfg.IngoingTransformers) + case <-c.stopRun: - if len(c.config.Collectors.Tail.PatternReply) > 0 && len(matches) == 0 { - re = regexp.MustCompile(c.config.Collectors.Tail.PatternReply) - matches = re.FindStringSubmatch(line.Text) - dm.DNS.Type = dnsutils.DnsReply - dm.DnsTap.Operation = dnsutils.DNSTAP_OPERATION_REPLY - } + // cleanup transformers + subprocessors.Reset() - if len(matches) == 0 { - continue - } + c.doneRun <- true + break RUN_LOOP - qrIndex := re.SubexpIndex("qr") - if qrIndex != -1 { - dm.DnsTap.Operation = matches[qrIndex] - } + case line := <-c.tailf.Lines: + var matches []string + var re *regexp.Regexp - var t time.Time - timestampIndex := re.SubexpIndex("timestamp") - if timestampIndex != -1 { - t, err = time.Parse(c.config.Collectors.Tail.TimeLayout, matches[timestampIndex]) - if err != nil { + if len(c.config.Collectors.Tail.PatternQuery) > 0 { + re = regexp.MustCompile(c.config.Collectors.Tail.PatternQuery) + matches = re.FindStringSubmatch(line.Text) + dm.DNS.Type = dnsutils.DnsQuery + dm.DnsTap.Operation = dnsutils.DNSTAP_OPERATION_QUERY + } + + if len(c.config.Collectors.Tail.PatternReply) > 0 && len(matches) == 0 { + re = regexp.MustCompile(c.config.Collectors.Tail.PatternReply) + matches = re.FindStringSubmatch(line.Text) + dm.DNS.Type = dnsutils.DnsReply + dm.DnsTap.Operation = dnsutils.DNSTAP_OPERATION_REPLY + } + + if len(matches) == 0 { continue } - } else { - t = time.Now() - } - dm.DnsTap.TimeSec = int(t.Unix()) - dm.DnsTap.TimeNsec = int(t.UnixNano() - t.Unix()*1e9) - identityIndex := re.SubexpIndex("identity") - if identityIndex != -1 { - dm.DnsTap.Identity = matches[identityIndex] - } + qrIndex := re.SubexpIndex("qr") + if qrIndex != -1 { + dm.DnsTap.Operation = matches[qrIndex] + } - rcodeIndex := re.SubexpIndex("rcode") - if rcodeIndex != -1 { - dm.DNS.Rcode = matches[rcodeIndex] - } + var t time.Time + timestampIndex := re.SubexpIndex("timestamp") + if timestampIndex != -1 { + t, err = time.Parse(c.config.Collectors.Tail.TimeLayout, matches[timestampIndex]) + if err != nil { + continue + } + } else { + t = time.Now() + } + dm.DnsTap.TimeSec = int(t.Unix()) + dm.DnsTap.TimeNsec = int(t.UnixNano() - t.Unix()*1e9) - queryipIndex := re.SubexpIndex("queryip") - if queryipIndex != -1 { - dm.NetworkInfo.QueryIp = matches[queryipIndex] - } + identityIndex := re.SubexpIndex("identity") + if identityIndex != -1 { + dm.DnsTap.Identity = matches[identityIndex] + } - queryportIndex := re.SubexpIndex("queryport") - if queryportIndex != -1 { - dm.NetworkInfo.QueryPort = matches[queryportIndex] - } else { - dm.NetworkInfo.ResponsePort = "0" - } + rcodeIndex := re.SubexpIndex("rcode") + if rcodeIndex != -1 { + dm.DNS.Rcode = matches[rcodeIndex] + } - responseipIndex := re.SubexpIndex("responseip") - if responseipIndex != -1 { - dm.NetworkInfo.ResponseIp = matches[responseipIndex] - } + queryipIndex := re.SubexpIndex("queryip") + if queryipIndex != -1 { + dm.NetworkInfo.QueryIp = matches[queryipIndex] + } - responseportIndex := re.SubexpIndex("responseport") - if responseportIndex != -1 { - dm.NetworkInfo.ResponsePort = matches[responseportIndex] - } else { - dm.NetworkInfo.ResponsePort = "0" - } + queryportIndex := re.SubexpIndex("queryport") + if queryportIndex != -1 { + dm.NetworkInfo.QueryPort = matches[queryportIndex] + } else { + dm.NetworkInfo.ResponsePort = "0" + } - familyIndex := re.SubexpIndex("family") - if familyIndex != -1 { - dm.NetworkInfo.Family = matches[familyIndex] - } else { - dm.NetworkInfo.Family = dnsutils.PROTO_IPV4 - } + responseipIndex := re.SubexpIndex("responseip") + if responseipIndex != -1 { + dm.NetworkInfo.ResponseIp = matches[responseipIndex] + } - protocolIndex := re.SubexpIndex("protocol") - if protocolIndex != -1 { - dm.NetworkInfo.Protocol = matches[protocolIndex] - } else { - dm.NetworkInfo.Protocol = dnsutils.PROTO_UDP - } + responseportIndex := re.SubexpIndex("responseport") + if responseportIndex != -1 { + dm.NetworkInfo.ResponsePort = matches[responseportIndex] + } else { + dm.NetworkInfo.ResponsePort = "0" + } - lengthIndex := re.SubexpIndex("length") - if lengthIndex != -1 { - length, err := strconv.Atoi(matches[lengthIndex]) - if err == nil { - dm.DNS.Length = length + familyIndex := re.SubexpIndex("family") + if familyIndex != -1 { + dm.NetworkInfo.Family = matches[familyIndex] + } else { + dm.NetworkInfo.Family = dnsutils.PROTO_IPV4 } - } - domainIndex := re.SubexpIndex("domain") - if domainIndex != -1 { - dm.DNS.Qname = matches[domainIndex] - } + protocolIndex := re.SubexpIndex("protocol") + if protocolIndex != -1 { + dm.NetworkInfo.Protocol = matches[protocolIndex] + } else { + dm.NetworkInfo.Protocol = dnsutils.PROTO_UDP + } - qtypeIndex := re.SubexpIndex("qtype") - if qtypeIndex != -1 { - dm.DNS.Qtype = matches[qtypeIndex] - } + lengthIndex := re.SubexpIndex("length") + if lengthIndex != -1 { + length, err := strconv.Atoi(matches[lengthIndex]) + if err == nil { + dm.DNS.Length = length + } + } - latencyIndex := re.SubexpIndex("latency") - if latencyIndex != -1 { - dm.DnsTap.LatencySec = matches[latencyIndex] - } + domainIndex := re.SubexpIndex("domain") + if domainIndex != -1 { + dm.DNS.Qname = matches[domainIndex] + } - // compute timestamp - ts := time.Unix(int64(dm.DnsTap.TimeSec), int64(dm.DnsTap.TimeNsec)) - dm.DnsTap.Timestamp = ts.UnixNano() - dm.DnsTap.TimestampRFC3339 = ts.UTC().Format(time.RFC3339Nano) - - // fake dns packet - dnspkt := new(dns.Msg) - var dnstype uint16 - dnstype = dns.TypeA - if dm.DNS.Qtype == "AAAA" { - dnstype = dns.TypeAAAA - } - dnspkt.SetQuestion(dm.DNS.Qname, dnstype) + qtypeIndex := re.SubexpIndex("qtype") + if qtypeIndex != -1 { + dm.DNS.Qtype = matches[qtypeIndex] + } - if dm.DNS.Type == dnsutils.DnsReply { - rr, _ := dns.NewRR(fmt.Sprintf("%s %s 0.0.0.0", dm.DNS.Qname, dm.DNS.Qtype)) - if err == nil { - dnspkt.Answer = append(dnspkt.Answer, rr) + latencyIndex := re.SubexpIndex("latency") + if latencyIndex != -1 { + dm.DnsTap.LatencySec = matches[latencyIndex] } - var rcode int - rcode = 0 - if dm.DNS.Rcode == "NXDOMAIN" { - rcode = 3 + + // compute timestamp + ts := time.Unix(int64(dm.DnsTap.TimeSec), int64(dm.DnsTap.TimeNsec)) + dm.DnsTap.Timestamp = ts.UnixNano() + dm.DnsTap.TimestampRFC3339 = ts.UTC().Format(time.RFC3339Nano) + + // fake dns packet + dnspkt := new(dns.Msg) + var dnstype uint16 + dnstype = dns.TypeA + if dm.DNS.Qtype == "AAAA" { + dnstype = dns.TypeAAAA + } + dnspkt.SetQuestion(dm.DNS.Qname, dnstype) + + if dm.DNS.Type == dnsutils.DnsReply { + rr, _ := dns.NewRR(fmt.Sprintf("%s %s 0.0.0.0", dm.DNS.Qname, dm.DNS.Qtype)) + if err == nil { + dnspkt.Answer = append(dnspkt.Answer, rr) + } + var rcode int + rcode = 0 + if dm.DNS.Rcode == "NXDOMAIN" { + rcode = 3 + } + dnspkt.Rcode = rcode } - dnspkt.Rcode = rcode - } - dm.DNS.Payload, _ = dnspkt.Pack() - dm.DNS.Length = len(dm.DNS.Payload) + dm.DNS.Payload, _ = dnspkt.Pack() + dm.DNS.Length = len(dm.DNS.Payload) - // apply all enabled transformers - if subprocessors.ProcessMessage(&dm) == transformers.RETURN_DROP { - continue - } + // apply all enabled transformers + if subprocessors.ProcessMessage(&dm) == transformers.RETURN_DROP { + continue + } - // dispatch dns message to connected loggers - chanLoggers := c.Loggers() - for i := range chanLoggers { - chanLoggers[i] <- dm + // dispatch dns message to connected loggers + chanLoggers := c.Loggers() + for i := range chanLoggers { + chanLoggers[i] <- dm + } } } - // cleanup transformers - subprocessors.Reset() - c.LogInfo("run terminated") - c.done <- true } diff --git a/collectors/powerdns.go b/collectors/powerdns.go index 30279139..b5109e18 100644 --- a/collectors/powerdns.go +++ b/collectors/powerdns.go @@ -13,12 +13,14 @@ import ( "github.com/dmachard/go-dnscollector/dnsutils" "github.com/dmachard/go-dnscollector/netlib" + "github.com/dmachard/go-dnscollector/processors" "github.com/dmachard/go-logger" powerdns_protobuf "github.com/dmachard/go-powerdns-protobuf" ) type ProtobufPowerDNS struct { doneRun chan bool + stopRun chan bool doneMonitor chan bool stopMonitor chan bool cleanup chan bool @@ -27,11 +29,12 @@ type ProtobufPowerDNS struct { conns []net.Conn loggers []dnsutils.Worker config *dnsutils.Config + configChan chan *dnsutils.Config logger *logger.Logger name string droppedCount int dropped chan int - pdnsProcessors []*PdnsProcessor + pdnsProcessors []*processors.PdnsProcessor sync.RWMutex } @@ -40,10 +43,12 @@ func NewProtobufPowerDNS(loggers []dnsutils.Worker, config *dnsutils.Config, log s := &ProtobufPowerDNS{ doneRun: make(chan bool), doneMonitor: make(chan bool), + stopRun: make(chan bool), stopMonitor: make(chan bool), cleanup: make(chan bool), dropped: make(chan int), config: config, + configChan: make(chan *dnsutils.Config), loggers: loggers, logger: logger, name: name, @@ -74,6 +79,11 @@ func (c *ProtobufPowerDNS) ReadConfig() { } } +func (c *ProtobufPowerDNS) ReloadConfig(config *dnsutils.Config) { + c.LogInfo("reload configuration...") + c.configChan <- config +} + func (c *ProtobufPowerDNS) LogInfo(msg string, v ...interface{}) { c.logger.Info("["+c.name+"] collector=powerdns - "+msg, v...) } @@ -107,7 +117,7 @@ func (c *ProtobufPowerDNS) HandleConn(conn net.Conn) { c.LogConnInfo(connId, "new connection from %s", peer) // start protobuf subprocessor - pdnsProc := NewPdnsProcessor(connId, c.config, c.logger, c.name, c.config.Collectors.PowerDNS.ChannelBufferSize) + pdnsProc := processors.NewPdnsProcessor(connId, c.config, c.logger, c.name, c.config.Collectors.PowerDNS.ChannelBufferSize) c.Lock() c.pdnsProcessors = append(c.pdnsProcessors, &pdnsProc) c.Unlock() @@ -118,6 +128,7 @@ func (c *ProtobufPowerDNS) HandleConn(conn net.Conn) { var err error var payload *powerdns_protobuf.ProtoPayload + for { payload, err = pbs.RecvPayload(false) if err != nil { @@ -150,13 +161,14 @@ func (c *ProtobufPowerDNS) HandleConn(conn net.Conn) { default: c.dropped <- 1 } + // } } // here the connection is closed, // then removes the current tap processor from the list c.Lock() for i, t := range c.pdnsProcessors { - if t.connId == connId { + if t.ConnId == connId { c.pdnsProcessors = append(c.pdnsProcessors[:i], c.pdnsProcessors[i+1:]...) } } @@ -208,8 +220,8 @@ func (c *ProtobufPowerDNS) Stop() { // read done channel and block until run is terminated c.LogInfo("stopping run...") + c.stopRun <- true <-c.doneRun - close(c.doneRun) } func (c *ProtobufPowerDNS) Listen() error { @@ -264,8 +276,10 @@ MONITOR_LOOP: bufferFull.Stop() c.doneMonitor <- true break MONITOR_LOOP + case <-c.dropped: c.droppedCount++ + case <-bufferFull.C: if c.droppedCount > 0 { c.LogError("recv buffer is full, %d packet(s) dropped", c.droppedCount) @@ -290,35 +304,63 @@ func (c *ProtobufPowerDNS) Run() { // start goroutine to count dropped messsages go c.MonitorCollector() - for { - // Accept() blocks waiting for new connection. - conn, err := c.listen.Accept() - if err != nil { - break - } - - if c.config.Collectors.Dnstap.RcvBufSize > 0 { - before, actual, err := netlib.SetSock_RCVBUF( - conn, - c.config.Collectors.Dnstap.RcvBufSize, - c.config.Collectors.Dnstap.TlsSupport, - ) + // goroutine to Accept() blocks waiting for new connection. + acceptChan := make(chan net.Conn) + go func() { + for { + conn, err := c.listen.Accept() if err != nil { - c.logger.Fatal("Unable to set SO_RCVBUF: ", err) + return } - c.LogInfo("set SO_RCVBUF option, value before: %d, desired: %d, actual: %d", - before, - c.config.Collectors.Dnstap.RcvBufSize, - actual) + acceptChan <- conn } + }() - c.Lock() - c.conns = append(c.conns, conn) - c.Unlock() - go c.HandleConn(conn) +RUN_LOOP: + for { + select { + case <-c.stopRun: + close(acceptChan) + c.doneRun <- true + break RUN_LOOP + + case cfg := <-c.configChan: + // save the new config + c.config = cfg + c.ReadConfig() + + // refresh config for all conns + for i := range c.pdnsProcessors { + c.pdnsProcessors[i].ConfigChan <- cfg + } + + case conn, opened := <-acceptChan: + if !opened { + return + } + if c.config.Collectors.Dnstap.RcvBufSize > 0 { + before, actual, err := netlib.SetSock_RCVBUF( + conn, + c.config.Collectors.Dnstap.RcvBufSize, + c.config.Collectors.Dnstap.TlsSupport, + ) + if err != nil { + c.logger.Fatal("Unable to set SO_RCVBUF: ", err) + } + c.LogInfo("set SO_RCVBUF option, value before: %d, desired: %d, actual: %d", + before, + c.config.Collectors.Dnstap.RcvBufSize, + actual) + } + + c.Lock() + c.conns = append(c.conns, conn) + c.Unlock() + go c.HandleConn(conn) + + } } c.LogInfo("run terminated") - c.doneRun <- true } diff --git a/collectors/sniffer_afpacket.go b/collectors/sniffer_afpacket.go index 168e665a..2f553349 100644 --- a/collectors/sniffer_afpacket.go +++ b/collectors/sniffer_afpacket.go @@ -14,6 +14,7 @@ import ( "github.com/dmachard/go-dnscollector/dnsutils" "github.com/dmachard/go-dnscollector/netlib" + "github.com/dmachard/go-dnscollector/processors" "github.com/dmachard/go-logger" "github.com/google/gopacket" "github.com/google/gopacket/layers" @@ -147,27 +148,27 @@ func RemoveBpfFilter(fd int) (err error) { } type AfpacketSniffer struct { - done chan bool - exit chan bool - fd int - port int - device string - identity string - loggers []dnsutils.Worker - config *dnsutils.Config - logger *logger.Logger - name string + done chan bool + exit chan bool + fd int + identity string + loggers []dnsutils.Worker + config *dnsutils.Config + configChan chan *dnsutils.Config + logger *logger.Logger + name string } func NewAfpacketSniffer(loggers []dnsutils.Worker, config *dnsutils.Config, logger *logger.Logger, name string) *AfpacketSniffer { logger.Info("[%s] collector=afpacket - enabled", name) s := &AfpacketSniffer{ - done: make(chan bool), - exit: make(chan bool), - config: config, - loggers: loggers, - logger: logger, - name: name, + done: make(chan bool), + exit: make(chan bool), + config: config, + configChan: make(chan *dnsutils.Config), + loggers: loggers, + logger: logger, + name: name, } s.ReadConfig() return s @@ -198,9 +199,12 @@ func (c *AfpacketSniffer) Loggers() ([]chan dnsutils.DnsMessage, []string) { } func (c *AfpacketSniffer) ReadConfig() { - c.port = c.config.Collectors.AfpacketLiveCapture.Port c.identity = c.config.GetServerIdentity() - c.device = c.config.Collectors.AfpacketLiveCapture.Device +} + +func (c *AfpacketSniffer) ReloadConfig(config *dnsutils.Config) { + c.LogInfo("reload configuration...") + c.configChan <- config } func (c *AfpacketSniffer) Channel() chan dnsutils.DnsMessage { @@ -226,8 +230,8 @@ func (c *AfpacketSniffer) Listen() error { } // bind to device ? - if c.device != "" { - iface, err := net.InterfaceByName(c.device) + if c.config.Collectors.AfpacketLiveCapture.Device != "" { + iface, err := net.InterfaceByName(c.config.Collectors.AfpacketLiveCapture.Device) if err != nil { return err } @@ -249,8 +253,7 @@ func (c *AfpacketSniffer) Listen() error { return err } - filter := GetBpfFilter(c.port) - //filter := GetBpfFilter_Ingress(c.port) + filter := GetBpfFilter(c.config.Collectors.AfpacketLiveCapture.Port) err = ApplyBpfFilter(filter, fd) if err != nil { return err @@ -274,7 +277,7 @@ func (c *AfpacketSniffer) Run() { } } - dnsProcessor := NewDnsProcessor(c.config, c.logger, c.name, c.config.Collectors.AfpacketLiveCapture.ChannelBufferSize) + dnsProcessor := processors.NewDnsProcessor(c.config, c.logger, c.name, c.config.Collectors.AfpacketLiveCapture.ChannelBufferSize) go dnsProcessor.Run(c.Loggers()) dnsChan := make(chan netlib.DnsPacket) @@ -299,30 +302,44 @@ func (c *AfpacketSniffer) Run() { // prepare dns message dm := dnsutils.DnsMessage{} - // for { - for dnsPacket := range dnsChan { - // reset - dm.Init() + for { + select { + // new config provided? + case cfg, opened := <-c.configChan: + if !opened { + return + } + c.config = cfg + c.ReadConfig() - dm.NetworkInfo.Family = dnsPacket.IpLayer.EndpointType().String() - dm.NetworkInfo.QueryIp = dnsPacket.IpLayer.Src().String() - dm.NetworkInfo.ResponseIp = dnsPacket.IpLayer.Dst().String() - dm.NetworkInfo.QueryPort = dnsPacket.TransportLayer.Src().String() - dm.NetworkInfo.ResponsePort = dnsPacket.TransportLayer.Dst().String() - dm.NetworkInfo.Protocol = dnsPacket.TransportLayer.EndpointType().String() + // send the config to the dns processor + dnsProcessor.ConfigChan <- cfg - dm.DNS.Payload = dnsPacket.Payload - dm.DNS.Length = len(dnsPacket.Payload) + // dns message to read ? + case dnsPacket := <-dnsChan: + // reset + dm.Init() - dm.DnsTap.Identity = c.identity + dm.NetworkInfo.Family = dnsPacket.IpLayer.EndpointType().String() + dm.NetworkInfo.QueryIp = dnsPacket.IpLayer.Src().String() + dm.NetworkInfo.ResponseIp = dnsPacket.IpLayer.Dst().String() + dm.NetworkInfo.QueryPort = dnsPacket.TransportLayer.Src().String() + dm.NetworkInfo.ResponsePort = dnsPacket.TransportLayer.Dst().String() + dm.NetworkInfo.Protocol = dnsPacket.TransportLayer.EndpointType().String() - timestamp := dnsPacket.Timestamp.UnixNano() - seconds := timestamp / int64(time.Second) - dm.DnsTap.TimeSec = int(seconds) - dm.DnsTap.TimeNsec = int(timestamp - seconds*int64(time.Second)*int64(time.Nanosecond)) + dm.DNS.Payload = dnsPacket.Payload + dm.DNS.Length = len(dnsPacket.Payload) - // send DNS message to DNS processor - dnsProcessor.GetChannel() <- dm + dm.DnsTap.Identity = c.identity + + timestamp := dnsPacket.Timestamp.UnixNano() + seconds := timestamp / int64(time.Second) + dm.DnsTap.TimeSec = int(seconds) + dm.DnsTap.TimeNsec = int(timestamp - seconds*int64(time.Second)*int64(time.Nanosecond)) + + // send DNS message to DNS processor + dnsProcessor.GetChannel() <- dm + } } }() @@ -415,6 +432,7 @@ func (c *AfpacketSniffer) Run() { <-c.exit close(dnsChan) + close(c.configChan) // stop dns processor dnsProcessor.Stop() diff --git a/collectors/sniffer_afpacket_darwin.go b/collectors/sniffer_afpacket_darwin.go index 58d909af..31721386 100644 --- a/collectors/sniffer_afpacket_darwin.go +++ b/collectors/sniffer_afpacket_darwin.go @@ -54,8 +54,9 @@ func (c *AfpacketSniffer) Loggers() []chan dnsutils.DnsMessage { return channels } -func (c *AfpacketSniffer) ReadConfig() { -} +func (c *AfpacketSniffer) ReadConfig() {} + +func (c *AfpacketSniffer) ReloadConfig(config *dnsutils.Config) {} func (c *AfpacketSniffer) Channel() chan dnsutils.DnsMessage { return nil diff --git a/collectors/sniffer_afpacket_freebsd.go b/collectors/sniffer_afpacket_freebsd.go index e9faffae..4e744bdc 100644 --- a/collectors/sniffer_afpacket_freebsd.go +++ b/collectors/sniffer_afpacket_freebsd.go @@ -54,8 +54,9 @@ func (c *AfpacketSniffer) Loggers() []chan dnsutils.DnsMessage { return channels } -func (c *AfpacketSniffer) ReadConfig() { -} +func (c *AfpacketSniffer) ReadConfig() {} + +func (c *AfpacketSniffer) ReloadConfig(config *dnsutils.Config) {} func (c *AfpacketSniffer) Channel() chan dnsutils.DnsMessage { return nil diff --git a/collectors/sniffer_afpacket_windows.go b/collectors/sniffer_afpacket_windows.go index 2a38d093..e284f3f2 100644 --- a/collectors/sniffer_afpacket_windows.go +++ b/collectors/sniffer_afpacket_windows.go @@ -54,8 +54,9 @@ func (c *AfpacketSniffer) Loggers() []chan dnsutils.DnsMessage { return channels } -func (c *AfpacketSniffer) ReadConfig() { -} +func (c *AfpacketSniffer) ReadConfig() {} + +func (c *AfpacketSniffer) ReloadConfig(config *dnsutils.Config) {} func (c *AfpacketSniffer) Channel() chan dnsutils.DnsMessage { return nil diff --git a/collectors/sniffer_xdp.go b/collectors/sniffer_xdp.go index b243b96d..eb32bad9 100644 --- a/collectors/sniffer_xdp.go +++ b/collectors/sniffer_xdp.go @@ -14,6 +14,7 @@ import ( "github.com/cilium/ebpf/link" "github.com/cilium/ebpf/perf" "github.com/dmachard/go-dnscollector/dnsutils" + "github.com/dmachard/go-dnscollector/processors" "github.com/dmachard/go-dnscollector/xdp" "github.com/dmachard/go-logger" "golang.org/x/sys/unix" @@ -39,24 +40,26 @@ func ConvertIp6(ip [4]uint32) net.IP { } type XdpSniffer struct { - done chan bool - exit chan bool - identity string - loggers []dnsutils.Worker - config *dnsutils.Config - logger *logger.Logger - name string + done chan bool + exit chan bool + identity string + loggers []dnsutils.Worker + config *dnsutils.Config + configChan chan *dnsutils.Config + logger *logger.Logger + name string } func NewXdpSniffer(loggers []dnsutils.Worker, config *dnsutils.Config, logger *logger.Logger, name string) *XdpSniffer { logger.Info("[%s] collector=xdp - enabled", name) s := &XdpSniffer{ - done: make(chan bool), - exit: make(chan bool), - config: config, - loggers: loggers, - logger: logger, - name: name, + done: make(chan bool), + exit: make(chan bool), + config: config, + configChan: make(chan *dnsutils.Config), + loggers: loggers, + logger: logger, + name: name, } s.ReadConfig() return s @@ -90,6 +93,11 @@ func (c *XdpSniffer) ReadConfig() { c.identity = c.config.GetServerIdentity() } +func (c *XdpSniffer) ReloadConfig(config *dnsutils.Config) { + c.LogInfo("reload configuration...") + c.configChan <- config +} + func (c *XdpSniffer) Channel() chan dnsutils.DnsMessage { return nil } @@ -108,7 +116,7 @@ func (c *XdpSniffer) Stop() { func (c *XdpSniffer) Run() { c.LogInfo("starting collector...") - dnsProcessor := NewDnsProcessor(c.config, c.logger, c.name, c.config.Collectors.XdpLiveCapture.ChannelBufferSize) + dnsProcessor := processors.NewDnsProcessor(c.config, c.logger, c.name, c.config.Collectors.XdpLiveCapture.ChannelBufferSize) go dnsProcessor.Run(c.Loggers()) iface, err := net.InterfaceByName(c.config.Collectors.XdpLiveCapture.Device) @@ -143,6 +151,35 @@ func (c *XdpSniffer) Run() { panic(err) } + dnsChan := make(chan dnsutils.DnsMessage) + + // goroutine to read all packets reassembled + go func() { + for { + select { + // new config provided? + case cfg, opened := <-c.configChan: + if !opened { + return + } + c.config = cfg + c.ReadConfig() + + // send the config to the dns processor + dnsProcessor.ConfigChan <- cfg + + //dns message to read ? + case dm := <-dnsChan: + + // update identity with config ? + dm.DnsTap.Identity = c.identity + + dnsProcessor.GetChannel() <- dm + + } + } + }() + go func() { var pkt xdp.BpfPktEvent for { @@ -189,7 +226,6 @@ func (c *XdpSniffer) Run() { dm.DnsTap.TimeSec = int(tsAdjusted.Unix()) dm.DnsTap.TimeNsec = int(tsAdjusted.UnixNano() - tsAdjusted.Unix()*1e9) - dm.DnsTap.Identity = c.identity if pkt.SrcPort == 53 { dm.DnsTap.Operation = dnsutils.DNSTAP_CLIENT_RESPONSE } else { @@ -217,11 +253,13 @@ func (c *XdpSniffer) Run() { dm.DNS.Length = len(dm.DNS.Payload) } - dnsProcessor.GetChannel() <- dm - + dnsChan <- dm } }() + <-c.exit + close(dnsChan) + close(c.configChan) // stop dns processor dnsProcessor.Stop() diff --git a/collectors/sniffer_xdp_windows.go b/collectors/sniffer_xdp_windows.go index 45402972..1a48a0e1 100644 --- a/collectors/sniffer_xdp_windows.go +++ b/collectors/sniffer_xdp_windows.go @@ -54,9 +54,9 @@ func (c *XdpSniffer) Loggers() []chan dnsutils.DnsMessage { return channels } -func (c *XdpSniffer) ReadConfig() { - c.identity = c.config.GetServerIdentity() -} +func (c *XdpSniffer) ReadConfig() {} + +func (c *XdpSniffer) ReloadConfig(config *dnsutils.Config) {} func (c *XdpSniffer) Channel() chan dnsutils.DnsMessage { return nil diff --git a/collectors/tzsp.go b/collectors/tzsp.go index 26f53042..f711d3f4 100644 --- a/collectors/tzsp.go +++ b/collectors/tzsp.go @@ -12,6 +12,7 @@ import ( "syscall" "github.com/dmachard/go-dnscollector/dnsutils" + "github.com/dmachard/go-dnscollector/processors" "github.com/dmachard/go-logger" "github.com/google/gopacket" "github.com/google/gopacket/layers" @@ -27,8 +28,6 @@ type TzspSniffer struct { logger *logger.Logger name string identity string - port int - ip string } func NewTzsp(loggers []dnsutils.Worker, config *dnsutils.Config, logger *logger.Logger, name string) *TzspSniffer { @@ -70,17 +69,17 @@ func (c *TzspSniffer) LogError(msg string, v ...interface{}) { } func (c *TzspSniffer) ReadConfig() { - - c.port = c.config.Collectors.Tzsp.ListenPort - c.ip = c.config.Collectors.Tzsp.ListenIp c.identity = c.config.GetServerIdentity() - // TODO: Implement +} + +func (c *TzspSniffer) ReloadConfig(config *dnsutils.Config) { + // TODO implement reload configuration } func (c *TzspSniffer) Listen() error { c.logger.Info("running in background...") - ServerAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", c.ip, c.port)) + ServerAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", c.config.Collectors.Tzsp.ListenIp, c.config.Collectors.Tzsp.ListenPort)) if err != nil { return err } @@ -127,8 +126,7 @@ func (c *TzspSniffer) Run() { c.logger.Fatal("collector=tzsp listening failed: ", err) } - dnsProcessor := NewDnsProcessor(c.config, c.logger, c.name, c.config.Collectors.Tzsp.ChannelBufferSize) - + dnsProcessor := processors.NewDnsProcessor(c.config, c.logger, c.name, c.config.Collectors.Tzsp.ChannelBufferSize) go dnsProcessor.Run(c.Loggers()) go func() { diff --git a/collectors/tzsp_darwin.go b/collectors/tzsp_darwin.go index 5cfc7866..6c80948f 100644 --- a/collectors/tzsp_darwin.go +++ b/collectors/tzsp_darwin.go @@ -54,8 +54,9 @@ func (c *TzspSniffer) Loggers() []chan dnsutils.DnsMessage { return channels } -func (c *TzspSniffer) ReadConfig() { -} +func (c *TzspSniffer) ReadConfig() {} + +func (c *TzspSniffer) ReloadConfig(config *dnsutils.Config) {} func (c *TzspSniffer) Channel() chan dnsutils.DnsMessage { return nil diff --git a/collectors/tzsp_freebsd.go b/collectors/tzsp_freebsd.go index 3622acd1..d10bdd01 100644 --- a/collectors/tzsp_freebsd.go +++ b/collectors/tzsp_freebsd.go @@ -54,8 +54,9 @@ func (c *TzspSniffer) Loggers() []chan dnsutils.DnsMessage { return channels } -func (c *TzspSniffer) ReadConfig() { -} +func (c *TzspSniffer) ReadConfig() {} + +func (c *TzspSniffer) ReloadConfig(config *dnsutils.Config) {} func (c *TzspSniffer) Channel() chan dnsutils.DnsMessage { return nil diff --git a/collectors/tzsp_windows.go b/collectors/tzsp_windows.go index 55ea3d6a..0fbd49e5 100644 --- a/collectors/tzsp_windows.go +++ b/collectors/tzsp_windows.go @@ -54,8 +54,9 @@ func (c *TzspSniffer) Loggers() []chan dnsutils.DnsMessage { return channels } -func (c *TzspSniffer) ReadConfig() { -} +func (c *TzspSniffer) ReadConfig() {} + +func (c *TzspSniffer) ReloadConfig(config *dnsutils.Config) {} func (c *TzspSniffer) Channel() chan dnsutils.DnsMessage { return nil diff --git a/config.yml b/config.yml index 7d65fffc..acaf3877 100644 --- a/config.yml +++ b/config.yml @@ -70,6 +70,9 @@ multiplexer: dnstap: listen-ip: 0.0.0.0 listen-port: 6000 + transforms: + normalize: + qname-lowercase: false loggers: - name: console @@ -195,12 +198,12 @@ multiplexer: # # Channel buffer size for incoming packets, number of packet before to drop it. # chan-buffer-size: 65535 -# # ztsp (TaZmen Sniffer Protocol) -# ztsp: -# # listen on ip -# listen-ip: 0.0.0.0 -# # listen on port -# listen-port: 10000 +# # tzsp (TaZmen Sniffer Protocol) +# tzsp: +# # listen on ip +# listen-ip: 0.0.0.0 +# # listen on port +# listen-port: 10000 # # Channel buffer size for incoming packets, number of packet before to drop it. # chan-buffer-size: 65535 @@ -680,6 +683,8 @@ multiplexer: # log-queries: true # # forward received replies to configured loggers ? # log-replies: true +# # only keep 1 out of every downsample records, e.g. if set to 20, then this will return every 20th record, dropping 95% of queries +# downsample: 0 # # GeoIP maxmind support, more information on https://www.maxmind.com/en/geoip-demo # # this feature can be used to append additional informations like country, city, asn diff --git a/dnscollector.go b/dnscollector.go index fc8d73c7..56f6ed42 100644 --- a/dnscollector.go +++ b/dnscollector.go @@ -61,6 +61,49 @@ func AreRoutesValid(config *dnsutils.Config) (ret error) { return } +func InitLogger(logger *logger.Logger, config *dnsutils.Config) { + // redirect app logs to file ? + if len(config.Global.Trace.Filename) > 0 { + logger.SetOutput(&lumberjack.Logger{ + Filename: config.Global.Trace.Filename, + MaxSize: config.Global.Trace.MaxSize, + MaxBackups: config.Global.Trace.MaxBackups, + }) + } + + // enable the verbose mode ? + logger.SetVerbose(config.Global.Trace.Verbose) +} + +func GetItemConfig(section string, config *dnsutils.Config, item dnsutils.MultiplexInOut) *dnsutils.Config { + // load config + cfg := make(map[string]interface{}) + cfg[section] = item.Params + cfg[section+"-transformers"] = make(map[string]interface{}) + for _, p := range item.Params { + p.(map[string]interface{})["enable"] = true + } + + // get config with default values + subcfg := &dnsutils.Config{} + subcfg.SetDefault() + + // add transformer + for k, v := range item.Transforms { + v.(map[string]interface{})["enable"] = true + cfg[section+"-transformers"].(map[string]interface{})[k] = v + } + + // copy global config + subcfg.Global = config.Global + + yamlcfg, _ := yaml.Marshal(cfg) + if err := yaml.Unmarshal(yamlcfg, subcfg); err != nil { + panic(fmt.Sprintf("main - yaml logger config error: %v", err)) + } + return subcfg +} + func main() { args := os.Args[1:] // Ignore the first argument (the program name) @@ -111,51 +154,25 @@ func main() { panic(fmt.Sprintf("main - config error: %v", err)) } - // redirect app logs to file ? - if len(config.Global.Trace.Filename) > 0 { - logger.SetOutput(&lumberjack.Logger{ - Filename: config.Global.Trace.Filename, - MaxSize: config.Global.Trace.MaxSize, - MaxBackups: config.Global.Trace.MaxBackups, - }) - } - - // enable the verbose mode ? - logger.SetVerbose(config.Global.Trace.Verbose) + // init logger + InitLogger(logger, config) logger.Info("main - version=%s revision=%s", version.Version, version.Revision) logger.Info("main - starting dns-collector...") + // checking all routes before to continue + if err := AreRoutesValid(config); err != nil { + panic(fmt.Sprintf("main - configuration error: %e", err)) + } + // load loggers logger.Info("main - loading loggers...") mapLoggers := make(map[string]dnsutils.Worker) for _, output := range config.Multiplexer.Loggers { - // load config - cfg := make(map[string]interface{}) - cfg["loggers"] = output.Params - cfg["outgoing-transformers"] = make(map[string]interface{}) - for _, p := range output.Params { - p.(map[string]interface{})["enable"] = true - } - - // get config with default values - subcfg := &dnsutils.Config{} - subcfg.SetDefault() - - // add transformer - for k, v := range output.Transforms { - v.(map[string]interface{})["enable"] = true - cfg["outgoing-transformers"].(map[string]interface{})[k] = v - } - - // copy global config - subcfg.Global = config.Global - - yamlcfg, _ := yaml.Marshal(cfg) - if err := yaml.Unmarshal(yamlcfg, subcfg); err != nil { - panic(fmt.Sprintf("main - yaml logger config error: %v", err)) - } + // prepare restructured config for the current logger + subcfg := GetItemConfig("loggers", config, output) + // registor the logger if enabled if subcfg.Loggers.RestAPI.Enable && IsLoggerRouted(config, output.Name) { mapLoggers[output.Name] = loggers.NewRestAPI(subcfg, logger, output.Name) } @@ -210,36 +227,10 @@ func main() { logger.Info("main - loading collectors...") mapCollectors := make(map[string]dnsutils.Worker) for _, input := range config.Multiplexer.Collectors { - // load config - cfg := make(map[string]interface{}) - cfg["collectors"] = input.Params - cfg["ingoing-transformers"] = make(map[string]interface{}) - for _, p := range input.Params { - p.(map[string]interface{})["enable"] = true - } - - // get config with default values - subcfg := &dnsutils.Config{} - subcfg.SetDefault() - - // add transformer - for k, v := range input.Transforms { - v.(map[string]interface{})["enable"] = true - cfg["ingoing-transformers"].(map[string]interface{})[k] = v - } - - // copy global config - subcfg.Global = config.Global - - yamlcfg, _ := yaml.Marshal(cfg) - if err := yaml.Unmarshal(yamlcfg, subcfg); err != nil { - panic(fmt.Sprintf("main - yaml collector config error: %v", err)) - } - - if err := AreRoutesValid(config); err != nil { - panic(fmt.Sprintf("main - configuration error: %e", err)) - } + // prepare restructured config for the current collector + subcfg := GetItemConfig("collectors", config, input) + // register the collector if enabled if subcfg.Collectors.Dnstap.Enable && IsCollectorRouted(config, input.Name) { mapCollectors[input.Name] = collectors.NewDnstap(nil, subcfg, logger, input.Name) } @@ -261,23 +252,23 @@ func main() { if subcfg.Collectors.FileIngestor.Enable && IsCollectorRouted(config, input.Name) { mapCollectors[input.Name] = collectors.NewFileIngestor(nil, subcfg, logger, input.Name) } - if subcfg.Collectors.Tzsp.Enable { + if subcfg.Collectors.Tzsp.Enable && IsCollectorRouted(config, input.Name) { mapCollectors[input.Name] = collectors.NewTzsp(nil, subcfg, logger, input.Name) } } // here the multiplexer logic // connect collectors between loggers - for _, routes := range config.Multiplexer.Routes { + for _, route := range config.Multiplexer.Routes { var logwrks []dnsutils.Worker - for _, dst := range routes.Dst { + for _, dst := range route.Dst { if _, ok := mapLoggers[dst]; ok { logwrks = append(logwrks, mapLoggers[dst]) } else { panic(fmt.Sprintf("main - routing error: logger %v doest not exist", dst)) } } - for _, src := range routes.Src { + for _, src := range route.Src { if _, ok := mapCollectors[src]; ok { mapCollectors[src].SetLoggers(logwrks) } else { @@ -300,7 +291,7 @@ func main() { for { select { case <-sigHUP: - logger.Info("main - reloading config...") + logger.Info("main - SIGHUP received") // read config err := dnsutils.ReloadConfig(configPath, config) @@ -308,8 +299,26 @@ func main() { panic(fmt.Sprintf("main - reload config error: %v", err)) } - // enable the verbose mode ? - logger.SetVerbose(config.Global.Trace.Verbose) + // reload logger + InitLogger(logger, config) + + for _, output := range config.Multiplexer.Loggers { + newcfg := GetItemConfig("loggers", config, output) + if _, ok := mapLoggers[output.Name]; ok { + mapLoggers[output.Name].ReloadConfig(newcfg) + } else { + logger.Info("main - reload config logger=%v doest not exist", output.Name) + } + } + + for _, input := range config.Multiplexer.Collectors { + newcfg := GetItemConfig("collectors", config, input) + if _, ok := mapCollectors[input.Name]; ok { + mapCollectors[input.Name].ReloadConfig(newcfg) + } else { + logger.Info("main - reload config collector=%v doest not exist", input.Name) + } + } case <-sigTerm: logger.Info("main - exiting...") diff --git a/dnsutils/config.go b/dnsutils/config.go index 794ca8f6..3a0f5a46 100644 --- a/dnsutils/config.go +++ b/dnsutils/config.go @@ -247,10 +247,10 @@ type Config struct { ListenIp string `yaml:"listen-ip"` ListenPort int `yaml:"listen-port"` ChannelBufferSize int `yaml:"chan-buffer-size"` - } + } `yaml:"tzsp"` } `yaml:"collectors"` - IngoingTransformers ConfigTransformers `yaml:"ingoing-transformers"` + IngoingTransformers ConfigTransformers `yaml:"collectors-transformers"` Loggers struct { Stdout struct { @@ -481,7 +481,7 @@ type Config struct { } `yaml:"falco"` } `yaml:"loggers"` - OutgoingTransformers ConfigTransformers `yaml:"outgoing-transformers"` + OutgoingTransformers ConfigTransformers `yaml:"loggers-transformers"` Multiplexer struct { Collectors []MultiplexInOut `yaml:"collectors"` diff --git a/dnsutils/dns.go b/dnsutils/dns_parser.go similarity index 100% rename from dnsutils/dns.go rename to dnsutils/dns_parser.go diff --git a/dnsutils/dns_test.go b/dnsutils/dns_parser_test.go similarity index 100% rename from dnsutils/dns_test.go rename to dnsutils/dns_parser_test.go diff --git a/dnsutils/edns.go b/dnsutils/edns_parser.go similarity index 100% rename from dnsutils/edns.go rename to dnsutils/edns_parser.go diff --git a/dnsutils/edns_test.go b/dnsutils/edns_parser_test.go similarity index 100% rename from dnsutils/edns_test.go rename to dnsutils/edns_parser_test.go diff --git a/dnsutils/worker.go b/dnsutils/worker.go index ccaf2d0e..136179d5 100644 --- a/dnsutils/worker.go +++ b/dnsutils/worker.go @@ -7,4 +7,5 @@ type Worker interface { Run() Channel() chan DnsMessage ReadConfig() + ReloadConfig(config *Config) } diff --git a/loggers/dnstapclient.go b/loggers/dnstapclient.go index e8e4cfd9..7628b299 100644 --- a/loggers/dnstapclient.go +++ b/loggers/dnstapclient.go @@ -21,6 +21,7 @@ type DnstapSender struct { inputChan chan dnsutils.DnsMessage outputChan chan dnsutils.DnsMessage config *dnsutils.Config + configChan chan *dnsutils.Config logger *logger.Logger fs *framestream.Fstrm fsReady bool @@ -43,6 +44,7 @@ func NewDnstapSender(config *dnsutils.Config, logger *logger.Logger, name string transportReconnect: make(chan bool), logger: logger, config: config, + configChan: make(chan *dnsutils.Config), name: name, } @@ -66,6 +68,11 @@ func (o *DnstapSender) ReadConfig() { } } +func (o *DnstapSender) ReloadConfig(config *dnsutils.Config) { + o.LogInfo("reload configuration!") + o.configChan <- config +} + func (o *DnstapSender) LogInfo(msg string, v ...interface{}) { o.logger.Info("["+o.name+"] logger=dnstap - "+msg, v...) } @@ -220,6 +227,14 @@ RUN_LOOP: o.doneRun <- true break RUN_LOOP + case cfg, opened := <-o.configChan: + if !opened { + return + } + o.config = cfg + o.ReadConfig() + subprocessors.ReloadConfig(&cfg.OutgoingTransformers) + case dm, opened := <-o.inputChan: if !opened { o.LogInfo("input channel closed!") diff --git a/loggers/elasticsearch.go b/loggers/elasticsearch.go index dfa40878..dbad2d7d 100644 --- a/loggers/elasticsearch.go +++ b/loggers/elasticsearch.go @@ -22,6 +22,7 @@ type ElasticSearchClient struct { inputChan chan dnsutils.DnsMessage outputChan chan dnsutils.DnsMessage config *dnsutils.Config + configChan chan *dnsutils.Config logger *logger.Logger name string server string @@ -40,6 +41,7 @@ func NewElasticSearchClient(config *dnsutils.Config, console *logger.Logger, nam outputChan: make(chan dnsutils.DnsMessage, config.Loggers.ElasticSearchClient.ChannelBufferSize), logger: console, config: config, + configChan: make(chan *dnsutils.Config), name: name, } o.ReadConfig() @@ -62,6 +64,11 @@ func (c *ElasticSearchClient) ReadConfig() { c.bulkUrl = u.String() } +func (o *ElasticSearchClient) ReloadConfig(config *dnsutils.Config) { + o.LogInfo("reload configuration!") + o.configChan <- config +} + func (o *ElasticSearchClient) Channel() chan dnsutils.DnsMessage { return o.inputChan } @@ -106,6 +113,14 @@ RUN_LOOP: o.doneRun <- true break RUN_LOOP + case cfg, opened := <-o.configChan: + if !opened { + return + } + o.config = cfg + o.ReadConfig() + subprocessors.ReloadConfig(&cfg.OutgoingTransformers) + case dm, opened := <-o.inputChan: if !opened { o.LogInfo("input channel closed!") diff --git a/loggers/fakelogger.go b/loggers/fakelogger.go index 63c9b1f2..96f3f148 100644 --- a/loggers/fakelogger.go +++ b/loggers/fakelogger.go @@ -25,6 +25,8 @@ func (c *FakeLogger) SetLoggers(loggers []dnsutils.Worker) {} func (o *FakeLogger) ReadConfig() {} +func (o *FakeLogger) ReloadConfig(config *dnsutils.Config) {} + func (o *FakeLogger) Stop() {} func (o *FakeLogger) Channel() chan dnsutils.DnsMessage { diff --git a/loggers/falco.go b/loggers/falco.go index 6f597fa9..955dd12a 100644 --- a/loggers/falco.go +++ b/loggers/falco.go @@ -19,6 +19,7 @@ type FalcoClient struct { inputChan chan dnsutils.DnsMessage outputChan chan dnsutils.DnsMessage config *dnsutils.Config + configChan chan *dnsutils.Config logger *logger.Logger name string url string @@ -35,6 +36,7 @@ func NewFalcoClient(config *dnsutils.Config, console *logger.Logger, name string outputChan: make(chan dnsutils.DnsMessage, config.Loggers.FalcoClient.ChannelBufferSize), logger: console, config: config, + configChan: make(chan *dnsutils.Config), name: name, } f.ReadConfig() @@ -49,6 +51,11 @@ func (c *FalcoClient) ReadConfig() { c.url = c.config.Loggers.FalcoClient.URL } +func (c *FalcoClient) ReloadConfig(config *dnsutils.Config) { + c.LogInfo("reload configuration!") + c.configChan <- config +} + func (f *FalcoClient) Channel() chan dnsutils.DnsMessage { return f.inputChan } @@ -93,6 +100,14 @@ RUN_LOOP: f.doneRun <- true break RUN_LOOP + case cfg, opened := <-f.configChan: + if !opened { + return + } + f.config = cfg + f.ReadConfig() + subprocessors.ReloadConfig(&cfg.OutgoingTransformers) + case dm, opened := <-f.inputChan: if !opened { f.LogInfo("input channel closed!") diff --git a/loggers/fluentd.go b/loggers/fluentd.go index 63d2dd64..f7dcc25d 100644 --- a/loggers/fluentd.go +++ b/loggers/fluentd.go @@ -20,6 +20,7 @@ type FluentdClient struct { inputChan chan dnsutils.DnsMessage outputChan chan dnsutils.DnsMessage config *dnsutils.Config + configChan chan *dnsutils.Config logger *logger.Logger transportConn net.Conn transportReady chan bool @@ -41,6 +42,7 @@ func NewFluentdClient(config *dnsutils.Config, logger *logger.Logger, name strin transportReconnect: make(chan bool), logger: logger, config: config, + configChan: make(chan *dnsutils.Config), name: name, } @@ -59,6 +61,11 @@ func (o *FluentdClient) ReadConfig() { } } +func (o *FluentdClient) ReloadConfig(config *dnsutils.Config) { + o.LogInfo("reload configuration!") + o.configChan <- config +} + func (o *FluentdClient) LogInfo(msg string, v ...interface{}) { o.logger.Info("["+o.name+"] logger=fluentd - "+msg, v...) } @@ -201,6 +208,14 @@ RUN_LOOP: o.doneRun <- true break RUN_LOOP + case cfg, opened := <-o.configChan: + if !opened { + return + } + o.config = cfg + o.ReadConfig() + subprocessors.ReloadConfig(&cfg.OutgoingTransformers) + case dm, opened := <-o.inputChan: if !opened { o.LogInfo("input channel closed!") diff --git a/loggers/influxdb.go b/loggers/influxdb.go index 792b5f99..4db38055 100644 --- a/loggers/influxdb.go +++ b/loggers/influxdb.go @@ -20,6 +20,7 @@ type InfluxDBClient struct { inputChan chan dnsutils.DnsMessage outputChan chan dnsutils.DnsMessage config *dnsutils.Config + configChan chan *dnsutils.Config logger *logger.Logger influxdbConn influxdb2.Client writeAPI api.WriteAPI @@ -38,6 +39,7 @@ func NewInfluxDBClient(config *dnsutils.Config, logger *logger.Logger, name stri outputChan: make(chan dnsutils.DnsMessage, config.Loggers.InfluxDB.ChannelBufferSize), logger: logger, config: config, + configChan: make(chan *dnsutils.Config), name: name, } @@ -56,6 +58,11 @@ func (o *InfluxDBClient) ReadConfig() { } } +func (o *InfluxDBClient) ReloadConfig(config *dnsutils.Config) { + o.LogInfo("reload configuration!") + o.configChan <- config +} + func (o *InfluxDBClient) LogInfo(msg string, v ...interface{}) { o.logger.Info("["+o.name+"] logger=influxdb - "+msg, v...) } @@ -100,6 +107,14 @@ RUN_LOOP: o.doneRun <- true break RUN_LOOP + case cfg, opened := <-o.configChan: + if !opened { + return + } + o.config = cfg + o.ReadConfig() + subprocessors.ReloadConfig(&cfg.OutgoingTransformers) + case dm, opened := <-o.inputChan: if !opened { o.LogInfo("input channel closed!") diff --git a/loggers/kafkaproducer.go b/loggers/kafkaproducer.go index 4fb6dc52..8744efb8 100644 --- a/loggers/kafkaproducer.go +++ b/loggers/kafkaproducer.go @@ -25,6 +25,7 @@ type KafkaProducer struct { inputChan chan dnsutils.DnsMessage outputChan chan dnsutils.DnsMessage config *dnsutils.Config + configChan chan *dnsutils.Config logger *logger.Logger textFormat []string name string @@ -45,6 +46,7 @@ func NewKafkaProducer(config *dnsutils.Config, logger *logger.Logger, name strin outputChan: make(chan dnsutils.DnsMessage, config.Loggers.KafkaProducer.ChannelBufferSize), logger: logger, config: config, + configChan: make(chan *dnsutils.Config), kafkaReady: make(chan bool), kafkaReconnect: make(chan bool), name: name, @@ -72,6 +74,11 @@ func (o *KafkaProducer) ReadConfig() { } } +func (o *KafkaProducer) ReloadConfig(config *dnsutils.Config) { + o.LogInfo("reload configuration!") + o.configChan <- config +} + func (o *KafkaProducer) LogInfo(msg string, v ...interface{}) { o.logger.Info("["+o.name+"] logger=kafka - "+msg, v...) } @@ -237,6 +244,14 @@ RUN_LOOP: o.doneRun <- true break RUN_LOOP + case cfg, opened := <-o.configChan: + if !opened { + return + } + o.config = cfg + o.ReadConfig() + subprocessors.ReloadConfig(&cfg.OutgoingTransformers) + case dm, opened := <-o.inputChan: if !opened { o.LogInfo("input channel closed!") diff --git a/loggers/logfile.go b/loggers/logfile.go index ad281194..338126c9 100644 --- a/loggers/logfile.go +++ b/loggers/logfile.go @@ -54,6 +54,7 @@ type LogFile struct { writerPcap *pcapgo.Writer writerDnstap *framestream.Encoder config *dnsutils.Config + configChan chan *dnsutils.Config logger *logger.Logger fileFd *os.File fileSize int64 @@ -76,6 +77,7 @@ func NewLogFile(config *dnsutils.Config, logger *logger.Logger, name string) *Lo inputChan: make(chan dnsutils.DnsMessage, config.Loggers.LogFile.ChannelBufferSize), outputChan: make(chan dnsutils.DnsMessage, config.Loggers.LogFile.ChannelBufferSize), config: config, + configChan: make(chan *dnsutils.Config), logger: logger, name: name, } @@ -115,6 +117,11 @@ func (l *LogFile) ReadConfig() { l.LogInfo("running in mode: %s", l.config.Loggers.LogFile.Mode) } +func (o *LogFile) ReloadConfig(config *dnsutils.Config) { + o.LogInfo("reload configuration!") + o.configChan <- config +} + func (l *LogFile) LogInfo(msg string, v ...interface{}) { l.logger.Info("["+l.name+"] logger=file - "+msg, v...) } @@ -467,6 +474,15 @@ RUN_LOOP: subprocessors.Reset() l.doneRun <- true break RUN_LOOP + + case cfg, opened := <-l.configChan: + if !opened { + return + } + l.config = cfg + l.ReadConfig() + subprocessors.ReloadConfig(&cfg.OutgoingTransformers) + case dm, opened := <-l.inputChan: if !opened { l.LogInfo("input channel closed!") diff --git a/loggers/lokiclient.go b/loggers/lokiclient.go index aea42fef..95b22a10 100644 --- a/loggers/lokiclient.go +++ b/loggers/lokiclient.go @@ -80,6 +80,7 @@ type LokiClient struct { inputChan chan dnsutils.DnsMessage outputChan chan dnsutils.DnsMessage config *dnsutils.Config + configChan chan *dnsutils.Config logger *logger.Logger httpclient *http.Client textFormat []string @@ -99,6 +100,7 @@ func NewLokiClient(config *dnsutils.Config, logger *logger.Logger, name string) outputChan: make(chan dnsutils.DnsMessage, config.Loggers.LokiClient.ChannelBufferSize), logger: logger, config: config, + configChan: make(chan *dnsutils.Config), streams: make(map[string]*LokiStream), name: name, } @@ -159,6 +161,11 @@ func (o *LokiClient) ReadConfig() { } } +func (o *LokiClient) ReloadConfig(config *dnsutils.Config) { + o.LogInfo("reload configuration!") + o.configChan <- config +} + func (o *LokiClient) LogInfo(msg string, v ...interface{}) { o.logger.Info("["+o.name+"] logger=loki - "+msg, v...) } @@ -203,6 +210,14 @@ RUN_LOOP: o.doneRun <- true break RUN_LOOP + case cfg, opened := <-o.configChan: + if !opened { + return + } + o.config = cfg + o.ReadConfig() + subprocessors.ReloadConfig(&cfg.OutgoingTransformers) + case dm, opened := <-o.inputChan: if !opened { o.LogInfo("input channel closed!") diff --git a/loggers/prometheus.go b/loggers/prometheus.go index 269844f2..e9a7382a 100644 --- a/loggers/prometheus.go +++ b/loggers/prometheus.go @@ -169,9 +169,9 @@ type Prometheus struct { inputChan chan dnsutils.DnsMessage outputChan chan dnsutils.DnsMessage config *dnsutils.Config + configChan chan *dnsutils.Config logger *logger.Logger promRegistry *prometheus.Registry - //version string sync.Mutex catalogueLabels []string @@ -724,20 +724,18 @@ func CreateSystemCatalogue(prom *Prometheus) ([]string, *PromCounterCatalogueCon func NewPrometheus(config *dnsutils.Config, logger *logger.Logger, name string) *Prometheus { logger.Info("[%s] logger=prometheus - enabled", name) o := &Prometheus{ - doneApi: make(chan bool), - stopProcess: make(chan bool), - doneProcess: make(chan bool), - stopRun: make(chan bool), - doneRun: make(chan bool), - config: config, - inputChan: make(chan dnsutils.DnsMessage, config.Loggers.Prometheus.ChannelBufferSize), - outputChan: make(chan dnsutils.DnsMessage, config.Loggers.Prometheus.ChannelBufferSize), - logger: logger, - //version: version, - + doneApi: make(chan bool), + stopProcess: make(chan bool), + doneProcess: make(chan bool), + stopRun: make(chan bool), + doneRun: make(chan bool), + config: config, + configChan: make(chan *dnsutils.Config), + inputChan: make(chan dnsutils.DnsMessage, config.Loggers.Prometheus.ChannelBufferSize), + outputChan: make(chan dnsutils.DnsMessage, config.Loggers.Prometheus.ChannelBufferSize), + logger: logger, promRegistry: prometheus.NewPedanticRegistry(), - - name: name, + name: name, } // This will create a catalogue of counters indexed by fileds requested by config @@ -1042,6 +1040,11 @@ func (o *Prometheus) ReadConfig() { } } +func (o *Prometheus) ReloadConfig(config *dnsutils.Config) { + o.LogInfo("reload configuration!") + o.configChan <- config +} + func (o *Prometheus) LogInfo(msg string, v ...interface{}) { o.logger.Info("["+o.name+"] logger=prometheus - "+msg, v...) } @@ -1176,6 +1179,15 @@ RUN_LOOP: subprocessors.Reset() s.doneRun <- true break RUN_LOOP + + case cfg, opened := <-s.configChan: + if !opened { + return + } + s.config = cfg + s.ReadConfig() + subprocessors.ReloadConfig(&cfg.OutgoingTransformers) + case dm, opened := <-s.inputChan: if !opened { s.LogInfo("input channel closed!") diff --git a/loggers/redispub.go b/loggers/redispub.go index 85ac63a8..3907bd01 100644 --- a/loggers/redispub.go +++ b/loggers/redispub.go @@ -26,6 +26,7 @@ type RedisPub struct { inputChan chan dnsutils.DnsMessage outputChan chan dnsutils.DnsMessage config *dnsutils.Config + configChan chan *dnsutils.Config logger *logger.Logger textFormat []string name string @@ -49,6 +50,7 @@ func NewRedisPub(config *dnsutils.Config, logger *logger.Logger, name string) *R transportReconnect: make(chan bool), logger: logger, config: config, + configChan: make(chan *dnsutils.Config), name: name, } @@ -74,6 +76,11 @@ func (o *RedisPub) ReadConfig() { } } +func (o *RedisPub) ReloadConfig(config *dnsutils.Config) { + o.LogInfo("reload configuration!") + o.configChan <- config +} + func (o *RedisPub) LogInfo(msg string, v ...interface{}) { o.logger.Info("["+o.name+"] logger=redispub - "+msg, v...) } @@ -253,6 +260,14 @@ RUN_LOOP: o.doneRun <- true break RUN_LOOP + case cfg, opened := <-o.configChan: + if !opened { + return + } + o.config = cfg + o.ReadConfig() + subprocessors.ReloadConfig(&cfg.OutgoingTransformers) + case dm, opened := <-o.inputChan: if !opened { o.LogInfo("input channel closed!") diff --git a/loggers/restapi.go b/loggers/restapi.go index d0f75850..0825b221 100644 --- a/loggers/restapi.go +++ b/loggers/restapi.go @@ -53,6 +53,7 @@ type RestAPI struct { httpserver net.Listener httpmux *http.ServeMux config *dnsutils.Config + configChan chan *dnsutils.Config logger *logger.Logger name string @@ -79,6 +80,7 @@ func NewRestAPI(config *dnsutils.Config, logger *logger.Logger, name string) *Re stopRun: make(chan bool), doneRun: make(chan bool), config: config, + configChan: make(chan *dnsutils.Config), inputChan: make(chan dnsutils.DnsMessage, config.Loggers.RestAPI.ChannelBufferSize), outputChan: make(chan dnsutils.DnsMessage, config.Loggers.RestAPI.ChannelBufferSize), logger: logger, @@ -117,6 +119,11 @@ func (o *RestAPI) ReadConfig() { } } +func (o *RestAPI) ReloadConfig(config *dnsutils.Config) { + o.LogInfo("reload configuration!") + o.configChan <- config +} + func (o *RestAPI) LogInfo(msg string, v ...interface{}) { o.logger.Info("["+o.name+"] logger=restapi - "+msg, v...) } @@ -709,6 +716,15 @@ RUN_LOOP: subprocessors.Reset() s.doneRun <- true break RUN_LOOP + + case cfg, opened := <-s.configChan: + if !opened { + return + } + s.config = cfg + s.ReadConfig() + subprocessors.ReloadConfig(&cfg.OutgoingTransformers) + case dm, opened := <-s.inputChan: if !opened { s.LogInfo("input channel closed!") diff --git a/loggers/scalyr.go b/loggers/scalyr.go index b986865f..b849d1e9 100644 --- a/loggers/scalyr.go +++ b/loggers/scalyr.go @@ -34,6 +34,7 @@ type ScalyrClient struct { logger *logger.Logger name string config *dnsutils.Config + configChan chan *dnsutils.Config mode string textFormat []string @@ -62,6 +63,7 @@ func NewScalyrClient(config *dnsutils.Config, console *logger.Logger, name strin logger: console, name: name, config: config, + configChan: make(chan *dnsutils.Config), mode: dnsutils.MODE_TEXT, endpoint: makeEndpoint("app.scalyr.com"), @@ -142,6 +144,11 @@ func (c *ScalyrClient) ReadConfig() { c.httpclient = &http.Client{Transport: tr} } +func (o *ScalyrClient) ReloadConfig(config *dnsutils.Config) { + o.LogInfo("reload configuration!") + o.configChan <- config +} + func (o *ScalyrClient) Run() { o.LogInfo("running in background...") @@ -164,6 +171,14 @@ RUN_LOOP: o.doneRun <- true break RUN_LOOP + case cfg, opened := <-o.configChan: + if !opened { + return + } + o.config = cfg + o.ReadConfig() + subprocessors.ReloadConfig(&cfg.OutgoingTransformers) + case dm, opened := <-o.inputChan: if !opened { o.LogInfo("input channel closed!") diff --git a/loggers/statsd.go b/loggers/statsd.go index ee5df223..39522153 100644 --- a/loggers/statsd.go +++ b/loggers/statsd.go @@ -49,6 +49,7 @@ type StatsdClient struct { inputChan chan dnsutils.DnsMessage outputChan chan dnsutils.DnsMessage config *dnsutils.Config + configChan chan *dnsutils.Config logger *logger.Logger name string @@ -68,6 +69,7 @@ func NewStatsdClient(config *dnsutils.Config, logger *logger.Logger, name string outputChan: make(chan dnsutils.DnsMessage, config.Loggers.Statsd.ChannelBufferSize), logger: logger, config: config, + configChan: make(chan *dnsutils.Config), name: name, Stats: StreamStats{Streams: make(map[string]*StatsPerStream)}, } @@ -88,6 +90,11 @@ func (o *StatsdClient) ReadConfig() { } } +func (o *StatsdClient) ReloadConfig(config *dnsutils.Config) { + o.LogInfo("reload configuration!") + o.configChan <- config +} + func (o *StatsdClient) LogInfo(msg string, v ...interface{}) { o.logger.Info("["+o.name+"] logger=statsd - "+msg, v...) } @@ -245,6 +252,14 @@ RUN_LOOP: o.doneRun <- true break RUN_LOOP + case cfg, opened := <-o.configChan: + if !opened { + return + } + o.config = cfg + o.ReadConfig() + subprocessors.ReloadConfig(&cfg.OutgoingTransformers) + case dm, opened := <-o.inputChan: if !opened { o.LogInfo("input channel closed!") diff --git a/loggers/stdout.go b/loggers/stdout.go index 7745adca..120ce1ab 100644 --- a/loggers/stdout.go +++ b/loggers/stdout.go @@ -37,6 +37,7 @@ type StdOut struct { outputChan chan dnsutils.DnsMessage textFormat []string config *dnsutils.Config + configChan chan *dnsutils.Config logger *logger.Logger stdout *log.Logger name string @@ -53,6 +54,7 @@ func NewStdOut(config *dnsutils.Config, console *logger.Logger, name string) *St outputChan: make(chan dnsutils.DnsMessage, config.Loggers.Stdout.ChannelBufferSize), logger: console, config: config, + configChan: make(chan *dnsutils.Config), stdout: log.New(os.Stdout, "", 0), name: name, } @@ -75,6 +77,11 @@ func (c *StdOut) ReadConfig() { } } +func (o *StdOut) ReloadConfig(config *dnsutils.Config) { + o.LogInfo("reload configuration!") + o.configChan <- config +} + func (c *StdOut) LogInfo(msg string, v ...interface{}) { c.logger.Info("["+c.name+"] logger=stdout - "+msg, v...) } @@ -121,6 +128,16 @@ RUN_LOOP: subprocessors.Reset() o.doneRun <- true break RUN_LOOP + + // new config provided? + case cfg, opened := <-o.configChan: + if !opened { + return + } + o.config = cfg + o.ReadConfig() + subprocessors.ReloadConfig(&cfg.OutgoingTransformers) + case dm, opened := <-o.inputChan: if !opened { o.LogInfo("input channel closed!") diff --git a/loggers/syslog.go b/loggers/syslog.go index 105179fd..075d30e4 100644 --- a/loggers/syslog.go +++ b/loggers/syslog.go @@ -60,6 +60,7 @@ type Syslog struct { inputChan chan dnsutils.DnsMessage outputChan chan dnsutils.DnsMessage config *dnsutils.Config + configChan chan *dnsutils.Config logger *logger.Logger severity syslog.Priority facility syslog.Priority @@ -84,6 +85,7 @@ func NewSyslog(config *dnsutils.Config, console *logger.Logger, name string) *Sy transportReconnect: make(chan bool), logger: console, config: config, + configChan: make(chan *dnsutils.Config), name: name, } o.ReadConfig() @@ -121,6 +123,11 @@ func (c *Syslog) ReadConfig() { } } +func (o *Syslog) ReloadConfig(config *dnsutils.Config) { + o.LogInfo("reload configuration!") + o.configChan <- config +} + func (o *Syslog) Channel() chan dnsutils.DnsMessage { return o.inputChan } @@ -234,6 +241,15 @@ RUN_LOOP: o.doneRun <- true break RUN_LOOP + // new config provided? + case cfg, opened := <-o.configChan: + if !opened { + return + } + o.config = cfg + o.ReadConfig() + subprocessors.ReloadConfig(&cfg.OutgoingTransformers) + case dm, opened := <-o.inputChan: if !opened { o.LogInfo("input channel closed!") diff --git a/loggers/tcpclient.go b/loggers/tcpclient.go index 56ce57f7..16d1bd27 100644 --- a/loggers/tcpclient.go +++ b/loggers/tcpclient.go @@ -22,6 +22,7 @@ type TcpClient struct { inputChan chan dnsutils.DnsMessage outputChan chan dnsutils.DnsMessage config *dnsutils.Config + configChan chan *dnsutils.Config logger *logger.Logger textFormat []string name string @@ -45,6 +46,7 @@ func NewTcpClient(config *dnsutils.Config, logger *logger.Logger, name string) * transportReconnect: make(chan bool), logger: logger, config: config, + configChan: make(chan *dnsutils.Config), name: name, } @@ -69,6 +71,11 @@ func (o *TcpClient) ReadConfig() { } } +func (o *TcpClient) ReloadConfig(config *dnsutils.Config) { + o.LogInfo("reload configuration!") + o.configChan <- config +} + func (o *TcpClient) LogInfo(msg string, v ...interface{}) { o.logger.Info("["+o.name+"] logger=tcpclient - "+msg, v...) } @@ -210,6 +217,14 @@ RUN_LOOP: o.doneRun <- true break RUN_LOOP + case cfg, opened := <-o.configChan: + if !opened { + return + } + o.config = cfg + o.ReadConfig() + subprocessors.ReloadConfig(&cfg.OutgoingTransformers) + case dm, opened := <-o.inputChan: if !opened { o.LogInfo("input channel closed!") diff --git a/collectors/dns_processor.go b/processors/dns.go similarity index 95% rename from collectors/dns_processor.go rename to processors/dns.go index c72d3375..68bda37b 100644 --- a/collectors/dns_processor.go +++ b/processors/dns.go @@ -1,4 +1,4 @@ -package collectors +package processors import ( "fmt" @@ -24,6 +24,7 @@ type DnsProcessor struct { recvFrom chan dnsutils.DnsMessage logger *logger.Logger config *dnsutils.Config + ConfigChan chan *dnsutils.Config name string dropped chan string droppedCount map[string]int @@ -39,18 +40,14 @@ func NewDnsProcessor(config *dnsutils.Config, logger *logger.Logger, name string recvFrom: make(chan dnsutils.DnsMessage, size), logger: logger, config: config, + ConfigChan: make(chan *dnsutils.Config), name: name, dropped: make(chan string), droppedCount: map[string]int{}, } - - d.ReadConfig() - return d } -func (d *DnsProcessor) ReadConfig() {} - func (c *DnsProcessor) LogInfo(msg string, v ...interface{}) { c.logger.Info("["+c.name+"] processor=dns - "+msg, v...) } @@ -125,9 +122,13 @@ func (d *DnsProcessor) Run(loggersChannel []chan dnsutils.DnsMessage, loggersNam RUN_LOOP: for { select { + case cfg := <-d.ConfigChan: + d.config = cfg + transforms.ReloadConfig(&cfg.IngoingTransformers) + case <-d.stopRun: transforms.Reset() - close(d.recvFrom) + //close(d.recvFrom) d.doneRun <- true break RUN_LOOP diff --git a/collectors/dnstap_processor.go b/processors/dnstap.go similarity index 94% rename from collectors/dnstap_processor.go rename to processors/dnstap.go index c21f1921..70d8956b 100644 --- a/collectors/dnstap_processor.go +++ b/processors/dnstap.go @@ -1,4 +1,4 @@ -package collectors +package processors import ( "fmt" @@ -49,7 +49,7 @@ func GetFakeDnstap(dnsquery []byte) *dnstap.Dnstap { } type DnstapProcessor struct { - connId int + ConnId int doneRun chan bool stopRun chan bool doneMonitor chan bool @@ -57,6 +57,7 @@ type DnstapProcessor struct { recvFrom chan []byte logger *logger.Logger config *dnsutils.Config + ConfigChan chan *dnsutils.Config name string chanSize int dropped chan string @@ -67,7 +68,7 @@ func NewDnstapProcessor(connId int, config *dnsutils.Config, logger *logger.Logg logger.Info("[%s] processor=dnstap#%d - initialization...", name, connId) d := DnstapProcessor{ - connId: connId, + ConnId: connId, doneMonitor: make(chan bool), doneRun: make(chan bool), stopMonitor: make(chan bool), @@ -76,36 +77,31 @@ func NewDnstapProcessor(connId int, config *dnsutils.Config, logger *logger.Logg chanSize: size, logger: logger, config: config, + ConfigChan: make(chan *dnsutils.Config), name: name, dropped: make(chan string), droppedCount: map[string]int{}, } - d.ReadConfig() - return d } -func (d *DnstapProcessor) ReadConfig() { - // todo - checking settings -} - func (c *DnstapProcessor) LogInfo(msg string, v ...interface{}) { var log string - if c.connId == 0 { + if c.ConnId == 0 { log = fmt.Sprintf("[%s] processor=dnstap - ", c.name) } else { - log = fmt.Sprintf("[%s] processor=dnstap#%d - ", c.name, c.connId) + log = fmt.Sprintf("[%s] processor=dnstap#%d - ", c.name, c.ConnId) } c.logger.Info(log+msg, v...) } func (c *DnstapProcessor) LogError(msg string, v ...interface{}) { var log string - if c.connId == 0 { + if c.ConnId == 0 { log = fmt.Sprintf("[%s] processor=dnstap - ", c.name) } else { - log = fmt.Sprintf("[%s] processor=dnstap#%d - ", c.name, c.connId) + log = fmt.Sprintf("[%s] processor=dnstap#%d - ", c.name, c.ConnId) } c.logger.Error(log+msg, v...) } @@ -161,7 +157,7 @@ func (d *DnstapProcessor) Run(loggersChannel []chan dnsutils.DnsMessage, loggers dt := &dnstap.Dnstap{} // prepare enabled transformers - transforms := transformers.NewTransforms(&d.config.IngoingTransformers, d.logger, d.name, loggersChannel, d.connId) + transforms := transformers.NewTransforms(&d.config.IngoingTransformers, d.logger, d.name, loggersChannel, d.ConnId) // start goroutine to count dropped messsages go d.MonitorLoggers() @@ -171,9 +167,12 @@ func (d *DnstapProcessor) Run(loggersChannel []chan dnsutils.DnsMessage, loggers RUN_LOOP: for { select { + case cfg := <-d.ConfigChan: + d.config = cfg + transforms.ReloadConfig(&cfg.IngoingTransformers) + case <-d.stopRun: transforms.Reset() - //close(d.recvFrom) d.doneRun <- true break RUN_LOOP diff --git a/collectors/dnstap_processor_test.go b/processors/dnstap_test.go similarity index 99% rename from collectors/dnstap_processor_test.go rename to processors/dnstap_test.go index 9f8bd609..30c7a999 100644 --- a/collectors/dnstap_processor_test.go +++ b/processors/dnstap_test.go @@ -1,4 +1,4 @@ -package collectors +package processors import ( "bytes" diff --git a/collectors/powerdns_processor.go b/processors/powerdns.go similarity index 95% rename from collectors/powerdns_processor.go rename to processors/powerdns.go index 21ffa12a..5ac7e1ac 100644 --- a/collectors/powerdns_processor.go +++ b/processors/powerdns.go @@ -1,4 +1,4 @@ -package collectors +package processors import ( "fmt" @@ -25,7 +25,7 @@ var ( ) type PdnsProcessor struct { - connId int + ConnId int doneRun chan bool stopRun chan bool doneMonitor chan bool @@ -33,6 +33,7 @@ type PdnsProcessor struct { recvFrom chan []byte logger *logger.Logger config *dnsutils.Config + ConfigChan chan *dnsutils.Config name string chanSize int dropped chan string @@ -42,7 +43,7 @@ type PdnsProcessor struct { func NewPdnsProcessor(connId int, config *dnsutils.Config, logger *logger.Logger, name string, size int) PdnsProcessor { logger.Info("[%s] processor=pdns#%d - initialization...", name, connId) d := PdnsProcessor{ - connId: connId, + ConnId: connId, doneMonitor: make(chan bool), doneRun: make(chan bool), stopMonitor: make(chan bool), @@ -51,36 +52,30 @@ func NewPdnsProcessor(connId int, config *dnsutils.Config, logger *logger.Logger chanSize: size, logger: logger, config: config, + ConfigChan: make(chan *dnsutils.Config), name: name, dropped: make(chan string), droppedCount: map[string]int{}, } - - d.ReadConfig() - return d } -func (c *PdnsProcessor) ReadConfig() { - // nothing to read -} - func (c *PdnsProcessor) LogInfo(msg string, v ...interface{}) { var log string - if c.connId == 0 { + if c.ConnId == 0 { log = fmt.Sprintf("[%s] processor=powerdns - ", c.name) } else { - log = fmt.Sprintf("[%s] processor=powerdns#%d - ", c.name, c.connId) + log = fmt.Sprintf("[%s] processor=powerdns#%d - ", c.name, c.ConnId) } c.logger.Info(log+msg, v...) } func (c *PdnsProcessor) LogError(msg string, v ...interface{}) { var log string - if c.connId == 0 { + if c.ConnId == 0 { log = fmt.Sprintf("[%s] processor=powerdns - ", c.name) } else { - log = fmt.Sprintf("[%s] processor=powerdns#%d - ", c.name, c.connId) + log = fmt.Sprintf("[%s] processor=powerdns#%d - ", c.name, c.ConnId) } c.logger.Error(log+msg, v...) } @@ -137,7 +132,7 @@ func (d *PdnsProcessor) Run(loggersChannel []chan dnsutils.DnsMessage, loggersNa pbdm := &powerdns_protobuf.PBDNSMessage{} // prepare enabled transformers - transforms := transformers.NewTransforms(&d.config.IngoingTransformers, d.logger, d.name, loggersChannel, d.connId) + transforms := transformers.NewTransforms(&d.config.IngoingTransformers, d.logger, d.name, loggersChannel, d.ConnId) // start goroutine to count dropped messsages go d.MonitorLoggers() @@ -147,6 +142,10 @@ func (d *PdnsProcessor) Run(loggersChannel []chan dnsutils.DnsMessage, loggersNa RUN_LOOP: for { select { + case cfg := <-d.ConfigChan: + d.config = cfg + transforms.ReloadConfig(&cfg.IngoingTransformers) + case <-d.stopRun: transforms.Reset() //close(d.recvFrom) diff --git a/collectors/powerdns_processor_test.go b/processors/powerdns_test.go similarity index 99% rename from collectors/powerdns_processor_test.go rename to processors/powerdns_test.go index 41b205ba..1dee486e 100644 --- a/collectors/powerdns_processor_test.go +++ b/processors/powerdns_test.go @@ -1,4 +1,4 @@ -package collectors +package processors import ( "testing" diff --git a/testsdata/filtering_keep_domains.txt b/testsdata/filtering_keep_domains.txt index 0b2541a5..0f1a9be3 100644 --- a/testsdata/filtering_keep_domains.txt +++ b/testsdata/filtering_keep_domains.txt @@ -1,2 +1,2 @@ google.fr -test.github.com +test.github.com \ No newline at end of file diff --git a/transformers/extract.go b/transformers/extract.go index b4c882f7..7f66c0a7 100644 --- a/transformers/extract.go +++ b/transformers/extract.go @@ -31,6 +31,10 @@ func NewExtractSubprocessor(config *dnsutils.ConfigTransformers, logger *logger. return s } +func (p *ExtractProcessor) ReloadConfig(config *dnsutils.ConfigTransformers) { + p.config = config +} + func (p *ExtractProcessor) InitDnsMessage(dm *dnsutils.DnsMessage) { if dm.Extracted == nil { dm.Extracted = &dnsutils.TransformExtracted{ diff --git a/transformers/filtering.go b/transformers/filtering.go index 90d73d66..d9632300 100644 --- a/transformers/filtering.go +++ b/transformers/filtering.go @@ -66,17 +66,13 @@ func NewFilteringProcessor(config *dnsutils.ConfigTransformers, logger *logger.L logInfo: logInfo, logError: logError, } - - d.LoadRcodes() - d.LoadDomainsList() - d.LoadQueryIpList() - d.LoadrDataIpList() - - d.LoadActiveFilters() - return d } +func (p *FilteringProcessor) ReloadConfig(config *dnsutils.ConfigTransformers) { + p.config = config +} + func (p *FilteringProcessor) LogInfo(msg string, v ...interface{}) { log := fmt.Sprintf("transformer=filtering#%d - ", p.instance) p.logInfo(log+msg, v...) @@ -90,12 +86,17 @@ func (p *FilteringProcessor) LogError(msg string, v ...interface{}) { func (p *FilteringProcessor) LoadActiveFilters() { // TODO: Change to iteration through Filtering to add filters in custom order. + // clean the slice + p.activeFilters = p.activeFilters[:0] + if !p.config.Filtering.LogQueries { p.activeFilters = append(p.activeFilters, p.ignoreQueryFilter) + p.LogInfo("drop queries subprocessor is enabled") } if !p.config.Filtering.LogReplies { p.activeFilters = append(p.activeFilters, p.ignoreReplyFilter) + p.LogInfo("drop replies subprocessor is enabled") } if len(p.mapRcodes) > 0 { @@ -135,76 +136,20 @@ func (p *FilteringProcessor) LoadActiveFilters() { p.downsample = p.config.Filtering.Downsample p.downsampleCount = 0 p.activeFilters = append(p.activeFilters, p.downsampleFilter) + p.LogInfo("down sampling subprocessor is enabled") } } func (p *FilteringProcessor) LoadRcodes() { - for _, v := range p.config.Filtering.DropRcodes { - p.mapRcodes[v] = true + // empty + for key := range p.mapRcodes { + delete(p.mapRcodes, key) } -} -func (p *FilteringProcessor) loadQueryIpList(fname string, drop bool) (uint64, error) { - file, err := os.Open(fname) - if err != nil { - return 0, err - } - - scanner := bufio.NewScanner(file) - var read uint64 - var ipsetbuilder netaddr.IPSetBuilder - for scanner.Scan() { - read++ - ipOrPrefix := strings.ToLower(scanner.Text()) - prefix, err := netaddr.ParseIPPrefix(ipOrPrefix) - if err != nil { - ip, err := netaddr.ParseIP(ipOrPrefix) - if err != nil { - p.LogError("%s in in %s is neither an IP address nor a prefix", ipOrPrefix, fname) - continue - } - ipsetbuilder.Add(ip) - continue - } - ipsetbuilder.AddPrefix(prefix) - } - if drop { - p.ipsetDrop, err = ipsetbuilder.IPSet() - } else { - p.ipsetKeep, err = ipsetbuilder.IPSet() - } - - return read, err -} - -func (p *FilteringProcessor) loadKeepRdataIpList(fname string) (uint64, error) { - file, err := os.Open(fname) - if err != nil { - return 0, err - } - - scanner := bufio.NewScanner(file) - var read uint64 - var ipsetbuilder netaddr.IPSetBuilder - for scanner.Scan() { - read++ - ipOrPrefix := strings.ToLower(scanner.Text()) - prefix, err := netaddr.ParseIPPrefix(ipOrPrefix) - if err != nil { - ip, err := netaddr.ParseIP(ipOrPrefix) - if err != nil { - p.LogError("%s in in %s is neither an IP address nor a prefix", ipOrPrefix, fname) - continue - } - ipsetbuilder.Add(ip) - continue - } - ipsetbuilder.AddPrefix(prefix) + // add + for _, v := range p.config.Filtering.DropRcodes { + p.mapRcodes[v] = true } - - p.rDataIpsetKeep, err = ipsetbuilder.IPSet() - - return read, err } func (p *FilteringProcessor) LoadQueryIpList() { @@ -236,6 +181,23 @@ func (p *FilteringProcessor) LoadrDataIpList() { } func (p *FilteringProcessor) LoadDomainsList() { + // before to start, reset all maps + p.dropDomains = false + p.keepDomains = false + + for key := range p.listFqdns { + delete(p.listFqdns, key) + } + for key := range p.listDomainsRegex { + delete(p.listDomainsRegex, key) + } + for key := range p.listKeepFqdns { + delete(p.listKeepFqdns, key) + } + for key := range p.listKeepDomainsRegex { + delete(p.listKeepDomainsRegex, key) + } + if len(p.config.Filtering.DropFqdnFile) > 0 { file, err := os.Open(p.config.Filtering.DropFqdnFile) if err != nil { @@ -304,6 +266,81 @@ func (p *FilteringProcessor) LoadDomainsList() { } } +func (p *FilteringProcessor) loadQueryIpList(fname string, drop bool) (uint64, error) { + var emptyIPSet *netaddr.IPSet + p.ipsetDrop = emptyIPSet + p.ipsetKeep = emptyIPSet + + file, err := os.Open(fname) + if err != nil { + return 0, err + } + + scanner := bufio.NewScanner(file) + var read uint64 + var ipsetbuilder netaddr.IPSetBuilder + for scanner.Scan() { + read++ + ipOrPrefix := strings.ToLower(scanner.Text()) + prefix, err := netaddr.ParseIPPrefix(ipOrPrefix) + if err != nil { + ip, err := netaddr.ParseIP(ipOrPrefix) + if err != nil { + p.LogError("%s in in %s is neither an IP address nor a prefix", ipOrPrefix, fname) + continue + } + ipsetbuilder.Add(ip) + continue + } + ipsetbuilder.AddPrefix(prefix) + } + + file.Close() + + if drop { + p.ipsetDrop, err = ipsetbuilder.IPSet() + } else { + p.ipsetKeep, err = ipsetbuilder.IPSet() + } + + return read, err +} + +func (p *FilteringProcessor) loadKeepRdataIpList(fname string) (uint64, error) { + var emptyIPSet *netaddr.IPSet + p.rDataIpsetKeep = emptyIPSet + + file, err := os.Open(fname) + if err != nil { + return 0, err + } + + scanner := bufio.NewScanner(file) + var read uint64 + var ipsetbuilder netaddr.IPSetBuilder + for scanner.Scan() { + read++ + ipOrPrefix := strings.ToLower(scanner.Text()) + prefix, err := netaddr.ParseIPPrefix(ipOrPrefix) + if err != nil { + ip, err := netaddr.ParseIP(ipOrPrefix) + if err != nil { + p.LogError("%s in in %s is neither an IP address nor a prefix", ipOrPrefix, fname) + continue + } + ipsetbuilder.Add(ip) + continue + } + ipsetbuilder.AddPrefix(prefix) + } + + file.Close() + + p.rDataIpsetKeep, err = ipsetbuilder.IPSet() + + return read, err +} + func (p *FilteringProcessor) Run() { for { select { diff --git a/transformers/filtering_test.go b/transformers/filtering_test.go index 0e79e043..632cd9f4 100644 --- a/transformers/filtering_test.go +++ b/transformers/filtering_test.go @@ -15,6 +15,7 @@ const ( func TestFilteringQR(t *testing.T) { // config config := dnsutils.GetFakeConfigTransformers() + config.Filtering.Enable = true config.Filtering.LogQueries = false config.Filtering.LogReplies = false @@ -23,6 +24,7 @@ func TestFilteringQR(t *testing.T) { // init subproccesor filtering := NewFilteringProcessor(config, logger.New(false), "test", 0, outChans, log.Info, log.Error) + filtering.LoadActiveFilters() dm := dnsutils.GetFakeDnsMessage() if !filtering.CheckIfDrop(&dm) { @@ -39,6 +41,7 @@ func TestFilteringQR(t *testing.T) { func TestFilteringByRcodeNOERROR(t *testing.T) { // config config := dnsutils.GetFakeConfigTransformers() + config.Filtering.Enable = true config.Filtering.DropRcodes = []string{"NOERROR"} log := logger.New(false) @@ -46,6 +49,8 @@ func TestFilteringByRcodeNOERROR(t *testing.T) { // init subproccesor filtering := NewFilteringProcessor(config, logger.New(false), "test", 0, outChans, log.Info, log.Error) + filtering.LoadRcodes() + filtering.LoadActiveFilters() dm := dnsutils.GetFakeDnsMessage() if filtering.CheckIfDrop(&dm) == false { @@ -57,6 +62,7 @@ func TestFilteringByRcodeNOERROR(t *testing.T) { func TestFilteringByRcodeEmpty(t *testing.T) { // config config := dnsutils.GetFakeConfigTransformers() + config.Filtering.Enable = true config.Filtering.DropRcodes = []string{} log := logger.New(false) @@ -64,6 +70,8 @@ func TestFilteringByRcodeEmpty(t *testing.T) { // init subproccesor filtering := NewFilteringProcessor(config, logger.New(false), "test", 0, outChans, log.Info, log.Error) + filtering.LoadRcodes() + filtering.LoadActiveFilters() dm := dnsutils.GetFakeDnsMessage() if filtering.CheckIfDrop(&dm) == true { @@ -74,6 +82,7 @@ func TestFilteringByRcodeEmpty(t *testing.T) { func TestFilteringByKeepQueryIp(t *testing.T) { // config config := dnsutils.GetFakeConfigTransformers() + config.Filtering.Enable = true config.Filtering.KeepQueryIpFile = "../testsdata/filtering_queryip_keep.txt" log := logger.New(false) @@ -81,6 +90,8 @@ func TestFilteringByKeepQueryIp(t *testing.T) { // init subproccesor filtering := NewFilteringProcessor(config, logger.New(false), "test", 0, outChans, log.Info, log.Error) + filtering.LoadQueryIpList() + filtering.LoadActiveFilters() dm := dnsutils.GetFakeDnsMessage() dm.NetworkInfo.QueryIp = "192.168.0.1" @@ -103,6 +114,7 @@ func TestFilteringByKeepQueryIp(t *testing.T) { func TestFilteringByDropQueryIp(t *testing.T) { // config config := dnsutils.GetFakeConfigTransformers() + config.Filtering.Enable = true config.Filtering.DropQueryIpFile = "../testsdata/filtering_queryip.txt" log := logger.New(false) @@ -110,6 +122,8 @@ func TestFilteringByDropQueryIp(t *testing.T) { // init subproccesor filtering := NewFilteringProcessor(config, logger.New(false), "test", 0, outChans, log.Info, log.Error) + filtering.LoadQueryIpList() + filtering.LoadActiveFilters() dm := dnsutils.GetFakeDnsMessage() dm.NetworkInfo.QueryIp = "192.168.0.1" @@ -132,6 +146,7 @@ func TestFilteringByDropQueryIp(t *testing.T) { func TestFilteringByKeepRdataIp(t *testing.T) { // config config := dnsutils.GetFakeConfigTransformers() + config.Filtering.Enable = true config.Filtering.KeepRdataFile = "../testsdata/filtering_rdataip_keep.txt" log := logger.New(false) @@ -139,6 +154,8 @@ func TestFilteringByKeepRdataIp(t *testing.T) { // init subproccesor filtering := NewFilteringProcessor(config, logger.New(false), "test", 0, outChans, log.Info, log.Error) + filtering.LoadrDataIpList() + filtering.LoadActiveFilters() dm := dnsutils.GetFakeDnsMessage() dm.DNS.DnsRRs.Answers = []dnsutils.DnsAnswer{ @@ -225,6 +242,7 @@ func TestFilteringByKeepRdataIp(t *testing.T) { func TestFilteringByFqdn(t *testing.T) { // config config := dnsutils.GetFakeConfigTransformers() + config.Filtering.Enable = true config.Filtering.DropFqdnFile = "../testsdata/filtering_fqdn.txt" log := logger.New(false) @@ -232,6 +250,8 @@ func TestFilteringByFqdn(t *testing.T) { // init subproccesor filtering := NewFilteringProcessor(config, logger.New(false), "test", 0, outChans, log.Info, log.Error) + filtering.LoadDomainsList() + filtering.LoadActiveFilters() dm := dnsutils.GetFakeDnsMessage() dm.DNS.Qname = "www.microsoft.com" @@ -248,6 +268,7 @@ func TestFilteringByFqdn(t *testing.T) { func TestFilteringByDomainRegex(t *testing.T) { // config config := dnsutils.GetFakeConfigTransformers() + config.Filtering.Enable = true config.Filtering.DropDomainFile = "../testsdata/filtering_fqdn_regex.txt" log := logger.New(false) @@ -255,6 +276,8 @@ func TestFilteringByDomainRegex(t *testing.T) { // init subproccesor filtering := NewFilteringProcessor(config, logger.New(false), "test", 0, outChans, log.Info, log.Error) + filtering.LoadDomainsList() + filtering.LoadActiveFilters() dm := dnsutils.GetFakeDnsMessage() dm.DNS.Qname = TEST_URL1 @@ -281,10 +304,13 @@ func TestFilteringByKeepDomain(t *testing.T) { outChans := []chan dnsutils.DnsMessage{} // file contains google.fr, test.github.com + config.Filtering.Enable = true config.Filtering.KeepDomainFile = "../testsdata/filtering_keep_domains.txt" // init subproccesor filtering := NewFilteringProcessor(config, logger.New(false), "test", 0, outChans, log.Info, log.Error) + filtering.LoadDomainsList() + filtering.LoadActiveFilters() dm := dnsutils.GetFakeDnsMessage() dm.DNS.Qname = TEST_URL1 @@ -320,10 +346,13 @@ func TestFilteringByKeepDomainRegex(t *testing.T) { test.github.com$ .+.google.com$ */ + config.Filtering.Enable = true config.Filtering.KeepDomainFile = "../testsdata/filtering_keep_domains_regex.txt" // init subproccesor filtering := NewFilteringProcessor(config, logger.New(false), "test", 0, outChans, log.Info, log.Error) + filtering.LoadDomainsList() + filtering.LoadActiveFilters() dm := dnsutils.GetFakeDnsMessage() dm.DNS.Qname = TEST_URL1 @@ -352,6 +381,7 @@ func TestFilteringByKeepDomainRegex(t *testing.T) { func TestFilteringByDownsample(t *testing.T) { // config config := dnsutils.GetFakeConfigTransformers() + config.Filtering.Enable = true config.Filtering.Downsample = 2 log := logger.New(false) @@ -359,6 +389,8 @@ func TestFilteringByDownsample(t *testing.T) { // init subproccesor filtering := NewFilteringProcessor(config, logger.New(false), "test", 0, outChans, log.Info, log.Error) + filtering.LoadActiveFilters() + dm := dnsutils.GetFakeDnsMessage() // filtering.downsampleCount @@ -394,6 +426,7 @@ func TestFilteringByDownsample(t *testing.T) { func TestFilteringMultipleFilters(t *testing.T) { // config config := dnsutils.GetFakeConfigTransformers() + config.Filtering.Enable = true config.Filtering.DropDomainFile = "../testsdata/filtering_fqdn_regex.txt" config.Filtering.DropQueryIpFile = "../testsdata/filtering_queryip.txt" @@ -402,6 +435,9 @@ func TestFilteringMultipleFilters(t *testing.T) { // init subproccesor filtering := NewFilteringProcessor(config, logger.New(false), "test", 0, outChans, log.Info, log.Error) + filtering.LoadQueryIpList() + filtering.LoadDomainsList() + filtering.LoadActiveFilters() dm := dnsutils.GetFakeDnsMessage() dm.DNS.Qname = TEST_URL1 diff --git a/transformers/geoip.go b/transformers/geoip.go index 10a959c4..1e46ea03 100644 --- a/transformers/geoip.go +++ b/transformers/geoip.go @@ -63,6 +63,10 @@ func NewDnsGeoIpProcessor(config *dnsutils.ConfigTransformers, logger *logger.Lo return d } +func (p *GeoIpProcessor) ReloadConfig(config *dnsutils.ConfigTransformers) { + p.config = config +} + func (p *GeoIpProcessor) LogInfo(msg string, v ...interface{}) { log := fmt.Sprintf("transformer=geoip#%d - ", p.instance) p.logInfo(log+msg, v...) @@ -86,6 +90,12 @@ func (p *GeoIpProcessor) InitDnsMessage(dm *dnsutils.DnsMessage) { } func (p *GeoIpProcessor) Open() (err error) { + // before to open, close all files + // because open can be called also on reload + p.enabled = false + p.Close() + + // open files ? if len(p.config.GeoIP.DbCountryFile) > 0 { p.dbCountry, err = maxminddb.Open(p.config.GeoIP.DbCountryFile) if err != nil { diff --git a/transformers/latency.go b/transformers/latency.go index 926a6e68..3ca1e4bc 100644 --- a/transformers/latency.go +++ b/transformers/latency.go @@ -27,6 +27,10 @@ func NewMapQueries(ttl time.Duration, channels []chan dnsutils.DnsMessage) MapQu } } +func (mp *MapQueries) SetTtl(ttl time.Duration) { + mp.ttl = ttl +} + func (mp *MapQueries) Exists(key uint64) (ok bool) { mp.RLock() defer mp.RUnlock() @@ -69,6 +73,10 @@ func NewHashQueries(ttl time.Duration) HashQueries { } } +func (mp *HashQueries) SetTtl(ttl time.Duration) { + mp.ttl = ttl +} + func (mp *HashQueries) Get(key uint64) (value int64, ok bool) { mp.RLock() defer mp.RUnlock() @@ -124,6 +132,13 @@ func NewLatencySubprocessor(config *dnsutils.ConfigTransformers, logger *logger. return &s } +func (s *LatencyProcessor) ReloadConfig(config *dnsutils.ConfigTransformers) { + s.config = config + + s.hashQueries.SetTtl(time.Duration(config.Latency.QueriesTimeout) * time.Second) + s.mapQueries.SetTtl(time.Duration(config.Latency.QueriesTimeout) * time.Second) +} + func (s *LatencyProcessor) MeasureLatency(dm *dnsutils.DnsMessage) { queryport, _ := strconv.Atoi(dm.NetworkInfo.QueryPort) if len(dm.NetworkInfo.QueryIp) > 0 && queryport > 0 && !dm.DNS.MalformedPacket { diff --git a/transformers/machinelearning.go b/transformers/machinelearning.go index b8f1d089..dd52b9dc 100644 --- a/transformers/machinelearning.go +++ b/transformers/machinelearning.go @@ -44,6 +44,10 @@ func NewMachineLearningSubprocessor(config *dnsutils.ConfigTransformers, logger return s } +func (p *MlProcessor) ReloadConfig(config *dnsutils.ConfigTransformers) { + p.config = config +} + func (p *MlProcessor) LogInfo(msg string, v ...interface{}) { log := fmt.Sprintf("transformer=ml#%d - ", p.instance) p.logInfo(log+msg, v...) diff --git a/transformers/normalize.go b/transformers/normalize.go index 842d53e4..ea423b9c 100644 --- a/transformers/normalize.go +++ b/transformers/normalize.go @@ -88,10 +88,13 @@ func NewNormalizeSubprocessor( logError: logError, } - s.LoadActiveProcessors() return s } +func (p *NormalizeProcessor) ReloadConfig(config *dnsutils.ConfigTransformers) { + p.config = config +} + func (p *NormalizeProcessor) LogInfo(msg string, v ...interface{}) { log := fmt.Sprintf("transformer=normalize#%d - ", p.instance) p.logInfo(log+msg, v...) @@ -102,6 +105,9 @@ func (p *NormalizeProcessor) LogError(msg string, v ...interface{}) { } func (p *NormalizeProcessor) LoadActiveProcessors() { + // clean the slice + p.activeProcessors = p.activeProcessors[:0] + if p.config.Normalize.QnameLowerCase { p.activeProcessors = append(p.activeProcessors, p.LowercaseQname) p.LogInfo("lowercase subprocessor is enabled") diff --git a/transformers/reducer.go b/transformers/reducer.go index 2ef95ca0..61fab9d6 100644 --- a/transformers/reducer.go +++ b/transformers/reducer.go @@ -39,6 +39,10 @@ func NewMapTraffic(ttl time.Duration, channels []chan dnsutils.DnsMessage, } } +func (mp *MapTraffic) SetTtl(ttl time.Duration) { + mp.ttl = ttl +} + func (mp *MapTraffic) Set(key string, dm *dnsutils.DnsMessage) { mp.Lock() defer mp.Unlock() @@ -124,12 +128,20 @@ func NewReducerSubprocessor( } s.mapTraffic = NewMapTraffic(time.Duration(config.Reducer.WatchInterval)*time.Second, outChannels, logInfo, logError) - s.LoadActiveReducers() - return &s } +func (p *ReducerProcessor) ReloadConfig(config *dnsutils.ConfigTransformers) { + p.config = config + p.mapTraffic.SetTtl(time.Duration(config.Reducer.WatchInterval) * time.Second) + + p.LoadActiveReducers() +} + func (p *ReducerProcessor) LoadActiveReducers() { + // clean the slice + p.activeProcessors = p.activeProcessors[:0] + if p.config.Reducer.RepetitiveTrafficDetector { p.activeProcessors = append(p.activeProcessors, p.RepetitiveTrafficDetector) go p.mapTraffic.Run() diff --git a/transformers/subprocessors.go b/transformers/subprocessors.go index 304fd15a..7934a616 100644 --- a/transformers/subprocessors.go +++ b/transformers/subprocessors.go @@ -55,12 +55,36 @@ func NewTransforms(config *dnsutils.ConfigTransformers, logger *logger.Logger, n return d } +func (p *Transforms) ReloadConfig(config *dnsutils.ConfigTransformers) { + p.config = config + p.NormalizeTransform.ReloadConfig(config) + p.GeoipTransform.ReloadConfig(config) + p.FilteringTransform.ReloadConfig(config) + p.UserPrivacyTransform.ReloadConfig(config) + p.LatencyTransform.ReloadConfig(config) + p.SuspiciousTransform.ReloadConfig(config) + p.ReducerTransform.ReloadConfig(config) + p.ExtractProcessor.ReloadConfig(config) + p.MachineLearningTransform.ReloadConfig(config) + + p.Prepare() +} + func (p *Transforms) Prepare() error { + // clean the slice + p.activeTransforms = p.activeTransforms[:0] + + if p.config.Normalize.Enable { + prefixlog := fmt.Sprintf("transformer=normalize#%d ", p.instance) + p.LogInfo(prefixlog + "enabled") + + p.NormalizeTransform.LoadActiveProcessors() + } if p.config.GeoIP.Enable { p.activeTransforms = append(p.activeTransforms, p.geoipTransform) - prefixlog := fmt.Sprintf("transformer=geoip#%d - ", p.instance) - p.LogInfo(prefixlog + "is enabled") + prefixlog := fmt.Sprintf("transformer=geoip#%d ", p.instance) + p.LogInfo(prefixlog + "enabled") if err := p.GeoipTransform.Open(); err != nil { p.LogError(prefixlog+"open error %v", err) @@ -89,8 +113,15 @@ func (p *Transforms) Prepare() error { } if p.config.Filtering.Enable { - prefixlog := fmt.Sprintf("transformer=filtering#%d - ", p.instance) - p.LogInfo(prefixlog + "is enabled") + prefixlog := fmt.Sprintf("transformer=filtering#%d ", p.instance) + p.LogInfo(prefixlog + "enabled") + + p.FilteringTransform.LoadRcodes() + p.FilteringTransform.LoadDomainsList() + p.FilteringTransform.LoadQueryIpList() + p.FilteringTransform.LoadrDataIpList() + + p.FilteringTransform.LoadActiveFilters() } if p.config.Latency.Enable { @@ -115,6 +146,8 @@ func (p *Transforms) Prepare() error { if p.config.Reducer.Enable { prefixlog := fmt.Sprintf("transformer=reducer#%d - ", p.instance) p.LogInfo(prefixlog + "is enabled") + + p.ReducerTransform.LoadActiveReducers() } if p.config.Extract.Enable { @@ -123,7 +156,6 @@ func (p *Transforms) Prepare() error { prefixlog := fmt.Sprintf("transformer=extract#%d - ", p.instance) p.LogInfo(prefixlog + "subprocessor add base64 payload is enabled") } - } if p.config.MachineLearning.Enable { @@ -229,6 +261,11 @@ func (p *Transforms) minimazeQname(dm *dnsutils.DnsMessage) int { return RETURN_SUCCESS } +func (p *Transforms) addBase64Payload(dm *dnsutils.DnsMessage) int { + dm.Extracted.Base64Payload = p.ExtractProcessor.AddBase64Payload(dm) + return RETURN_SUCCESS +} + func (p *Transforms) ProcessMessage(dm *dnsutils.DnsMessage) int { // Begin to normalize p.NormalizeTransform.ProcessDnsMessage(dm) @@ -254,8 +291,3 @@ func (p *Transforms) ProcessMessage(dm *dnsutils.DnsMessage) int { return RETURN_SUCCESS } - -func (p *Transforms) addBase64Payload(dm *dnsutils.DnsMessage) int { - dm.Extracted.Base64Payload = p.ExtractProcessor.AddBase64Payload(dm) - return RETURN_SUCCESS -} diff --git a/transformers/subprocessors_test.go b/transformers/subprocessors_test.go index 688d967d..69bf415f 100644 --- a/transformers/subprocessors_test.go +++ b/transformers/subprocessors_test.go @@ -274,6 +274,7 @@ func TestTransformAndFilter(t *testing.T) { config.UserPrivacy.AnonymizeIP = true // file contains google.fr, test.github.com + config.Filtering.Enable = true config.Filtering.KeepDomainFile = "../testsdata/filtering_keep_domains.txt" TEST_URL1 := "mail.google.com" diff --git a/transformers/suspicious.go b/transformers/suspicious.go index c22715ef..38482361 100644 --- a/transformers/suspicious.go +++ b/transformers/suspicious.go @@ -43,6 +43,15 @@ func NewSuspiciousSubprocessor(config *dnsutils.ConfigTransformers, logger *logg } func (p *SuspiciousTransform) ReadConfig() { + // cleanup maps + for key := range p.CommonQtypes { + delete(p.CommonQtypes, key) + } + for key := range p.whitelistDomainsRegex { + delete(p.whitelistDomainsRegex, key) + } + + // load maps for _, v := range p.config.Suspicious.CommonQtypes { p.CommonQtypes[v] = true } @@ -51,6 +60,12 @@ func (p *SuspiciousTransform) ReadConfig() { } } +func (s *SuspiciousTransform) ReloadConfig(config *dnsutils.ConfigTransformers) { + s.config = config + + s.ReadConfig() +} + func (p *SuspiciousTransform) IsEnabled() bool { return p.config.Suspicious.Enable } diff --git a/transformers/userprivacy.go b/transformers/userprivacy.go index efc4a823..4d0d22ad 100644 --- a/transformers/userprivacy.go +++ b/transformers/userprivacy.go @@ -43,6 +43,10 @@ func NewUserPrivacySubprocessor(config *dnsutils.ConfigTransformers, logger *log return s } +func (s *UserPrivacyProcessor) ReloadConfig(config *dnsutils.ConfigTransformers) { + s.config = config +} + func (s *UserPrivacyProcessor) MinimazeQname(qname string) string { if etpo, err := publicsuffix.EffectiveTLDPlusOne(qname); err == nil { return etpo