From 53cfdf3adcf61ce3a2996d20a9074e38d6fc16ca Mon Sep 17 00:00:00 2001 From: Denis Machard <5562930+dmachard@users.noreply.github.com> Date: Thu, 9 May 2024 12:09:02 +0200 Subject: [PATCH] refacto: new workers package (#702) * new workers package * update the docs * remove processors package --- .github/workflows/testing-go.yml | 11 +- Makefile | 4 +- collectors/dnstap.go | 238 --------------- collectors/dnstap_test.go | 199 ------------ collectors/powerdns.go | 176 ----------- collectors/powerdns_test.go | 27 -- dnsutils/message.go | 3 +- docs/development.md | 283 +++--------------- pkgconfig/constants.go | 1 + pkgconfig/loggers.go | 2 +- pkglinker/multiplexer.go | 55 ++-- pkglinker/pipelines.go | 55 ++-- {loggers => workers}/clickhouse.go | 2 +- {loggers => workers}/clickhouse_test.go | 4 +- {processors => workers}/constants.go | 2 +- {loggers => workers}/devnull.go | 2 +- {collectors => workers}/dnsmessage.go | 2 +- {collectors => workers}/dnsmessage_test.go | 11 +- processors/dns.go => workers/dnsprocessor.go | 2 +- .../dnsprocessor_test.go | 2 +- {collectors => workers}/dnstap_relay.go | 2 +- {collectors => workers}/dnstap_relay_test.go | 5 +- {loggers => workers}/dnstapclient.go | 2 +- {loggers => workers}/dnstapclient_test.go | 2 +- .../dnstap.go => workers/dnstapserver.go | 228 +++++++++++++- .../dnstapserver_test.go | 186 +++++++++++- {loggers => workers}/elasticsearch.go | 2 +- {loggers => workers}/elasticsearch_test.go | 2 +- {loggers => workers}/falco.go | 2 +- {loggers => workers}/falco_test.go | 2 +- {collectors => workers}/file_ingestor.go | 11 +- {collectors => workers}/file_ingestor_test.go | 2 +- {collectors => workers}/file_tail.go | 2 +- {collectors => workers}/file_tail_test.go | 2 +- {loggers => workers}/fluentd.go | 2 +- {loggers => workers}/fluentd_test.go | 4 +- {loggers => workers}/influxdb.go | 2 +- {loggers => workers}/influxdb_test.go | 2 +- {loggers => workers}/kafkaproducer.go | 2 +- {loggers => workers}/kafkaproducer_test.go | 2 +- {loggers => workers}/logfile.go | 6 +- {loggers => workers}/logfile_test.go | 2 +- {loggers => workers}/lokiclient.go | 2 +- {loggers => workers}/lokiclient_test.go | 2 +- {processors => workers}/powerdns.go | 165 +++++++++- {processors => workers}/powerdns_test.go | 19 +- {loggers => workers}/prometheus.go | 2 +- {loggers => workers}/prometheus_test.go | 2 +- {loggers => workers}/redispub.go | 2 +- {loggers => workers}/redispub_test.go | 2 +- {loggers => workers}/restapi.go | 2 +- {loggers => workers}/restapi_test.go | 2 +- {loggers => workers}/scalyr.go | 2 +- {collectors => workers}/sniffer_afpacket.go | 2 +- .../sniffer_afpacket_linux.go | 5 +- .../sniffer_afpacket_test.go | 6 +- {collectors => workers}/sniffer_xdp.go | 5 +- .../sniffer_xdp_windows.go | 2 +- {loggers => workers}/statsd.go | 2 +- {loggers => workers}/statsd_test.go | 2 +- {loggers => workers}/stdout.go | 2 +- {loggers => workers}/stdout_test.go | 15 +- {loggers => workers}/syslog.go | 2 +- {loggers => workers}/syslog_test.go | 2 +- {loggers => workers}/tcpclient.go | 2 +- {loggers => workers}/tcpclient_test.go | 2 +- {collectors => workers}/tzsp.go | 2 +- {collectors => workers}/tzsp_linux.go | 5 +- 68 files changed, 779 insertions(+), 1032 deletions(-) delete mode 100644 collectors/dnstap.go delete mode 100644 collectors/dnstap_test.go delete mode 100644 collectors/powerdns.go delete mode 100644 collectors/powerdns_test.go rename {loggers => workers}/clickhouse.go (99%) rename {loggers => workers}/clickhouse_test.go (96%) rename {processors => workers}/constants.go (92%) rename {loggers => workers}/devnull.go (98%) rename {collectors => workers}/dnsmessage.go (99%) rename {collectors => workers}/dnsmessage_test.go (84%) rename processors/dns.go => workers/dnsprocessor.go (99%) rename processors/dns_test.go => workers/dnsprocessor_test.go (99%) rename {collectors => workers}/dnstap_relay.go (99%) rename {collectors => workers}/dnstap_relay_test.go (95%) rename {loggers => workers}/dnstapclient.go (99%) rename {loggers => workers}/dnstapclient_test.go (99%) rename processors/dnstap.go => workers/dnstapserver.go (64%) rename processors/dnstap_test.go => workers/dnstapserver_test.go (67%) rename {loggers => workers}/elasticsearch.go (99%) rename {loggers => workers}/elasticsearch_test.go (99%) rename {loggers => workers}/falco.go (99%) rename {loggers => workers}/falco_test.go (98%) rename {collectors => workers}/file_ingestor.go (95%) rename {collectors => workers}/file_ingestor_test.go (97%) rename {collectors => workers}/file_tail.go (99%) rename {collectors => workers}/file_tail_test.go (98%) rename {loggers => workers}/fluentd.go (99%) rename {loggers => workers}/fluentd_test.go (98%) rename {loggers => workers}/influxdb.go (99%) rename {loggers => workers}/influxdb_test.go (98%) rename {loggers => workers}/kafkaproducer.go (99%) rename {loggers => workers}/kafkaproducer_test.go (99%) rename {loggers => workers}/logfile.go (99%) rename {loggers => workers}/logfile_test.go (99%) rename {loggers => workers}/lokiclient.go (99%) rename {loggers => workers}/lokiclient_test.go (99%) rename {processors => workers}/powerdns.go (72%) rename {processors => workers}/powerdns_test.go (94%) rename {loggers => workers}/prometheus.go (99%) rename {loggers => workers}/prometheus_test.go (99%) rename {loggers => workers}/redispub.go (99%) rename {loggers => workers}/redispub_test.go (99%) rename {loggers => workers}/restapi.go (99%) rename {loggers => workers}/restapi_test.go (99%) rename {loggers => workers}/scalyr.go (99%) rename {collectors => workers}/sniffer_afpacket.go (97%) rename {collectors => workers}/sniffer_afpacket_linux.go (97%) rename {collectors => workers}/sniffer_afpacket_test.go (89%) rename {collectors => workers}/sniffer_xdp.go (96%) rename {collectors => workers}/sniffer_xdp_windows.go (97%) rename {loggers => workers}/statsd.go (99%) rename {loggers => workers}/statsd_test.go (98%) rename {loggers => workers}/stdout.go (99%) rename {loggers => workers}/stdout_test.go (94%) rename {loggers => workers}/syslog.go (99%) rename {loggers => workers}/syslog_test.go (99%) rename {loggers => workers}/tcpclient.go (99%) rename {loggers => workers}/tcpclient_test.go (99%) rename {collectors => workers}/tzsp.go (97%) rename {collectors => workers}/tzsp_linux.go (96%) diff --git a/.github/workflows/testing-go.yml b/.github/workflows/testing-go.yml index 92901f58..1baa4b3c 100644 --- a/.github/workflows/testing-go.yml +++ b/.github/workflows/testing-go.yml @@ -29,14 +29,9 @@ jobs: - 'pkglinker' - 'pkgutils' - 'dnsutils' - - 'collectors' - - 'loggers' + - 'workers' - 'transformers' - 'netutils' - - 'processors' - # exclude: - # - os-version: macos-latest - # go-version: '1.20' runs-on: ${{ matrix.os-version }} @@ -64,7 +59,7 @@ jobs: sudo go version - name: Test ${{ matrix.package }} - run: sudo go test -timeout 120s ./${{ matrix.package }}/ -race -cover -v + run: sudo go test -timeout 240s ./${{ matrix.package }}/ -race -cover -v int: runs-on: ubuntu-22.04 @@ -148,7 +143,7 @@ jobs: - id: count_tests run: | - data=$(sudo go test -timeout 360s -v ./collectors ./processors ./dnsutils ./netutils ./loggers ./transformers ./pkgconfig ./pkglinker ./pkgutils ././ 2>&1 | grep -c RUN) + data=$(sudo go test -timeout 360s -v ./workers ./dnsutils ./netutils ./transformers ./pkgconfig ./pkglinker ./pkgutils ././ 2>&1 | grep -c RUN) echo "Count of Tests: $data" echo "data=$data" >> $GITHUB_OUTPUT diff --git a/Makefile b/Makefile index 1b7ab5e9..4c15b90e 100644 --- a/Makefile +++ b/Makefile @@ -74,9 +74,7 @@ tests: check-go @go test ./netutils/ -race -cover -v @go test -timeout 90s ./dnsutils/ -race -cover -v @go test -timeout 90s ./transformers/ -race -cover -v - @go test -timeout 90s ./collectors/ -race -cover -v - @go test -timeout 90s ./loggers/ -race -cover -v - @go test -timeout 90s ./processors/ -race -cover -v + @go test -timeout 180s ./workers/ -race -cover -v # Cleans the project using go clean. clean: check-go diff --git a/collectors/dnstap.go b/collectors/dnstap.go deleted file mode 100644 index 2c87620f..00000000 --- a/collectors/dnstap.go +++ /dev/null @@ -1,238 +0,0 @@ -package collectors - -import ( - "bufio" - "encoding/binary" - "errors" - "io" - "net" - "sync" - "sync/atomic" - "time" - - "github.com/dmachard/go-dnscollector/netutils" - "github.com/dmachard/go-dnscollector/pkgconfig" - "github.com/dmachard/go-dnscollector/pkgutils" - "github.com/dmachard/go-dnscollector/processors" - "github.com/dmachard/go-framestream" - "github.com/dmachard/go-logger" - "github.com/segmentio/kafka-go/compress" -) - -type Dnstap struct { - *pkgutils.GenericWorker - connCounter uint64 -} - -func NewDnstap(next []pkgutils.Worker, config *pkgconfig.Config, logger *logger.Logger, name string) *Dnstap { - s := &Dnstap{GenericWorker: pkgutils.NewGenericWorker(config, logger, name, "dnstap", pkgutils.DefaultBufferSize)} - s.SetDefaultRoutes(next) - s.CheckConfig() - return s -} - -func (w *Dnstap) CheckConfig() { - if !pkgconfig.IsValidTLS(w.GetConfig().Collectors.Dnstap.TLSMinVersion) { - w.LogFatal(pkgutils.PrefixLogCollector + "[" + w.GetName() + "] dnstap - invalid tls min version") - } -} - -func (w *Dnstap) HandleConn(conn net.Conn, connID uint64, forceClose chan bool, wg *sync.WaitGroup) { - // close connection on function exit - defer func() { - w.LogInfo("conn #%d - connection handler terminated", connID) - netutils.Close(conn, w.GetConfig().Collectors.Dnstap.ResetConn) - wg.Done() - }() - - // get peer address - peer := conn.RemoteAddr().String() - peerName := netutils.GetPeerName(peer) - w.LogInfo("new connection #%d from %s (%s)", connID, peer, peerName) - - // start dnstap processor and run it - dnstapProcessor := processors.NewDNSTapProcessor(int(connID), peerName, w.GetConfig(), w.GetLogger(), w.GetName(), w.GetConfig().Collectors.Dnstap.ChannelBufferSize) - go dnstapProcessor.Run(w.GetDefaultRoutes(), w.GetDroppedRoutes()) - - // init frame stream library - fsReader := bufio.NewReader(conn) - fsWriter := bufio.NewWriter(conn) - fs := framestream.NewFstrm(fsReader, fsWriter, conn, 5*time.Second, []byte("protobuf:dnstap.Dnstap"), true) - - // framestream as receiver - if err := fs.InitReceiver(); err != nil { - w.LogError("conn #%d - stream initialization: %s", connID, err) - } else { - w.LogInfo("conn #%d - receiver framestream initialized", connID) - } - - // process incoming frame and send it to dnstap consumer channel - var err error - var frame *framestream.Frame - cleanup := make(chan struct{}) - - // goroutine to close the connection properly - go func() { - defer func() { - dnstapProcessor.Stop() - w.LogInfo("conn #%d - cleanup connection handler terminated", connID) - }() - - for { - select { - case <-forceClose: - w.LogInfo("conn #%d - force to cleanup the connection handler", connID) - netutils.Close(conn, w.GetConfig().Collectors.Dnstap.ResetConn) - return - case <-cleanup: - w.LogInfo("conn #%d - cleanup the connection handler", connID) - return - } - } - }() - - // handle incoming frame - for { - if w.GetConfig().Collectors.Dnstap.Compression == pkgconfig.CompressNone { - frame, err = fs.RecvFrame(false) - } else { - frame, err = fs.RecvCompressedFrame(&compress.GzipCodec, false) - } - if err != nil { - connClosed := false - - var opErr *net.OpError - if errors.As(err, &opErr) { - if errors.Is(opErr, net.ErrClosed) { - connClosed = true - } - } - if errors.Is(err, io.EOF) { - connClosed = true - } - - if connClosed { - w.LogInfo("conn #%d - connection closed with peer %s", connID, peer) - } else { - w.LogError("conn #%d - framestream reader error: %s", connID, err) - } - // exit goroutine - close(cleanup) - break - } - - if frame.IsControl() { - if err := fs.ResetReceiver(frame); err != nil { - if errors.Is(err, io.EOF) { - w.LogInfo("conn #%d - framestream reseted by sender", connID) - } else { - w.LogError("conn #%d - unexpected control framestream: %s", connID, err) - } - - } - - // exit goroutine - close(cleanup) - break - } - - if w.GetConfig().Collectors.Dnstap.Compression == pkgconfig.CompressNone { - // send payload to the channel - select { - case dnstapProcessor.GetChannel() <- frame.Data(): // Successful send to channel - default: - w.ProcessorIsBusy() - } - } else { - // ignore first 4 bytes - data := frame.Data()[4:] - validFrame := true - for len(data) >= 4 { - // get frame size - payloadSize := binary.BigEndian.Uint32(data[:4]) - data = data[4:] - - // enough next data ? - if uint32(len(data)) < payloadSize { - validFrame = false - break - } - // send payload to the channel - select { - case dnstapProcessor.GetChannel() <- data[:payloadSize]: // Successful send to channel - default: - w.ProcessorIsBusy() - } - - // continue for next - data = data[payloadSize:] - } - if !validFrame { - w.LogError("conn #%d - invalid compressed frame received", connID) - continue - } - } - } -} - -func (w *Dnstap) StartCollect() { - w.LogInfo("worker is starting collection") - defer w.CollectDone() - - var connWG sync.WaitGroup - connCleanup := make(chan bool) - cfg := w.GetConfig().Collectors.Dnstap - - // start to listen - listener, err := netutils.StartToListen( - cfg.ListenIP, cfg.ListenPort, cfg.SockPath, - cfg.TLSSupport, pkgconfig.TLSVersion[cfg.TLSMinVersion], - cfg.CertFile, cfg.KeyFile) - if err != nil { - w.LogFatal(pkgutils.PrefixLogCollector+"["+w.GetName()+"] listen error: ", err) - } - w.LogInfo("listening on %s", listener.Addr()) - - // goroutine to Accept() blocks waiting for new connection. - acceptChan := make(chan net.Conn) - netutils.AcceptConnections(listener, acceptChan) - - // main loop - for { - select { - case <-w.OnStop(): - w.LogInfo("stop to listen...") - listener.Close() - - w.LogInfo("closing connected peers...") - close(connCleanup) - connWG.Wait() - return - - // save the new config - case cfg := <-w.NewConfig(): - w.SetConfig(cfg) - w.CheckConfig() - - // new incoming connection - case conn, opened := <-acceptChan: - if !opened { - return - } - - if len(cfg.SockPath) == 0 && cfg.RcvBufSize > 0 { - before, actual, err := netutils.SetSockRCVBUF(conn, cfg.RcvBufSize, cfg.TLSSupport) - if err != nil { - w.LogFatal(pkgutils.PrefixLogCollector+"["+w.GetName()+"] unable to set SO_RCVBUF: ", err) - } - w.LogInfo("set SO_RCVBUF option, value before: %d, desired: %d, actual: %d", before, cfg.RcvBufSize, actual) - } - - // handle the connection - connWG.Add(1) - connID := atomic.AddUint64(&w.connCounter, 1) - go w.HandleConn(conn, connID, connCleanup, &connWG) - } - - } -} diff --git a/collectors/dnstap_test.go b/collectors/dnstap_test.go deleted file mode 100644 index e44dc649..00000000 --- a/collectors/dnstap_test.go +++ /dev/null @@ -1,199 +0,0 @@ -package collectors - -import ( - "bufio" - "fmt" - "net" - "regexp" - "testing" - "time" - - "github.com/dmachard/go-dnscollector/dnsutils" - "github.com/dmachard/go-dnscollector/netutils" - "github.com/dmachard/go-dnscollector/pkgconfig" - "github.com/dmachard/go-dnscollector/pkgutils" - "github.com/dmachard/go-dnscollector/processors" - "github.com/dmachard/go-framestream" - "github.com/dmachard/go-logger" - "github.com/segmentio/kafka-go/compress" - "google.golang.org/protobuf/proto" -) - -func Test_DnstapCollector(t *testing.T) { - testcases := []struct { - name string - mode string - address string - listenPort int - operation string - compression string - }{ - { - name: "tcp_default", - mode: netutils.SocketTCP, - address: ":6000", - listenPort: 0, - operation: "CLIENT_QUERY", - compression: "none", - }, - { - name: "tcp_custom_port", - mode: netutils.SocketTCP, - address: ":7000", - listenPort: 7000, - operation: "CLIENT_QUERY", - compression: "none", - }, - { - name: "unix_default", - mode: netutils.SocketUnix, - address: "/tmp/dnscollector.sock", - listenPort: 0, - operation: "CLIENT_QUERY", - compression: "none", - }, - { - name: "tcp_compress_gzip", - mode: netutils.SocketTCP, - address: ":7000", - listenPort: 7000, - operation: "CLIENT_QUERY", - compression: "gzip", - }, - } - - for _, tc := range testcases { - t.Run(tc.name, func(t *testing.T) { - g := pkgutils.NewFakeLogger() - - config := pkgconfig.GetFakeConfig() - if tc.listenPort > 0 { - config.Collectors.Dnstap.ListenPort = tc.listenPort - } - if tc.mode == netutils.SocketUnix { - config.Collectors.Dnstap.SockPath = tc.address - } - config.Collectors.Dnstap.Compression = tc.compression - - // start the collector - c := NewDnstap([]pkgutils.Worker{g}, config, logger.New(false), "test") - go c.StartCollect() - - // wait before to connect - time.Sleep(1 * time.Second) - conn, err := net.Dial(tc.mode, tc.address) - if err != nil { - t.Error("could not connect: ", err) - } - defer conn.Close() - - r := bufio.NewReader(conn) - w := bufio.NewWriter(conn) - fs := framestream.NewFstrm(r, w, conn, 5*time.Second, []byte("protobuf:dnstap.Dnstap"), true) - if err := fs.InitSender(); err != nil { - t.Fatalf("framestream init error: %s", err) - } else { - bulkFrame := &framestream.Frame{} - subFrame := &framestream.Frame{} - - // get fake dns question - dnsquery, err := dnsutils.GetFakeDNS() - if err != nil { - t.Fatalf("dns question pack error") - } - - // get fake dnstap message - dtQuery := processors.GetFakeDNSTap(dnsquery) - - // serialize to bytes - data, err := proto.Marshal(dtQuery) - if err != nil { - t.Fatalf("dnstap proto marshal error %s", err) - } - // send query - - if config.Collectors.Dnstap.Compression == pkgconfig.CompressNone { - // send the frame - bulkFrame.Write(data) - if err := fs.SendFrame(bulkFrame); err != nil { - t.Fatalf("send frame error %s", err) - } - } else { - subFrame.Write(data) - bulkFrame.AppendData(subFrame.Data()) - } - - if config.Collectors.Dnstap.Compression != pkgconfig.CompressNone { - bulkFrame.Encode() - if err := fs.SendCompressedFrame(&compress.GzipCodec, bulkFrame); err != nil { - t.Fatalf("send compressed frame error %s", err) - } - } - } - - // waiting message in channel - msg := <-g.GetInputChannel() - if msg.DNSTap.Operation != tc.operation { - t.Errorf("want %s, got %s", tc.operation, msg.DNSTap.Operation) - } - - c.Stop() - }) - } -} - -// Testcase for https://github.com/dmachard/go-dnscollector/issues/461 -// Support Bind9 with dnstap closing. -func Test_DnstapCollector_CloseFrameStream(t *testing.T) { - // redirect stdout output to bytes buffer - logsChan := make(chan logger.LogEntry, 50) - lg := logger.New(true) - lg.SetOutputChannel((logsChan)) - - config := pkgconfig.GetFakeConfig() - config.Collectors.Dnstap.SockPath = "/tmp/dnscollector.sock" - - // start the collector in unix mode - g := pkgutils.NewFakeLogger() - c := NewDnstap([]pkgutils.Worker{g}, config, lg, "test") - go c.StartCollect() - - // simulate dns server connection to collector - time.Sleep(1 * time.Second) - conn, err := net.Dial(netutils.SocketUnix, "/tmp/dnscollector.sock") - if err != nil { - t.Error("could not connect: ", err) - } - defer conn.Close() - - r := bufio.NewReader(conn) - w := bufio.NewWriter(conn) - fs := framestream.NewFstrm(r, w, conn, 5*time.Second, []byte("protobuf:dnstap.Dnstap"), true) - if err := fs.InitSender(); err != nil { - t.Fatalf("framestream init error: %s", err) - } - - // checking reset - errClose := fs.ResetSender() - if errClose != nil { - t.Errorf("reset sender error: %s", errClose) - } - - regxp := ".*framestream reseted by sender.*" - pattern := regexp.MustCompile(regxp) - - matchMsg := false - for entry := range logsChan { - fmt.Println(entry) - if pattern.MatchString(entry.Message) { - matchMsg = true - break - } - } - if !matchMsg { - t.Errorf("reset from sender not received") - } - - // cleanup - c.Stop() -} diff --git a/collectors/powerdns.go b/collectors/powerdns.go deleted file mode 100644 index 4ad00877..00000000 --- a/collectors/powerdns.go +++ /dev/null @@ -1,176 +0,0 @@ -package collectors - -import ( - "bufio" - "errors" - "io" - "net" - "sync" - "sync/atomic" - "time" - - "github.com/dmachard/go-dnscollector/netutils" - "github.com/dmachard/go-dnscollector/pkgconfig" - "github.com/dmachard/go-dnscollector/pkgutils" - "github.com/dmachard/go-dnscollector/processors" - "github.com/dmachard/go-logger" - powerdns_protobuf "github.com/dmachard/go-powerdns-protobuf" -) - -type ProtobufPowerDNS struct { - *pkgutils.GenericWorker - connCounter uint64 -} - -func NewProtobufPowerDNS(next []pkgutils.Worker, config *pkgconfig.Config, logger *logger.Logger, name string) *ProtobufPowerDNS { - w := &ProtobufPowerDNS{GenericWorker: pkgutils.NewGenericWorker(config, logger, name, "powerdns", pkgutils.DefaultBufferSize)} - w.SetDefaultRoutes(next) - w.CheckConfig() - return w -} - -func (w *ProtobufPowerDNS) CheckConfig() { - if !pkgconfig.IsValidTLS(w.GetConfig().Collectors.PowerDNS.TLSMinVersion) { - w.LogFatal(pkgutils.PrefixLogCollector + "[" + w.GetName() + "] invalid tls min version") - } -} - -func (w *ProtobufPowerDNS) HandleConn(conn net.Conn, connID uint64, forceClose chan bool, wg *sync.WaitGroup) { - // close connection on function exit - defer func() { - w.LogInfo("conn #%d - connection handler terminated", connID) - netutils.Close(conn, w.GetConfig().Collectors.Dnstap.ResetConn) - wg.Done() - }() - - // get peer address - peer := conn.RemoteAddr().String() - peerName := netutils.GetPeerName(peer) - w.LogInfo("new connection #%d from %s (%s)", connID, peer, peerName) - - // start protobuf subprocessor - pdnsProcessor := processors.NewPdnsProcessor(int(connID), peerName, w.GetConfig(), w.GetLogger(), w.GetName(), w.GetConfig().Collectors.PowerDNS.ChannelBufferSize) - go pdnsProcessor.Run(w.GetDefaultRoutes(), w.GetDroppedRoutes()) - - r := bufio.NewReader(conn) - pbs := powerdns_protobuf.NewProtobufStream(r, conn, 5*time.Second) - - var err error - var payload *powerdns_protobuf.ProtoPayload - cleanup := make(chan struct{}) - - // goroutine to close the connection properly - go func() { - defer func() { - pdnsProcessor.Stop() - w.LogInfo("conn #%d - cleanup connection handler terminated", connID) - }() - - for { - select { - case <-forceClose: - w.LogInfo("conn #%d - force to cleanup the connection handler", connID) - netutils.Close(conn, w.GetConfig().Collectors.Dnstap.ResetConn) - return - case <-cleanup: - w.LogInfo("conn #%d - cleanup the connection handler", connID) - return - } - } - }() - - for { - payload, err = pbs.RecvPayload(false) - if err != nil { - connClosed := false - - var opErr *net.OpError - if errors.As(err, &opErr) { - if errors.Is(opErr, net.ErrClosed) { - connClosed = true - } - } - if errors.Is(err, io.EOF) { - connClosed = true - } - - if connClosed { - w.LogInfo("conn #%d - connection closed with peer %s", connID, peer) - } else { - w.LogError("conn #%d - powerdns reader error: %s", connID, err) - } - - // exit goroutine - close(cleanup) - break - } - - // send payload to the channel - select { - case pdnsProcessor.GetChannel() <- payload.Data(): // Successful send - default: - w.ProcessorIsBusy() - } - } -} - -func (w *ProtobufPowerDNS) StartCollect() { - w.LogInfo("worker is starting collection") - defer w.CollectDone() - - var connWG sync.WaitGroup - connCleanup := make(chan bool) - cfg := w.GetConfig().Collectors.PowerDNS - - // start to listen - listener, err := netutils.StartToListen( - cfg.ListenIP, cfg.ListenPort, "", - cfg.TLSSupport, pkgconfig.TLSVersion[cfg.TLSMinVersion], - cfg.CertFile, cfg.KeyFile) - if err != nil { - w.LogFatal(pkgutils.PrefixLogCollector+"["+w.GetName()+"] listening failed: ", err) - } - w.LogInfo("listening on %s", listener.Addr()) - - // goroutine to Accept() blocks waiting for new connection. - acceptChan := make(chan net.Conn) - netutils.AcceptConnections(listener, acceptChan) - - // main loop - for { - select { - case <-w.OnStop(): - w.LogInfo("stop to listen...") - listener.Close() - - w.LogInfo("closing connected peers...") - close(connCleanup) - connWG.Wait() - return - - // save the new config - case cfg := <-w.NewConfig(): - w.SetConfig(cfg) - w.CheckConfig() - - case conn, opened := <-acceptChan: - if !opened { - return - } - - if w.GetConfig().Collectors.Dnstap.RcvBufSize > 0 { - before, actual, err := netutils.SetSockRCVBUF(conn, cfg.RcvBufSize, cfg.TLSSupport) - if err != nil { - w.LogFatal(pkgutils.PrefixLogCollector+"["+w.GetName()+"] unable to set SO_RCVBUF: ", err) - } - w.LogInfo("set SO_RCVBUF option, value before: %d, desired: %d, actual: %d", before, cfg.RcvBufSize, actual) - } - - // handle the connection - connWG.Add(1) - connID := atomic.AddUint64(&w.connCounter, 1) - go w.HandleConn(conn, connID, connCleanup, &connWG) - - } - } -} diff --git a/collectors/powerdns_test.go b/collectors/powerdns_test.go deleted file mode 100644 index 4478acec..00000000 --- a/collectors/powerdns_test.go +++ /dev/null @@ -1,27 +0,0 @@ -package collectors - -import ( - "net" - "testing" - "time" - - "github.com/dmachard/go-dnscollector/netutils" - "github.com/dmachard/go-dnscollector/pkgconfig" - "github.com/dmachard/go-dnscollector/pkgutils" - "github.com/dmachard/go-logger" -) - -func TestPowerDNS_Run(t *testing.T) { - g := pkgutils.NewFakeLogger() - - c := NewProtobufPowerDNS([]pkgutils.Worker{g}, pkgconfig.GetFakeConfig(), logger.New(false), "test") - go c.StartCollect() - - // wait before to connect - time.Sleep(1 * time.Second) - conn, err := net.Dial(netutils.SocketTCP, ":6001") - if err != nil { - t.Error("could not connect to TCP server: ", err) - } - defer conn.Close() -} diff --git a/dnsutils/message.go b/dnsutils/message.go index baea3c5b..efdc060f 100644 --- a/dnsutils/message.go +++ b/dnsutils/message.go @@ -17,6 +17,7 @@ import ( "time" "github.com/dmachard/go-dnscollector/netutils" + "github.com/dmachard/go-dnscollector/pkgconfig" "github.com/dmachard/go-dnstap-protobuf" "github.com/google/gopacket" "github.com/google/gopacket/layers" @@ -1830,7 +1831,7 @@ func GetFakeDNSMessage() DNSMessage { dm.DNSTap.Identity = "collector" dm.DNSTap.Operation = "CLIENT_QUERY" dm.DNS.Type = DNSQuery - dm.DNS.Qname = "dns.collector" + dm.DNS.Qname = pkgconfig.ProgQname dm.NetworkInfo.QueryIP = "1.2.3.4" dm.NetworkInfo.QueryPort = "1234" dm.NetworkInfo.ResponseIP = "4.3.2.1" diff --git a/docs/development.md b/docs/development.md index 0b4389e1..42fbc1b5 100644 --- a/docs/development.md +++ b/docs/development.md @@ -6,8 +6,7 @@ First, make sure your golang version is `1.20` or higher How to userguides: -- [Add a new collector](#add-collector) -- [Add a new logger](#add-logger) +- [Add a new worker](#add-worker) - [Add a new transform](#add-transformer) ## Build and run from source @@ -149,9 +148,9 @@ func NewTransforms( Finally update the docs `doc/transformers.md` and `README.md` -### Add logger +### Add a worker (collector or logger) -1. Add Configuration `dnsutils/config.go` and `config.yml` +1. Add Configuration in `pkgconfig/logger.go` or `pkgconfig/collectors.go` ```golang Loggers struct { @@ -159,7 +158,6 @@ Loggers struct { Enable bool `yaml:"enable"` } } - ``` ```golang @@ -168,82 +166,70 @@ func (c *Config) SetDefault() { } ``` -2. Create the following file `loggers/mylogger.go` and `loggers/mylogger_test.go` +2. Create the following file `workers/mylogger.go` and `loggers/mylogger_test.go` ```golang -package loggers +package workers import ( - "github.com/dmachard/go-dnscollector/dnsutils" + "github.com/dmachard/go-dnscollector/pkgconfig" + "github.com/dmachard/go-dnscollector/pkgutils" + "github.com/dmachard/go-logger" ) -type MyLogger struct { - done chan bool - channel chan dnsutils.DnsMessage - config *pkgconfig.Config - logger *logger.Logger - exit chan bool - name string -} - -func NewMyLogger(config *pkgconfig.Config, logger *logger.Logger, name string) *MyLogger { - o := &MyLogger{ - done: make(chan bool), - exit: make(chan bool), - channel: make(chan dnsutils.DnsMessage, 512), - logger: logger, - config: config, - name: "mylogger", - } - return o -} - -func (c *MyLogger) GetName() string { return c.name } - -func (c *MyLogger) SetLoggers(loggers []pkgutils.Worker) {} - -func (o *MyLogger) ReadConfig() {} - -func (o *MyLogger) LogInfo(msg string, v ...interface{}) { - o.logger.Info("["+o.name+"] mylogger - "+msg, v...) +type MyWorker struct { + *pkgutils.GenericWorker } -func (o *MyLogger) LogError(msg string, v ...interface{}) { - o.logger.Error("["+o.name+"] mylogger - "+msg, v...) +func NewMyWorker(config *pkgconfig.Config, console *logger.Logger, name string) *MyWorker { + s := &MyWorker{GenericWorker: pkgutils.NewGenericWorker(config, console, name, "worker", DefaultBufferSize)} + s.ReadConfig() + return s } -func (o *MyLogger) Stop() { - o.LogInfo("stopping...") +func (w *DevNull) StartCollect() { + w.LogInfo("worker is starting collection") + defer w.CollectDone() - // exit to close properly - o.exit <- true + // goroutine to process transformed dns messages + go w.StartLogging() - // read done channel and block until run is terminated - <-o.done - close(o.done) -} + // loop to process incoming messages + for { + select { + case <-w.OnStop(): + w.StopLogger() -func (o *MyLogger) GetInputChannel() chan dnsutils.DnsMessage { - return o.channel + case _, opened := <-w.GetInputChannel(): + if !opened { + w.LogInfo("run: input channel closed!") + return + } + } + } } -func (o *MyLogger) Run() { - o.LogInfo("running in background...") - // prepare transforms - listChannel := []chan dnsutils.DnsMessage{} - listChannel = append(listChannel, o.channel) - subprocessors := transformers.NewTransforms(&o.config.OutgoingTransformers, o.logger, o.name, listChannel) +func (w *DevNull) StartLogging() { + w.LogInfo("worker is starting logging") + defer w.LoggingDone() - o.LogInfo("run terminated") + for { + select { + case <-w.OnLoggerStopped(): + return - // cleanup transformers - subprocessors.Reset() + case _, opened := <-w.GetOutputChannel(): + if !opened { + w.LogInfo("process: output channel closed!") + return + } - o.done <- true + } + } } ``` -3. Update the main file `dnscollector.go` +3. Update the main file `pkglinker` in `pipelines.go` ```golang if subcfg.Loggers.MyLogger.Enable && IsLoggerRouted(config, output.Name) { @@ -251,179 +237,4 @@ if subcfg.Loggers.MyLogger.Enable && IsLoggerRouted(config, output.Name) { } ``` -4. Finally update the docs `doc/loggers.md` and `README.md` - -### Add collector - -Add Configuration `dnsutils/config.go` and `config.yml` - -```golang -Collectors struct { - MyCollector struct { - Enable bool `yaml:"enable"` - } `yaml:"tail"` -} -``` - -```golang -func (c *Config) SetDefault() { - c.Collectors.MyCollector.Enable = false -} -``` - -Create the following file `collectors/mycollector.go` and `collectors/mycollector_test.go` - -```golang -package collectors - -import ( - "time" - - "github.com/dmachard/go-dnscollector/dnsutils" - "github.com/dmachard/go-dnscollector/pkgconfig" - "github.com/dmachard/go-logger" -) - -type MyNewCollector struct { - doneRun chan bool - doneMonitor chan bool - stopRun chan bool - stopMonitor chan bool - loggers []pkgutils.Worker - config *pkgconfig.Config - configChan chan *pkgconfig.Config - logger *logger.Logger - name string - droppedCount int - dropped chan int -} - -func NewNewCollector(loggers []pkgutils.Worker, config *pkgconfig.Config, logger *logger.Logger, name string) *Dnstap { - logger.Info("[%s] collector=mynewcollector - enabled", name) - s := &MyNewCollector{ - doneRun: make(chan bool), - doneMonitor: make(chan bool), - stopRun: make(chan bool), - stopMonitor: make(chan bool), - dropped: make(chan int), - config: config, - configChan: make(chan *pkgconfig.Config), - loggers: loggers, - logger: logger, - name: name, - } - s.ReadConfig() - return s -} - -func (c *MyNewCollector) GetName() string { return c.name } - -func (c *MyNewCollector) AddDefaultRoute(wrk pkgutils.Worker) { - c.loggers = append(c.loggers, wrk) -} - -func (c *MyNewCollector) SetLoggers(loggers []pkgutils.Worker) { - c.loggers = loggers -} - -func (c *MyNewCollector) Loggers() ([]chan dnsutils.DNSMessage, []string) { - channels := []chan dnsutils.DNSMessage{} - names := []string{} - for _, p := range c.loggers { - channels = append(channels,p.GetInputChannel()) - names = append(names, p.GetName()) - } - return channels, names -} - -func (c *MyNewCollector) ReadConfig() {} - -func (c *MyNewCollector) ReloadConfig(config *pkgconfig.Config) { - c.LogInfo("reload configuration...") - c.configChan <- config -} - -func (c *MyNewCollector) LogInfo(msg string, v ...interface{}) { - c.logger.Info("["+c.name+"] collector=mynewcollector - "+msg, v...) -} - -func (c *MyNewCollector) LogError(msg string, v ...interface{}) { - c.logger.Error("["+c.name+" collector=mynewcollector - "+msg, v...) -} - -func (c *MyNewCollector) GetInputChannel() chan dnsutils.DNSMessage { - return nil -} - -func (c *MyNewCollector) Stop() { - // stop monitor goroutine - c.LogInfo("stopping monitor...") - c.stopMonitor <- true - <-c.doneMonitor - - // read done channel and block until run is terminated - c.LogInfo("stopping run...") - c.stopRun <- true - <-c.doneRun -} - -func (c *MyNewCollector) MonitorCollector() { - watchInterval := 10 * time.Second - bufferFull := time.NewTimer(watchInterval) -MONITOR_LOOP: - for { - select { - case <-c.dropped: - c.droppedCount++ - case <-c.stopMonitor: - close(c.dropped) - bufferFull.Stop() - c.doneMonitor <- true - break MONITOR_LOOP - case <-bufferFull.C: - if c.droppedCount > 0 { - c.LogError("recv buffer is full, %d packet(s) dropped", c.droppedCount) - c.droppedCount = 0 - } - bufferFull.Reset(watchInterval) - } - } - c.LogInfo("monitor terminated") -} - -func (c *DNSMessage) Run() { - c.LogInfo("starting collector...") - - // start goroutine to count dropped messsages - go c.MonitorCollector() - -RUN_LOOP: - for { - select { - case <-c.stopRun: - c.doneRun <- true - break RUN_LOOP - - case cfg := <-c.configChan: - - // save the new config - c.config = cfg - c.ReadConfig() - } - - } - c.LogInfo("run terminated") -} - - -``` - -Update the main file `dnscollector.go` - -```golang -if subcfg.Collectors.MyCollector.Enable && IsCollectorRouted(config, input.Name) { - mapCollectors[input.Name] = collectors.NewMyCollector(nil, subcfg, logger, input.Name) -} -``` - -Finally update the docs `doc/collectors.md` and `README.md` +4. Finally update the docs `doc/loggers.md` or `doc/collectors.md` and `README.md` \ No newline at end of file diff --git a/pkgconfig/constants.go b/pkgconfig/constants.go index 7e8a90f7..29f6d9d5 100644 --- a/pkgconfig/constants.go +++ b/pkgconfig/constants.go @@ -7,6 +7,7 @@ import ( const ( StrUnknown = "UNKNOWN" + ProgQname = "dns.collector" ProgName = "dnscollector" LocalhostIP = "127.0.0.1" AnyIP = "0.0.0.0" diff --git a/pkgconfig/loggers.go b/pkgconfig/loggers.go index f0572d5a..dc25d4e0 100644 --- a/pkgconfig/loggers.go +++ b/pkgconfig/loggers.go @@ -457,7 +457,7 @@ func (c *ConfigLoggers) SetDefault() { c.Fluentd.CAFile = "" c.Fluentd.CertFile = "" c.Fluentd.KeyFile = "" - c.Fluentd.Tag = "dns.collector" + c.Fluentd.Tag = ProgQname c.Fluentd.BufferSize = 100 c.Fluentd.ChannelBufferSize = 4096 diff --git a/pkglinker/multiplexer.go b/pkglinker/multiplexer.go index c05e58e9..3e78b97c 100644 --- a/pkglinker/multiplexer.go +++ b/pkglinker/multiplexer.go @@ -4,10 +4,9 @@ import ( "fmt" "strings" - "github.com/dmachard/go-dnscollector/collectors" - "github.com/dmachard/go-dnscollector/loggers" "github.com/dmachard/go-dnscollector/pkgconfig" "github.com/dmachard/go-dnscollector/pkgutils" + "github.com/dmachard/go-dnscollector/workers" "github.com/dmachard/go-logger" "gopkg.in/yaml.v2" ) @@ -101,58 +100,58 @@ func InitMultiplexer(mapLoggers map[string]pkgutils.Worker, mapCollectors map[st // registor the logger if enabled if subcfg.Loggers.DevNull.Enable && IsLoggerRouted(config, output.Name) { - mapLoggers[output.Name] = loggers.NewDevNull(subcfg, logger, output.Name) + mapLoggers[output.Name] = workers.NewDevNull(subcfg, logger, output.Name) } if subcfg.Loggers.RestAPI.Enable && IsLoggerRouted(config, output.Name) { - mapLoggers[output.Name] = loggers.NewRestAPI(subcfg, logger, output.Name) + mapLoggers[output.Name] = workers.NewRestAPI(subcfg, logger, output.Name) } if subcfg.Loggers.Prometheus.Enable && IsLoggerRouted(config, output.Name) { - mapLoggers[output.Name] = loggers.NewPrometheus(subcfg, logger, output.Name) + mapLoggers[output.Name] = workers.NewPrometheus(subcfg, logger, output.Name) } if subcfg.Loggers.Stdout.Enable && IsLoggerRouted(config, output.Name) { - mapLoggers[output.Name] = loggers.NewStdOut(subcfg, logger, output.Name) + mapLoggers[output.Name] = workers.NewStdOut(subcfg, logger, output.Name) } if subcfg.Loggers.LogFile.Enable && IsLoggerRouted(config, output.Name) { - mapLoggers[output.Name] = loggers.NewLogFile(subcfg, logger, output.Name) + mapLoggers[output.Name] = workers.NewLogFile(subcfg, logger, output.Name) } if subcfg.Loggers.DNSTap.Enable && IsLoggerRouted(config, output.Name) { - mapLoggers[output.Name] = loggers.NewDnstapSender(subcfg, logger, output.Name) + mapLoggers[output.Name] = workers.NewDnstapSender(subcfg, logger, output.Name) } if subcfg.Loggers.TCPClient.Enable && IsLoggerRouted(config, output.Name) { - mapLoggers[output.Name] = loggers.NewTCPClient(subcfg, logger, output.Name) + mapLoggers[output.Name] = workers.NewTCPClient(subcfg, logger, output.Name) } if subcfg.Loggers.Syslog.Enable && IsLoggerRouted(config, output.Name) { - mapLoggers[output.Name] = loggers.NewSyslog(subcfg, logger, output.Name) + mapLoggers[output.Name] = workers.NewSyslog(subcfg, logger, output.Name) } if subcfg.Loggers.Fluentd.Enable && IsLoggerRouted(config, output.Name) { - mapLoggers[output.Name] = loggers.NewFluentdClient(subcfg, logger, output.Name) + mapLoggers[output.Name] = workers.NewFluentdClient(subcfg, logger, output.Name) } if subcfg.Loggers.InfluxDB.Enable && IsLoggerRouted(config, output.Name) { - mapLoggers[output.Name] = loggers.NewInfluxDBClient(subcfg, logger, output.Name) + mapLoggers[output.Name] = workers.NewInfluxDBClient(subcfg, logger, output.Name) } if subcfg.Loggers.LokiClient.Enable && IsLoggerRouted(config, output.Name) { - mapLoggers[output.Name] = loggers.NewLokiClient(subcfg, logger, output.Name) + mapLoggers[output.Name] = workers.NewLokiClient(subcfg, logger, output.Name) } if subcfg.Loggers.Statsd.Enable && IsLoggerRouted(config, output.Name) { - mapLoggers[output.Name] = loggers.NewStatsdClient(subcfg, logger, output.Name) + mapLoggers[output.Name] = workers.NewStatsdClient(subcfg, logger, output.Name) } if subcfg.Loggers.ElasticSearchClient.Enable && IsLoggerRouted(config, output.Name) { - mapLoggers[output.Name] = loggers.NewElasticSearchClient(subcfg, logger, output.Name) + mapLoggers[output.Name] = workers.NewElasticSearchClient(subcfg, logger, output.Name) } if subcfg.Loggers.ScalyrClient.Enable && IsLoggerRouted(config, output.Name) { - mapLoggers[output.Name] = loggers.NewScalyrClient(subcfg, logger, output.Name) + mapLoggers[output.Name] = workers.NewScalyrClient(subcfg, logger, output.Name) } if subcfg.Loggers.RedisPub.Enable && IsLoggerRouted(config, output.Name) { - mapLoggers[output.Name] = loggers.NewRedisPub(subcfg, logger, output.Name) + mapLoggers[output.Name] = workers.NewRedisPub(subcfg, logger, output.Name) } if subcfg.Loggers.KafkaProducer.Enable && IsLoggerRouted(config, output.Name) { - mapLoggers[output.Name] = loggers.NewKafkaProducer(subcfg, logger, output.Name) + mapLoggers[output.Name] = workers.NewKafkaProducer(subcfg, logger, output.Name) } if subcfg.Loggers.FalcoClient.Enable && IsLoggerRouted(config, output.Name) { - mapLoggers[output.Name] = loggers.NewFalcoClient(subcfg, logger, output.Name) + mapLoggers[output.Name] = workers.NewFalcoClient(subcfg, logger, output.Name) } if subcfg.Loggers.ClickhouseClient.Enable && IsLoggerRouted(config, output.Name) { - mapLoggers[output.Name] = loggers.NewClickhouseClient(subcfg, logger, output.Name) + mapLoggers[output.Name] = workers.NewClickhouseClient(subcfg, logger, output.Name) } } @@ -164,28 +163,28 @@ func InitMultiplexer(mapLoggers map[string]pkgutils.Worker, mapCollectors map[st // register the collector if enabled if subcfg.Collectors.Dnstap.Enable && IsCollectorRouted(config, input.Name) { - mapCollectors[input.Name] = collectors.NewDnstap(nil, subcfg, logger, input.Name) + mapCollectors[input.Name] = workers.NewDnstapServer(nil, subcfg, logger, input.Name) } if subcfg.Collectors.DnstapProxifier.Enable && IsCollectorRouted(config, input.Name) { - mapCollectors[input.Name] = collectors.NewDnstapProxifier(nil, subcfg, logger, input.Name) + mapCollectors[input.Name] = workers.NewDnstapProxifier(nil, subcfg, logger, input.Name) } if subcfg.Collectors.AfpacketLiveCapture.Enable && IsCollectorRouted(config, input.Name) { - mapCollectors[input.Name] = collectors.NewAfpacketSniffer(nil, subcfg, logger, input.Name) + mapCollectors[input.Name] = workers.NewAfpacketSniffer(nil, subcfg, logger, input.Name) } if subcfg.Collectors.XdpLiveCapture.Enable && IsCollectorRouted(config, input.Name) { - mapCollectors[input.Name] = collectors.NewXDPSniffer(nil, subcfg, logger, input.Name) + mapCollectors[input.Name] = workers.NewXDPSniffer(nil, subcfg, logger, input.Name) } if subcfg.Collectors.Tail.Enable && IsCollectorRouted(config, input.Name) { - mapCollectors[input.Name] = collectors.NewTail(nil, subcfg, logger, input.Name) + mapCollectors[input.Name] = workers.NewTail(nil, subcfg, logger, input.Name) } if subcfg.Collectors.PowerDNS.Enable && IsCollectorRouted(config, input.Name) { - mapCollectors[input.Name] = collectors.NewProtobufPowerDNS(nil, subcfg, logger, input.Name) + mapCollectors[input.Name] = workers.NewPdnsServer(nil, subcfg, logger, input.Name) } if subcfg.Collectors.FileIngestor.Enable && IsCollectorRouted(config, input.Name) { - mapCollectors[input.Name] = collectors.NewFileIngestor(nil, subcfg, logger, input.Name) + mapCollectors[input.Name] = workers.NewFileIngestor(nil, subcfg, logger, input.Name) } if subcfg.Collectors.Tzsp.Enable && IsCollectorRouted(config, input.Name) { - mapCollectors[input.Name] = collectors.NewTZSP(nil, subcfg, logger, input.Name) + mapCollectors[input.Name] = workers.NewTZSP(nil, subcfg, logger, input.Name) } } diff --git a/pkglinker/pipelines.go b/pkglinker/pipelines.go index e4d8f551..d62a9293 100644 --- a/pkglinker/pipelines.go +++ b/pkglinker/pipelines.go @@ -3,10 +3,9 @@ package pkglinker import ( "fmt" - "github.com/dmachard/go-dnscollector/collectors" - "github.com/dmachard/go-dnscollector/loggers" "github.com/dmachard/go-dnscollector/pkgconfig" "github.com/dmachard/go-dnscollector/pkgutils" + "github.com/dmachard/go-dnscollector/workers" "github.com/dmachard/go-logger" "github.com/pkg/errors" "gopkg.in/yaml.v2" @@ -125,84 +124,84 @@ func CreateRouting(stanza pkgconfig.ConfigPipelines, mapCollectors map[string]pk func CreateStanza(stanzaName string, config *pkgconfig.Config, mapCollectors map[string]pkgutils.Worker, mapLoggers map[string]pkgutils.Worker, logger *logger.Logger) { // register the logger if enabled if config.Loggers.RestAPI.Enable { - mapLoggers[stanzaName] = loggers.NewRestAPI(config, logger, stanzaName) + mapLoggers[stanzaName] = workers.NewRestAPI(config, logger, stanzaName) } if config.Loggers.Prometheus.Enable { - mapLoggers[stanzaName] = loggers.NewPrometheus(config, logger, stanzaName) + mapLoggers[stanzaName] = workers.NewPrometheus(config, logger, stanzaName) } if config.Loggers.Stdout.Enable { - mapLoggers[stanzaName] = loggers.NewStdOut(config, logger, stanzaName) + mapLoggers[stanzaName] = workers.NewStdOut(config, logger, stanzaName) } if config.Loggers.LogFile.Enable { - mapLoggers[stanzaName] = loggers.NewLogFile(config, logger, stanzaName) + mapLoggers[stanzaName] = workers.NewLogFile(config, logger, stanzaName) } if config.Loggers.DNSTap.Enable { - mapLoggers[stanzaName] = loggers.NewDnstapSender(config, logger, stanzaName) + mapLoggers[stanzaName] = workers.NewDnstapSender(config, logger, stanzaName) } if config.Loggers.TCPClient.Enable { - mapLoggers[stanzaName] = loggers.NewTCPClient(config, logger, stanzaName) + mapLoggers[stanzaName] = workers.NewTCPClient(config, logger, stanzaName) } if config.Loggers.Syslog.Enable { - mapLoggers[stanzaName] = loggers.NewSyslog(config, logger, stanzaName) + mapLoggers[stanzaName] = workers.NewSyslog(config, logger, stanzaName) } if config.Loggers.Fluentd.Enable { - mapLoggers[stanzaName] = loggers.NewFluentdClient(config, logger, stanzaName) + mapLoggers[stanzaName] = workers.NewFluentdClient(config, logger, stanzaName) } if config.Loggers.InfluxDB.Enable { - mapLoggers[stanzaName] = loggers.NewInfluxDBClient(config, logger, stanzaName) + mapLoggers[stanzaName] = workers.NewInfluxDBClient(config, logger, stanzaName) } if config.Loggers.LokiClient.Enable { - mapLoggers[stanzaName] = loggers.NewLokiClient(config, logger, stanzaName) + mapLoggers[stanzaName] = workers.NewLokiClient(config, logger, stanzaName) } if config.Loggers.Statsd.Enable { - mapLoggers[stanzaName] = loggers.NewStatsdClient(config, logger, stanzaName) + mapLoggers[stanzaName] = workers.NewStatsdClient(config, logger, stanzaName) } if config.Loggers.ElasticSearchClient.Enable { - mapLoggers[stanzaName] = loggers.NewElasticSearchClient(config, logger, stanzaName) + mapLoggers[stanzaName] = workers.NewElasticSearchClient(config, logger, stanzaName) } if config.Loggers.ScalyrClient.Enable { - mapLoggers[stanzaName] = loggers.NewScalyrClient(config, logger, stanzaName) + mapLoggers[stanzaName] = workers.NewScalyrClient(config, logger, stanzaName) } if config.Loggers.RedisPub.Enable { - mapLoggers[stanzaName] = loggers.NewRedisPub(config, logger, stanzaName) + mapLoggers[stanzaName] = workers.NewRedisPub(config, logger, stanzaName) } if config.Loggers.KafkaProducer.Enable { - mapLoggers[stanzaName] = loggers.NewKafkaProducer(config, logger, stanzaName) + mapLoggers[stanzaName] = workers.NewKafkaProducer(config, logger, stanzaName) } if config.Loggers.FalcoClient.Enable { - mapLoggers[stanzaName] = loggers.NewFalcoClient(config, logger, stanzaName) + mapLoggers[stanzaName] = workers.NewFalcoClient(config, logger, stanzaName) } if config.Loggers.ClickhouseClient.Enable { - mapLoggers[stanzaName] = loggers.NewClickhouseClient(config, logger, stanzaName) + mapLoggers[stanzaName] = workers.NewClickhouseClient(config, logger, stanzaName) } // register the collector if enabled if config.Collectors.DNSMessage.Enable { - mapCollectors[stanzaName] = collectors.NewDNSMessage(nil, config, logger, stanzaName) + mapCollectors[stanzaName] = workers.NewDNSMessage(nil, config, logger, stanzaName) } if config.Collectors.Dnstap.Enable { - mapCollectors[stanzaName] = collectors.NewDnstap(nil, config, logger, stanzaName) + mapCollectors[stanzaName] = workers.NewDnstapServer(nil, config, logger, stanzaName) } if config.Collectors.DnstapProxifier.Enable { - mapCollectors[stanzaName] = collectors.NewDnstapProxifier(nil, config, logger, stanzaName) + mapCollectors[stanzaName] = workers.NewDnstapProxifier(nil, config, logger, stanzaName) } if config.Collectors.AfpacketLiveCapture.Enable { - mapCollectors[stanzaName] = collectors.NewAfpacketSniffer(nil, config, logger, stanzaName) + mapCollectors[stanzaName] = workers.NewAfpacketSniffer(nil, config, logger, stanzaName) } if config.Collectors.XdpLiveCapture.Enable { - mapCollectors[stanzaName] = collectors.NewXDPSniffer(nil, config, logger, stanzaName) + mapCollectors[stanzaName] = workers.NewXDPSniffer(nil, config, logger, stanzaName) } if config.Collectors.Tail.Enable { - mapCollectors[stanzaName] = collectors.NewTail(nil, config, logger, stanzaName) + mapCollectors[stanzaName] = workers.NewTail(nil, config, logger, stanzaName) } if config.Collectors.PowerDNS.Enable { - mapCollectors[stanzaName] = collectors.NewProtobufPowerDNS(nil, config, logger, stanzaName) + mapCollectors[stanzaName] = workers.NewPdnsServer(nil, config, logger, stanzaName) } if config.Collectors.FileIngestor.Enable { - mapCollectors[stanzaName] = collectors.NewFileIngestor(nil, config, logger, stanzaName) + mapCollectors[stanzaName] = workers.NewFileIngestor(nil, config, logger, stanzaName) } if config.Collectors.Tzsp.Enable { - mapCollectors[stanzaName] = collectors.NewTZSP(nil, config, logger, stanzaName) + mapCollectors[stanzaName] = workers.NewTZSP(nil, config, logger, stanzaName) } } diff --git a/loggers/clickhouse.go b/workers/clickhouse.go similarity index 99% rename from loggers/clickhouse.go rename to workers/clickhouse.go index 97b728f4..ec6b5a19 100644 --- a/loggers/clickhouse.go +++ b/workers/clickhouse.go @@ -1,4 +1,4 @@ -package loggers +package workers import ( "net/http" diff --git a/loggers/clickhouse_test.go b/workers/clickhouse_test.go similarity index 96% rename from loggers/clickhouse_test.go rename to workers/clickhouse_test.go index f62a2b2d..66def44b 100644 --- a/loggers/clickhouse_test.go +++ b/workers/clickhouse_test.go @@ -1,4 +1,4 @@ -package loggers +package workers import ( "bufio" @@ -20,7 +20,7 @@ func Test_ClickhouseClient(t *testing.T) { }{ { mode: pkgconfig.ModeJSON, - pattern: "dns.collector", + pattern: pkgconfig.ProgQname, }, } cfg := pkgconfig.GetFakeConfig() diff --git a/processors/constants.go b/workers/constants.go similarity index 92% rename from processors/constants.go rename to workers/constants.go index f8ba88f2..31c3451a 100644 --- a/processors/constants.go +++ b/workers/constants.go @@ -1,4 +1,4 @@ -package processors +package workers const ( ExpectedQname = "dnscollector.dev" diff --git a/loggers/devnull.go b/workers/devnull.go similarity index 98% rename from loggers/devnull.go rename to workers/devnull.go index 80f61c8d..b6ef953b 100644 --- a/loggers/devnull.go +++ b/workers/devnull.go @@ -1,4 +1,4 @@ -package loggers +package workers import ( "github.com/dmachard/go-dnscollector/pkgconfig" diff --git a/collectors/dnsmessage.go b/workers/dnsmessage.go similarity index 99% rename from collectors/dnsmessage.go rename to workers/dnsmessage.go index 754cefcc..92d42018 100644 --- a/collectors/dnsmessage.go +++ b/workers/dnsmessage.go @@ -1,4 +1,4 @@ -package collectors +package workers import ( "bufio" diff --git a/collectors/dnsmessage_test.go b/workers/dnsmessage_test.go similarity index 84% rename from collectors/dnsmessage_test.go rename to workers/dnsmessage_test.go index b6462b36..a61d89a6 100644 --- a/collectors/dnsmessage_test.go +++ b/workers/dnsmessage_test.go @@ -1,4 +1,4 @@ -package collectors +package workers import ( "fmt" @@ -9,7 +9,6 @@ import ( "github.com/dmachard/go-dnscollector/dnsutils" "github.com/dmachard/go-dnscollector/pkgconfig" "github.com/dmachard/go-dnscollector/pkgutils" - "github.com/dmachard/go-dnscollector/processors" "github.com/dmachard/go-logger" ) @@ -41,7 +40,7 @@ func Test_DnsMessage_BufferLoggerIsFull(t *testing.T) { for entry := range logsChan { fmt.Println(entry) - pattern := regexp.MustCompile(processors.ExpectedBufferMsg511) + pattern := regexp.MustCompile(ExpectedBufferMsg511) if pattern.MatchString(entry.Message) { break } @@ -49,7 +48,7 @@ func Test_DnsMessage_BufferLoggerIsFull(t *testing.T) { // read dnsmessage from next logger dmOut := <-nxt.GetInputChannel() - if dmOut.DNS.Qname != processors.ExpectedQname2 { + if dmOut.DNS.Qname != ExpectedQname2 { t.Errorf("invalid qname in dns message: %s", dmOut.DNS.Qname) } @@ -63,14 +62,14 @@ func Test_DnsMessage_BufferLoggerIsFull(t *testing.T) { for entry := range logsChan { fmt.Println(entry) - pattern := regexp.MustCompile(processors.ExpectedBufferMsg1023) + pattern := regexp.MustCompile(ExpectedBufferMsg1023) if pattern.MatchString(entry.Message) { break } } // read dnsmessage from next logger dm2 := <-nxt.GetInputChannel() - if dm2.DNS.Qname != processors.ExpectedQname2 { + if dm2.DNS.Qname != ExpectedQname2 { t.Errorf("invalid qname in dns message: %s", dm2.DNS.Qname) } diff --git a/processors/dns.go b/workers/dnsprocessor.go similarity index 99% rename from processors/dns.go rename to workers/dnsprocessor.go index 3f1706a7..817598b9 100644 --- a/processors/dns.go +++ b/workers/dnsprocessor.go @@ -1,4 +1,4 @@ -package processors +package workers import ( "fmt" diff --git a/processors/dns_test.go b/workers/dnsprocessor_test.go similarity index 99% rename from processors/dns_test.go rename to workers/dnsprocessor_test.go index ec213e03..d641a682 100644 --- a/processors/dns_test.go +++ b/workers/dnsprocessor_test.go @@ -1,4 +1,4 @@ -package processors +package workers import ( "bytes" diff --git a/collectors/dnstap_relay.go b/workers/dnstap_relay.go similarity index 99% rename from collectors/dnstap_relay.go rename to workers/dnstap_relay.go index 05baf909..2f35674b 100644 --- a/collectors/dnstap_relay.go +++ b/workers/dnstap_relay.go @@ -1,4 +1,4 @@ -package collectors +package workers import ( "bufio" diff --git a/collectors/dnstap_relay_test.go b/workers/dnstap_relay_test.go similarity index 95% rename from collectors/dnstap_relay_test.go rename to workers/dnstap_relay_test.go index 1baa7035..6b3093e7 100644 --- a/collectors/dnstap_relay_test.go +++ b/workers/dnstap_relay_test.go @@ -1,4 +1,4 @@ -package collectors +package workers import ( "bufio" @@ -10,7 +10,6 @@ import ( "github.com/dmachard/go-dnscollector/netutils" "github.com/dmachard/go-dnscollector/pkgconfig" "github.com/dmachard/go-dnscollector/pkgutils" - "github.com/dmachard/go-dnscollector/processors" "github.com/dmachard/go-framestream" "github.com/dmachard/go-logger" "google.golang.org/protobuf/proto" @@ -82,7 +81,7 @@ func Test_DnstapRelay(t *testing.T) { } // get fake dnstap message - dtQuery := processors.GetFakeDNSTap(dnsquery) + dtQuery := GetFakeDNSTap(dnsquery) // serialize to bytes data, err := proto.Marshal(dtQuery) diff --git a/loggers/dnstapclient.go b/workers/dnstapclient.go similarity index 99% rename from loggers/dnstapclient.go rename to workers/dnstapclient.go index 1e316039..07d1ed95 100644 --- a/loggers/dnstapclient.go +++ b/workers/dnstapclient.go @@ -1,4 +1,4 @@ -package loggers +package workers import ( "bufio" diff --git a/loggers/dnstapclient_test.go b/workers/dnstapclient_test.go similarity index 99% rename from loggers/dnstapclient_test.go rename to workers/dnstapclient_test.go index 32e94622..adcf495b 100644 --- a/loggers/dnstapclient_test.go +++ b/workers/dnstapclient_test.go @@ -1,4 +1,4 @@ -package loggers +package workers import ( "bufio" diff --git a/processors/dnstap.go b/workers/dnstapserver.go similarity index 64% rename from processors/dnstap.go rename to workers/dnstapserver.go index 18517cd0..f2d5c63d 100644 --- a/processors/dnstap.go +++ b/workers/dnstapserver.go @@ -1,9 +1,15 @@ -package processors +package workers import ( + "bufio" + "encoding/binary" + "errors" "fmt" + "io" "net" "strconv" + "sync" + "sync/atomic" "time" "github.com/dmachard/go-dnscollector/dnsutils" @@ -12,10 +18,230 @@ import ( "github.com/dmachard/go-dnscollector/pkgutils" "github.com/dmachard/go-dnscollector/transformers" "github.com/dmachard/go-dnstap-protobuf" + "github.com/dmachard/go-framestream" "github.com/dmachard/go-logger" + "github.com/segmentio/kafka-go/compress" "google.golang.org/protobuf/proto" ) +type DnstapServer struct { + *pkgutils.GenericWorker + connCounter uint64 +} + +func NewDnstapServer(next []pkgutils.Worker, config *pkgconfig.Config, logger *logger.Logger, name string) *DnstapServer { + w := &DnstapServer{GenericWorker: pkgutils.NewGenericWorker(config, logger, name, "dnstap", pkgutils.DefaultBufferSize)} + w.SetDefaultRoutes(next) + w.CheckConfig() + return w +} + +func (w *DnstapServer) CheckConfig() { + if !pkgconfig.IsValidTLS(w.GetConfig().Collectors.Dnstap.TLSMinVersion) { + w.LogFatal(pkgutils.PrefixLogCollector + "[" + w.GetName() + "] dnstap - invalid tls min version") + } +} + +func (w *DnstapServer) HandleConn(conn net.Conn, connID uint64, forceClose chan bool, wg *sync.WaitGroup) { + // close connection on function exit + defer func() { + w.LogInfo("conn #%d - connection handler terminated", connID) + netutils.Close(conn, w.GetConfig().Collectors.Dnstap.ResetConn) + wg.Done() + }() + + // get peer address + peer := conn.RemoteAddr().String() + peerName := netutils.GetPeerName(peer) + w.LogInfo("new connection #%d from %s (%s)", connID, peer, peerName) + + // start dnstap processor and run it + dnstapProcessor := NewDNSTapProcessor(int(connID), peerName, w.GetConfig(), w.GetLogger(), w.GetName(), w.GetConfig().Collectors.Dnstap.ChannelBufferSize) + go dnstapProcessor.Run(w.GetDefaultRoutes(), w.GetDroppedRoutes()) + + // init frame stream library + fsReader := bufio.NewReader(conn) + fsWriter := bufio.NewWriter(conn) + fs := framestream.NewFstrm(fsReader, fsWriter, conn, 5*time.Second, []byte("protobuf:dnstap.Dnstap"), true) + + // framestream as receiver + if err := fs.InitReceiver(); err != nil { + w.LogError("conn #%d - stream initialization: %s", connID, err) + } else { + w.LogInfo("conn #%d - receiver framestream initialized", connID) + } + + // process incoming frame and send it to dnstap consumer channel + var err error + var frame *framestream.Frame + cleanup := make(chan struct{}) + + // goroutine to close the connection properly + go func() { + defer func() { + dnstapProcessor.Stop() + w.LogInfo("conn #%d - cleanup connection handler terminated", connID) + }() + + for { + select { + case <-forceClose: + w.LogInfo("conn #%d - force to cleanup the connection handler", connID) + netutils.Close(conn, w.GetConfig().Collectors.Dnstap.ResetConn) + return + case <-cleanup: + w.LogInfo("conn #%d - cleanup the connection handler", connID) + return + } + } + }() + + // handle incoming frame + for { + if w.GetConfig().Collectors.Dnstap.Compression == pkgconfig.CompressNone { + frame, err = fs.RecvFrame(false) + } else { + frame, err = fs.RecvCompressedFrame(&compress.GzipCodec, false) + } + if err != nil { + connClosed := false + + var opErr *net.OpError + if errors.As(err, &opErr) { + if errors.Is(opErr, net.ErrClosed) { + connClosed = true + } + } + if errors.Is(err, io.EOF) { + connClosed = true + } + + if connClosed { + w.LogInfo("conn #%d - connection closed with peer %s", connID, peer) + } else { + w.LogError("conn #%d - framestream reader error: %s", connID, err) + } + // exit goroutine + close(cleanup) + break + } + + if frame.IsControl() { + if err := fs.ResetReceiver(frame); err != nil { + if errors.Is(err, io.EOF) { + w.LogInfo("conn #%d - framestream reseted by sender", connID) + } else { + w.LogError("conn #%d - unexpected control framestream: %s", connID, err) + } + + } + + // exit goroutine + close(cleanup) + break + } + + if w.GetConfig().Collectors.Dnstap.Compression == pkgconfig.CompressNone { + // send payload to the channel + select { + case dnstapProcessor.GetChannel() <- frame.Data(): // Successful send to channel + default: + w.ProcessorIsBusy() + } + } else { + // ignore first 4 bytes + data := frame.Data()[4:] + validFrame := true + for len(data) >= 4 { + // get frame size + payloadSize := binary.BigEndian.Uint32(data[:4]) + data = data[4:] + + // enough next data ? + if uint32(len(data)) < payloadSize { + validFrame = false + break + } + // send payload to the channel + select { + case dnstapProcessor.GetChannel() <- data[:payloadSize]: // Successful send to channel + default: + w.ProcessorIsBusy() + } + + // continue for next + data = data[payloadSize:] + } + if !validFrame { + w.LogError("conn #%d - invalid compressed frame received", connID) + continue + } + } + } +} + +func (w *DnstapServer) StartCollect() { + w.LogInfo("worker is starting collection") + defer w.CollectDone() + + var connWG sync.WaitGroup + connCleanup := make(chan bool) + cfg := w.GetConfig().Collectors.Dnstap + + // start to listen + listener, err := netutils.StartToListen( + cfg.ListenIP, cfg.ListenPort, cfg.SockPath, + cfg.TLSSupport, pkgconfig.TLSVersion[cfg.TLSMinVersion], + cfg.CertFile, cfg.KeyFile) + if err != nil { + w.LogFatal(pkgutils.PrefixLogCollector+"["+w.GetName()+"] listen error: ", err) + } + w.LogInfo("listening on %s", listener.Addr()) + + // goroutine to Accept() blocks waiting for new connection. + acceptChan := make(chan net.Conn) + netutils.AcceptConnections(listener, acceptChan) + + // main loop + for { + select { + case <-w.OnStop(): + w.LogInfo("stop to listen...") + listener.Close() + + w.LogInfo("closing connected peers...") + close(connCleanup) + connWG.Wait() + return + + // save the new config + case cfg := <-w.NewConfig(): + w.SetConfig(cfg) + w.CheckConfig() + + // new incoming connection + case conn, opened := <-acceptChan: + if !opened { + return + } + + if len(cfg.SockPath) == 0 && cfg.RcvBufSize > 0 { + before, actual, err := netutils.SetSockRCVBUF(conn, cfg.RcvBufSize, cfg.TLSSupport) + if err != nil { + w.LogFatal(pkgutils.PrefixLogCollector+"["+w.GetName()+"] unable to set SO_RCVBUF: ", err) + } + w.LogInfo("set SO_RCVBUF option, value before: %d, desired: %d, actual: %d", before, cfg.RcvBufSize, actual) + } + + // handle the connection + connWG.Add(1) + connID := atomic.AddUint64(&w.connCounter, 1) + go w.HandleConn(conn, connID, connCleanup, &connWG) + } + + } +} + func GetFakeDNSTap(dnsquery []byte) *dnstap.Dnstap { dtQuery := &dnstap.Dnstap{} diff --git a/processors/dnstap_test.go b/workers/dnstapserver_test.go similarity index 67% rename from processors/dnstap_test.go rename to workers/dnstapserver_test.go index c886b01a..7e284d51 100644 --- a/processors/dnstap_test.go +++ b/workers/dnstapserver_test.go @@ -1,21 +1,205 @@ -package processors +package workers import ( + "bufio" "bytes" "fmt" + "net" "regexp" "testing" "time" "github.com/dmachard/go-dnscollector/dnsutils" + "github.com/dmachard/go-dnscollector/netutils" "github.com/dmachard/go-dnscollector/pkgconfig" "github.com/dmachard/go-dnscollector/pkgutils" "github.com/dmachard/go-dnstap-protobuf" + "github.com/dmachard/go-framestream" "github.com/dmachard/go-logger" "github.com/miekg/dns" + "github.com/segmentio/kafka-go/compress" "google.golang.org/protobuf/proto" ) +func Test_DnstapCollector(t *testing.T) { + testcases := []struct { + name string + mode string + address string + listenPort int + operation string + compression string + }{ + { + name: "tcp_default", + mode: netutils.SocketTCP, + address: ":6000", + listenPort: 0, + operation: "CLIENT_QUERY", + compression: "none", + }, + { + name: "tcp_custom_port", + mode: netutils.SocketTCP, + address: ":7000", + listenPort: 7000, + operation: "CLIENT_QUERY", + compression: "none", + }, + { + name: "unix_default", + mode: netutils.SocketUnix, + address: "/tmp/dnscollector.sock", + listenPort: 0, + operation: "CLIENT_QUERY", + compression: "none", + }, + { + name: "tcp_compress_gzip", + mode: netutils.SocketTCP, + address: ":7000", + listenPort: 7000, + operation: "CLIENT_QUERY", + compression: "gzip", + }, + } + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + g := pkgutils.NewFakeLogger() + + config := pkgconfig.GetFakeConfig() + if tc.listenPort > 0 { + config.Collectors.Dnstap.ListenPort = tc.listenPort + } + if tc.mode == netutils.SocketUnix { + config.Collectors.Dnstap.SockPath = tc.address + } + config.Collectors.Dnstap.Compression = tc.compression + + // start the collector + c := NewDnstapServer([]pkgutils.Worker{g}, config, logger.New(false), "test") + go c.StartCollect() + + // wait before to connect + time.Sleep(1 * time.Second) + conn, err := net.Dial(tc.mode, tc.address) + if err != nil { + t.Error("could not connect: ", err) + } + defer conn.Close() + + r := bufio.NewReader(conn) + w := bufio.NewWriter(conn) + fs := framestream.NewFstrm(r, w, conn, 5*time.Second, []byte("protobuf:dnstap.Dnstap"), true) + if err := fs.InitSender(); err != nil { + t.Fatalf("framestream init error: %s", err) + } else { + bulkFrame := &framestream.Frame{} + subFrame := &framestream.Frame{} + + // get fake dns question + dnsquery, err := dnsutils.GetFakeDNS() + if err != nil { + t.Fatalf("dns question pack error") + } + + // get fake dnstap message + dtQuery := GetFakeDNSTap(dnsquery) + + // serialize to bytes + data, err := proto.Marshal(dtQuery) + if err != nil { + t.Fatalf("dnstap proto marshal error %s", err) + } + // send query + + if config.Collectors.Dnstap.Compression == pkgconfig.CompressNone { + // send the frame + bulkFrame.Write(data) + if err := fs.SendFrame(bulkFrame); err != nil { + t.Fatalf("send frame error %s", err) + } + } else { + subFrame.Write(data) + bulkFrame.AppendData(subFrame.Data()) + } + + if config.Collectors.Dnstap.Compression != pkgconfig.CompressNone { + bulkFrame.Encode() + if err := fs.SendCompressedFrame(&compress.GzipCodec, bulkFrame); err != nil { + t.Fatalf("send compressed frame error %s", err) + } + } + } + + // waiting message in channel + msg := <-g.GetInputChannel() + if msg.DNSTap.Operation != tc.operation { + t.Errorf("want %s, got %s", tc.operation, msg.DNSTap.Operation) + } + + c.Stop() + }) + } +} + +// Testcase for https://github.com/dmachard/go-dnscollector/issues/461 +// Support Bind9 with dnstap closing. +func Test_DnstapCollector_CloseFrameStream(t *testing.T) { + // redirect stdout output to bytes buffer + logsChan := make(chan logger.LogEntry, 50) + lg := logger.New(true) + lg.SetOutputChannel((logsChan)) + + config := pkgconfig.GetFakeConfig() + config.Collectors.Dnstap.SockPath = "/tmp/dnscollector.sock" + + // start the collector in unix mode + g := pkgutils.NewFakeLogger() + c := NewDnstapServer([]pkgutils.Worker{g}, config, lg, "test") + go c.StartCollect() + + // simulate dns server connection to collector + time.Sleep(1 * time.Second) + conn, err := net.Dial(netutils.SocketUnix, "/tmp/dnscollector.sock") + if err != nil { + t.Error("could not connect: ", err) + } + defer conn.Close() + + r := bufio.NewReader(conn) + w := bufio.NewWriter(conn) + fs := framestream.NewFstrm(r, w, conn, 5*time.Second, []byte("protobuf:dnstap.Dnstap"), true) + if err := fs.InitSender(); err != nil { + t.Fatalf("framestream init error: %s", err) + } + + // checking reset + errClose := fs.ResetSender() + if errClose != nil { + t.Errorf("reset sender error: %s", errClose) + } + + regxp := ".*framestream reseted by sender.*" + pattern := regexp.MustCompile(regxp) + + matchMsg := false + for entry := range logsChan { + fmt.Println(entry) + if pattern.MatchString(entry.Message) { + matchMsg = true + break + } + } + if !matchMsg { + t.Errorf("reset from sender not received") + } + + // cleanup + c.Stop() +} + func Test_DnstapProcessor(t *testing.T) { logger := logger.New(true) var o bytes.Buffer diff --git a/loggers/elasticsearch.go b/workers/elasticsearch.go similarity index 99% rename from loggers/elasticsearch.go rename to workers/elasticsearch.go index 9c569cda..fc418609 100644 --- a/loggers/elasticsearch.go +++ b/workers/elasticsearch.go @@ -1,4 +1,4 @@ -package loggers +package workers import ( "bytes" diff --git a/loggers/elasticsearch_test.go b/workers/elasticsearch_test.go similarity index 99% rename from loggers/elasticsearch_test.go rename to workers/elasticsearch_test.go index 4a317c0f..d52a6e02 100644 --- a/loggers/elasticsearch_test.go +++ b/workers/elasticsearch_test.go @@ -1,4 +1,4 @@ -package loggers +package workers import ( "bufio" diff --git a/loggers/falco.go b/workers/falco.go similarity index 99% rename from loggers/falco.go rename to workers/falco.go index f596c396..e3202a63 100644 --- a/loggers/falco.go +++ b/workers/falco.go @@ -1,4 +1,4 @@ -package loggers +package workers import ( "bytes" diff --git a/loggers/falco_test.go b/workers/falco_test.go similarity index 98% rename from loggers/falco_test.go rename to workers/falco_test.go index cb868fc4..e6e72c83 100644 --- a/loggers/falco_test.go +++ b/workers/falco_test.go @@ -1,4 +1,4 @@ -package loggers +package workers import ( "bufio" diff --git a/collectors/file_ingestor.go b/workers/file_ingestor.go similarity index 95% rename from collectors/file_ingestor.go rename to workers/file_ingestor.go index b1195415..fe7e91c9 100644 --- a/collectors/file_ingestor.go +++ b/workers/file_ingestor.go @@ -1,4 +1,4 @@ -package collectors +package workers import ( "errors" @@ -14,7 +14,6 @@ import ( "github.com/dmachard/go-dnscollector/netutils" "github.com/dmachard/go-dnscollector/pkgconfig" "github.com/dmachard/go-dnscollector/pkgutils" - "github.com/dmachard/go-dnscollector/processors" "github.com/dmachard/go-logger" framestream "github.com/farsightsec/golang-framestream" "github.com/fsnotify/fsnotify" @@ -38,8 +37,8 @@ func IsValidMode(mode string) bool { type FileIngestor struct { *pkgutils.GenericWorker watcherTimers map[string]*time.Timer - dnsProcessor processors.DNSProcessor - dnstapProcessor processors.DNSTapProcessor + dnsProcessor DNSProcessor + dnstapProcessor DNSTapProcessor mu sync.Mutex } @@ -311,11 +310,11 @@ func (w *FileIngestor) StartCollect() { w.LogInfo("worker is starting collection") defer w.CollectDone() - w.dnsProcessor = processors.NewDNSProcessor(w.GetConfig(), w.GetLogger(), w.GetName(), w.GetConfig().Collectors.FileIngestor.ChannelBufferSize) + w.dnsProcessor = NewDNSProcessor(w.GetConfig(), w.GetLogger(), w.GetName(), w.GetConfig().Collectors.FileIngestor.ChannelBufferSize) go w.dnsProcessor.Run(w.GetDefaultRoutes(), w.GetDroppedRoutes()) // start dnstap subprocessor - w.dnstapProcessor = processors.NewDNSTapProcessor(0, "", w.GetConfig(), w.GetLogger(), w.GetName(), w.GetConfig().Collectors.FileIngestor.ChannelBufferSize) + w.dnstapProcessor = NewDNSTapProcessor(0, "", w.GetConfig(), w.GetLogger(), w.GetName(), w.GetConfig().Collectors.FileIngestor.ChannelBufferSize) go w.dnstapProcessor.Run(w.GetDefaultRoutes(), w.GetDroppedRoutes()) // read current folder content diff --git a/collectors/file_ingestor_test.go b/workers/file_ingestor_test.go similarity index 97% rename from collectors/file_ingestor_test.go rename to workers/file_ingestor_test.go index aa7ef10c..744ba9c5 100644 --- a/collectors/file_ingestor_test.go +++ b/workers/file_ingestor_test.go @@ -1,4 +1,4 @@ -package collectors +package workers import ( "testing" diff --git a/collectors/file_tail.go b/workers/file_tail.go similarity index 99% rename from collectors/file_tail.go rename to workers/file_tail.go index 28fe6416..55ee22b8 100644 --- a/collectors/file_tail.go +++ b/workers/file_tail.go @@ -1,4 +1,4 @@ -package collectors +package workers import ( "fmt" diff --git a/collectors/file_tail_test.go b/workers/file_tail_test.go similarity index 98% rename from collectors/file_tail_test.go rename to workers/file_tail_test.go index b184677b..ac5fefb3 100644 --- a/collectors/file_tail_test.go +++ b/workers/file_tail_test.go @@ -1,4 +1,4 @@ -package collectors +package workers import ( "bufio" diff --git a/loggers/fluentd.go b/workers/fluentd.go similarity index 99% rename from loggers/fluentd.go rename to workers/fluentd.go index 0ddb68aa..e74000be 100644 --- a/loggers/fluentd.go +++ b/workers/fluentd.go @@ -1,4 +1,4 @@ -package loggers +package workers import ( "crypto/tls" diff --git a/loggers/fluentd_test.go b/workers/fluentd_test.go similarity index 98% rename from loggers/fluentd_test.go rename to workers/fluentd_test.go index 0f42260e..c34f7331 100644 --- a/loggers/fluentd_test.go +++ b/workers/fluentd_test.go @@ -1,4 +1,4 @@ -package loggers +package workers import ( "bytes" @@ -96,7 +96,7 @@ func Test_FluentdClient(t *testing.T) { t.Errorf("Decode tag: %v", err) break } - if tag != "dns.collector" { + if tag != pkgconfig.ProgQname { t.Errorf("invalid tag: %s", tag) break } diff --git a/loggers/influxdb.go b/workers/influxdb.go similarity index 99% rename from loggers/influxdb.go rename to workers/influxdb.go index b1c59d25..3be72810 100644 --- a/loggers/influxdb.go +++ b/workers/influxdb.go @@ -1,4 +1,4 @@ -package loggers +package workers import ( "time" diff --git a/loggers/influxdb_test.go b/workers/influxdb_test.go similarity index 98% rename from loggers/influxdb_test.go rename to workers/influxdb_test.go index b483becb..94917679 100644 --- a/loggers/influxdb_test.go +++ b/workers/influxdb_test.go @@ -1,4 +1,4 @@ -package loggers +package workers import ( "bufio" diff --git a/loggers/kafkaproducer.go b/workers/kafkaproducer.go similarity index 99% rename from loggers/kafkaproducer.go rename to workers/kafkaproducer.go index daca75fd..4d05d8e4 100644 --- a/loggers/kafkaproducer.go +++ b/workers/kafkaproducer.go @@ -1,4 +1,4 @@ -package loggers +package workers import ( "bytes" diff --git a/loggers/kafkaproducer_test.go b/workers/kafkaproducer_test.go similarity index 99% rename from loggers/kafkaproducer_test.go rename to workers/kafkaproducer_test.go index 29e39ddd..61edf1de 100644 --- a/loggers/kafkaproducer_test.go +++ b/workers/kafkaproducer_test.go @@ -1,4 +1,4 @@ -package loggers +package workers import ( "log" diff --git a/loggers/logfile.go b/workers/logfile.go similarity index 99% rename from loggers/logfile.go rename to workers/logfile.go index 663fdbe2..986d9ef1 100644 --- a/loggers/logfile.go +++ b/workers/logfile.go @@ -1,4 +1,4 @@ -package loggers +package workers import ( "bufio" @@ -32,7 +32,7 @@ const ( compressSuffix = ".gz" ) -func IsValidMode(mode string) bool { +func IsValid(mode string) bool { switch mode { case pkgconfig.ModeText, @@ -67,7 +67,7 @@ func NewLogFile(config *pkgconfig.Config, logger *logger.Logger, name string) *L } func (w *LogFile) ReadConfig() { - if !IsValidMode(w.GetConfig().Loggers.LogFile.Mode) { + if !IsValid(w.GetConfig().Loggers.LogFile.Mode) { w.LogFatal("["+w.GetName()+"] logger=file - invalid mode: ", w.GetConfig().Loggers.LogFile.Mode) } w.fileDir = filepath.Dir(w.GetConfig().Loggers.LogFile.FilePath) diff --git a/loggers/logfile_test.go b/workers/logfile_test.go similarity index 99% rename from loggers/logfile_test.go rename to workers/logfile_test.go index bdd97e24..9bc80f45 100644 --- a/loggers/logfile_test.go +++ b/workers/logfile_test.go @@ -1,4 +1,4 @@ -package loggers +package workers import ( "fmt" diff --git a/loggers/lokiclient.go b/workers/lokiclient.go similarity index 99% rename from loggers/lokiclient.go rename to workers/lokiclient.go index 5f84ea5c..ccdde497 100644 --- a/loggers/lokiclient.go +++ b/workers/lokiclient.go @@ -1,4 +1,4 @@ -package loggers +package workers import ( "bufio" diff --git a/loggers/lokiclient_test.go b/workers/lokiclient_test.go similarity index 99% rename from loggers/lokiclient_test.go rename to workers/lokiclient_test.go index dd65d78a..2cb93502 100644 --- a/loggers/lokiclient_test.go +++ b/workers/lokiclient_test.go @@ -1,4 +1,4 @@ -package loggers +package workers import ( "bufio" diff --git a/processors/powerdns.go b/workers/powerdns.go similarity index 72% rename from processors/powerdns.go rename to workers/powerdns.go index 20e0c526..c1905576 100644 --- a/processors/powerdns.go +++ b/workers/powerdns.go @@ -1,10 +1,15 @@ -package processors +package workers import ( + "bufio" + "errors" "fmt" + "io" "net" "strconv" "strings" + "sync" + "sync/atomic" "time" "github.com/dmachard/go-dnscollector/dnsutils" @@ -18,6 +23,164 @@ import ( "google.golang.org/protobuf/proto" ) +type PdnsServer struct { + *pkgutils.GenericWorker + connCounter uint64 +} + +func NewPdnsServer(next []pkgutils.Worker, config *pkgconfig.Config, logger *logger.Logger, name string) *PdnsServer { + w := &PdnsServer{GenericWorker: pkgutils.NewGenericWorker(config, logger, name, "powerdns", pkgutils.DefaultBufferSize)} + w.SetDefaultRoutes(next) + w.CheckConfig() + return w +} + +func (w *PdnsServer) CheckConfig() { + if !pkgconfig.IsValidTLS(w.GetConfig().Collectors.PowerDNS.TLSMinVersion) { + w.LogFatal(pkgutils.PrefixLogCollector + "[" + w.GetName() + "] invalid tls min version") + } +} + +func (w *PdnsServer) HandleConn(conn net.Conn, connID uint64, forceClose chan bool, wg *sync.WaitGroup) { + // close connection on function exit + defer func() { + w.LogInfo("conn #%d - connection handler terminated", connID) + netutils.Close(conn, w.GetConfig().Collectors.Dnstap.ResetConn) + wg.Done() + }() + + // get peer address + peer := conn.RemoteAddr().String() + peerName := netutils.GetPeerName(peer) + w.LogInfo("new connection #%d from %s (%s)", connID, peer, peerName) + + // start protobuf subprocessor + pdnsProcessor := NewPdnsProcessor(int(connID), peerName, w.GetConfig(), w.GetLogger(), w.GetName(), w.GetConfig().Collectors.PowerDNS.ChannelBufferSize) + go pdnsProcessor.Run(w.GetDefaultRoutes(), w.GetDroppedRoutes()) + + r := bufio.NewReader(conn) + pbs := powerdns_protobuf.NewProtobufStream(r, conn, 5*time.Second) + + var err error + var payload *powerdns_protobuf.ProtoPayload + cleanup := make(chan struct{}) + + // goroutine to close the connection properly + go func() { + defer func() { + pdnsProcessor.Stop() + w.LogInfo("conn #%d - cleanup connection handler terminated", connID) + }() + + for { + select { + case <-forceClose: + w.LogInfo("conn #%d - force to cleanup the connection handler", connID) + netutils.Close(conn, w.GetConfig().Collectors.Dnstap.ResetConn) + return + case <-cleanup: + w.LogInfo("conn #%d - cleanup the connection handler", connID) + return + } + } + }() + + for { + payload, err = pbs.RecvPayload(false) + if err != nil { + connClosed := false + + var opErr *net.OpError + if errors.As(err, &opErr) { + if errors.Is(opErr, net.ErrClosed) { + connClosed = true + } + } + if errors.Is(err, io.EOF) { + connClosed = true + } + + if connClosed { + w.LogInfo("conn #%d - connection closed with peer %s", connID, peer) + } else { + w.LogError("conn #%d - powerdns reader error: %s", connID, err) + } + + // exit goroutine + close(cleanup) + break + } + + // send payload to the channel + select { + case pdnsProcessor.GetChannel() <- payload.Data(): // Successful send + default: + w.ProcessorIsBusy() + } + } +} + +func (w *PdnsServer) StartCollect() { + w.LogInfo("worker is starting collection") + defer w.CollectDone() + + var connWG sync.WaitGroup + connCleanup := make(chan bool) + cfg := w.GetConfig().Collectors.PowerDNS + + // start to listen + listener, err := netutils.StartToListen( + cfg.ListenIP, cfg.ListenPort, "", + cfg.TLSSupport, pkgconfig.TLSVersion[cfg.TLSMinVersion], + cfg.CertFile, cfg.KeyFile) + if err != nil { + w.LogFatal(pkgutils.PrefixLogCollector+"["+w.GetName()+"] listening failed: ", err) + } + w.LogInfo("listening on %s", listener.Addr()) + + // goroutine to Accept() blocks waiting for new connection. + acceptChan := make(chan net.Conn) + netutils.AcceptConnections(listener, acceptChan) + + // main loop + for { + select { + case <-w.OnStop(): + w.LogInfo("stop to listen...") + listener.Close() + + w.LogInfo("closing connected peers...") + close(connCleanup) + connWG.Wait() + return + + // save the new config + case cfg := <-w.NewConfig(): + w.SetConfig(cfg) + w.CheckConfig() + + case conn, opened := <-acceptChan: + if !opened { + return + } + + if w.GetConfig().Collectors.Dnstap.RcvBufSize > 0 { + before, actual, err := netutils.SetSockRCVBUF(conn, cfg.RcvBufSize, cfg.TLSSupport) + if err != nil { + w.LogFatal(pkgutils.PrefixLogCollector+"["+w.GetName()+"] unable to set SO_RCVBUF: ", err) + } + w.LogInfo("set SO_RCVBUF option, value before: %d, desired: %d, actual: %d", before, cfg.RcvBufSize, actual) + } + + // handle the connection + connWG.Add(1) + connID := atomic.AddUint64(&w.connCounter, 1) + go w.HandleConn(conn, connID, connCleanup, &connWG) + + } + } +} + var ( ProtobufPowerDNSToDNSTap = map[string]string{ "DNSQueryType": "CLIENT_QUERY", diff --git a/processors/powerdns_test.go b/workers/powerdns_test.go similarity index 94% rename from processors/powerdns_test.go rename to workers/powerdns_test.go index 4bc57091..3ad2a959 100644 --- a/processors/powerdns_test.go +++ b/workers/powerdns_test.go @@ -1,11 +1,13 @@ -package processors +package workers import ( "fmt" + "net" "regexp" "testing" "time" + "github.com/dmachard/go-dnscollector/netutils" "github.com/dmachard/go-dnscollector/pkgconfig" "github.com/dmachard/go-dnscollector/pkgutils" "github.com/dmachard/go-logger" @@ -14,6 +16,21 @@ import ( "google.golang.org/protobuf/proto" ) +func TestPowerDNS_Run(t *testing.T) { + g := pkgutils.NewFakeLogger() + + c := NewPdnsServer([]pkgutils.Worker{g}, pkgconfig.GetFakeConfig(), logger.New(false), "test") + go c.StartCollect() + + // wait before to connect + time.Sleep(1 * time.Second) + conn, err := net.Dial(netutils.SocketTCP, ":6001") + if err != nil { + t.Error("could not connect to TCP server: ", err) + } + defer conn.Close() +} + func Test_PowerDNSProcessor(t *testing.T) { // init the dnstap consumer consumer := NewPdnsProcessor(0, "peername", pkgconfig.GetFakeConfig(), logger.New(false), "test", 512) diff --git a/loggers/prometheus.go b/workers/prometheus.go similarity index 99% rename from loggers/prometheus.go rename to workers/prometheus.go index 876b63f9..b2ceb62e 100644 --- a/loggers/prometheus.go +++ b/workers/prometheus.go @@ -1,4 +1,4 @@ -package loggers +package workers import ( "crypto/tls" diff --git a/loggers/prometheus_test.go b/workers/prometheus_test.go similarity index 99% rename from loggers/prometheus_test.go rename to workers/prometheus_test.go index 49b9750c..c3408f62 100644 --- a/loggers/prometheus_test.go +++ b/workers/prometheus_test.go @@ -1,4 +1,4 @@ -package loggers +package workers import ( "net/http" diff --git a/loggers/redispub.go b/workers/redispub.go similarity index 99% rename from loggers/redispub.go rename to workers/redispub.go index c94043e7..70fcbd53 100644 --- a/loggers/redispub.go +++ b/workers/redispub.go @@ -1,4 +1,4 @@ -package loggers +package workers import ( "bufio" diff --git a/loggers/redispub_test.go b/workers/redispub_test.go similarity index 99% rename from loggers/redispub_test.go rename to workers/redispub_test.go index 5b03ead4..3178a65a 100644 --- a/loggers/redispub_test.go +++ b/workers/redispub_test.go @@ -1,4 +1,4 @@ -package loggers +package workers import ( "bufio" diff --git a/loggers/restapi.go b/workers/restapi.go similarity index 99% rename from loggers/restapi.go rename to workers/restapi.go index aad0f102..1e7f761b 100644 --- a/loggers/restapi.go +++ b/workers/restapi.go @@ -1,4 +1,4 @@ -package loggers +package workers import ( "crypto/tls" diff --git a/loggers/restapi_test.go b/workers/restapi_test.go similarity index 99% rename from loggers/restapi_test.go rename to workers/restapi_test.go index 1a472ce4..7b78da72 100644 --- a/loggers/restapi_test.go +++ b/workers/restapi_test.go @@ -1,4 +1,4 @@ -package loggers +package workers import ( "net/http" diff --git a/loggers/scalyr.go b/workers/scalyr.go similarity index 99% rename from loggers/scalyr.go rename to workers/scalyr.go index cc7007f0..d81b15ff 100644 --- a/loggers/scalyr.go +++ b/workers/scalyr.go @@ -1,4 +1,4 @@ -package loggers +package workers import ( "bytes" diff --git a/collectors/sniffer_afpacket.go b/workers/sniffer_afpacket.go similarity index 97% rename from collectors/sniffer_afpacket.go rename to workers/sniffer_afpacket.go index 0c849de8..ee76571a 100644 --- a/collectors/sniffer_afpacket.go +++ b/workers/sniffer_afpacket.go @@ -1,7 +1,7 @@ //go:build windows || darwin || freebsd // +build windows darwin freebsd -package collectors +package workers import ( "github.com/dmachard/go-dnscollector/pkgconfig" diff --git a/collectors/sniffer_afpacket_linux.go b/workers/sniffer_afpacket_linux.go similarity index 97% rename from collectors/sniffer_afpacket_linux.go rename to workers/sniffer_afpacket_linux.go index 960c4bc0..1d4b06be 100644 --- a/collectors/sniffer_afpacket_linux.go +++ b/workers/sniffer_afpacket_linux.go @@ -1,7 +1,7 @@ //go:build linux // +build linux -package collectors +package workers import ( "context" @@ -16,7 +16,6 @@ import ( "github.com/dmachard/go-dnscollector/netutils" "github.com/dmachard/go-dnscollector/pkgconfig" "github.com/dmachard/go-dnscollector/pkgutils" - "github.com/dmachard/go-dnscollector/processors" "github.com/dmachard/go-logger" "github.com/google/gopacket" "github.com/google/gopacket/layers" @@ -87,7 +86,7 @@ func (w *AfpacketSniffer) StartCollect() { } } - dnsProcessor := processors.NewDNSProcessor(w.GetConfig(), w.GetLogger(), w.GetName(), w.GetConfig().Collectors.AfpacketLiveCapture.ChannelBufferSize) + dnsProcessor := NewDNSProcessor(w.GetConfig(), w.GetLogger(), w.GetName(), w.GetConfig().Collectors.AfpacketLiveCapture.ChannelBufferSize) go dnsProcessor.Run(w.GetDefaultRoutes(), w.GetDroppedRoutes()) dnsChan := make(chan netutils.DNSPacket) diff --git a/collectors/sniffer_afpacket_test.go b/workers/sniffer_afpacket_test.go similarity index 89% rename from collectors/sniffer_afpacket_test.go rename to workers/sniffer_afpacket_test.go index e994af38..a3fbd923 100644 --- a/collectors/sniffer_afpacket_test.go +++ b/workers/sniffer_afpacket_test.go @@ -1,7 +1,7 @@ //go:build linux // +build linux -package collectors +package workers import ( "log" @@ -23,12 +23,12 @@ func TestAfpacketSnifferRun(t *testing.T) { go c.StartCollect() // send dns query - net.LookupIP("dns.collector") + net.LookupIP(pkgconfig.ProgQname) // waiting message in channel for { msg := <-g.GetInputChannel() - if msg.DNSTap.Operation == dnsutils.DNSTapClientQuery && msg.DNS.Qname == "dns.collector" { + if msg.DNSTap.Operation == dnsutils.DNSTapClientQuery && msg.DNS.Qname == pkgconfig.ProgQname { break } } diff --git a/collectors/sniffer_xdp.go b/workers/sniffer_xdp.go similarity index 96% rename from collectors/sniffer_xdp.go rename to workers/sniffer_xdp.go index 50b372aa..4210da78 100644 --- a/collectors/sniffer_xdp.go +++ b/workers/sniffer_xdp.go @@ -1,7 +1,7 @@ //go:build linux || darwin || freebsd // +build linux darwin freebsd -package collectors +package workers import ( "bytes" @@ -18,7 +18,6 @@ import ( "github.com/dmachard/go-dnscollector/netutils" "github.com/dmachard/go-dnscollector/pkgconfig" "github.com/dmachard/go-dnscollector/pkgutils" - "github.com/dmachard/go-dnscollector/processors" "github.com/dmachard/go-dnscollector/xdp" "github.com/dmachard/go-logger" "golang.org/x/sys/unix" @@ -39,7 +38,7 @@ func (w *XDPSniffer) StartCollect() { defer w.CollectDone() // init dns processor - dnsProcessor := processors.NewDNSProcessor(w.GetConfig(), w.GetLogger(), w.GetName(), w.GetConfig().Collectors.XdpLiveCapture.ChannelBufferSize) + dnsProcessor := NewDNSProcessor(w.GetConfig(), w.GetLogger(), w.GetName(), w.GetConfig().Collectors.XdpLiveCapture.ChannelBufferSize) go dnsProcessor.Run(w.GetDefaultRoutes(), w.GetDroppedRoutes()) // get network interface by name diff --git a/collectors/sniffer_xdp_windows.go b/workers/sniffer_xdp_windows.go similarity index 97% rename from collectors/sniffer_xdp_windows.go rename to workers/sniffer_xdp_windows.go index 690b15ee..54f7f377 100644 --- a/collectors/sniffer_xdp_windows.go +++ b/workers/sniffer_xdp_windows.go @@ -1,7 +1,7 @@ //go:build windows // +build windows -package collectors +package workers import ( "github.com/dmachard/go-dnscollector/pkgconfig" diff --git a/loggers/statsd.go b/workers/statsd.go similarity index 99% rename from loggers/statsd.go rename to workers/statsd.go index 2bd4eb51..88e7a233 100644 --- a/loggers/statsd.go +++ b/workers/statsd.go @@ -1,4 +1,4 @@ -package loggers +package workers import ( "bufio" diff --git a/loggers/statsd_test.go b/workers/statsd_test.go similarity index 98% rename from loggers/statsd_test.go rename to workers/statsd_test.go index 8d7c4b6d..867cf1be 100644 --- a/loggers/statsd_test.go +++ b/workers/statsd_test.go @@ -1,4 +1,4 @@ -package loggers +package workers import ( "net" diff --git a/loggers/stdout.go b/workers/stdout.go similarity index 99% rename from loggers/stdout.go rename to workers/stdout.go index 89d371ea..d3136ebc 100644 --- a/loggers/stdout.go +++ b/workers/stdout.go @@ -1,4 +1,4 @@ -package loggers +package workers import ( "bytes" diff --git a/loggers/stdout_test.go b/workers/stdout_test.go similarity index 94% rename from loggers/stdout_test.go rename to workers/stdout_test.go index 0fbb5790..9d2a2184 100644 --- a/loggers/stdout_test.go +++ b/workers/stdout_test.go @@ -1,4 +1,4 @@ -package loggers +package workers import ( "bytes" @@ -10,7 +10,6 @@ import ( "github.com/dmachard/go-dnscollector/dnsutils" "github.com/dmachard/go-dnscollector/pkgconfig" "github.com/dmachard/go-dnscollector/pkgutils" - "github.com/dmachard/go-dnscollector/processors" "github.com/dmachard/go-logger" "github.com/google/gopacket/pcapgo" ) @@ -30,14 +29,14 @@ func Test_StdoutTextMode(t *testing.T) { name: "default_delimiter", delimiter: cfg.Global.TextFormatDelimiter, boundary: cfg.Global.TextFormatBoundary, - qname: "dns.collector", + qname: pkgconfig.ProgQname, expected: "- collector CLIENT_QUERY NOERROR 1.2.3.4 1234 - - 0b dns.collector A -\n", }, { name: "custom_delimiter", delimiter: ";", boundary: cfg.Global.TextFormatBoundary, - qname: "dns.collector", + qname: pkgconfig.ProgQname, expected: "-;collector;CLIENT_QUERY;NOERROR;1.2.3.4;1234;-;-;0b;dns.collector;A;-\n", }, { @@ -241,7 +240,7 @@ func Test_StdoutBufferLoggerIsFull(t *testing.T) { for entry := range logsChan { fmt.Println(entry) - pattern := regexp.MustCompile(processors.ExpectedBufferMsg511) + pattern := regexp.MustCompile(ExpectedBufferMsg511) if pattern.MatchString(entry.Message) { break } @@ -249,7 +248,7 @@ func Test_StdoutBufferLoggerIsFull(t *testing.T) { // read dns message from dnstap consumer dmOut := <-nxt.GetInputChannel() - if dmOut.DNS.Qname != processors.ExpectedQname2 { + if dmOut.DNS.Qname != ExpectedQname2 { t.Errorf("invalid qname in dns message: %s", dmOut.DNS.Qname) } @@ -262,7 +261,7 @@ func Test_StdoutBufferLoggerIsFull(t *testing.T) { time.Sleep(12 * time.Second) for entry := range logsChan { fmt.Println(entry) - pattern := regexp.MustCompile(processors.ExpectedBufferMsg1023) + pattern := regexp.MustCompile(ExpectedBufferMsg1023) if pattern.MatchString(entry.Message) { break } @@ -270,7 +269,7 @@ func Test_StdoutBufferLoggerIsFull(t *testing.T) { // read dns message from dnstap consumer dmOut2 := <-nxt.GetInputChannel() - if dmOut2.DNS.Qname != processors.ExpectedQname2 { + if dmOut2.DNS.Qname != ExpectedQname2 { t.Errorf("invalid qname in second dns message: %s", dmOut2.DNS.Qname) } diff --git a/loggers/syslog.go b/workers/syslog.go similarity index 99% rename from loggers/syslog.go rename to workers/syslog.go index bf33f0b3..d79b3b56 100644 --- a/loggers/syslog.go +++ b/workers/syslog.go @@ -1,4 +1,4 @@ -package loggers +package workers import ( "bytes" diff --git a/loggers/syslog_test.go b/workers/syslog_test.go similarity index 99% rename from loggers/syslog_test.go rename to workers/syslog_test.go index 65e61aa2..66d33a8d 100644 --- a/loggers/syslog_test.go +++ b/workers/syslog_test.go @@ -1,4 +1,4 @@ -package loggers +package workers import ( "bufio" diff --git a/loggers/tcpclient.go b/workers/tcpclient.go similarity index 99% rename from loggers/tcpclient.go rename to workers/tcpclient.go index 7290e6b9..68c194a4 100644 --- a/loggers/tcpclient.go +++ b/workers/tcpclient.go @@ -1,4 +1,4 @@ -package loggers +package workers import ( "bufio" diff --git a/loggers/tcpclient_test.go b/workers/tcpclient_test.go similarity index 99% rename from loggers/tcpclient_test.go rename to workers/tcpclient_test.go index f985f156..1b658668 100644 --- a/loggers/tcpclient_test.go +++ b/workers/tcpclient_test.go @@ -1,4 +1,4 @@ -package loggers +package workers import ( "bufio" diff --git a/collectors/tzsp.go b/workers/tzsp.go similarity index 97% rename from collectors/tzsp.go rename to workers/tzsp.go index b96a88ce..29e4d957 100644 --- a/collectors/tzsp.go +++ b/workers/tzsp.go @@ -1,7 +1,7 @@ //go:build windows || freebsd || darwin // +build windows freebsd darwin -package collectors +package workers import ( "github.com/dmachard/go-dnscollector/pkgconfig" diff --git a/collectors/tzsp_linux.go b/workers/tzsp_linux.go similarity index 96% rename from collectors/tzsp_linux.go rename to workers/tzsp_linux.go index 978ae77d..abf9606d 100644 --- a/collectors/tzsp_linux.go +++ b/workers/tzsp_linux.go @@ -4,7 +4,7 @@ // Written by Noel Kuntze // Updating by Denis Machard -package collectors +package workers import ( "context" @@ -19,7 +19,6 @@ import ( "github.com/dmachard/go-dnscollector/netutils" "github.com/dmachard/go-dnscollector/pkgconfig" "github.com/dmachard/go-dnscollector/pkgutils" - "github.com/dmachard/go-dnscollector/processors" "github.com/dmachard/go-logger" "github.com/google/gopacket" "github.com/google/gopacket/layers" @@ -84,7 +83,7 @@ func (w *TZSPSniffer) StartCollect() { } // init dns processor - dnsProcessor := processors.NewDNSProcessor(w.GetConfig(), w.GetLogger(), w.GetName(), w.GetConfig().Collectors.Tzsp.ChannelBufferSize) + dnsProcessor := NewDNSProcessor(w.GetConfig(), w.GetLogger(), w.GetName(), w.GetConfig().Collectors.Tzsp.ChannelBufferSize) go dnsProcessor.Run(w.GetDefaultRoutes(), w.GetDroppedRoutes()) ctx, cancel := context.WithCancel(context.Background())