Skip to content

Commit

Permalink
add compression support on DNSTap client and server (#538)
Browse files Browse the repository at this point in the history
* Support compression
* Update README.md
  • Loading branch information
dmachard authored Mar 23, 2024
1 parent c931a14 commit 146435a
Show file tree
Hide file tree
Showing 12 changed files with 233 additions and 53 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
<p align="center">
<img src="https://goreportcard.com/badge/github.com/dmachard/go-dns-collector" alt="Go Report"/>
<img src="https://img.shields.io/badge/go%20version-min%201.20-green" alt="Go version"/>
<img src="https://img.shields.io/badge/go%20tests-429-green" alt="Go tests"/>
<img src="https://img.shields.io/badge/go%20tests-435-green" alt="Go tests"/>
<img src="https://img.shields.io/badge/go%20bench-19-green" alt="Go bench"/>
<img src="https://img.shields.io/badge/go%20lines-38661-green" alt="Go lines"/>
<img src="https://img.shields.io/badge/go%20lines-39765-green" alt="Go lines"/>
</p>

<p align="center">
Expand All @@ -22,7 +22,7 @@
> - DNS parser with [Extension Mechanisms for DNS (EDNS)](https://github.com/dmachard/go-dns-collector/blob/main/docs/dnsparser.md) support
> - IPv4/v6 defragmentation and TCP reassembly
> - Nanoseconds in timestamps
> - [Extended DNStap](https://github.com/dmachard/go-dns-collector/blob/main/docs/extended_dnstap.md)
> - [Extended](https://github.com/dmachard/go-dns-collector/blob/main/docs/extended_dnstap.md) DNStap with TLS encryption, compression, and more metadata capabilities
*NOTE: The code before version 1.x is considered beta quality and is subject to breaking changes.*

Expand Down Expand Up @@ -67,7 +67,7 @@
- [`Scalyr`](docs/loggers/logger_scalyr.md)
- [`Redis`](docs/loggers/logger_redis.md) publisher
- [`Kafka`](docs/loggers/logger_kafka.md) producer
- [`Clickhouse`](doc/logger_clickhouse.md) *not yet production ready*
- [`Clickhouse`](docs/loggers/logger_clickhouse.md) *(not yet production ready)*
- *Send to security tools*
- [`Falco`](docs/loggers/logger_falco.md)

Expand Down
54 changes: 48 additions & 6 deletions collectors/dnstap.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package collectors
import (
"bufio"
"crypto/tls"
"encoding/binary"
"errors"
"io"
"net"
Expand All @@ -18,6 +19,7 @@ import (
"github.com/dmachard/go-dnscollector/processors"
"github.com/dmachard/go-framestream"
"github.com/dmachard/go-logger"
"github.com/segmentio/kafka-go/compress"
)

type Dnstap struct {
Expand Down Expand Up @@ -154,7 +156,11 @@ func (c *Dnstap) HandleConn(conn net.Conn) {
var err error
var frame *framestream.Frame
for {
frame, err = fs.RecvFrame(false)
if c.config.Collectors.Dnstap.Compression == pkgconfig.CompressNone {
frame, err = fs.RecvFrame(false)
} else {
frame, err = fs.RecvCompressedFrame(&compress.GzipCodec, false)
}
if err != nil {
connClosed := false

Expand Down Expand Up @@ -193,11 +199,41 @@ func (c *Dnstap) HandleConn(conn net.Conn) {
break
}

// send payload to the channel
select {
case dnstapProcessor.GetChannel() <- frame.Data(): // Successful send to channel
default:
c.droppedProcessor <- 1
if c.config.Collectors.Dnstap.Compression == pkgconfig.CompressNone {
// send payload to the channel
select {
case dnstapProcessor.GetChannel() <- frame.Data(): // Successful send to channel
default:
c.droppedProcessor <- 1
}
} else {
// ignore first 4 bytes
data := frame.Data()[4:]
validFrame := true
for len(data) >= 4 {
// get frame size
payloadSize := binary.BigEndian.Uint32(data[:4])
data = data[4:]

// enough next data ?
if uint32(len(data)) < payloadSize {
validFrame = false
break
}
// send payload to the channel
select {
case dnstapProcessor.GetChannel() <- data[:payloadSize]: // Successful send to channel
default:
c.droppedProcessor <- 1
}

// continue for next
data = data[payloadSize:]
}
if !validFrame {
c.LogError("conn #%d - invalid compressed frame received", connID)
break // ignore the invalid frame
}
}
}

Expand All @@ -207,6 +243,12 @@ func (c *Dnstap) HandleConn(conn net.Conn) {
return
}

// to avoid lock if the Stop function is already called
if c.stopCalled {
c.LogInfo("conn #%d - connection handler exited", connID)
return
}

// here the connection is closed,
// then removes the current tap processor from the list
c.Lock()
Expand Down
77 changes: 53 additions & 24 deletions collectors/dnstap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,37 +15,50 @@ import (
"github.com/dmachard/go-dnscollector/processors"
"github.com/dmachard/go-framestream"
"github.com/dmachard/go-logger"
"github.com/segmentio/kafka-go/compress"
"google.golang.org/protobuf/proto"
)

func Test_DnstapCollector(t *testing.T) {
testcases := []struct {
name string
mode string
address string
listenPort int
operation string
name string
mode string
address string
listenPort int
operation string
compression string
}{
{
name: "tcp_default",
mode: netlib.SocketTCP,
address: ":6000",
listenPort: 0,
operation: "CLIENT_QUERY",
name: "tcp_default",
mode: netlib.SocketTCP,
address: ":6000",
listenPort: 0,
operation: "CLIENT_QUERY",
compression: "none",
},
{
name: "tcp_custom_port",
mode: netlib.SocketTCP,
address: ":7000",
listenPort: 7000,
operation: "CLIENT_QUERY",
name: "tcp_custom_port",
mode: netlib.SocketTCP,
address: ":7000",
listenPort: 7000,
operation: "CLIENT_QUERY",
compression: "none",
},
{
name: "unix_default",
mode: netlib.SocketUnix,
address: "/tmp/dnscollector.sock",
listenPort: 0,
operation: "CLIENT_QUERY",
name: "unix_default",
mode: netlib.SocketUnix,
address: "/tmp/dnscollector.sock",
listenPort: 0,
operation: "CLIENT_QUERY",
compression: "none",
},
{
name: "tcp_compress_gzip",
mode: netlib.SocketTCP,
address: ":7000",
listenPort: 7000,
operation: "CLIENT_QUERY",
compression: "gzip",
},
}

Expand All @@ -60,6 +73,7 @@ func Test_DnstapCollector(t *testing.T) {
if tc.mode == netlib.SocketUnix {
config.Collectors.Dnstap.SockPath = tc.address
}
config.Collectors.Dnstap.Compression = tc.compression

c := NewDnstap([]pkgutils.Worker{g}, config, logger.New(false), "test")
if err := c.Listen(); err != nil {
Expand All @@ -80,7 +94,8 @@ func Test_DnstapCollector(t *testing.T) {
if err := fs.InitSender(); err != nil {
t.Fatalf("framestream init error: %s", err)
} else {
frame := &framestream.Frame{}
bulkFrame := &framestream.Frame{}
subFrame := &framestream.Frame{}

// get fake dns question
dnsquery, err := processors.GetFakeDNS()
Expand All @@ -98,9 +113,23 @@ func Test_DnstapCollector(t *testing.T) {
}

// send query
frame.Write(data)
if err := fs.SendFrame(frame); err != nil {
t.Fatalf("send frame error %s", err)

if config.Collectors.Dnstap.Compression == pkgconfig.CompressNone {
// send the frame
bulkFrame.Write(data)
if err := fs.SendFrame(bulkFrame); err != nil {
t.Fatalf("send frame error %s", err)
}
} else {
subFrame.Write(data)
bulkFrame.AppendData(subFrame.Data())
}

if config.Collectors.Dnstap.Compression != pkgconfig.CompressNone {
bulkFrame.Encode()
if err := fs.SendCompressedFrame(&compress.GzipCodec, bulkFrame); err != nil {
t.Fatalf("send compressed frame error %s", err)
}
}
}

Expand Down
7 changes: 7 additions & 0 deletions config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,13 @@ multiplexer:
- name: console
stdout:
mode: text
- name: dnstapclient
dnstapclient:
transport: tcp
remote-address: 192.168.1.210
remote-port: 6002
flush-interval: 30
compression: gzip

routes:
- from: [ tap ]
Expand Down
3 changes: 3 additions & 0 deletions docs/collectors/collector_dnstap.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ Options:
- `extended-support` (bool) decode the extended extra field sent by DNScollector.
> If this setting is enabled, DNScollector will expect receiving the specific [protobuf structure](./../../dnsutils/extended_dnstap.proto) in the extra field, which must be sent by another DNS collector.
> This field will contain additional metadata generated by various transformations such as filtering, ATags, and others.
- `compression` (string) Compression for DNStap messages: `none`, `gzip`, `lz4`, `snappy`, `zstd`.
> Specifies the compression algorithm to use.
Defaults:

Expand All @@ -51,6 +53,7 @@ Defaults:
chan-buffer-size: 65535
disable-dnsparser: true
extended-support: false
compression: none
```
## DNS tap Proxifier
Expand Down
34 changes: 31 additions & 3 deletions docs/extended_dnstap.md
Original file line number Diff line number Diff line change
@@ -1,16 +1,42 @@
# DNS-collector - Extended DNStap
# DNS-collector - Enhanced DNStap

The DNScollector adds enhancements to the DNStap protocol with compression, TLS and extended metadata support.
These features can be only used between two `DNS-collector` instance.


## Compression

DNSTAP messages are highly compressible. They can be sent in reasonably large blocks, which enables significant compression for transmission over long-haul network links. While DNSTAP does not natively support compression, it seems not unreasonable that `DNS-collector` could have a configurable compression flag that would mark a stream as being compressed with one of the different models of compression that are supported in other areas of the code currently. This would allow a much more efficient transmission of DNSTAP-based messages through various components.

The following codec are supported:

- gzip
- lz4
- snappy
- std

> ref: https://github.com/dmachard/go-dnscollector/issues/490
## Extended feature

DNSTAP message can be extended by incorporating additional metadata added through transformations, such as filtering, geo, ATags.

If this feature is enabled, DNScollector will extend the DNStap protocol by incorporating additional metadata added through transformations, such as filtering, geo, ATags.
These metadata are encoded in the extra field with the following [protobuf structure](./../../dnsutils/extended_dnstap.proto).

This feature can be only used between two `DNS-collector` instance.
## TLS encryption

DNSTAP messages contains sensitive data. `DNS-collector` have a configurable flag to enable TLS encryption.

## Settings

How to enable it on the collector side ?

```yaml
- name: dnstap_collector
dnstap:
extended-support: true
compression: gzip
tls-support: true
```
How to enable it on the sender side ?
Expand All @@ -19,4 +45,6 @@ How to enable it on the sender side ?
- name: dnstap_sender
dnstapclient:
extended-support: true
compression: gzip
transport: tcp+tls
```
1 change: 1 addition & 0 deletions docs/loggers.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@
| [Redis](loggers/logger_redis.md) | Redis pub logger |
| [Kafka](loggers/logger_kafka.md) | Kafka DNS producer |
| [Falco](loggers/logger_falco.md) | Falco plugin logger |
| [Clickhouse](loggers/logger_clickhouse.md) | ClickHouse logger |
3 changes: 3 additions & 0 deletions docs/loggers/logger_dnstap.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ Options:
* `buffer-size`: (integer) how many DNS messages will be buffered before being sent
* `chan-buffer-size`: (integer) channel buffer size used on incoming dns message, number of messages before to drop it.
* `extended-support`: (boolen) Extend the DNStap message by incorporating additional transformations, such as filtering and ATags, into the extra field.
* `compression` (string) Compression for DNStap messages: `none`, `gzip`, `lz4`, `snappy`, `zstd`. Default to `none`.
> Specifies the compression algorithm to use.
Defaults:

Expand All @@ -46,4 +48,5 @@ Defaults:
buffer-size: 100
chan-buffer-size: 65535
extended-support: false
compression: none
```
28 changes: 22 additions & 6 deletions loggers/dnstapclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/dmachard/go-dnscollector/transformers"
"github.com/dmachard/go-framestream"
"github.com/dmachard/go-logger"
"github.com/segmentio/kafka-go/compress"
)

type DnstapSender struct {
Expand Down Expand Up @@ -209,7 +210,8 @@ func (ds *DnstapSender) FlushBuffer(buf *[]dnsutils.DNSMessage) {

var data []byte
var err error
frame := &framestream.Frame{}
bulkFrame := &framestream.Frame{}
subFrame := &framestream.Frame{}

for _, dm := range *buf {
// update identity ?
Expand All @@ -224,13 +226,27 @@ func (ds *DnstapSender) FlushBuffer(buf *[]dnsutils.DNSMessage) {
continue
}

// send the frame
frame.Write(data)
if err := ds.fs.SendFrame(frame); err != nil {
ds.LogError("send frame error %s", err)
if ds.config.Loggers.DNSTap.Compression == pkgconfig.CompressNone {
// send the frame
bulkFrame.Write(data)
if err := ds.fs.SendFrame(bulkFrame); err != nil {
ds.LogError("send frame error %s", err)
ds.fsReady = false
<-ds.transportReconnect
break
}
} else {
subFrame.Write(data)
bulkFrame.AppendData(subFrame.Data())
}
}

if ds.config.Loggers.DNSTap.Compression != pkgconfig.CompressNone {
bulkFrame.Encode()
if err := ds.fs.SendCompressedFrame(&compress.GzipCodec, bulkFrame); err != nil {
ds.LogError("send bulk frame error %s", err)
ds.fsReady = false
<-ds.transportReconnect
break
}
}

Expand Down
Loading

0 comments on commit 146435a

Please sign in to comment.