-
-
Notifications
You must be signed in to change notification settings - Fork 50
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
alpha support for powerdns protobuf remote logger #39
- Loading branch information
Showing
7 changed files
with
283 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,139 @@ | ||
package collectors | ||
|
||
import ( | ||
"bufio" | ||
"net" | ||
"strconv" | ||
"time" | ||
|
||
"github.com/dmachard/go-dnscollector/dnsutils" | ||
"github.com/dmachard/go-dnscollector/subprocessors" | ||
"github.com/dmachard/go-logger" | ||
powerdns_protobuf "github.com/dmachard/go-powerdns-protobuf" | ||
) | ||
|
||
type ProtobufPowerDNS struct { | ||
done chan bool | ||
listen net.Listener | ||
conns []net.Conn | ||
loggers []dnsutils.Worker | ||
config *dnsutils.Config | ||
logger *logger.Logger | ||
} | ||
|
||
func NewProtobufPowerDNS(loggers []dnsutils.Worker, config *dnsutils.Config, logger *logger.Logger) *ProtobufPowerDNS { | ||
logger.Info("collector PowerDNS protobuf - enabled") | ||
s := &ProtobufPowerDNS{ | ||
done: make(chan bool), | ||
config: config, | ||
loggers: loggers, | ||
logger: logger, | ||
} | ||
s.ReadConfig() | ||
return s | ||
} | ||
|
||
func (c *ProtobufPowerDNS) Loggers() []chan dnsutils.DnsMessage { | ||
channels := []chan dnsutils.DnsMessage{} | ||
for _, p := range c.loggers { | ||
channels = append(channels, p.Channel()) | ||
} | ||
return channels | ||
} | ||
|
||
func (c *ProtobufPowerDNS) ReadConfig() { | ||
} | ||
|
||
func (o *ProtobufPowerDNS) LogInfo(msg string, v ...interface{}) { | ||
o.logger.Info("collector powerdns protobuf - "+msg, v...) | ||
} | ||
|
||
func (o *ProtobufPowerDNS) LogError(msg string, v ...interface{}) { | ||
o.logger.Error("collector powerdns protobuf - "+msg, v...) | ||
} | ||
|
||
func (c *ProtobufPowerDNS) HandleConn(conn net.Conn) { | ||
// close connection on function exit | ||
defer conn.Close() | ||
|
||
// get peer address | ||
peer := conn.RemoteAddr().String() | ||
c.LogInfo("%s - new connection\n", peer) | ||
|
||
// start protobuf subprocessor | ||
pdns_subprocessor := subprocessors.NewPdnsProcessor(c.config, c.logger) | ||
go pdns_subprocessor.Run(c.Loggers()) | ||
|
||
r := bufio.NewReader(conn) | ||
pbs := powerdns_protobuf.NewProtobufStream(r, conn, 5*time.Second) | ||
|
||
// process incoming protobuf payload | ||
if err := pbs.ProcessStream(pdns_subprocessor.GetChannel()); err != nil { | ||
c.LogError("transport error: %s", err) | ||
} | ||
|
||
c.LogInfo("%s - connection closed\n", peer) | ||
} | ||
|
||
func (c *ProtobufPowerDNS) Channel() chan dnsutils.DnsMessage { | ||
return nil | ||
} | ||
|
||
func (c *ProtobufPowerDNS) Stop() { | ||
c.LogInfo("stopping...") | ||
|
||
// closing properly current connections if exists | ||
for _, conn := range c.conns { | ||
peer := conn.RemoteAddr().String() | ||
c.LogInfo("%s - closing connection...", peer) | ||
conn.Close() | ||
} | ||
// Finally close the listener to unblock accept | ||
c.LogInfo("stop listening...") | ||
c.listen.Close() | ||
|
||
// read done channel and block until run is terminated | ||
<-c.done | ||
close(c.done) | ||
} | ||
|
||
func (c *ProtobufPowerDNS) Listen() error { | ||
c.LogInfo("running in background...") | ||
|
||
var err error | ||
var listener net.Listener | ||
addrlisten := c.config.Collectors.PowerDNS.ListenIP + ":" + strconv.Itoa(c.config.Collectors.PowerDNS.ListenPort) | ||
|
||
listener, err = net.Listen("tcp", addrlisten) | ||
|
||
// something is wrong ? | ||
if err != nil { | ||
return err | ||
} | ||
c.LogInfo("is listening on %s", listener.Addr()) | ||
c.listen = listener | ||
return nil | ||
} | ||
|
||
func (c *ProtobufPowerDNS) Run() { | ||
c.LogInfo("starting collector...") | ||
if c.listen == nil { | ||
if err := c.Listen(); err != nil { | ||
c.logger.Fatal("collector dnstap listening failed: ", err) | ||
} | ||
} | ||
for { | ||
// Accept() blocks waiting for new connection. | ||
conn, err := c.listen.Accept() | ||
if err != nil { | ||
break | ||
} | ||
|
||
c.conns = append(c.conns, conn) | ||
go c.HandleConn(conn) | ||
|
||
} | ||
|
||
c.LogInfo("run terminated") | ||
c.done <- true | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,119 @@ | ||
package subprocessors | ||
|
||
import ( | ||
"net" | ||
"strconv" | ||
"strings" | ||
"time" | ||
|
||
"github.com/dmachard/go-dnscollector/dnsutils" | ||
"github.com/dmachard/go-logger" | ||
powerdns_protobuf "github.com/dmachard/go-powerdns-protobuf" | ||
"google.golang.org/protobuf/proto" | ||
) | ||
|
||
type PdnsProcessor struct { | ||
done chan bool | ||
recvFrom chan []byte | ||
logger *logger.Logger | ||
config *dnsutils.Config | ||
} | ||
|
||
func NewPdnsProcessor(config *dnsutils.Config, logger *logger.Logger) PdnsProcessor { | ||
logger.Info("powerdns processor - initialization...") | ||
d := PdnsProcessor{ | ||
done: make(chan bool), | ||
recvFrom: make(chan []byte, 512), | ||
logger: logger, | ||
config: config, | ||
} | ||
|
||
d.ReadConfig() | ||
|
||
return d | ||
} | ||
|
||
func (c *PdnsProcessor) ReadConfig() { | ||
c.logger.Info("processor powerdns parser - config") | ||
} | ||
|
||
func (c *PdnsProcessor) LogInfo(msg string, v ...interface{}) { | ||
c.logger.Info("processor powerdns parser - "+msg, v...) | ||
} | ||
|
||
func (c *PdnsProcessor) LogError(msg string, v ...interface{}) { | ||
c.logger.Error("procesor powerdns parser - "+msg, v...) | ||
} | ||
|
||
func (d *PdnsProcessor) GetChannel() chan []byte { | ||
return d.recvFrom | ||
} | ||
|
||
func (d *PdnsProcessor) Stop() { | ||
close(d.recvFrom) | ||
|
||
// read done channel and block until run is terminated | ||
<-d.done | ||
close(d.done) | ||
} | ||
|
||
func (d *PdnsProcessor) Run(sendTo []chan dnsutils.DnsMessage) { | ||
|
||
pbdm := &powerdns_protobuf.PBDNSMessage{} | ||
|
||
// read incoming dns message | ||
d.LogInfo("running... waiting incoming dns message") | ||
for data := range d.recvFrom { | ||
err := proto.Unmarshal(data, pbdm) | ||
if err != nil { | ||
continue | ||
} | ||
|
||
dm := dnsutils.DnsMessage{} | ||
dm.Init() | ||
|
||
dm.DnsTap.Operation = pbdm.GetType().String() | ||
|
||
dm.NetworkInfo.Family = pbdm.GetSocketFamily().String() | ||
dm.NetworkInfo.Protocol = pbdm.GetSocketProtocol().String() | ||
|
||
dm.NetworkInfo.QueryIp = net.IP(pbdm.From).String() | ||
dm.NetworkInfo.QueryPort = strconv.FormatUint(uint64(pbdm.GetFromPort()), 10) | ||
dm.NetworkInfo.ResponseIp = net.IP(pbdm.To).String() | ||
dm.NetworkInfo.ResponsePort = strconv.FormatUint(uint64(pbdm.GetToPort()), 10) | ||
|
||
dm.DNS.Id = int(pbdm.GetId()) | ||
dm.DNS.Length = int(pbdm.GetInBytes()) | ||
dm.DnsTap.TimeSec = int(pbdm.GetTimeSec()) | ||
dm.DnsTap.TimeNsec = int(pbdm.GetTimeUsec()) | ||
|
||
if int(pbdm.Type.Number())%2 == 1 { | ||
dm.DNS.Type = dnsutils.DnsQuery | ||
} else { | ||
dm.DNS.Type = dnsutils.DnsReply | ||
} | ||
|
||
dm.DNS.Rcode = dnsutils.RcodeToString(int(pbdm.Response.GetRcode())) | ||
|
||
// compute timestamp | ||
dm.DnsTap.Timestamp = float64(dm.DnsTap.TimeSec) + float64(dm.DnsTap.TimeNsec)/1e9 | ||
ts := time.Unix(int64(dm.DnsTap.TimeSec), int64(dm.DnsTap.TimeNsec)) | ||
dm.DnsTap.TimestampRFC3339 = ts.UTC().Format(time.RFC3339Nano) | ||
|
||
if d.config.Subprocessors.QnameLowerCase { | ||
dm.DNS.Qname = strings.ToLower(pbdm.Question.GetQName()) | ||
} else { | ||
dm.DNS.Qname = pbdm.Question.GetQName() | ||
} | ||
|
||
dm.DNS.Qtype = dnsutils.RdatatypeToString(int(pbdm.Question.GetQType())) | ||
|
||
// dispatch dns message to all generators | ||
for i := range sendTo { | ||
sendTo[i] <- dm | ||
} | ||
} | ||
|
||
// dnstap channel closed | ||
d.done <- true | ||
} |