From f8f62be8673230ded61efd764d1f39ad50b736a0 Mon Sep 17 00:00:00 2001 From: dmachard <5562930+dmachard@users.noreply.github.com> Date: Fri, 4 Mar 2022 13:59:41 +0100 Subject: [PATCH] alpha support for powerdns protobuf remote logger #39 --- collectors/powerdns.go | 139 ++++++++++++++++++++++++++++++++++++++ config.yml | 11 ++- dnscollector.go | 3 + dnsutils/config.go | 9 +++ go.mod | 1 + go.sum | 2 + subprocessors/powerdns.go | 119 ++++++++++++++++++++++++++++++++ 7 files changed, 283 insertions(+), 1 deletion(-) create mode 100644 collectors/powerdns.go create mode 100644 subprocessors/powerdns.go diff --git a/collectors/powerdns.go b/collectors/powerdns.go new file mode 100644 index 00000000..4bf2d821 --- /dev/null +++ b/collectors/powerdns.go @@ -0,0 +1,139 @@ +package collectors + +import ( + "bufio" + "net" + "strconv" + "time" + + "github.com/dmachard/go-dnscollector/dnsutils" + "github.com/dmachard/go-dnscollector/subprocessors" + "github.com/dmachard/go-logger" + powerdns_protobuf "github.com/dmachard/go-powerdns-protobuf" +) + +type ProtobufPowerDNS struct { + done chan bool + listen net.Listener + conns []net.Conn + loggers []dnsutils.Worker + config *dnsutils.Config + logger *logger.Logger +} + +func NewProtobufPowerDNS(loggers []dnsutils.Worker, config *dnsutils.Config, logger *logger.Logger) *ProtobufPowerDNS { + logger.Info("collector PowerDNS protobuf - enabled") + s := &ProtobufPowerDNS{ + done: make(chan bool), + config: config, + loggers: loggers, + logger: logger, + } + s.ReadConfig() + return s +} + +func (c *ProtobufPowerDNS) Loggers() []chan dnsutils.DnsMessage { + channels := []chan dnsutils.DnsMessage{} + for _, p := range c.loggers { + channels = append(channels, p.Channel()) + } + return channels +} + +func (c *ProtobufPowerDNS) ReadConfig() { +} + +func (o *ProtobufPowerDNS) LogInfo(msg string, v ...interface{}) { + o.logger.Info("collector powerdns protobuf - "+msg, v...) +} + +func (o *ProtobufPowerDNS) LogError(msg string, v ...interface{}) { + o.logger.Error("collector powerdns protobuf - "+msg, v...) +} + +func (c *ProtobufPowerDNS) HandleConn(conn net.Conn) { + // close connection on function exit + defer conn.Close() + + // get peer address + peer := conn.RemoteAddr().String() + c.LogInfo("%s - new connection\n", peer) + + // start protobuf subprocessor + pdns_subprocessor := subprocessors.NewPdnsProcessor(c.config, c.logger) + go pdns_subprocessor.Run(c.Loggers()) + + r := bufio.NewReader(conn) + pbs := powerdns_protobuf.NewProtobufStream(r, conn, 5*time.Second) + + // process incoming protobuf payload + if err := pbs.ProcessStream(pdns_subprocessor.GetChannel()); err != nil { + c.LogError("transport error: %s", err) + } + + c.LogInfo("%s - connection closed\n", peer) +} + +func (c *ProtobufPowerDNS) Channel() chan dnsutils.DnsMessage { + return nil +} + +func (c *ProtobufPowerDNS) Stop() { + c.LogInfo("stopping...") + + // closing properly current connections if exists + for _, conn := range c.conns { + peer := conn.RemoteAddr().String() + c.LogInfo("%s - closing connection...", peer) + conn.Close() + } + // Finally close the listener to unblock accept + c.LogInfo("stop listening...") + c.listen.Close() + + // read done channel and block until run is terminated + <-c.done + close(c.done) +} + +func (c *ProtobufPowerDNS) Listen() error { + c.LogInfo("running in background...") + + var err error + var listener net.Listener + addrlisten := c.config.Collectors.PowerDNS.ListenIP + ":" + strconv.Itoa(c.config.Collectors.PowerDNS.ListenPort) + + listener, err = net.Listen("tcp", addrlisten) + + // something is wrong ? + if err != nil { + return err + } + c.LogInfo("is listening on %s", listener.Addr()) + c.listen = listener + return nil +} + +func (c *ProtobufPowerDNS) Run() { + c.LogInfo("starting collector...") + if c.listen == nil { + if err := c.Listen(); err != nil { + c.logger.Fatal("collector dnstap listening failed: ", err) + } + } + for { + // Accept() blocks waiting for new connection. + conn, err := c.listen.Accept() + if err != nil { + break + } + + c.conns = append(c.conns, conn) + go c.HandleConn(conn) + + } + + c.LogInfo("run terminated") + c.done <- true +} diff --git a/config.yml b/config.yml index 55b86939..7e11e8eb 100644 --- a/config.yml +++ b/config.yml @@ -16,7 +16,7 @@ collectors: # dnstap standard dnstap: # to enable, set the enable to true - enable: true + enable: false # listen on ip listen-ip: 0.0.0.0 # listening on port @@ -62,6 +62,15 @@ collectors: (?P[^ ]*) (?P[^ ]*) (?P[^ ]*) (?P[^ ]*) (?P[^ ]*)b (?P[^ ]*) (?P[^ ]*) (?P[^ ]*)$" + # protobuf powerdns + powerdns: + # to enable, set the enable to true + enable: true + # listen on ip + listen-ip: 0.0.0.0 + # listening on port + listen-port: 6001 + # settings for subprocessors subprocessors: # this option can be useful to reduce the size of your dns logs diff --git a/dnscollector.go b/dnscollector.go index 592aa421..0296e979 100644 --- a/dnscollector.go +++ b/dnscollector.go @@ -123,6 +123,9 @@ func main() { if config.Collectors.Tail.Enable { collwrks = append(collwrks, collectors.NewTail(logwrks, config, logger)) } + if config.Collectors.PowerDNS.Enable { + collwrks = append(collwrks, collectors.NewProtobufPowerDNS(logwrks, config, logger)) + } // Handle Ctrl-C sigTerm := make(chan os.Signal, 1) diff --git a/dnsutils/config.go b/dnsutils/config.go index 257b373f..936bea6e 100644 --- a/dnsutils/config.go +++ b/dnsutils/config.go @@ -49,6 +49,11 @@ type Config struct { CaptureDnsQueries bool `yaml:"capture-dns-queries"` CaptureDnsReplies bool `yaml:"capture-dns-replies"` } `yaml:"dns-sniffer"` + PowerDNS struct { + Enable bool `yaml:"enable"` + ListenIP string `yaml:"listen-ip"` + ListenPort int `yaml:"listen-port"` + } `yaml:"powerdns"` } `yaml:"collectors"` Subprocessors struct { @@ -251,6 +256,10 @@ func (c *Config) SetDefault() { c.Collectors.DnsSniffer.CaptureDnsQueries = true c.Collectors.DnsSniffer.CaptureDnsReplies = true + c.Collectors.PowerDNS.Enable = true + c.Collectors.PowerDNS.ListenIP = "0.0.0.0" + c.Collectors.PowerDNS.ListenPort = 6000 + // Subprocessors c.Subprocessors.QuietText.Dnstap = false c.Subprocessors.QuietText.Dns = false diff --git a/go.mod b/go.mod index 62ff48d6..124de29e 100644 --- a/go.mod +++ b/go.mod @@ -27,6 +27,7 @@ require ( ) require ( + github.com/dmachard/go-powerdns-protobuf v0.0.2 // indirect go4.org/intern v0.0.0-20211027215823-ae77deb06f29 // indirect go4.org/unsafe/assume-no-moving-gc v0.0.0-20211027215541-db492cf91b37 // indirect ) diff --git a/go.sum b/go.sum index 0fa2e1ba..f35e6d9f 100644 --- a/go.sum +++ b/go.sum @@ -234,6 +234,8 @@ github.com/dmachard/go-framestream v0.1.0 h1:SwBZSamv1S3T4zHeUZFMNSo3u09WKyyOOwZ github.com/dmachard/go-framestream v0.1.0/go.mod h1:NTkjiWoRSjLVyx/PxDjZoTCb6bE40DHJ8fe29xOHBcg= github.com/dmachard/go-logger v0.1.0 h1:E4kmxIZx+/hrx6Iusa7ygbcSbP+jQnYoeLKhqsjUOxY= github.com/dmachard/go-logger v0.1.0/go.mod h1:vg6cMQBmx+SgH45XsqEyqScXp9eJhS6yuvvJZOgBbvU= +github.com/dmachard/go-powerdns-protobuf v0.0.2 h1:POJPDn5gWG0nFjCsKo4RYIHFTrCVK3+UH+y0B6hL3XM= +github.com/dmachard/go-powerdns-protobuf v0.0.2/go.mod h1:ARzDrAPrRJO0RRKQAJpmrUtCChorBnioluLrJRExjzc= github.com/dmachard/go-topmap v0.4.0 h1:u8MJuaLwlMmuVu3UvJcid+sS3vuxgj3y728BQFLiYXA= github.com/dmachard/go-topmap v0.4.0/go.mod h1:EpVNlcJO8kJjXzTgstKBXOQumsmSUz5bTxSxTE4DRrk= github.com/docker/distribution v2.7.0+incompatible/go.mod h1:J2gT2udsDAN96Uj4KfcMRqY0/ypR+oyYUYmja8H+y+w= diff --git a/subprocessors/powerdns.go b/subprocessors/powerdns.go new file mode 100644 index 00000000..0527979b --- /dev/null +++ b/subprocessors/powerdns.go @@ -0,0 +1,119 @@ +package subprocessors + +import ( + "net" + "strconv" + "strings" + "time" + + "github.com/dmachard/go-dnscollector/dnsutils" + "github.com/dmachard/go-logger" + powerdns_protobuf "github.com/dmachard/go-powerdns-protobuf" + "google.golang.org/protobuf/proto" +) + +type PdnsProcessor struct { + done chan bool + recvFrom chan []byte + logger *logger.Logger + config *dnsutils.Config +} + +func NewPdnsProcessor(config *dnsutils.Config, logger *logger.Logger) PdnsProcessor { + logger.Info("powerdns processor - initialization...") + d := PdnsProcessor{ + done: make(chan bool), + recvFrom: make(chan []byte, 512), + logger: logger, + config: config, + } + + d.ReadConfig() + + return d +} + +func (c *PdnsProcessor) ReadConfig() { + c.logger.Info("processor powerdns parser - config") +} + +func (c *PdnsProcessor) LogInfo(msg string, v ...interface{}) { + c.logger.Info("processor powerdns parser - "+msg, v...) +} + +func (c *PdnsProcessor) LogError(msg string, v ...interface{}) { + c.logger.Error("procesor powerdns parser - "+msg, v...) +} + +func (d *PdnsProcessor) GetChannel() chan []byte { + return d.recvFrom +} + +func (d *PdnsProcessor) Stop() { + close(d.recvFrom) + + // read done channel and block until run is terminated + <-d.done + close(d.done) +} + +func (d *PdnsProcessor) Run(sendTo []chan dnsutils.DnsMessage) { + + pbdm := &powerdns_protobuf.PBDNSMessage{} + + // read incoming dns message + d.LogInfo("running... waiting incoming dns message") + for data := range d.recvFrom { + err := proto.Unmarshal(data, pbdm) + if err != nil { + continue + } + + dm := dnsutils.DnsMessage{} + dm.Init() + + dm.DnsTap.Operation = pbdm.GetType().String() + + dm.NetworkInfo.Family = pbdm.GetSocketFamily().String() + dm.NetworkInfo.Protocol = pbdm.GetSocketProtocol().String() + + dm.NetworkInfo.QueryIp = net.IP(pbdm.From).String() + dm.NetworkInfo.QueryPort = strconv.FormatUint(uint64(pbdm.GetFromPort()), 10) + dm.NetworkInfo.ResponseIp = net.IP(pbdm.To).String() + dm.NetworkInfo.ResponsePort = strconv.FormatUint(uint64(pbdm.GetToPort()), 10) + + dm.DNS.Id = int(pbdm.GetId()) + dm.DNS.Length = int(pbdm.GetInBytes()) + dm.DnsTap.TimeSec = int(pbdm.GetTimeSec()) + dm.DnsTap.TimeNsec = int(pbdm.GetTimeUsec()) + + if int(pbdm.Type.Number())%2 == 1 { + dm.DNS.Type = dnsutils.DnsQuery + } else { + dm.DNS.Type = dnsutils.DnsReply + } + + dm.DNS.Rcode = dnsutils.RcodeToString(int(pbdm.Response.GetRcode())) + + // compute timestamp + dm.DnsTap.Timestamp = float64(dm.DnsTap.TimeSec) + float64(dm.DnsTap.TimeNsec)/1e9 + ts := time.Unix(int64(dm.DnsTap.TimeSec), int64(dm.DnsTap.TimeNsec)) + dm.DnsTap.TimestampRFC3339 = ts.UTC().Format(time.RFC3339Nano) + + if d.config.Subprocessors.QnameLowerCase { + dm.DNS.Qname = strings.ToLower(pbdm.Question.GetQName()) + } else { + dm.DNS.Qname = pbdm.Question.GetQName() + } + + dm.DNS.Qtype = dnsutils.RdatatypeToString(int(pbdm.Question.GetQType())) + + // dispatch dns message to all generators + for i := range sendTo { + sendTo[i] <- dm + } + } + + // dnstap channel closed + d.done <- true +}