diff --git a/README.md b/README.md index ffa6b6fc..65a0ea1f 100644 --- a/README.md +++ b/README.md @@ -1,9 +1,9 @@

Go Report Go version -Go tests +Go tests Go bench -Go lines +Go lines

@@ -22,7 +22,7 @@ > - DNS parser with [Extension Mechanisms for DNS (EDNS)](https://github.com/dmachard/go-dns-collector/blob/main/docs/dnsparser.md) support > - IPv4/v6 defragmentation and TCP reassembly > - Nanoseconds in timestamps -> - [Extended DNStap](https://github.com/dmachard/go-dns-collector/blob/main/docs/extended_dnstap.md) +> - [Extended](https://github.com/dmachard/go-dns-collector/blob/main/docs/extended_dnstap.md) DNStap with TLS encryption, compression, and more metadata capabilities *NOTE: The code before version 1.x is considered beta quality and is subject to breaking changes.* @@ -67,7 +67,7 @@ - [`Scalyr`](docs/loggers/logger_scalyr.md) - [`Redis`](docs/loggers/logger_redis.md) publisher - [`Kafka`](docs/loggers/logger_kafka.md) producer - - [`Clickhouse`](doc/logger_clickhouse.md) *not yet production ready* + - [`Clickhouse`](docs/loggers/logger_clickhouse.md) *(not yet production ready)* - *Send to security tools* - [`Falco`](docs/loggers/logger_falco.md) diff --git a/collectors/dnstap.go b/collectors/dnstap.go index 965b5d6d..ff28c694 100644 --- a/collectors/dnstap.go +++ b/collectors/dnstap.go @@ -3,6 +3,7 @@ package collectors import ( "bufio" "crypto/tls" + "encoding/binary" "errors" "io" "net" @@ -18,6 +19,7 @@ import ( "github.com/dmachard/go-dnscollector/processors" "github.com/dmachard/go-framestream" "github.com/dmachard/go-logger" + "github.com/segmentio/kafka-go/compress" ) type Dnstap struct { @@ -154,7 +156,11 @@ func (c *Dnstap) HandleConn(conn net.Conn) { var err error var frame *framestream.Frame for { - frame, err = fs.RecvFrame(false) + if c.config.Collectors.Dnstap.Compression == pkgconfig.CompressNone { + frame, err = fs.RecvFrame(false) + } else { + frame, err = fs.RecvCompressedFrame(&compress.GzipCodec, false) + } if err != nil { connClosed := false @@ -193,11 +199,41 @@ func (c *Dnstap) HandleConn(conn net.Conn) { break } - // send payload to the channel - select { - case dnstapProcessor.GetChannel() <- frame.Data(): // Successful send to channel - default: - c.droppedProcessor <- 1 + if c.config.Collectors.Dnstap.Compression == pkgconfig.CompressNone { + // send payload to the channel + select { + case dnstapProcessor.GetChannel() <- frame.Data(): // Successful send to channel + default: + c.droppedProcessor <- 1 + } + } else { + // ignore first 4 bytes + data := frame.Data()[4:] + validFrame := true + for len(data) >= 4 { + // get frame size + payloadSize := binary.BigEndian.Uint32(data[:4]) + data = data[4:] + + // enough next data ? + if uint32(len(data)) < payloadSize { + validFrame = false + break + } + // send payload to the channel + select { + case dnstapProcessor.GetChannel() <- data[:payloadSize]: // Successful send to channel + default: + c.droppedProcessor <- 1 + } + + // continue for next + data = data[payloadSize:] + } + if !validFrame { + c.LogError("conn #%d - invalid compressed frame received", connID) + break // ignore the invalid frame + } } } @@ -207,6 +243,12 @@ func (c *Dnstap) HandleConn(conn net.Conn) { return } + // to avoid lock if the Stop function is already called + if c.stopCalled { + c.LogInfo("conn #%d - connection handler exited", connID) + return + } + // here the connection is closed, // then removes the current tap processor from the list c.Lock() diff --git a/collectors/dnstap_test.go b/collectors/dnstap_test.go index 7bb3986a..7b282e72 100644 --- a/collectors/dnstap_test.go +++ b/collectors/dnstap_test.go @@ -15,37 +15,50 @@ import ( "github.com/dmachard/go-dnscollector/processors" "github.com/dmachard/go-framestream" "github.com/dmachard/go-logger" + "github.com/segmentio/kafka-go/compress" "google.golang.org/protobuf/proto" ) func Test_DnstapCollector(t *testing.T) { testcases := []struct { - name string - mode string - address string - listenPort int - operation string + name string + mode string + address string + listenPort int + operation string + compression string }{ { - name: "tcp_default", - mode: netlib.SocketTCP, - address: ":6000", - listenPort: 0, - operation: "CLIENT_QUERY", + name: "tcp_default", + mode: netlib.SocketTCP, + address: ":6000", + listenPort: 0, + operation: "CLIENT_QUERY", + compression: "none", }, { - name: "tcp_custom_port", - mode: netlib.SocketTCP, - address: ":7000", - listenPort: 7000, - operation: "CLIENT_QUERY", + name: "tcp_custom_port", + mode: netlib.SocketTCP, + address: ":7000", + listenPort: 7000, + operation: "CLIENT_QUERY", + compression: "none", }, { - name: "unix_default", - mode: netlib.SocketUnix, - address: "/tmp/dnscollector.sock", - listenPort: 0, - operation: "CLIENT_QUERY", + name: "unix_default", + mode: netlib.SocketUnix, + address: "/tmp/dnscollector.sock", + listenPort: 0, + operation: "CLIENT_QUERY", + compression: "none", + }, + { + name: "tcp_compress_gzip", + mode: netlib.SocketTCP, + address: ":7000", + listenPort: 7000, + operation: "CLIENT_QUERY", + compression: "gzip", }, } @@ -60,6 +73,7 @@ func Test_DnstapCollector(t *testing.T) { if tc.mode == netlib.SocketUnix { config.Collectors.Dnstap.SockPath = tc.address } + config.Collectors.Dnstap.Compression = tc.compression c := NewDnstap([]pkgutils.Worker{g}, config, logger.New(false), "test") if err := c.Listen(); err != nil { @@ -80,7 +94,8 @@ func Test_DnstapCollector(t *testing.T) { if err := fs.InitSender(); err != nil { t.Fatalf("framestream init error: %s", err) } else { - frame := &framestream.Frame{} + bulkFrame := &framestream.Frame{} + subFrame := &framestream.Frame{} // get fake dns question dnsquery, err := processors.GetFakeDNS() @@ -98,9 +113,23 @@ func Test_DnstapCollector(t *testing.T) { } // send query - frame.Write(data) - if err := fs.SendFrame(frame); err != nil { - t.Fatalf("send frame error %s", err) + + if config.Collectors.Dnstap.Compression == pkgconfig.CompressNone { + // send the frame + bulkFrame.Write(data) + if err := fs.SendFrame(bulkFrame); err != nil { + t.Fatalf("send frame error %s", err) + } + } else { + subFrame.Write(data) + bulkFrame.AppendData(subFrame.Data()) + } + + if config.Collectors.Dnstap.Compression != pkgconfig.CompressNone { + bulkFrame.Encode() + if err := fs.SendCompressedFrame(&compress.GzipCodec, bulkFrame); err != nil { + t.Fatalf("send compressed frame error %s", err) + } } } diff --git a/config.yml b/config.yml index 25a6bf41..b761e2c7 100644 --- a/config.yml +++ b/config.yml @@ -85,6 +85,13 @@ multiplexer: - name: console stdout: mode: text + - name: dnstapclient + dnstapclient: + transport: tcp + remote-address: 192.168.1.210 + remote-port: 6002 + flush-interval: 30 + compression: gzip routes: - from: [ tap ] diff --git a/docs/collectors/collector_dnstap.md b/docs/collectors/collector_dnstap.md index 7351b57d..e0da5bcc 100644 --- a/docs/collectors/collector_dnstap.md +++ b/docs/collectors/collector_dnstap.md @@ -33,6 +33,8 @@ Options: - `extended-support` (bool) decode the extended extra field sent by DNScollector. > If this setting is enabled, DNScollector will expect receiving the specific [protobuf structure](./../../dnsutils/extended_dnstap.proto) in the extra field, which must be sent by another DNS collector. > This field will contain additional metadata generated by various transformations such as filtering, ATags, and others. +- `compression` (string) Compression for DNStap messages: `none`, `gzip`, `lz4`, `snappy`, `zstd`. + > Specifies the compression algorithm to use. Defaults: @@ -51,6 +53,7 @@ Defaults: chan-buffer-size: 65535 disable-dnsparser: true extended-support: false + compression: none ``` ## DNS tap Proxifier diff --git a/docs/extended_dnstap.md b/docs/extended_dnstap.md index 43090fbc..c7528f34 100644 --- a/docs/extended_dnstap.md +++ b/docs/extended_dnstap.md @@ -1,9 +1,33 @@ -# DNS-collector - Extended DNStap +# DNS-collector - Enhanced DNStap + +The DNScollector adds enhancements to the DNStap protocol with compression, TLS and extended metadata support. +These features can be only used between two `DNS-collector` instance. + + +## Compression + +DNSTAP messages are highly compressible. They can be sent in reasonably large blocks, which enables significant compression for transmission over long-haul network links. While DNSTAP does not natively support compression, it seems not unreasonable that `DNS-collector` could have a configurable compression flag that would mark a stream as being compressed with one of the different models of compression that are supported in other areas of the code currently. This would allow a much more efficient transmission of DNSTAP-based messages through various components. + +The following codec are supported: + +- gzip +- lz4 +- snappy +- std + +> ref: https://github.com/dmachard/go-dnscollector/issues/490 + +## Extended feature + +DNSTAP message can be extended by incorporating additional metadata added through transformations, such as filtering, geo, ATags. -If this feature is enabled, DNScollector will extend the DNStap protocol by incorporating additional metadata added through transformations, such as filtering, geo, ATags. These metadata are encoded in the extra field with the following [protobuf structure](./../../dnsutils/extended_dnstap.proto). -This feature can be only used between two `DNS-collector` instance. +## TLS encryption + +DNSTAP messages contains sensitive data. `DNS-collector` have a configurable flag to enable TLS encryption. + +## Settings How to enable it on the collector side ? @@ -11,6 +35,8 @@ How to enable it on the collector side ? - name: dnstap_collector dnstap: extended-support: true + compression: gzip + tls-support: true ``` How to enable it on the sender side ? @@ -19,4 +45,6 @@ How to enable it on the sender side ? - name: dnstap_sender dnstapclient: extended-support: true + compression: gzip + transport: tcp+tls ``` diff --git a/docs/loggers.md b/docs/loggers.md index b046e1c2..4eda86b4 100644 --- a/docs/loggers.md +++ b/docs/loggers.md @@ -18,3 +18,4 @@ | [Redis](loggers/logger_redis.md) | Redis pub logger | | [Kafka](loggers/logger_kafka.md) | Kafka DNS producer | | [Falco](loggers/logger_falco.md) | Falco plugin logger | +| [Clickhouse](loggers/logger_clickhouse.md) | ClickHouse logger | diff --git a/docs/loggers/logger_dnstap.md b/docs/loggers/logger_dnstap.md index f739c28c..30c54c76 100644 --- a/docs/loggers/logger_dnstap.md +++ b/docs/loggers/logger_dnstap.md @@ -24,6 +24,8 @@ Options: * `buffer-size`: (integer) how many DNS messages will be buffered before being sent * `chan-buffer-size`: (integer) channel buffer size used on incoming dns message, number of messages before to drop it. * `extended-support`: (boolen) Extend the DNStap message by incorporating additional transformations, such as filtering and ATags, into the extra field. +* `compression` (string) Compression for DNStap messages: `none`, `gzip`, `lz4`, `snappy`, `zstd`. Default to `none`. + > Specifies the compression algorithm to use. Defaults: @@ -46,4 +48,5 @@ Defaults: buffer-size: 100 chan-buffer-size: 65535 extended-support: false + compression: none ``` diff --git a/loggers/dnstapclient.go b/loggers/dnstapclient.go index be4d4a10..0163a0d4 100644 --- a/loggers/dnstapclient.go +++ b/loggers/dnstapclient.go @@ -14,6 +14,7 @@ import ( "github.com/dmachard/go-dnscollector/transformers" "github.com/dmachard/go-framestream" "github.com/dmachard/go-logger" + "github.com/segmentio/kafka-go/compress" ) type DnstapSender struct { @@ -209,7 +210,8 @@ func (ds *DnstapSender) FlushBuffer(buf *[]dnsutils.DNSMessage) { var data []byte var err error - frame := &framestream.Frame{} + bulkFrame := &framestream.Frame{} + subFrame := &framestream.Frame{} for _, dm := range *buf { // update identity ? @@ -224,13 +226,27 @@ func (ds *DnstapSender) FlushBuffer(buf *[]dnsutils.DNSMessage) { continue } - // send the frame - frame.Write(data) - if err := ds.fs.SendFrame(frame); err != nil { - ds.LogError("send frame error %s", err) + if ds.config.Loggers.DNSTap.Compression == pkgconfig.CompressNone { + // send the frame + bulkFrame.Write(data) + if err := ds.fs.SendFrame(bulkFrame); err != nil { + ds.LogError("send frame error %s", err) + ds.fsReady = false + <-ds.transportReconnect + break + } + } else { + subFrame.Write(data) + bulkFrame.AppendData(subFrame.Data()) + } + } + + if ds.config.Loggers.DNSTap.Compression != pkgconfig.CompressNone { + bulkFrame.Encode() + if err := ds.fs.SendCompressedFrame(&compress.GzipCodec, bulkFrame); err != nil { + ds.LogError("send bulk frame error %s", err) ds.fsReady = false <-ds.transportReconnect - break } } diff --git a/loggers/dnstapclient_test.go b/loggers/dnstapclient_test.go index f67b1623..400988e5 100644 --- a/loggers/dnstapclient_test.go +++ b/loggers/dnstapclient_test.go @@ -2,6 +2,7 @@ package loggers import ( "bufio" + "encoding/binary" "net" "testing" "time" @@ -11,31 +12,45 @@ import ( "github.com/dmachard/go-dnstap-protobuf" "github.com/dmachard/go-framestream" "github.com/dmachard/go-logger" + "github.com/segmentio/kafka-go/compress" "google.golang.org/protobuf/proto" ) func Test_DnstapClient(t *testing.T) { testcases := []struct { - transport string - address string + name string + transport string + address string + compression string }{ { - transport: "tcp", - address: ":6000", + name: "dnstap_tcp", + transport: "tcp", + address: ":6000", + compression: "none", }, { - transport: "unix", - address: "/tmp/test.sock", + name: "dnstap_unix", + transport: "unix", + address: "/tmp/test.sock", + compression: "none", + }, + { + name: "dnstap_tcp_gzip_compress", + transport: "tcp", + address: ":6000", + compression: "gzip", }, } for _, tc := range testcases { - t.Run(tc.transport, func(t *testing.T) { + t.Run(tc.name, func(t *testing.T) { // init logger cfg := pkgconfig.GetFakeConfig() cfg.Loggers.DNSTap.FlushInterval = 1 cfg.Loggers.DNSTap.BufferSize = 0 + cfg.Loggers.DNSTap.Compression = tc.compression if tc.transport == "unix" { cfg.Loggers.DNSTap.SockPath = tc.address } @@ -73,15 +88,47 @@ func Test_DnstapClient(t *testing.T) { g.GetInputChannel() <- dm // receive frame on server side ?, timeout 5s - fs, err := fsSvr.RecvFrame(true) + var fs *framestream.Frame + if tc.compression == "gzip" { + fs, err = fsSvr.RecvCompressedFrame(&compress.GzipCodec, true) + } else { + fs, err = fsSvr.RecvFrame(true) + } if err != nil { t.Errorf("error to receive frame: %s", err) } // decode the dnstap message in server side dt := &dnstap.Dnstap{} - if err := proto.Unmarshal(fs.Data(), dt); err != nil { - t.Errorf("error to decode dnstap") + if cfg.Loggers.DNSTap.Compression == pkgconfig.CompressNone { + if err := proto.Unmarshal(fs.Data(), dt); err != nil { + t.Errorf("error to decode dnstap") + } + } else { + // ignore first 4 bytes + data := fs.Data()[4:] + validFrame := true + for len(data) >= 4 { + // get frame size + payloadSize := binary.BigEndian.Uint32(data[:4]) + data = data[4:] + + // enough next data ? + if uint32(len(data)) < payloadSize { + validFrame = false + break + } + + if err := proto.Unmarshal(data[:payloadSize], dt); err != nil { + t.Errorf("error to decode dnstap from compressed frame") + } + + // continue for next + data = data[payloadSize:] + } + if !validFrame { + t.Errorf("invalid compressed frame") + } } }) } diff --git a/pkgconfig/collectors.go b/pkgconfig/collectors.go index 243b2425..13141c50 100644 --- a/pkgconfig/collectors.go +++ b/pkgconfig/collectors.go @@ -32,6 +32,7 @@ type ConfigCollectors struct { ChannelBufferSize int `yaml:"chan-buffer-size"` DisableDNSParser bool `yaml:"disable-dnsparser"` ExtendedSupport bool `yaml:"extended-support"` + Compression string `yaml:"compression"` } `yaml:"dnstap"` DnstapProxifier struct { Enable bool `yaml:"enable"` @@ -107,6 +108,7 @@ func (c *ConfigCollectors) SetDefault() { c.Dnstap.ChannelBufferSize = 65535 c.Dnstap.DisableDNSParser = false c.Dnstap.ExtendedSupport = false + c.Dnstap.Compression = CompressNone c.DnstapProxifier.Enable = false c.DnstapProxifier.ListenIP = AnyIP diff --git a/pkgconfig/loggers.go b/pkgconfig/loggers.go index 1255f6d3..d94d2f86 100644 --- a/pkgconfig/loggers.go +++ b/pkgconfig/loggers.go @@ -104,6 +104,7 @@ type ConfigLoggers struct { BufferSize int `yaml:"buffer-size"` ChannelBufferSize int `yaml:"chan-buffer-size"` ExtendedSupport bool `yaml:"extended-support"` + Compression string `yaml:"compression"` } `yaml:"dnstapclient"` TCPClient struct { Enable bool `yaml:"enable"` @@ -334,6 +335,7 @@ func (c *ConfigLoggers) SetDefault() { c.DNSTap.OverwriteIdentity = false c.DNSTap.BufferSize = 100 c.DNSTap.ChannelBufferSize = 65535 + c.DNSTap.Compression = CompressNone c.DNSTap.ExtendedSupport = false c.LogFile.Enable = false