diff --git a/.github/workflows/testing-go.yml b/.github/workflows/testing-go.yml index fbb46c10..95226535 100644 --- a/.github/workflows/testing-go.yml +++ b/.github/workflows/testing-go.yml @@ -20,7 +20,7 @@ jobs: matrix: os-version: ['ubuntu-22.04', 'macos-latest' ] go-version: [ '1.20', '1.21' ] - package: ['dnsutils', 'collectors', 'loggers', 'transformers', 'netlib'] + package: ['dnsutils', 'collectors', 'loggers', 'transformers', 'netlib', 'processors'] exclude: - os-version: macos-latest go-version: '1.20' diff --git a/collectors/dnstap.go b/collectors/dnstap.go index e9d7f4fb..827ff2ba 100644 --- a/collectors/dnstap.go +++ b/collectors/dnstap.go @@ -14,6 +14,7 @@ import ( "github.com/dmachard/go-dnscollector/dnsutils" "github.com/dmachard/go-dnscollector/netlib" + "github.com/dmachard/go-dnscollector/processors" "github.com/dmachard/go-framestream" "github.com/dmachard/go-logger" ) @@ -21,19 +22,21 @@ import ( type Dnstap struct { doneRun chan bool doneMonitor chan bool + stopRun chan bool stopMonitor chan bool listen net.Listener conns []net.Conn sockPath string loggers []dnsutils.Worker config *dnsutils.Config + configChan chan *dnsutils.Config logger *logger.Logger name string connMode string connId int droppedCount int dropped chan int - tapProcessors []DnstapProcessor + tapProcessors []processors.DnstapProcessor sync.RWMutex } @@ -42,9 +45,11 @@ func NewDnstap(loggers []dnsutils.Worker, config *dnsutils.Config, logger *logge s := &Dnstap{ doneRun: make(chan bool), doneMonitor: make(chan bool), + stopRun: make(chan bool), stopMonitor: make(chan bool), dropped: make(chan int), config: config, + configChan: make(chan *dnsutils.Config), loggers: loggers, logger: logger, name: name, @@ -86,16 +91,8 @@ func (c *Dnstap) ReadConfig() { } func (c *Dnstap) ReloadConfig(config *dnsutils.Config) { - c.LogInfo("reload config...") - - // save the new config - c.config = config - c.ReadConfig() - - // refresh config for all conns - for i := range c.tapProcessors { - c.tapProcessors[i].ReloadConfig(config) - } + c.LogInfo("reload configuration...") + c.configChan <- config } func (c *Dnstap) LogInfo(msg string, v ...interface{}) { @@ -131,7 +128,7 @@ func (c *Dnstap) HandleConn(conn net.Conn) { c.LogConnInfo(connId, "new connection from %s", peer) // start dnstap subprocessor - dnstapProcessor := NewDnstapProcessor(connId, c.config, c.logger, c.name, c.config.Collectors.Dnstap.ChannelBufferSize) + dnstapProcessor := processors.NewDnstapProcessor(connId, c.config, c.logger, c.name, c.config.Collectors.Dnstap.ChannelBufferSize) c.Lock() c.tapProcessors = append(c.tapProcessors, dnstapProcessor) c.Unlock() @@ -190,7 +187,7 @@ func (c *Dnstap) HandleConn(conn net.Conn) { // then removes the current tap processor from the list c.Lock() for i, t := range c.tapProcessors { - if t.connId == connId { + if t.ConnId == connId { c.tapProcessors = append(c.tapProcessors[:i], c.tapProcessors[i+1:]...) } } @@ -238,8 +235,8 @@ func (c *Dnstap) Stop() { // read done channel and block until run is terminated c.LogInfo("stopping run...") + c.stopRun <- true <-c.doneRun - close(c.doneRun) } func (c *Dnstap) Listen() error { @@ -333,32 +330,61 @@ func (c *Dnstap) Run() { // start goroutine to count dropped messsages go c.MonitorCollector() - for { - // Accept() blocks waiting for new connection. - conn, err := c.listen.Accept() - if err != nil { - break + // goroutine to Accept() blocks waiting for new connection. + acceptChan := make(chan net.Conn) + go func() { + for { + conn, err := c.listen.Accept() + if err != nil { + return + } + acceptChan <- conn } + }() - if (c.connMode == "tls" || c.connMode == "tcp") && c.config.Collectors.Dnstap.RcvBufSize > 0 { - before, actual, err := netlib.SetSock_RCVBUF( - conn, - c.config.Collectors.Dnstap.RcvBufSize, - c.config.Collectors.Dnstap.TlsSupport, - ) - if err != nil { - c.logger.Fatal("Unable to set SO_RCVBUF: ", err) +RUN_LOOP: + for { + select { + case <-c.stopRun: + close(acceptChan) + c.doneRun <- true + break RUN_LOOP + + case cfg := <-c.configChan: + + // save the new config + c.config = cfg + c.ReadConfig() + + // refresh config for all conns + for i := range c.tapProcessors { + c.tapProcessors[i].ConfigChan <- cfg + } + + case conn, opened := <-acceptChan: + if !opened { + return } - c.LogInfo("set SO_RCVBUF option, value before: %d, desired: %d, actual: %d", before, - c.config.Collectors.Dnstap.RcvBufSize, actual) + + if (c.connMode == "tls" || c.connMode == "tcp") && c.config.Collectors.Dnstap.RcvBufSize > 0 { + before, actual, err := netlib.SetSock_RCVBUF( + conn, + c.config.Collectors.Dnstap.RcvBufSize, + c.config.Collectors.Dnstap.TlsSupport, + ) + if err != nil { + c.logger.Fatal("Unable to set SO_RCVBUF: ", err) + } + c.LogInfo("set SO_RCVBUF option, value before: %d, desired: %d, actual: %d", before, + c.config.Collectors.Dnstap.RcvBufSize, actual) + } + + c.Lock() + c.conns = append(c.conns, conn) + c.Unlock() + go c.HandleConn(conn) } - c.Lock() - c.conns = append(c.conns, conn) - c.Unlock() - go c.HandleConn(conn) } - c.LogInfo("run terminated") - c.doneRun <- true } diff --git a/collectors/dnstap_proxifier.go b/collectors/dnstap_proxifier.go index 2e8dbc6c..0f07ea04 100644 --- a/collectors/dnstap_proxifier.go +++ b/collectors/dnstap_proxifier.go @@ -14,25 +14,29 @@ import ( ) type DnstapProxifier struct { - done chan bool - listen net.Listener - conns []net.Conn - sockPath string - loggers []dnsutils.Worker - config *dnsutils.Config - logger *logger.Logger - name string - stopping bool + doneRun chan bool + stopRun chan bool + listen net.Listener + conns []net.Conn + sockPath string + loggers []dnsutils.Worker + config *dnsutils.Config + configChan chan *dnsutils.Config + logger *logger.Logger + name string + stopping bool } func NewDnstapProxifier(loggers []dnsutils.Worker, config *dnsutils.Config, logger *logger.Logger, name string) *DnstapProxifier { logger.Info("[%s] collector=dnstaprelay - enabled", name) s := &DnstapProxifier{ - done: make(chan bool), - config: config, - loggers: loggers, - logger: logger, - name: name, + doneRun: make(chan bool), + stopRun: make(chan bool), + config: config, + configChan: make(chan *dnsutils.Config), + loggers: loggers, + logger: logger, + name: name, } s.ReadConfig() return s @@ -61,13 +65,8 @@ func (c *DnstapProxifier) ReadConfig() { } func (c *DnstapProxifier) ReloadConfig(config *dnsutils.Config) { - c.LogInfo("reload config...") - - // save the new config - c.config = config - - // read again - c.ReadConfig() + c.LogInfo("reload configuration...") + c.configChan <- config } func (c *DnstapProxifier) LogInfo(msg string, v ...interface{}) { @@ -148,8 +147,8 @@ func (c *DnstapProxifier) Stop() { c.listen.Close() // read done channel and block until run is terminated - <-c.done - close(c.done) + c.stopRun <- true + <-c.doneRun } func (c *DnstapProxifier) Listen() error { @@ -211,17 +210,42 @@ func (c *DnstapProxifier) Run() { c.logger.Fatal("collector dnstap listening failed: ", err) } } - for { - // Accept() blocks waiting for new connection. - conn, err := c.listen.Accept() - if err != nil { - break + + // goroutine to Accept() blocks waiting for new connection. + acceptChan := make(chan net.Conn) + go func() { + for { + conn, err := c.listen.Accept() + if err != nil { + return + } + acceptChan <- conn } + }() - c.conns = append(c.conns, conn) - go c.HandleConn(conn) +RUN_LOOP: + for { + select { + case <-c.stopRun: + close(acceptChan) + c.doneRun <- true + break RUN_LOOP + + case cfg := <-c.configChan: + + // save the new config + c.config = cfg + c.ReadConfig() + + case conn, opened := <-acceptChan: + if !opened { + return + } + + c.conns = append(c.conns, conn) + go c.HandleConn(conn) + } } c.LogInfo("run terminated") - c.done <- true } diff --git a/collectors/dnstap_proxifier_test.go b/collectors/dnstap_proxifier_test.go index 7845a902..364cd8f2 100644 --- a/collectors/dnstap_proxifier_test.go +++ b/collectors/dnstap_proxifier_test.go @@ -9,6 +9,7 @@ import ( "github.com/dmachard/go-dnscollector/dnsutils" "github.com/dmachard/go-dnscollector/loggers" + "github.com/dmachard/go-dnscollector/processors" "github.com/dmachard/go-framestream" "github.com/dmachard/go-logger" "google.golang.org/protobuf/proto" @@ -75,13 +76,13 @@ func Test_DnstapProxifier(t *testing.T) { frame := &framestream.Frame{} // get fake dns question - dnsquery, err := GetFakeDns() + dnsquery, err := processors.GetFakeDns() if err != nil { t.Fatalf("dns question pack error") } // get fake dnstap message - dt_query := GetFakeDnstap(dnsquery) + dt_query := processors.GetFakeDnstap(dnsquery) // serialize to bytes data, err := proto.Marshal(dt_query) diff --git a/collectors/dnstap_test.go b/collectors/dnstap_test.go index d1d092e9..dbab6bed 100644 --- a/collectors/dnstap_test.go +++ b/collectors/dnstap_test.go @@ -9,6 +9,7 @@ import ( "github.com/dmachard/go-dnscollector/dnsutils" "github.com/dmachard/go-dnscollector/loggers" + "github.com/dmachard/go-dnscollector/processors" "github.com/dmachard/go-framestream" "github.com/dmachard/go-logger" "google.golang.org/protobuf/proto" @@ -79,13 +80,13 @@ func Test_DnstapCollector(t *testing.T) { frame := &framestream.Frame{} // get fake dns question - dnsquery, err := GetFakeDns() + dnsquery, err := processors.GetFakeDns() if err != nil { t.Fatalf("dns question pack error") } // get fake dnstap message - dt_query := GetFakeDnstap(dnsquery) + dt_query := processors.GetFakeDnstap(dnsquery) // serialize to bytes data, err := proto.Marshal(dt_query) diff --git a/collectors/file_ingestor.go b/collectors/file_ingestor.go index ed51917e..636f00ac 100644 --- a/collectors/file_ingestor.go +++ b/collectors/file_ingestor.go @@ -13,6 +13,7 @@ import ( "github.com/dmachard/go-dnscollector/dnsutils" "github.com/dmachard/go-dnscollector/netlib" + "github.com/dmachard/go-dnscollector/processors" "github.com/dmachard/go-logger" framestream "github.com/farsightsec/golang-framestream" "github.com/fsnotify/fsnotify" @@ -38,11 +39,12 @@ type FileIngestor struct { exit chan bool loggers []dnsutils.Worker config *dnsutils.Config + configChan chan *dnsutils.Config logger *logger.Logger watcher *fsnotify.Watcher watcherTimers map[string]*time.Timer - dnsProcessor DnsProcessor - dnstapProcessor DnstapProcessor + dnsProcessor processors.DnsProcessor + dnstapProcessor processors.DnstapProcessor filterDnsPort int identity string name string @@ -55,6 +57,7 @@ func NewFileIngestor(loggers []dnsutils.Worker, config *dnsutils.Config, logger done: make(chan bool), exit: make(chan bool), config: config, + configChan: make(chan *dnsutils.Config), loggers: loggers, logger: logger, name: name, @@ -94,13 +97,8 @@ func (c *FileIngestor) ReadConfig() { } func (c *FileIngestor) ReloadConfig(config *dnsutils.Config) { - c.LogInfo("reload config...") - - // save the new config - c.config = config - - // read again - c.ReadConfig() + c.LogInfo("reload configuration...") + c.configChan <- config } func (c *FileIngestor) LogInfo(msg string, v ...interface{}) { @@ -376,11 +374,11 @@ func (c *FileIngestor) RemoveEvent(filePath string) { func (c *FileIngestor) Run() { c.LogInfo("starting collector...") - c.dnsProcessor = NewDnsProcessor(c.config, c.logger, c.name, c.config.Collectors.FileIngestor.ChannelBufferSize) + c.dnsProcessor = processors.NewDnsProcessor(c.config, c.logger, c.name, c.config.Collectors.FileIngestor.ChannelBufferSize) go c.dnsProcessor.Run(c.Loggers()) // start dnstap subprocessor - c.dnstapProcessor = NewDnstapProcessor(0, c.config, c.logger, c.name, c.config.Collectors.FileIngestor.ChannelBufferSize) + c.dnstapProcessor = processors.NewDnstapProcessor(0, c.config, c.logger, c.name, c.config.Collectors.FileIngestor.ChannelBufferSize) go c.dnstapProcessor.Run(c.Loggers()) // read current folder content @@ -426,6 +424,17 @@ func (c *FileIngestor) Run() { go func() { for { select { + // new config provided? + case cfg, opened := <-c.configChan: + if !opened { + return + } + c.config = cfg + c.ReadConfig() + + c.dnsProcessor.ConfigChan <- cfg + c.dnstapProcessor.ConfigChan <- cfg + case event, ok := <-c.watcher.Events: if !ok { // Channel was closed (i.e. Watcher.Close() was called). return diff --git a/collectors/file_tail.go b/collectors/file_tail.go index d2519301..6ac04643 100644 --- a/collectors/file_tail.go +++ b/collectors/file_tail.go @@ -16,22 +16,26 @@ import ( ) type Tail struct { - done chan bool - tailf *tail.Tail - loggers []dnsutils.Worker - config *dnsutils.Config - logger *logger.Logger - name string + doneRun chan bool + stopRun chan bool + tailf *tail.Tail + loggers []dnsutils.Worker + config *dnsutils.Config + configChan chan *dnsutils.Config + logger *logger.Logger + name string } func NewTail(loggers []dnsutils.Worker, config *dnsutils.Config, logger *logger.Logger, name string) *Tail { logger.Info("[%s] collector=tail - enabled", name) s := &Tail{ - done: make(chan bool), - config: config, - loggers: loggers, - logger: logger, - name: name, + doneRun: make(chan bool), + stopRun: make(chan bool), + config: config, + configChan: make(chan *dnsutils.Config), + loggers: loggers, + logger: logger, + name: name, } s.ReadConfig() return s @@ -51,18 +55,11 @@ func (c *Tail) Loggers() []chan dnsutils.DnsMessage { return channels } -func (c *Tail) ReadConfig() { - //tbc -} +func (c *Tail) ReadConfig() {} func (c *Tail) ReloadConfig(config *dnsutils.Config) { - c.LogInfo("reload config...") - - // save the new config - c.config = config - - // read again - c.ReadConfig() + c.LogInfo("reload configuration...") + c.configChan <- config } func (c *Tail) LogInfo(msg string, v ...interface{}) { @@ -85,8 +82,8 @@ func (c *Tail) Stop() { c.tailf.Stop() // read done channel and block until run is terminated - <-c.done - close(c.done) + c.stopRun <- true + <-c.doneRun } func (c *Tail) Follow() error { @@ -124,162 +121,179 @@ func (c *Tail) Run() { dm.DnsTap.Identity = "undefined" } - for line := range c.tailf.Lines { - var matches []string - var re *regexp.Regexp +RUN_LOOP: + for { + select { + // new config provided? + case cfg, opened := <-c.configChan: + if !opened { + return + } + c.config = cfg + c.ReadConfig() - if len(c.config.Collectors.Tail.PatternQuery) > 0 { - re = regexp.MustCompile(c.config.Collectors.Tail.PatternQuery) - matches = re.FindStringSubmatch(line.Text) - dm.DNS.Type = dnsutils.DnsQuery - dm.DnsTap.Operation = dnsutils.DNSTAP_OPERATION_QUERY - } + subprocessors.ReloadConfig(&cfg.IngoingTransformers) + case <-c.stopRun: - if len(c.config.Collectors.Tail.PatternReply) > 0 && len(matches) == 0 { - re = regexp.MustCompile(c.config.Collectors.Tail.PatternReply) - matches = re.FindStringSubmatch(line.Text) - dm.DNS.Type = dnsutils.DnsReply - dm.DnsTap.Operation = dnsutils.DNSTAP_OPERATION_REPLY - } + // cleanup transformers + subprocessors.Reset() - if len(matches) == 0 { - continue - } + c.doneRun <- true + break RUN_LOOP - qrIndex := re.SubexpIndex("qr") - if qrIndex != -1 { - dm.DnsTap.Operation = matches[qrIndex] - } + case line := <-c.tailf.Lines: + var matches []string + var re *regexp.Regexp + + if len(c.config.Collectors.Tail.PatternQuery) > 0 { + re = regexp.MustCompile(c.config.Collectors.Tail.PatternQuery) + matches = re.FindStringSubmatch(line.Text) + dm.DNS.Type = dnsutils.DnsQuery + dm.DnsTap.Operation = dnsutils.DNSTAP_OPERATION_QUERY + } + + if len(c.config.Collectors.Tail.PatternReply) > 0 && len(matches) == 0 { + re = regexp.MustCompile(c.config.Collectors.Tail.PatternReply) + matches = re.FindStringSubmatch(line.Text) + dm.DNS.Type = dnsutils.DnsReply + dm.DnsTap.Operation = dnsutils.DNSTAP_OPERATION_REPLY + } - var t time.Time - timestampIndex := re.SubexpIndex("timestamp") - if timestampIndex != -1 { - t, err = time.Parse(c.config.Collectors.Tail.TimeLayout, matches[timestampIndex]) - if err != nil { + if len(matches) == 0 { continue } - } else { - t = time.Now() - } - dm.DnsTap.TimeSec = int(t.Unix()) - dm.DnsTap.TimeNsec = int(t.UnixNano() - t.Unix()*1e9) - identityIndex := re.SubexpIndex("identity") - if identityIndex != -1 { - dm.DnsTap.Identity = matches[identityIndex] - } + qrIndex := re.SubexpIndex("qr") + if qrIndex != -1 { + dm.DnsTap.Operation = matches[qrIndex] + } - rcodeIndex := re.SubexpIndex("rcode") - if rcodeIndex != -1 { - dm.DNS.Rcode = matches[rcodeIndex] - } + var t time.Time + timestampIndex := re.SubexpIndex("timestamp") + if timestampIndex != -1 { + t, err = time.Parse(c.config.Collectors.Tail.TimeLayout, matches[timestampIndex]) + if err != nil { + continue + } + } else { + t = time.Now() + } + dm.DnsTap.TimeSec = int(t.Unix()) + dm.DnsTap.TimeNsec = int(t.UnixNano() - t.Unix()*1e9) - queryipIndex := re.SubexpIndex("queryip") - if queryipIndex != -1 { - dm.NetworkInfo.QueryIp = matches[queryipIndex] - } + identityIndex := re.SubexpIndex("identity") + if identityIndex != -1 { + dm.DnsTap.Identity = matches[identityIndex] + } - queryportIndex := re.SubexpIndex("queryport") - if queryportIndex != -1 { - dm.NetworkInfo.QueryPort = matches[queryportIndex] - } else { - dm.NetworkInfo.ResponsePort = "0" - } + rcodeIndex := re.SubexpIndex("rcode") + if rcodeIndex != -1 { + dm.DNS.Rcode = matches[rcodeIndex] + } - responseipIndex := re.SubexpIndex("responseip") - if responseipIndex != -1 { - dm.NetworkInfo.ResponseIp = matches[responseipIndex] - } + queryipIndex := re.SubexpIndex("queryip") + if queryipIndex != -1 { + dm.NetworkInfo.QueryIp = matches[queryipIndex] + } - responseportIndex := re.SubexpIndex("responseport") - if responseportIndex != -1 { - dm.NetworkInfo.ResponsePort = matches[responseportIndex] - } else { - dm.NetworkInfo.ResponsePort = "0" - } + queryportIndex := re.SubexpIndex("queryport") + if queryportIndex != -1 { + dm.NetworkInfo.QueryPort = matches[queryportIndex] + } else { + dm.NetworkInfo.ResponsePort = "0" + } - familyIndex := re.SubexpIndex("family") - if familyIndex != -1 { - dm.NetworkInfo.Family = matches[familyIndex] - } else { - dm.NetworkInfo.Family = dnsutils.PROTO_IPV4 - } + responseipIndex := re.SubexpIndex("responseip") + if responseipIndex != -1 { + dm.NetworkInfo.ResponseIp = matches[responseipIndex] + } - protocolIndex := re.SubexpIndex("protocol") - if protocolIndex != -1 { - dm.NetworkInfo.Protocol = matches[protocolIndex] - } else { - dm.NetworkInfo.Protocol = dnsutils.PROTO_UDP - } + responseportIndex := re.SubexpIndex("responseport") + if responseportIndex != -1 { + dm.NetworkInfo.ResponsePort = matches[responseportIndex] + } else { + dm.NetworkInfo.ResponsePort = "0" + } - lengthIndex := re.SubexpIndex("length") - if lengthIndex != -1 { - length, err := strconv.Atoi(matches[lengthIndex]) - if err == nil { - dm.DNS.Length = length + familyIndex := re.SubexpIndex("family") + if familyIndex != -1 { + dm.NetworkInfo.Family = matches[familyIndex] + } else { + dm.NetworkInfo.Family = dnsutils.PROTO_IPV4 } - } - domainIndex := re.SubexpIndex("domain") - if domainIndex != -1 { - dm.DNS.Qname = matches[domainIndex] - } + protocolIndex := re.SubexpIndex("protocol") + if protocolIndex != -1 { + dm.NetworkInfo.Protocol = matches[protocolIndex] + } else { + dm.NetworkInfo.Protocol = dnsutils.PROTO_UDP + } - qtypeIndex := re.SubexpIndex("qtype") - if qtypeIndex != -1 { - dm.DNS.Qtype = matches[qtypeIndex] - } + lengthIndex := re.SubexpIndex("length") + if lengthIndex != -1 { + length, err := strconv.Atoi(matches[lengthIndex]) + if err == nil { + dm.DNS.Length = length + } + } - latencyIndex := re.SubexpIndex("latency") - if latencyIndex != -1 { - dm.DnsTap.LatencySec = matches[latencyIndex] - } + domainIndex := re.SubexpIndex("domain") + if domainIndex != -1 { + dm.DNS.Qname = matches[domainIndex] + } - // compute timestamp - ts := time.Unix(int64(dm.DnsTap.TimeSec), int64(dm.DnsTap.TimeNsec)) - dm.DnsTap.Timestamp = ts.UnixNano() - dm.DnsTap.TimestampRFC3339 = ts.UTC().Format(time.RFC3339Nano) - - // fake dns packet - dnspkt := new(dns.Msg) - var dnstype uint16 - dnstype = dns.TypeA - if dm.DNS.Qtype == "AAAA" { - dnstype = dns.TypeAAAA - } - dnspkt.SetQuestion(dm.DNS.Qname, dnstype) + qtypeIndex := re.SubexpIndex("qtype") + if qtypeIndex != -1 { + dm.DNS.Qtype = matches[qtypeIndex] + } - if dm.DNS.Type == dnsutils.DnsReply { - rr, _ := dns.NewRR(fmt.Sprintf("%s %s 0.0.0.0", dm.DNS.Qname, dm.DNS.Qtype)) - if err == nil { - dnspkt.Answer = append(dnspkt.Answer, rr) + latencyIndex := re.SubexpIndex("latency") + if latencyIndex != -1 { + dm.DnsTap.LatencySec = matches[latencyIndex] } - var rcode int - rcode = 0 - if dm.DNS.Rcode == "NXDOMAIN" { - rcode = 3 + + // compute timestamp + ts := time.Unix(int64(dm.DnsTap.TimeSec), int64(dm.DnsTap.TimeNsec)) + dm.DnsTap.Timestamp = ts.UnixNano() + dm.DnsTap.TimestampRFC3339 = ts.UTC().Format(time.RFC3339Nano) + + // fake dns packet + dnspkt := new(dns.Msg) + var dnstype uint16 + dnstype = dns.TypeA + if dm.DNS.Qtype == "AAAA" { + dnstype = dns.TypeAAAA + } + dnspkt.SetQuestion(dm.DNS.Qname, dnstype) + + if dm.DNS.Type == dnsutils.DnsReply { + rr, _ := dns.NewRR(fmt.Sprintf("%s %s 0.0.0.0", dm.DNS.Qname, dm.DNS.Qtype)) + if err == nil { + dnspkt.Answer = append(dnspkt.Answer, rr) + } + var rcode int + rcode = 0 + if dm.DNS.Rcode == "NXDOMAIN" { + rcode = 3 + } + dnspkt.Rcode = rcode } - dnspkt.Rcode = rcode - } - dm.DNS.Payload, _ = dnspkt.Pack() - dm.DNS.Length = len(dm.DNS.Payload) + dm.DNS.Payload, _ = dnspkt.Pack() + dm.DNS.Length = len(dm.DNS.Payload) - // apply all enabled transformers - if subprocessors.ProcessMessage(&dm) == transformers.RETURN_DROP { - continue - } + // apply all enabled transformers + if subprocessors.ProcessMessage(&dm) == transformers.RETURN_DROP { + continue + } - // dispatch dns message to connected loggers - chanLoggers := c.Loggers() - for i := range chanLoggers { - chanLoggers[i] <- dm + // dispatch dns message to connected loggers + chanLoggers := c.Loggers() + for i := range chanLoggers { + chanLoggers[i] <- dm + } } } - // cleanup transformers - subprocessors.Reset() - c.LogInfo("run terminated") - c.done <- true } diff --git a/collectors/powerdns.go b/collectors/powerdns.go index 65f9dea7..b5109e18 100644 --- a/collectors/powerdns.go +++ b/collectors/powerdns.go @@ -13,12 +13,14 @@ import ( "github.com/dmachard/go-dnscollector/dnsutils" "github.com/dmachard/go-dnscollector/netlib" + "github.com/dmachard/go-dnscollector/processors" "github.com/dmachard/go-logger" powerdns_protobuf "github.com/dmachard/go-powerdns-protobuf" ) type ProtobufPowerDNS struct { doneRun chan bool + stopRun chan bool doneMonitor chan bool stopMonitor chan bool cleanup chan bool @@ -27,11 +29,12 @@ type ProtobufPowerDNS struct { conns []net.Conn loggers []dnsutils.Worker config *dnsutils.Config + configChan chan *dnsutils.Config logger *logger.Logger name string droppedCount int dropped chan int - pdnsProcessors []*PdnsProcessor + pdnsProcessors []*processors.PdnsProcessor sync.RWMutex } @@ -40,10 +43,12 @@ func NewProtobufPowerDNS(loggers []dnsutils.Worker, config *dnsutils.Config, log s := &ProtobufPowerDNS{ doneRun: make(chan bool), doneMonitor: make(chan bool), + stopRun: make(chan bool), stopMonitor: make(chan bool), cleanup: make(chan bool), dropped: make(chan int), config: config, + configChan: make(chan *dnsutils.Config), loggers: loggers, logger: logger, name: name, @@ -75,16 +80,8 @@ func (c *ProtobufPowerDNS) ReadConfig() { } func (c *ProtobufPowerDNS) ReloadConfig(config *dnsutils.Config) { - c.LogInfo("reload config...") - - // save the new config - c.config = config - c.ReadConfig() - - // refresh config for all conns - for i := range c.pdnsProcessors { - c.pdnsProcessors[i].ReloadConfig(config) - } + c.LogInfo("reload configuration...") + c.configChan <- config } func (c *ProtobufPowerDNS) LogInfo(msg string, v ...interface{}) { @@ -120,7 +117,7 @@ func (c *ProtobufPowerDNS) HandleConn(conn net.Conn) { c.LogConnInfo(connId, "new connection from %s", peer) // start protobuf subprocessor - pdnsProc := NewPdnsProcessor(connId, c.config, c.logger, c.name, c.config.Collectors.PowerDNS.ChannelBufferSize) + pdnsProc := processors.NewPdnsProcessor(connId, c.config, c.logger, c.name, c.config.Collectors.PowerDNS.ChannelBufferSize) c.Lock() c.pdnsProcessors = append(c.pdnsProcessors, &pdnsProc) c.Unlock() @@ -131,6 +128,7 @@ func (c *ProtobufPowerDNS) HandleConn(conn net.Conn) { var err error var payload *powerdns_protobuf.ProtoPayload + for { payload, err = pbs.RecvPayload(false) if err != nil { @@ -163,13 +161,14 @@ func (c *ProtobufPowerDNS) HandleConn(conn net.Conn) { default: c.dropped <- 1 } + // } } // here the connection is closed, // then removes the current tap processor from the list c.Lock() for i, t := range c.pdnsProcessors { - if t.connId == connId { + if t.ConnId == connId { c.pdnsProcessors = append(c.pdnsProcessors[:i], c.pdnsProcessors[i+1:]...) } } @@ -221,8 +220,8 @@ func (c *ProtobufPowerDNS) Stop() { // read done channel and block until run is terminated c.LogInfo("stopping run...") + c.stopRun <- true <-c.doneRun - close(c.doneRun) } func (c *ProtobufPowerDNS) Listen() error { @@ -277,8 +276,10 @@ MONITOR_LOOP: bufferFull.Stop() c.doneMonitor <- true break MONITOR_LOOP + case <-c.dropped: c.droppedCount++ + case <-bufferFull.C: if c.droppedCount > 0 { c.LogError("recv buffer is full, %d packet(s) dropped", c.droppedCount) @@ -303,35 +304,63 @@ func (c *ProtobufPowerDNS) Run() { // start goroutine to count dropped messsages go c.MonitorCollector() - for { - // Accept() blocks waiting for new connection. - conn, err := c.listen.Accept() - if err != nil { - break - } - - if c.config.Collectors.Dnstap.RcvBufSize > 0 { - before, actual, err := netlib.SetSock_RCVBUF( - conn, - c.config.Collectors.Dnstap.RcvBufSize, - c.config.Collectors.Dnstap.TlsSupport, - ) + // goroutine to Accept() blocks waiting for new connection. + acceptChan := make(chan net.Conn) + go func() { + for { + conn, err := c.listen.Accept() if err != nil { - c.logger.Fatal("Unable to set SO_RCVBUF: ", err) + return } - c.LogInfo("set SO_RCVBUF option, value before: %d, desired: %d, actual: %d", - before, - c.config.Collectors.Dnstap.RcvBufSize, - actual) + acceptChan <- conn } + }() - c.Lock() - c.conns = append(c.conns, conn) - c.Unlock() - go c.HandleConn(conn) +RUN_LOOP: + for { + select { + case <-c.stopRun: + close(acceptChan) + c.doneRun <- true + break RUN_LOOP + + case cfg := <-c.configChan: + // save the new config + c.config = cfg + c.ReadConfig() + + // refresh config for all conns + for i := range c.pdnsProcessors { + c.pdnsProcessors[i].ConfigChan <- cfg + } + + case conn, opened := <-acceptChan: + if !opened { + return + } + if c.config.Collectors.Dnstap.RcvBufSize > 0 { + before, actual, err := netlib.SetSock_RCVBUF( + conn, + c.config.Collectors.Dnstap.RcvBufSize, + c.config.Collectors.Dnstap.TlsSupport, + ) + if err != nil { + c.logger.Fatal("Unable to set SO_RCVBUF: ", err) + } + c.LogInfo("set SO_RCVBUF option, value before: %d, desired: %d, actual: %d", + before, + c.config.Collectors.Dnstap.RcvBufSize, + actual) + } + + c.Lock() + c.conns = append(c.conns, conn) + c.Unlock() + go c.HandleConn(conn) + + } } c.LogInfo("run terminated") - c.doneRun <- true } diff --git a/collectors/sniffer_afpacket.go b/collectors/sniffer_afpacket.go index 2790c77a..2f553349 100644 --- a/collectors/sniffer_afpacket.go +++ b/collectors/sniffer_afpacket.go @@ -14,6 +14,7 @@ import ( "github.com/dmachard/go-dnscollector/dnsutils" "github.com/dmachard/go-dnscollector/netlib" + "github.com/dmachard/go-dnscollector/processors" "github.com/dmachard/go-logger" "github.com/google/gopacket" "github.com/google/gopacket/layers" @@ -147,28 +148,27 @@ func RemoveBpfFilter(fd int) (err error) { } type AfpacketSniffer struct { - done chan bool - exit chan bool - fd int - port int - device string - identity string - loggers []dnsutils.Worker - config *dnsutils.Config - logger *logger.Logger - dnsProcessor DnsProcessor - name string + done chan bool + exit chan bool + fd int + identity string + loggers []dnsutils.Worker + config *dnsutils.Config + configChan chan *dnsutils.Config + logger *logger.Logger + name string } func NewAfpacketSniffer(loggers []dnsutils.Worker, config *dnsutils.Config, logger *logger.Logger, name string) *AfpacketSniffer { logger.Info("[%s] collector=afpacket - enabled", name) s := &AfpacketSniffer{ - done: make(chan bool), - exit: make(chan bool), - config: config, - loggers: loggers, - logger: logger, - name: name, + done: make(chan bool), + exit: make(chan bool), + config: config, + configChan: make(chan *dnsutils.Config), + loggers: loggers, + logger: logger, + name: name, } s.ReadConfig() return s @@ -199,18 +199,12 @@ func (c *AfpacketSniffer) Loggers() ([]chan dnsutils.DnsMessage, []string) { } func (c *AfpacketSniffer) ReadConfig() { - c.port = c.config.Collectors.AfpacketLiveCapture.Port c.identity = c.config.GetServerIdentity() - c.device = c.config.Collectors.AfpacketLiveCapture.Device } func (c *AfpacketSniffer) ReloadConfig(config *dnsutils.Config) { - c.LogInfo("reload config...") - - c.config = config - c.ReadConfig() - - c.dnsProcessor.ReloadConfig(config) + c.LogInfo("reload configuration...") + c.configChan <- config } func (c *AfpacketSniffer) Channel() chan dnsutils.DnsMessage { @@ -236,8 +230,8 @@ func (c *AfpacketSniffer) Listen() error { } // bind to device ? - if c.device != "" { - iface, err := net.InterfaceByName(c.device) + if c.config.Collectors.AfpacketLiveCapture.Device != "" { + iface, err := net.InterfaceByName(c.config.Collectors.AfpacketLiveCapture.Device) if err != nil { return err } @@ -259,8 +253,7 @@ func (c *AfpacketSniffer) Listen() error { return err } - filter := GetBpfFilter(c.port) - //filter := GetBpfFilter_Ingress(c.port) + filter := GetBpfFilter(c.config.Collectors.AfpacketLiveCapture.Port) err = ApplyBpfFilter(filter, fd) if err != nil { return err @@ -284,8 +277,8 @@ func (c *AfpacketSniffer) Run() { } } - c.dnsProcessor = NewDnsProcessor(c.config, c.logger, c.name, c.config.Collectors.AfpacketLiveCapture.ChannelBufferSize) - go c.dnsProcessor.Run(c.Loggers()) + dnsProcessor := processors.NewDnsProcessor(c.config, c.logger, c.name, c.config.Collectors.AfpacketLiveCapture.ChannelBufferSize) + go dnsProcessor.Run(c.Loggers()) dnsChan := make(chan netlib.DnsPacket) udpChan := make(chan gopacket.Packet) @@ -309,30 +302,44 @@ func (c *AfpacketSniffer) Run() { // prepare dns message dm := dnsutils.DnsMessage{} - // for { - for dnsPacket := range dnsChan { - // reset - dm.Init() + for { + select { + // new config provided? + case cfg, opened := <-c.configChan: + if !opened { + return + } + c.config = cfg + c.ReadConfig() - dm.NetworkInfo.Family = dnsPacket.IpLayer.EndpointType().String() - dm.NetworkInfo.QueryIp = dnsPacket.IpLayer.Src().String() - dm.NetworkInfo.ResponseIp = dnsPacket.IpLayer.Dst().String() - dm.NetworkInfo.QueryPort = dnsPacket.TransportLayer.Src().String() - dm.NetworkInfo.ResponsePort = dnsPacket.TransportLayer.Dst().String() - dm.NetworkInfo.Protocol = dnsPacket.TransportLayer.EndpointType().String() + // send the config to the dns processor + dnsProcessor.ConfigChan <- cfg - dm.DNS.Payload = dnsPacket.Payload - dm.DNS.Length = len(dnsPacket.Payload) + // dns message to read ? + case dnsPacket := <-dnsChan: + // reset + dm.Init() - dm.DnsTap.Identity = c.identity + dm.NetworkInfo.Family = dnsPacket.IpLayer.EndpointType().String() + dm.NetworkInfo.QueryIp = dnsPacket.IpLayer.Src().String() + dm.NetworkInfo.ResponseIp = dnsPacket.IpLayer.Dst().String() + dm.NetworkInfo.QueryPort = dnsPacket.TransportLayer.Src().String() + dm.NetworkInfo.ResponsePort = dnsPacket.TransportLayer.Dst().String() + dm.NetworkInfo.Protocol = dnsPacket.TransportLayer.EndpointType().String() - timestamp := dnsPacket.Timestamp.UnixNano() - seconds := timestamp / int64(time.Second) - dm.DnsTap.TimeSec = int(seconds) - dm.DnsTap.TimeNsec = int(timestamp - seconds*int64(time.Second)*int64(time.Nanosecond)) + dm.DNS.Payload = dnsPacket.Payload + dm.DNS.Length = len(dnsPacket.Payload) - // send DNS message to DNS processor - c.dnsProcessor.GetChannel() <- dm + dm.DnsTap.Identity = c.identity + + timestamp := dnsPacket.Timestamp.UnixNano() + seconds := timestamp / int64(time.Second) + dm.DnsTap.TimeSec = int(seconds) + dm.DnsTap.TimeNsec = int(timestamp - seconds*int64(time.Second)*int64(time.Nanosecond)) + + // send DNS message to DNS processor + dnsProcessor.GetChannel() <- dm + } } }() @@ -425,9 +432,10 @@ func (c *AfpacketSniffer) Run() { <-c.exit close(dnsChan) + close(c.configChan) // stop dns processor - c.dnsProcessor.Stop() + dnsProcessor.Stop() c.LogInfo("run terminated") c.done <- true diff --git a/collectors/sniffer_afpacket_darwin.go b/collectors/sniffer_afpacket_darwin.go index c386e2d2..31721386 100644 --- a/collectors/sniffer_afpacket_darwin.go +++ b/collectors/sniffer_afpacket_darwin.go @@ -54,18 +54,9 @@ func (c *AfpacketSniffer) Loggers() []chan dnsutils.DnsMessage { return channels } -func (c *AfpacketSniffer) ReadConfig() { -} - -func (c *AfpacketSniffer) ReloadConfig(config *dnsutils.Config) { - c.LogInfo("reload config...") +func (c *AfpacketSniffer) ReadConfig() {} - // save the new config - c.config = config - - // read again - c.ReadConfig() -} +func (c *AfpacketSniffer) ReloadConfig(config *dnsutils.Config) {} func (c *AfpacketSniffer) Channel() chan dnsutils.DnsMessage { return nil diff --git a/collectors/sniffer_afpacket_freebsd.go b/collectors/sniffer_afpacket_freebsd.go index b07d3762..4e744bdc 100644 --- a/collectors/sniffer_afpacket_freebsd.go +++ b/collectors/sniffer_afpacket_freebsd.go @@ -54,18 +54,9 @@ func (c *AfpacketSniffer) Loggers() []chan dnsutils.DnsMessage { return channels } -func (c *AfpacketSniffer) ReadConfig() { -} - -func (c *AfpacketSniffer) ReloadConfig(config *dnsutils.Config) { - c.LogInfo("reload config...") +func (c *AfpacketSniffer) ReadConfig() {} - // save the new config - c.config = config - - // read again - c.ReadConfig() -} +func (c *AfpacketSniffer) ReloadConfig(config *dnsutils.Config) {} func (c *AfpacketSniffer) Channel() chan dnsutils.DnsMessage { return nil diff --git a/collectors/sniffer_afpacket_windows.go b/collectors/sniffer_afpacket_windows.go index b5b4c9a0..e284f3f2 100644 --- a/collectors/sniffer_afpacket_windows.go +++ b/collectors/sniffer_afpacket_windows.go @@ -54,18 +54,9 @@ func (c *AfpacketSniffer) Loggers() []chan dnsutils.DnsMessage { return channels } -func (c *AfpacketSniffer) ReadConfig() { -} - -func (c *AfpacketSniffer) ReloadConfig(config *dnsutils.Config) { - c.LogInfo("reload config...") +func (c *AfpacketSniffer) ReadConfig() {} - // save the new config - c.config = config - - // read again - c.ReadConfig() -} +func (c *AfpacketSniffer) ReloadConfig(config *dnsutils.Config) {} func (c *AfpacketSniffer) Channel() chan dnsutils.DnsMessage { return nil diff --git a/collectors/sniffer_xdp.go b/collectors/sniffer_xdp.go index e3487425..eb32bad9 100644 --- a/collectors/sniffer_xdp.go +++ b/collectors/sniffer_xdp.go @@ -14,6 +14,7 @@ import ( "github.com/cilium/ebpf/link" "github.com/cilium/ebpf/perf" "github.com/dmachard/go-dnscollector/dnsutils" + "github.com/dmachard/go-dnscollector/processors" "github.com/dmachard/go-dnscollector/xdp" "github.com/dmachard/go-logger" "golang.org/x/sys/unix" @@ -39,24 +40,26 @@ func ConvertIp6(ip [4]uint32) net.IP { } type XdpSniffer struct { - done chan bool - exit chan bool - identity string - loggers []dnsutils.Worker - config *dnsutils.Config - logger *logger.Logger - name string + done chan bool + exit chan bool + identity string + loggers []dnsutils.Worker + config *dnsutils.Config + configChan chan *dnsutils.Config + logger *logger.Logger + name string } func NewXdpSniffer(loggers []dnsutils.Worker, config *dnsutils.Config, logger *logger.Logger, name string) *XdpSniffer { logger.Info("[%s] collector=xdp - enabled", name) s := &XdpSniffer{ - done: make(chan bool), - exit: make(chan bool), - config: config, - loggers: loggers, - logger: logger, - name: name, + done: make(chan bool), + exit: make(chan bool), + config: config, + configChan: make(chan *dnsutils.Config), + loggers: loggers, + logger: logger, + name: name, } s.ReadConfig() return s @@ -91,13 +94,8 @@ func (c *XdpSniffer) ReadConfig() { } func (c *XdpSniffer) ReloadConfig(config *dnsutils.Config) { - c.LogInfo("reload config...") - - // save the new config - c.config = config - - // read again - c.ReadConfig() + c.LogInfo("reload configuration...") + c.configChan <- config } func (c *XdpSniffer) Channel() chan dnsutils.DnsMessage { @@ -118,7 +116,7 @@ func (c *XdpSniffer) Stop() { func (c *XdpSniffer) Run() { c.LogInfo("starting collector...") - dnsProcessor := NewDnsProcessor(c.config, c.logger, c.name, c.config.Collectors.XdpLiveCapture.ChannelBufferSize) + dnsProcessor := processors.NewDnsProcessor(c.config, c.logger, c.name, c.config.Collectors.XdpLiveCapture.ChannelBufferSize) go dnsProcessor.Run(c.Loggers()) iface, err := net.InterfaceByName(c.config.Collectors.XdpLiveCapture.Device) @@ -153,6 +151,35 @@ func (c *XdpSniffer) Run() { panic(err) } + dnsChan := make(chan dnsutils.DnsMessage) + + // goroutine to read all packets reassembled + go func() { + for { + select { + // new config provided? + case cfg, opened := <-c.configChan: + if !opened { + return + } + c.config = cfg + c.ReadConfig() + + // send the config to the dns processor + dnsProcessor.ConfigChan <- cfg + + //dns message to read ? + case dm := <-dnsChan: + + // update identity with config ? + dm.DnsTap.Identity = c.identity + + dnsProcessor.GetChannel() <- dm + + } + } + }() + go func() { var pkt xdp.BpfPktEvent for { @@ -199,7 +226,6 @@ func (c *XdpSniffer) Run() { dm.DnsTap.TimeSec = int(tsAdjusted.Unix()) dm.DnsTap.TimeNsec = int(tsAdjusted.UnixNano() - tsAdjusted.Unix()*1e9) - dm.DnsTap.Identity = c.identity if pkt.SrcPort == 53 { dm.DnsTap.Operation = dnsutils.DNSTAP_CLIENT_RESPONSE } else { @@ -227,11 +253,13 @@ func (c *XdpSniffer) Run() { dm.DNS.Length = len(dm.DNS.Payload) } - dnsProcessor.GetChannel() <- dm - + dnsChan <- dm } }() + <-c.exit + close(dnsChan) + close(c.configChan) // stop dns processor dnsProcessor.Stop() diff --git a/collectors/sniffer_xdp_windows.go b/collectors/sniffer_xdp_windows.go index e9bb92be..b3b68075 100644 --- a/collectors/sniffer_xdp_windows.go +++ b/collectors/sniffer_xdp_windows.go @@ -54,19 +54,7 @@ func (c *XdpSniffer) Loggers() []chan dnsutils.DnsMessage { return channels } -func (c *XdpSniffer) ReadConfig() { - c.identity = c.config.GetServerIdentity() -} - -func (c *XdpSniffer) ReloadConfig(config *dnsutils.Config) { - c.LogInfo("reload config...") - - // save the new config - c.config = config - - // read again - c.ReadConfig() -} +func (c *XdpSniffer) ReadConfig() {} func (c *XdpSniffer) Channel() chan dnsutils.DnsMessage { return nil diff --git a/collectors/tzsp.go b/collectors/tzsp.go index ff06de2b..f711d3f4 100644 --- a/collectors/tzsp.go +++ b/collectors/tzsp.go @@ -12,6 +12,7 @@ import ( "syscall" "github.com/dmachard/go-dnscollector/dnsutils" + "github.com/dmachard/go-dnscollector/processors" "github.com/dmachard/go-logger" "github.com/google/gopacket" "github.com/google/gopacket/layers" @@ -27,8 +28,6 @@ type TzspSniffer struct { logger *logger.Logger name string identity string - port int - ip string } func NewTzsp(loggers []dnsutils.Worker, config *dnsutils.Config, logger *logger.Logger, name string) *TzspSniffer { @@ -70,27 +69,17 @@ func (c *TzspSniffer) LogError(msg string, v ...interface{}) { } func (c *TzspSniffer) ReadConfig() { - - c.port = c.config.Collectors.Tzsp.ListenPort - c.ip = c.config.Collectors.Tzsp.ListenIp c.identity = c.config.GetServerIdentity() - // TODO: Implement } func (c *TzspSniffer) ReloadConfig(config *dnsutils.Config) { - c.LogInfo("reload config...") - - // save the new config - c.config = config - - // read again - c.ReadConfig() + // TODO implement reload configuration } func (c *TzspSniffer) Listen() error { c.logger.Info("running in background...") - ServerAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", c.ip, c.port)) + ServerAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", c.config.Collectors.Tzsp.ListenIp, c.config.Collectors.Tzsp.ListenPort)) if err != nil { return err } @@ -137,8 +126,7 @@ func (c *TzspSniffer) Run() { c.logger.Fatal("collector=tzsp listening failed: ", err) } - dnsProcessor := NewDnsProcessor(c.config, c.logger, c.name, c.config.Collectors.Tzsp.ChannelBufferSize) - + dnsProcessor := processors.NewDnsProcessor(c.config, c.logger, c.name, c.config.Collectors.Tzsp.ChannelBufferSize) go dnsProcessor.Run(c.Loggers()) go func() { diff --git a/collectors/tzsp_darwin.go b/collectors/tzsp_darwin.go index 645171a2..6c80948f 100644 --- a/collectors/tzsp_darwin.go +++ b/collectors/tzsp_darwin.go @@ -54,18 +54,9 @@ func (c *TzspSniffer) Loggers() []chan dnsutils.DnsMessage { return channels } -func (c *TzspSniffer) ReadConfig() { -} - -func (c *TzspSniffer) ReloadConfig(config *dnsutils.Config) { - c.LogInfo("reload config...") +func (c *TzspSniffer) ReadConfig() {} - // save the new config - c.config = config - - // read again - c.ReadConfig() -} +func (c *TzspSniffer) ReloadConfig(config *dnsutils.Config) {} func (c *TzspSniffer) Channel() chan dnsutils.DnsMessage { return nil diff --git a/collectors/tzsp_freebsd.go b/collectors/tzsp_freebsd.go index ef84c987..d10bdd01 100644 --- a/collectors/tzsp_freebsd.go +++ b/collectors/tzsp_freebsd.go @@ -54,18 +54,9 @@ func (c *TzspSniffer) Loggers() []chan dnsutils.DnsMessage { return channels } -func (c *TzspSniffer) ReadConfig() { -} - -func (c *TzspSniffer) ReloadConfig(config *dnsutils.Config) { - c.LogInfo("reload config...") +func (c *TzspSniffer) ReadConfig() {} - // save the new config - c.config = config - - // read again - c.ReadConfig() -} +func (c *TzspSniffer) ReloadConfig(config *dnsutils.Config) {} func (c *TzspSniffer) Channel() chan dnsutils.DnsMessage { return nil diff --git a/collectors/tzsp_windows.go b/collectors/tzsp_windows.go index 640daee7..0fbd49e5 100644 --- a/collectors/tzsp_windows.go +++ b/collectors/tzsp_windows.go @@ -54,18 +54,9 @@ func (c *TzspSniffer) Loggers() []chan dnsutils.DnsMessage { return channels } -func (c *TzspSniffer) ReadConfig() { -} - -func (c *TzspSniffer) ReloadConfig(config *dnsutils.Config) { - c.LogInfo("reload config...") +func (c *TzspSniffer) ReadConfig() {} - // save the new config - c.config = config - - // read again - c.ReadConfig() -} +func (c *TzspSniffer) ReloadConfig(config *dnsutils.Config) {} func (c *TzspSniffer) Channel() chan dnsutils.DnsMessage { return nil diff --git a/config.yml b/config.yml index 7586d02e..acaf3877 100644 --- a/config.yml +++ b/config.yml @@ -70,6 +70,9 @@ multiplexer: dnstap: listen-ip: 0.0.0.0 listen-port: 6000 + transforms: + normalize: + qname-lowercase: false loggers: - name: console @@ -195,12 +198,12 @@ multiplexer: # # Channel buffer size for incoming packets, number of packet before to drop it. # chan-buffer-size: 65535 -# # ztsp (TaZmen Sniffer Protocol) -# ztsp: -# # listen on ip -# listen-ip: 0.0.0.0 -# # listen on port -# listen-port: 10000 +# # tzsp (TaZmen Sniffer Protocol) +# tzsp: +# # listen on ip +# listen-ip: 0.0.0.0 +# # listen on port +# listen-port: 10000 # # Channel buffer size for incoming packets, number of packet before to drop it. # chan-buffer-size: 65535 diff --git a/dnscollector.go b/dnscollector.go index 3afd6d88..56f6ed42 100644 --- a/dnscollector.go +++ b/dnscollector.go @@ -252,7 +252,7 @@ func main() { if subcfg.Collectors.FileIngestor.Enable && IsCollectorRouted(config, input.Name) { mapCollectors[input.Name] = collectors.NewFileIngestor(nil, subcfg, logger, input.Name) } - if subcfg.Collectors.Tzsp.Enable { + if subcfg.Collectors.Tzsp.Enable && IsCollectorRouted(config, input.Name) { mapCollectors[input.Name] = collectors.NewTzsp(nil, subcfg, logger, input.Name) } } diff --git a/dnsutils/config.go b/dnsutils/config.go index 809be9bf..3a0f5a46 100644 --- a/dnsutils/config.go +++ b/dnsutils/config.go @@ -247,7 +247,7 @@ type Config struct { ListenIp string `yaml:"listen-ip"` ListenPort int `yaml:"listen-port"` ChannelBufferSize int `yaml:"chan-buffer-size"` - } + } `yaml:"tzsp"` } `yaml:"collectors"` IngoingTransformers ConfigTransformers `yaml:"collectors-transformers"` diff --git a/dnsutils/dns.go b/dnsutils/dns_parser.go similarity index 100% rename from dnsutils/dns.go rename to dnsutils/dns_parser.go diff --git a/dnsutils/dns_test.go b/dnsutils/dns_parser_test.go similarity index 100% rename from dnsutils/dns_test.go rename to dnsutils/dns_parser_test.go diff --git a/dnsutils/edns.go b/dnsutils/edns_parser.go similarity index 100% rename from dnsutils/edns.go rename to dnsutils/edns_parser.go diff --git a/dnsutils/edns_test.go b/dnsutils/edns_parser_test.go similarity index 100% rename from dnsutils/edns_test.go rename to dnsutils/edns_parser_test.go diff --git a/loggers/dnstapclient.go b/loggers/dnstapclient.go index be307d78..7628b299 100644 --- a/loggers/dnstapclient.go +++ b/loggers/dnstapclient.go @@ -21,6 +21,7 @@ type DnstapSender struct { inputChan chan dnsutils.DnsMessage outputChan chan dnsutils.DnsMessage config *dnsutils.Config + configChan chan *dnsutils.Config logger *logger.Logger fs *framestream.Fstrm fsReady bool @@ -43,6 +44,7 @@ func NewDnstapSender(config *dnsutils.Config, logger *logger.Logger, name string transportReconnect: make(chan bool), logger: logger, config: config, + configChan: make(chan *dnsutils.Config), name: name, } @@ -67,13 +69,8 @@ func (o *DnstapSender) ReadConfig() { } func (o *DnstapSender) ReloadConfig(config *dnsutils.Config) { - o.LogInfo("reload config...") - - // save the new config - o.config = config - - // read again - o.ReadConfig() + o.LogInfo("reload configuration!") + o.configChan <- config } func (o *DnstapSender) LogInfo(msg string, v ...interface{}) { @@ -230,6 +227,14 @@ RUN_LOOP: o.doneRun <- true break RUN_LOOP + case cfg, opened := <-o.configChan: + if !opened { + return + } + o.config = cfg + o.ReadConfig() + subprocessors.ReloadConfig(&cfg.OutgoingTransformers) + case dm, opened := <-o.inputChan: if !opened { o.LogInfo("input channel closed!") diff --git a/loggers/elasticsearch.go b/loggers/elasticsearch.go index 28aeaa8b..dbad2d7d 100644 --- a/loggers/elasticsearch.go +++ b/loggers/elasticsearch.go @@ -22,6 +22,7 @@ type ElasticSearchClient struct { inputChan chan dnsutils.DnsMessage outputChan chan dnsutils.DnsMessage config *dnsutils.Config + configChan chan *dnsutils.Config logger *logger.Logger name string server string @@ -40,6 +41,7 @@ func NewElasticSearchClient(config *dnsutils.Config, console *logger.Logger, nam outputChan: make(chan dnsutils.DnsMessage, config.Loggers.ElasticSearchClient.ChannelBufferSize), logger: console, config: config, + configChan: make(chan *dnsutils.Config), name: name, } o.ReadConfig() @@ -63,13 +65,8 @@ func (c *ElasticSearchClient) ReadConfig() { } func (o *ElasticSearchClient) ReloadConfig(config *dnsutils.Config) { - o.LogInfo("reload config...") - - // save the new config - o.config = config - - // read again - o.ReadConfig() + o.LogInfo("reload configuration!") + o.configChan <- config } func (o *ElasticSearchClient) Channel() chan dnsutils.DnsMessage { @@ -116,6 +113,14 @@ RUN_LOOP: o.doneRun <- true break RUN_LOOP + case cfg, opened := <-o.configChan: + if !opened { + return + } + o.config = cfg + o.ReadConfig() + subprocessors.ReloadConfig(&cfg.OutgoingTransformers) + case dm, opened := <-o.inputChan: if !opened { o.LogInfo("input channel closed!") diff --git a/loggers/falco.go b/loggers/falco.go index 3014840b..955dd12a 100644 --- a/loggers/falco.go +++ b/loggers/falco.go @@ -19,6 +19,7 @@ type FalcoClient struct { inputChan chan dnsutils.DnsMessage outputChan chan dnsutils.DnsMessage config *dnsutils.Config + configChan chan *dnsutils.Config logger *logger.Logger name string url string @@ -35,6 +36,7 @@ func NewFalcoClient(config *dnsutils.Config, console *logger.Logger, name string outputChan: make(chan dnsutils.DnsMessage, config.Loggers.FalcoClient.ChannelBufferSize), logger: console, config: config, + configChan: make(chan *dnsutils.Config), name: name, } f.ReadConfig() @@ -50,13 +52,8 @@ func (c *FalcoClient) ReadConfig() { } func (c *FalcoClient) ReloadConfig(config *dnsutils.Config) { - c.LogInfo("reload config...") - - // save the new config - c.config = config - - // read again - c.ReadConfig() + c.LogInfo("reload configuration!") + c.configChan <- config } func (f *FalcoClient) Channel() chan dnsutils.DnsMessage { @@ -103,6 +100,14 @@ RUN_LOOP: f.doneRun <- true break RUN_LOOP + case cfg, opened := <-f.configChan: + if !opened { + return + } + f.config = cfg + f.ReadConfig() + subprocessors.ReloadConfig(&cfg.OutgoingTransformers) + case dm, opened := <-f.inputChan: if !opened { f.LogInfo("input channel closed!") diff --git a/loggers/fluentd.go b/loggers/fluentd.go index 52bb3113..f7dcc25d 100644 --- a/loggers/fluentd.go +++ b/loggers/fluentd.go @@ -20,6 +20,7 @@ type FluentdClient struct { inputChan chan dnsutils.DnsMessage outputChan chan dnsutils.DnsMessage config *dnsutils.Config + configChan chan *dnsutils.Config logger *logger.Logger transportConn net.Conn transportReady chan bool @@ -41,6 +42,7 @@ func NewFluentdClient(config *dnsutils.Config, logger *logger.Logger, name strin transportReconnect: make(chan bool), logger: logger, config: config, + configChan: make(chan *dnsutils.Config), name: name, } @@ -60,13 +62,8 @@ func (o *FluentdClient) ReadConfig() { } func (o *FluentdClient) ReloadConfig(config *dnsutils.Config) { - o.LogInfo("reload config...") - - // save the new config - o.config = config - - // read again - o.ReadConfig() + o.LogInfo("reload configuration!") + o.configChan <- config } func (o *FluentdClient) LogInfo(msg string, v ...interface{}) { @@ -211,6 +208,14 @@ RUN_LOOP: o.doneRun <- true break RUN_LOOP + case cfg, opened := <-o.configChan: + if !opened { + return + } + o.config = cfg + o.ReadConfig() + subprocessors.ReloadConfig(&cfg.OutgoingTransformers) + case dm, opened := <-o.inputChan: if !opened { o.LogInfo("input channel closed!") diff --git a/loggers/influxdb.go b/loggers/influxdb.go index ede656bc..4db38055 100644 --- a/loggers/influxdb.go +++ b/loggers/influxdb.go @@ -20,6 +20,7 @@ type InfluxDBClient struct { inputChan chan dnsutils.DnsMessage outputChan chan dnsutils.DnsMessage config *dnsutils.Config + configChan chan *dnsutils.Config logger *logger.Logger influxdbConn influxdb2.Client writeAPI api.WriteAPI @@ -38,6 +39,7 @@ func NewInfluxDBClient(config *dnsutils.Config, logger *logger.Logger, name stri outputChan: make(chan dnsutils.DnsMessage, config.Loggers.InfluxDB.ChannelBufferSize), logger: logger, config: config, + configChan: make(chan *dnsutils.Config), name: name, } @@ -57,13 +59,8 @@ func (o *InfluxDBClient) ReadConfig() { } func (o *InfluxDBClient) ReloadConfig(config *dnsutils.Config) { - o.LogInfo("reload config...") - - // save the new config - o.config = config - - // read again - o.ReadConfig() + o.LogInfo("reload configuration!") + o.configChan <- config } func (o *InfluxDBClient) LogInfo(msg string, v ...interface{}) { @@ -110,6 +107,14 @@ RUN_LOOP: o.doneRun <- true break RUN_LOOP + case cfg, opened := <-o.configChan: + if !opened { + return + } + o.config = cfg + o.ReadConfig() + subprocessors.ReloadConfig(&cfg.OutgoingTransformers) + case dm, opened := <-o.inputChan: if !opened { o.LogInfo("input channel closed!") diff --git a/loggers/kafkaproducer.go b/loggers/kafkaproducer.go index e56a8c37..8744efb8 100644 --- a/loggers/kafkaproducer.go +++ b/loggers/kafkaproducer.go @@ -25,6 +25,7 @@ type KafkaProducer struct { inputChan chan dnsutils.DnsMessage outputChan chan dnsutils.DnsMessage config *dnsutils.Config + configChan chan *dnsutils.Config logger *logger.Logger textFormat []string name string @@ -45,6 +46,7 @@ func NewKafkaProducer(config *dnsutils.Config, logger *logger.Logger, name strin outputChan: make(chan dnsutils.DnsMessage, config.Loggers.KafkaProducer.ChannelBufferSize), logger: logger, config: config, + configChan: make(chan *dnsutils.Config), kafkaReady: make(chan bool), kafkaReconnect: make(chan bool), name: name, @@ -73,13 +75,8 @@ func (o *KafkaProducer) ReadConfig() { } func (o *KafkaProducer) ReloadConfig(config *dnsutils.Config) { - o.LogInfo("reload config...") - - // save the new config - o.config = config - - // read again - o.ReadConfig() + o.LogInfo("reload configuration!") + o.configChan <- config } func (o *KafkaProducer) LogInfo(msg string, v ...interface{}) { @@ -247,6 +244,14 @@ RUN_LOOP: o.doneRun <- true break RUN_LOOP + case cfg, opened := <-o.configChan: + if !opened { + return + } + o.config = cfg + o.ReadConfig() + subprocessors.ReloadConfig(&cfg.OutgoingTransformers) + case dm, opened := <-o.inputChan: if !opened { o.LogInfo("input channel closed!") diff --git a/loggers/logfile.go b/loggers/logfile.go index aa77e382..338126c9 100644 --- a/loggers/logfile.go +++ b/loggers/logfile.go @@ -54,6 +54,7 @@ type LogFile struct { writerPcap *pcapgo.Writer writerDnstap *framestream.Encoder config *dnsutils.Config + configChan chan *dnsutils.Config logger *logger.Logger fileFd *os.File fileSize int64 @@ -76,6 +77,7 @@ func NewLogFile(config *dnsutils.Config, logger *logger.Logger, name string) *Lo inputChan: make(chan dnsutils.DnsMessage, config.Loggers.LogFile.ChannelBufferSize), outputChan: make(chan dnsutils.DnsMessage, config.Loggers.LogFile.ChannelBufferSize), config: config, + configChan: make(chan *dnsutils.Config), logger: logger, name: name, } @@ -116,13 +118,8 @@ func (l *LogFile) ReadConfig() { } func (o *LogFile) ReloadConfig(config *dnsutils.Config) { - o.LogInfo("reload config...") - - // save the new config - o.config = config - - // read again - o.ReadConfig() + o.LogInfo("reload configuration!") + o.configChan <- config } func (l *LogFile) LogInfo(msg string, v ...interface{}) { @@ -477,6 +474,15 @@ RUN_LOOP: subprocessors.Reset() l.doneRun <- true break RUN_LOOP + + case cfg, opened := <-l.configChan: + if !opened { + return + } + l.config = cfg + l.ReadConfig() + subprocessors.ReloadConfig(&cfg.OutgoingTransformers) + case dm, opened := <-l.inputChan: if !opened { l.LogInfo("input channel closed!") diff --git a/loggers/lokiclient.go b/loggers/lokiclient.go index 4f0851cc..95b22a10 100644 --- a/loggers/lokiclient.go +++ b/loggers/lokiclient.go @@ -80,6 +80,7 @@ type LokiClient struct { inputChan chan dnsutils.DnsMessage outputChan chan dnsutils.DnsMessage config *dnsutils.Config + configChan chan *dnsutils.Config logger *logger.Logger httpclient *http.Client textFormat []string @@ -99,6 +100,7 @@ func NewLokiClient(config *dnsutils.Config, logger *logger.Logger, name string) outputChan: make(chan dnsutils.DnsMessage, config.Loggers.LokiClient.ChannelBufferSize), logger: logger, config: config, + configChan: make(chan *dnsutils.Config), streams: make(map[string]*LokiStream), name: name, } @@ -160,13 +162,8 @@ func (o *LokiClient) ReadConfig() { } func (o *LokiClient) ReloadConfig(config *dnsutils.Config) { - o.LogInfo("reload config...") - - // save the new config - o.config = config - - // read again - o.ReadConfig() + o.LogInfo("reload configuration!") + o.configChan <- config } func (o *LokiClient) LogInfo(msg string, v ...interface{}) { @@ -213,6 +210,14 @@ RUN_LOOP: o.doneRun <- true break RUN_LOOP + case cfg, opened := <-o.configChan: + if !opened { + return + } + o.config = cfg + o.ReadConfig() + subprocessors.ReloadConfig(&cfg.OutgoingTransformers) + case dm, opened := <-o.inputChan: if !opened { o.LogInfo("input channel closed!") diff --git a/loggers/prometheus.go b/loggers/prometheus.go index 92c44f57..e9a7382a 100644 --- a/loggers/prometheus.go +++ b/loggers/prometheus.go @@ -169,9 +169,9 @@ type Prometheus struct { inputChan chan dnsutils.DnsMessage outputChan chan dnsutils.DnsMessage config *dnsutils.Config + configChan chan *dnsutils.Config logger *logger.Logger promRegistry *prometheus.Registry - //version string sync.Mutex catalogueLabels []string @@ -724,20 +724,18 @@ func CreateSystemCatalogue(prom *Prometheus) ([]string, *PromCounterCatalogueCon func NewPrometheus(config *dnsutils.Config, logger *logger.Logger, name string) *Prometheus { logger.Info("[%s] logger=prometheus - enabled", name) o := &Prometheus{ - doneApi: make(chan bool), - stopProcess: make(chan bool), - doneProcess: make(chan bool), - stopRun: make(chan bool), - doneRun: make(chan bool), - config: config, - inputChan: make(chan dnsutils.DnsMessage, config.Loggers.Prometheus.ChannelBufferSize), - outputChan: make(chan dnsutils.DnsMessage, config.Loggers.Prometheus.ChannelBufferSize), - logger: logger, - //version: version, - + doneApi: make(chan bool), + stopProcess: make(chan bool), + doneProcess: make(chan bool), + stopRun: make(chan bool), + doneRun: make(chan bool), + config: config, + configChan: make(chan *dnsutils.Config), + inputChan: make(chan dnsutils.DnsMessage, config.Loggers.Prometheus.ChannelBufferSize), + outputChan: make(chan dnsutils.DnsMessage, config.Loggers.Prometheus.ChannelBufferSize), + logger: logger, promRegistry: prometheus.NewPedanticRegistry(), - - name: name, + name: name, } // This will create a catalogue of counters indexed by fileds requested by config @@ -1043,13 +1041,8 @@ func (o *Prometheus) ReadConfig() { } func (o *Prometheus) ReloadConfig(config *dnsutils.Config) { - o.LogInfo("reload config...") - - // save the new config - o.config = config - - // read again - o.ReadConfig() + o.LogInfo("reload configuration!") + o.configChan <- config } func (o *Prometheus) LogInfo(msg string, v ...interface{}) { @@ -1186,6 +1179,15 @@ RUN_LOOP: subprocessors.Reset() s.doneRun <- true break RUN_LOOP + + case cfg, opened := <-s.configChan: + if !opened { + return + } + s.config = cfg + s.ReadConfig() + subprocessors.ReloadConfig(&cfg.OutgoingTransformers) + case dm, opened := <-s.inputChan: if !opened { s.LogInfo("input channel closed!") diff --git a/loggers/redispub.go b/loggers/redispub.go index 6d3b06ca..3907bd01 100644 --- a/loggers/redispub.go +++ b/loggers/redispub.go @@ -26,6 +26,7 @@ type RedisPub struct { inputChan chan dnsutils.DnsMessage outputChan chan dnsutils.DnsMessage config *dnsutils.Config + configChan chan *dnsutils.Config logger *logger.Logger textFormat []string name string @@ -49,6 +50,7 @@ func NewRedisPub(config *dnsutils.Config, logger *logger.Logger, name string) *R transportReconnect: make(chan bool), logger: logger, config: config, + configChan: make(chan *dnsutils.Config), name: name, } @@ -75,13 +77,8 @@ func (o *RedisPub) ReadConfig() { } func (o *RedisPub) ReloadConfig(config *dnsutils.Config) { - o.LogInfo("reload config...") - - // save the new config - o.config = config - - // read again - o.ReadConfig() + o.LogInfo("reload configuration!") + o.configChan <- config } func (o *RedisPub) LogInfo(msg string, v ...interface{}) { @@ -263,6 +260,14 @@ RUN_LOOP: o.doneRun <- true break RUN_LOOP + case cfg, opened := <-o.configChan: + if !opened { + return + } + o.config = cfg + o.ReadConfig() + subprocessors.ReloadConfig(&cfg.OutgoingTransformers) + case dm, opened := <-o.inputChan: if !opened { o.LogInfo("input channel closed!") diff --git a/loggers/restapi.go b/loggers/restapi.go index 5e726528..0825b221 100644 --- a/loggers/restapi.go +++ b/loggers/restapi.go @@ -53,6 +53,7 @@ type RestAPI struct { httpserver net.Listener httpmux *http.ServeMux config *dnsutils.Config + configChan chan *dnsutils.Config logger *logger.Logger name string @@ -79,6 +80,7 @@ func NewRestAPI(config *dnsutils.Config, logger *logger.Logger, name string) *Re stopRun: make(chan bool), doneRun: make(chan bool), config: config, + configChan: make(chan *dnsutils.Config), inputChan: make(chan dnsutils.DnsMessage, config.Loggers.RestAPI.ChannelBufferSize), outputChan: make(chan dnsutils.DnsMessage, config.Loggers.RestAPI.ChannelBufferSize), logger: logger, @@ -118,13 +120,8 @@ func (o *RestAPI) ReadConfig() { } func (o *RestAPI) ReloadConfig(config *dnsutils.Config) { - o.LogInfo("reload config...") - - // save the new config - o.config = config - - // read again - o.ReadConfig() + o.LogInfo("reload configuration!") + o.configChan <- config } func (o *RestAPI) LogInfo(msg string, v ...interface{}) { @@ -719,6 +716,15 @@ RUN_LOOP: subprocessors.Reset() s.doneRun <- true break RUN_LOOP + + case cfg, opened := <-s.configChan: + if !opened { + return + } + s.config = cfg + s.ReadConfig() + subprocessors.ReloadConfig(&cfg.OutgoingTransformers) + case dm, opened := <-s.inputChan: if !opened { s.LogInfo("input channel closed!") diff --git a/loggers/scalyr.go b/loggers/scalyr.go index 784743dd..b849d1e9 100644 --- a/loggers/scalyr.go +++ b/loggers/scalyr.go @@ -34,6 +34,7 @@ type ScalyrClient struct { logger *logger.Logger name string config *dnsutils.Config + configChan chan *dnsutils.Config mode string textFormat []string @@ -62,6 +63,7 @@ func NewScalyrClient(config *dnsutils.Config, console *logger.Logger, name strin logger: console, name: name, config: config, + configChan: make(chan *dnsutils.Config), mode: dnsutils.MODE_TEXT, endpoint: makeEndpoint("app.scalyr.com"), @@ -143,13 +145,8 @@ func (c *ScalyrClient) ReadConfig() { } func (o *ScalyrClient) ReloadConfig(config *dnsutils.Config) { - o.LogInfo("reload config...") - - // save the new config - o.config = config - - // read again - o.ReadConfig() + o.LogInfo("reload configuration!") + o.configChan <- config } func (o *ScalyrClient) Run() { @@ -174,6 +171,14 @@ RUN_LOOP: o.doneRun <- true break RUN_LOOP + case cfg, opened := <-o.configChan: + if !opened { + return + } + o.config = cfg + o.ReadConfig() + subprocessors.ReloadConfig(&cfg.OutgoingTransformers) + case dm, opened := <-o.inputChan: if !opened { o.LogInfo("input channel closed!") diff --git a/loggers/statsd.go b/loggers/statsd.go index e8cfc73d..39522153 100644 --- a/loggers/statsd.go +++ b/loggers/statsd.go @@ -49,6 +49,7 @@ type StatsdClient struct { inputChan chan dnsutils.DnsMessage outputChan chan dnsutils.DnsMessage config *dnsutils.Config + configChan chan *dnsutils.Config logger *logger.Logger name string @@ -68,6 +69,7 @@ func NewStatsdClient(config *dnsutils.Config, logger *logger.Logger, name string outputChan: make(chan dnsutils.DnsMessage, config.Loggers.Statsd.ChannelBufferSize), logger: logger, config: config, + configChan: make(chan *dnsutils.Config), name: name, Stats: StreamStats{Streams: make(map[string]*StatsPerStream)}, } @@ -89,13 +91,8 @@ func (o *StatsdClient) ReadConfig() { } func (o *StatsdClient) ReloadConfig(config *dnsutils.Config) { - o.LogInfo("reload config...") - - // save the new config - o.config = config - - // read again - o.ReadConfig() + o.LogInfo("reload configuration!") + o.configChan <- config } func (o *StatsdClient) LogInfo(msg string, v ...interface{}) { @@ -255,6 +252,14 @@ RUN_LOOP: o.doneRun <- true break RUN_LOOP + case cfg, opened := <-o.configChan: + if !opened { + return + } + o.config = cfg + o.ReadConfig() + subprocessors.ReloadConfig(&cfg.OutgoingTransformers) + case dm, opened := <-o.inputChan: if !opened { o.LogInfo("input channel closed!") diff --git a/loggers/stdout.go b/loggers/stdout.go index 21cd05b2..120ce1ab 100644 --- a/loggers/stdout.go +++ b/loggers/stdout.go @@ -37,6 +37,7 @@ type StdOut struct { outputChan chan dnsutils.DnsMessage textFormat []string config *dnsutils.Config + configChan chan *dnsutils.Config logger *logger.Logger stdout *log.Logger name string @@ -53,6 +54,7 @@ func NewStdOut(config *dnsutils.Config, console *logger.Logger, name string) *St outputChan: make(chan dnsutils.DnsMessage, config.Loggers.Stdout.ChannelBufferSize), logger: console, config: config, + configChan: make(chan *dnsutils.Config), stdout: log.New(os.Stdout, "", 0), name: name, } @@ -76,13 +78,8 @@ func (c *StdOut) ReadConfig() { } func (o *StdOut) ReloadConfig(config *dnsutils.Config) { - o.LogInfo("reload config...") - - // save the new config - o.config = config - - // read again - o.ReadConfig() + o.LogInfo("reload configuration!") + o.configChan <- config } func (c *StdOut) LogInfo(msg string, v ...interface{}) { @@ -131,6 +128,16 @@ RUN_LOOP: subprocessors.Reset() o.doneRun <- true break RUN_LOOP + + // new config provided? + case cfg, opened := <-o.configChan: + if !opened { + return + } + o.config = cfg + o.ReadConfig() + subprocessors.ReloadConfig(&cfg.OutgoingTransformers) + case dm, opened := <-o.inputChan: if !opened { o.LogInfo("input channel closed!") diff --git a/loggers/syslog.go b/loggers/syslog.go index 829c9252..075d30e4 100644 --- a/loggers/syslog.go +++ b/loggers/syslog.go @@ -60,6 +60,7 @@ type Syslog struct { inputChan chan dnsutils.DnsMessage outputChan chan dnsutils.DnsMessage config *dnsutils.Config + configChan chan *dnsutils.Config logger *logger.Logger severity syslog.Priority facility syslog.Priority @@ -84,6 +85,7 @@ func NewSyslog(config *dnsutils.Config, console *logger.Logger, name string) *Sy transportReconnect: make(chan bool), logger: console, config: config, + configChan: make(chan *dnsutils.Config), name: name, } o.ReadConfig() @@ -122,13 +124,8 @@ func (c *Syslog) ReadConfig() { } func (o *Syslog) ReloadConfig(config *dnsutils.Config) { - o.LogInfo("reload config...") - - // save the new config - o.config = config - - // read again - o.ReadConfig() + o.LogInfo("reload configuration!") + o.configChan <- config } func (o *Syslog) Channel() chan dnsutils.DnsMessage { @@ -244,6 +241,15 @@ RUN_LOOP: o.doneRun <- true break RUN_LOOP + // new config provided? + case cfg, opened := <-o.configChan: + if !opened { + return + } + o.config = cfg + o.ReadConfig() + subprocessors.ReloadConfig(&cfg.OutgoingTransformers) + case dm, opened := <-o.inputChan: if !opened { o.LogInfo("input channel closed!") diff --git a/loggers/tcpclient.go b/loggers/tcpclient.go index 7fac6460..16d1bd27 100644 --- a/loggers/tcpclient.go +++ b/loggers/tcpclient.go @@ -22,6 +22,7 @@ type TcpClient struct { inputChan chan dnsutils.DnsMessage outputChan chan dnsutils.DnsMessage config *dnsutils.Config + configChan chan *dnsutils.Config logger *logger.Logger textFormat []string name string @@ -45,6 +46,7 @@ func NewTcpClient(config *dnsutils.Config, logger *logger.Logger, name string) * transportReconnect: make(chan bool), logger: logger, config: config, + configChan: make(chan *dnsutils.Config), name: name, } @@ -70,13 +72,8 @@ func (o *TcpClient) ReadConfig() { } func (o *TcpClient) ReloadConfig(config *dnsutils.Config) { - o.LogInfo("reload config...") - - // save the new config - o.config = config - - // read again - o.ReadConfig() + o.LogInfo("reload configuration!") + o.configChan <- config } func (o *TcpClient) LogInfo(msg string, v ...interface{}) { @@ -220,6 +217,14 @@ RUN_LOOP: o.doneRun <- true break RUN_LOOP + case cfg, opened := <-o.configChan: + if !opened { + return + } + o.config = cfg + o.ReadConfig() + subprocessors.ReloadConfig(&cfg.OutgoingTransformers) + case dm, opened := <-o.inputChan: if !opened { o.LogInfo("input channel closed!") diff --git a/collectors/dns_processor.go b/processors/dns.go similarity index 78% rename from collectors/dns_processor.go rename to processors/dns.go index 4af968ea..68bda37b 100644 --- a/collectors/dns_processor.go +++ b/processors/dns.go @@ -1,4 +1,4 @@ -package collectors +package processors import ( "fmt" @@ -17,49 +17,37 @@ func GetFakeDns() ([]byte, error) { } type DnsProcessor struct { - doneRun chan bool - stopRun chan bool - doneMonitor chan bool - stopMonitor chan bool - recvFrom chan dnsutils.DnsMessage - logger *logger.Logger - config *dnsutils.Config - configTransformers chan *dnsutils.ConfigTransformers - name string - dropped chan string - droppedCount map[string]int + doneRun chan bool + stopRun chan bool + doneMonitor chan bool + stopMonitor chan bool + recvFrom chan dnsutils.DnsMessage + logger *logger.Logger + config *dnsutils.Config + ConfigChan chan *dnsutils.Config + name string + dropped chan string + droppedCount map[string]int } func NewDnsProcessor(config *dnsutils.Config, logger *logger.Logger, name string, size int) DnsProcessor { logger.Info("[%s] processor=dns - initialization...", name) d := DnsProcessor{ - doneMonitor: make(chan bool), - doneRun: make(chan bool), - stopMonitor: make(chan bool), - stopRun: make(chan bool), - recvFrom: make(chan dnsutils.DnsMessage, size), - logger: logger, - config: config, - configTransformers: make(chan *dnsutils.ConfigTransformers), - name: name, - dropped: make(chan string), - droppedCount: map[string]int{}, + doneMonitor: make(chan bool), + doneRun: make(chan bool), + stopMonitor: make(chan bool), + stopRun: make(chan bool), + recvFrom: make(chan dnsutils.DnsMessage, size), + logger: logger, + config: config, + ConfigChan: make(chan *dnsutils.Config), + name: name, + dropped: make(chan string), + droppedCount: map[string]int{}, } - - d.ReadConfig() - return d } -func (d *DnsProcessor) ReadConfig() {} - -func (d *DnsProcessor) ReloadConfig(config *dnsutils.Config) { - d.config = config - d.ReadConfig() - - d.configTransformers <- &config.IngoingTransformers -} - func (c *DnsProcessor) LogInfo(msg string, v ...interface{}) { c.logger.Info("["+c.name+"] processor=dns - "+msg, v...) } @@ -134,12 +122,13 @@ func (d *DnsProcessor) Run(loggersChannel []chan dnsutils.DnsMessage, loggersNam RUN_LOOP: for { select { - case cfg := <-d.configTransformers: - transforms.ReloadConfig(cfg) + case cfg := <-d.ConfigChan: + d.config = cfg + transforms.ReloadConfig(&cfg.IngoingTransformers) case <-d.stopRun: transforms.Reset() - close(d.recvFrom) + //close(d.recvFrom) d.doneRun <- true break RUN_LOOP diff --git a/collectors/dnstap_processor.go b/processors/dnstap.go similarity index 82% rename from collectors/dnstap_processor.go rename to processors/dnstap.go index 34657516..70d8956b 100644 --- a/collectors/dnstap_processor.go +++ b/processors/dnstap.go @@ -1,4 +1,4 @@ -package collectors +package processors import ( "fmt" @@ -49,71 +49,59 @@ func GetFakeDnstap(dnsquery []byte) *dnstap.Dnstap { } type DnstapProcessor struct { - connId int - doneRun chan bool - stopRun chan bool - doneMonitor chan bool - stopMonitor chan bool - recvFrom chan []byte - logger *logger.Logger - config *dnsutils.Config - configTransformers chan *dnsutils.ConfigTransformers - name string - chanSize int - dropped chan string - droppedCount map[string]int + ConnId int + doneRun chan bool + stopRun chan bool + doneMonitor chan bool + stopMonitor chan bool + recvFrom chan []byte + logger *logger.Logger + config *dnsutils.Config + ConfigChan chan *dnsutils.Config + name string + chanSize int + dropped chan string + droppedCount map[string]int } func NewDnstapProcessor(connId int, config *dnsutils.Config, logger *logger.Logger, name string, size int) DnstapProcessor { logger.Info("[%s] processor=dnstap#%d - initialization...", name, connId) d := DnstapProcessor{ - connId: connId, - doneMonitor: make(chan bool), - doneRun: make(chan bool), - stopMonitor: make(chan bool), - stopRun: make(chan bool), - recvFrom: make(chan []byte, size), - chanSize: size, - logger: logger, - config: config, - configTransformers: make(chan *dnsutils.ConfigTransformers), - name: name, - dropped: make(chan string), - droppedCount: map[string]int{}, + ConnId: connId, + doneMonitor: make(chan bool), + doneRun: make(chan bool), + stopMonitor: make(chan bool), + stopRun: make(chan bool), + recvFrom: make(chan []byte, size), + chanSize: size, + logger: logger, + config: config, + ConfigChan: make(chan *dnsutils.Config), + name: name, + dropped: make(chan string), + droppedCount: map[string]int{}, } - d.ReadConfig() return d } -func (d *DnstapProcessor) ReadConfig() { - // todo - checking settings -} - -func (d *DnstapProcessor) ReloadConfig(config *dnsutils.Config) { - d.config = config - d.ReadConfig() - - d.configTransformers <- &config.IngoingTransformers -} - func (c *DnstapProcessor) LogInfo(msg string, v ...interface{}) { var log string - if c.connId == 0 { + if c.ConnId == 0 { log = fmt.Sprintf("[%s] processor=dnstap - ", c.name) } else { - log = fmt.Sprintf("[%s] processor=dnstap#%d - ", c.name, c.connId) + log = fmt.Sprintf("[%s] processor=dnstap#%d - ", c.name, c.ConnId) } c.logger.Info(log+msg, v...) } func (c *DnstapProcessor) LogError(msg string, v ...interface{}) { var log string - if c.connId == 0 { + if c.ConnId == 0 { log = fmt.Sprintf("[%s] processor=dnstap - ", c.name) } else { - log = fmt.Sprintf("[%s] processor=dnstap#%d - ", c.name, c.connId) + log = fmt.Sprintf("[%s] processor=dnstap#%d - ", c.name, c.ConnId) } c.logger.Error(log+msg, v...) } @@ -169,7 +157,7 @@ func (d *DnstapProcessor) Run(loggersChannel []chan dnsutils.DnsMessage, loggers dt := &dnstap.Dnstap{} // prepare enabled transformers - transforms := transformers.NewTransforms(&d.config.IngoingTransformers, d.logger, d.name, loggersChannel, d.connId) + transforms := transformers.NewTransforms(&d.config.IngoingTransformers, d.logger, d.name, loggersChannel, d.ConnId) // start goroutine to count dropped messsages go d.MonitorLoggers() @@ -179,8 +167,9 @@ func (d *DnstapProcessor) Run(loggersChannel []chan dnsutils.DnsMessage, loggers RUN_LOOP: for { select { - case cfg := <-d.configTransformers: - transforms.ReloadConfig(cfg) + case cfg := <-d.ConfigChan: + d.config = cfg + transforms.ReloadConfig(&cfg.IngoingTransformers) case <-d.stopRun: transforms.Reset() diff --git a/collectors/dnstap_processor_test.go b/processors/dnstap_test.go similarity index 99% rename from collectors/dnstap_processor_test.go rename to processors/dnstap_test.go index 9f8bd609..30c7a999 100644 --- a/collectors/dnstap_processor_test.go +++ b/processors/dnstap_test.go @@ -1,4 +1,4 @@ -package collectors +package processors import ( "bytes" diff --git a/collectors/powerdns_processor.go b/processors/powerdns.go similarity index 86% rename from collectors/powerdns_processor.go rename to processors/powerdns.go index 40cd9789..5ac7e1ac 100644 --- a/collectors/powerdns_processor.go +++ b/processors/powerdns.go @@ -1,4 +1,4 @@ -package collectors +package processors import ( "fmt" @@ -25,71 +25,57 @@ var ( ) type PdnsProcessor struct { - connId int - doneRun chan bool - stopRun chan bool - doneMonitor chan bool - stopMonitor chan bool - recvFrom chan []byte - logger *logger.Logger - config *dnsutils.Config - configTransformers chan *dnsutils.ConfigTransformers - name string - chanSize int - dropped chan string - droppedCount map[string]int + ConnId int + doneRun chan bool + stopRun chan bool + doneMonitor chan bool + stopMonitor chan bool + recvFrom chan []byte + logger *logger.Logger + config *dnsutils.Config + ConfigChan chan *dnsutils.Config + name string + chanSize int + dropped chan string + droppedCount map[string]int } func NewPdnsProcessor(connId int, config *dnsutils.Config, logger *logger.Logger, name string, size int) PdnsProcessor { logger.Info("[%s] processor=pdns#%d - initialization...", name, connId) d := PdnsProcessor{ - connId: connId, - doneMonitor: make(chan bool), - doneRun: make(chan bool), - stopMonitor: make(chan bool), - stopRun: make(chan bool), - recvFrom: make(chan []byte, size), - chanSize: size, - logger: logger, - config: config, - configTransformers: make(chan *dnsutils.ConfigTransformers), - name: name, - dropped: make(chan string), - droppedCount: map[string]int{}, + ConnId: connId, + doneMonitor: make(chan bool), + doneRun: make(chan bool), + stopMonitor: make(chan bool), + stopRun: make(chan bool), + recvFrom: make(chan []byte, size), + chanSize: size, + logger: logger, + config: config, + ConfigChan: make(chan *dnsutils.Config), + name: name, + dropped: make(chan string), + droppedCount: map[string]int{}, } - - d.ReadConfig() - return d } -func (c *PdnsProcessor) ReadConfig() { - // nothing to read -} - -func (d *PdnsProcessor) ReloadConfig(config *dnsutils.Config) { - d.config = config - d.ReadConfig() - - d.configTransformers <- &config.IngoingTransformers -} - func (c *PdnsProcessor) LogInfo(msg string, v ...interface{}) { var log string - if c.connId == 0 { + if c.ConnId == 0 { log = fmt.Sprintf("[%s] processor=powerdns - ", c.name) } else { - log = fmt.Sprintf("[%s] processor=powerdns#%d - ", c.name, c.connId) + log = fmt.Sprintf("[%s] processor=powerdns#%d - ", c.name, c.ConnId) } c.logger.Info(log+msg, v...) } func (c *PdnsProcessor) LogError(msg string, v ...interface{}) { var log string - if c.connId == 0 { + if c.ConnId == 0 { log = fmt.Sprintf("[%s] processor=powerdns - ", c.name) } else { - log = fmt.Sprintf("[%s] processor=powerdns#%d - ", c.name, c.connId) + log = fmt.Sprintf("[%s] processor=powerdns#%d - ", c.name, c.ConnId) } c.logger.Error(log+msg, v...) } @@ -146,7 +132,7 @@ func (d *PdnsProcessor) Run(loggersChannel []chan dnsutils.DnsMessage, loggersNa pbdm := &powerdns_protobuf.PBDNSMessage{} // prepare enabled transformers - transforms := transformers.NewTransforms(&d.config.IngoingTransformers, d.logger, d.name, loggersChannel, d.connId) + transforms := transformers.NewTransforms(&d.config.IngoingTransformers, d.logger, d.name, loggersChannel, d.ConnId) // start goroutine to count dropped messsages go d.MonitorLoggers() @@ -156,8 +142,9 @@ func (d *PdnsProcessor) Run(loggersChannel []chan dnsutils.DnsMessage, loggersNa RUN_LOOP: for { select { - case cfg := <-d.configTransformers: - transforms.ReloadConfig(cfg) + case cfg := <-d.ConfigChan: + d.config = cfg + transforms.ReloadConfig(&cfg.IngoingTransformers) case <-d.stopRun: transforms.Reset() diff --git a/collectors/powerdns_processor_test.go b/processors/powerdns_test.go similarity index 99% rename from collectors/powerdns_processor_test.go rename to processors/powerdns_test.go index 41b205ba..1dee486e 100644 --- a/collectors/powerdns_processor_test.go +++ b/processors/powerdns_test.go @@ -1,4 +1,4 @@ -package collectors +package processors import ( "testing"