Skip to content

Commit

Permalink
elasticsearch: release buffer properly to avoid memory leaks (#630)
Browse files Browse the repository at this point in the history
* release buffer properly
* integration - add docker compose for elastic and some docs
* enable log in test
* change default value
* make linter happy
* optimize code
* update docs
  • Loading branch information
dmachard authored Mar 4, 2024
1 parent 30790c2 commit 6371fa0
Show file tree
Hide file tree
Showing 16 changed files with 444 additions and 106 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
# Python
__pycache__/

# ignore some specifics files
go-dnscollector
bin/
include/
include/
docs/_integration/elasticsearch/data/
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,15 @@ INFO: 2023/12/24 14:43:29.043730 main - config OK!

The [`_examples`](./docs/_examples) folder from documentation contains a number of [various configurations](./docs/examples.md) to get you started with the DNS-collector in different ways.

The [`_integration`](./docs/_integration) folder contains DNS-collector `configuration` files and `docker compose` examples for popular tools:

- [Elasticsearch](./docs/_integration/elasticsearch/README.md)

## Performance

Tuning may be neccesary to deal with a large traffic loads.
Please refer to the [performance tuning](./docs/performance.md) guide if needed.

## Contributing

See the [development guide](./docs/development.md) for more information on how to build it yourself.
2 changes: 1 addition & 1 deletion collectors/dnstap.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (c *Dnstap) LogInfo(msg string, v ...interface{}) {
}

func (c *Dnstap) LogError(msg string, v ...interface{}) {
c.logger.Error(pkgutils.PrefixLogCollector+"["+c.name+" dnstap - "+msg, v...)
c.logger.Error(pkgutils.PrefixLogCollector+"["+c.name+"] dnstap - "+msg, v...)
}

func (c *Dnstap) HandleConn(conn net.Conn) {
Expand Down
13 changes: 7 additions & 6 deletions config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -551,18 +551,19 @@ multiplexer:
# # interval in second before to flush the buffer
# flush-interval: 30

# # elasticsearch backend, basic support
# # ElasticSearch logger
# # DNSMessage are exported with flat-json format
# elasticsearch:
# # remote server url
# server: "http://127.0.0.1:9200/"
# # Elasticsearch index for ingestion
# index: "indexname"
# index: "dnscollector"
# # Channel buffer size for incoming packets, number of packet before to drop it.
# chan-buffer-size: 65535
# # Size of batches sent to ES via _bulk
# bulk-size: 100
# chan-buffer-size: 4096
# # Size of batches sent to ES via _bulk in bytes, default to 1MB
# bulk-size: 1048576
# # interval in seconds before to flush the buffer
# flush-interval: 30
# flush-interval: 10

# # resend captured dns traffic to a remote fluentd server or to unix socket
# fluentd:
Expand Down
8 changes: 8 additions & 0 deletions dnscollector.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import (
"strings"
"syscall"

"net/http"
_ "net/http/pprof"

"github.com/dmachard/go-dnscollector/dnsutils"
"github.com/dmachard/go-dnscollector/pkgconfig"
"github.com/dmachard/go-dnscollector/pkglinker"
Expand Down Expand Up @@ -51,6 +54,11 @@ func main() {
configPath := "./config.yml"
testFlag := false

// Server for pprof
go func() {
fmt.Println(http.ListenAndServe("localhost:9999", nil))
}()

// no more use embedded golang flags...
// external lib like tcpassembly can set some uneeded flags too...
for i := 0; i < len(args); i++ {
Expand Down
17 changes: 17 additions & 0 deletions dnsutils/dns_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,23 @@ const (
TestQName = "dnstapcollector.test."
)

// Benchmark

func BenchmarkDnsParseLabels(b *testing.B) {
payload := []byte{0x12, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x76, 0x69,
0x74, 0x79, 0x2d, 0x63, 0x68, 0x65, 0x63, 0x6b, 0x06,
0x75, 0x62, 0x75, 0x6e, 0x74, 0x75, 0x03, 0x63, 0x6f, 0x6d, 0x00,
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, _, err := ParseLabels(0, payload)
if err != nil {
b.Fatalf("could not parse labels: %v\n", err)
}
}
}

// Regular tests
func TestRcodeValid(t *testing.T) {
rcode := RcodeToString(0)
if rcode != "NOERROR" {
Expand Down
20 changes: 20 additions & 0 deletions docs/_integration/elasticsearch/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@

# DNS-collector with Elastic and Kibana

- Copy folder [./docs/_integration/elasticsearch] and start the docker stack:

```bash
sudo docker compose up -d
```

- Go to kibana web interface through `http://127.0.0.1:5601`

- Click on `Explore on my own` and `Discover`

- Finally create index pattern `dnscollector` and choose `dnstap.timestamp-rfc33939ns`

- Run DNScollector from source:

```bash
go run . -config docs/_integration/elasticsearch/config.yml
```
24 changes: 24 additions & 0 deletions docs/_integration/elasticsearch/config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@

global:
trace:
verbose: true

multiplexer:
collectors:
- name: tap
dnstap:
listen-ip: 0.0.0.0
listen-port: 6000
chan-buffer-size: 4096
loggers:
- name: elastic
elasticsearch:
server: "http://127.0.0.1:9200/"
index: "dnscollector"
chan-buffer-size: 4096
bulk-size: 12582912
flush-interval: 5

routes:
- from: [ tap ]
to: [ elastic ]
24 changes: 24 additions & 0 deletions docs/_integration/elasticsearch/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
version: "3.8"
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.12.2
container_name: elasticsearch
restart: always
environment:
- discovery.type=single-node
- xpack.security.enabled=false
- xpack.security.enrollment.enabled=false
volumes:
- ./data:/usr/share/elasticsearch/data
ports:
- 9200:9200
kibana:
container_name: kibana
image: docker.elastic.co/kibana/kibana:8.12.2
restart: always
environment:
- ELASTICSEARCH_HOSTS=http://elasticsearch:9200
ports:
- 5601:5601
depends_on:
- elasticsearch
24 changes: 10 additions & 14 deletions docs/loggers/logger_elasticsearch.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,13 @@ ElasticSearch client to remote ElasticSearch server

Options:

- `server`: (string) Elasticsearch server url
- `index`: (string) Elasticsearch index
- `bulk-size`: (integer) Bulk size to be used for bulk batches
- `chan-buffer-size`: (integer) channel buffer size used on incoming dns message, number of messages before to drop it
- `flush-interval`: (integer) interval in seconds before to flush the buffer

```yaml
elasticsearch:
server: "http://127.0.0.1:9200"
index: "indexname"
bulk-size: 100
chan-buffer-size: 65535
flush-interval: 10
```
- `server` (string) Elasticsearch server url. Default to `http://127.0.0.1:9200`.
> Specify the URL of your Elasticsearch server.
- `index` (string) Elasticsearch index. Default to `dnscollector`.
> Define the name of the Elasticsearch index to use.
- `bulk-size` (integer) Bulk size to be used for bulk batches in bytes. Default to `1048576` (1MB).
> Set the maximum size of each bulk batch before sending it to Elasticsearch.
- `chan-buffer-size` (integer) channel buffer size used on incoming dns message, number of messages before to drop it. Default to `4096`.
> Adjust the size of the channel buffer. If you encounter the error message buffer is full, xxx packet(s) dropped, consider increasing this parameter to prevent message drops.
- `flush-interval` (integer) interval in seconds before to flush the buffer. Default to `10`.
> Set the maximum time interval before the buffer is flushed. If the bulk batches reach this interval before reaching the maximum size, they will be sent to Elasticsearch.
34 changes: 34 additions & 0 deletions docs/performance.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Performance tuning

All loggers and collectors are based on buffered channels.
The size of these buffers can be configured with `chan-buffer-size`.
If you encounter the following error message in your logs, it indicates that you need to increase the chan-buffer-size:

```bash
logger[elastic] buffer is full, 7855 packet(s) dropped
```

## CPU usage

The conversion of DNS logs to JSON, text, or PCAP can incur CPU costs. Here's a list ordered by ns/op.

```bash
./dnsutils$ go test -bench=.
goos: linux
goarch: amd64
pkg: github.com/dmachard/go-dnscollector/dnsutils
cpu: Intel(R) Core(TM) i5-7200U CPU @ 2.50GHz
BenchmarkDnsMessage_ToTextFormat-4 2600718 460.7 ns/op
BenchmarkDnsMessage_ToPacketLayer-4 1171467 969.5 ns/op
BenchmarkDnsMessage_ToDNSTap-4 993242 1130 ns/op
BenchmarkDnsMessage_ToExtendedDNSTap-4 618400 1951 ns/op
BenchmarkDnsMessage_ToJSON-4 190939 6584 ns/op
BenchmarkDnsMessage_ToFlatJSON-4 19868 55533 ns/op
```

## Memory usage

The main sources of memory usage in DNS-collector are:

- Buffered channels
- Prometheus logger with LRU cache
131 changes: 131 additions & 0 deletions loggers/devnull.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package loggers

import (
"github.com/dmachard/go-dnscollector/dnsutils"
"github.com/dmachard/go-dnscollector/pkgconfig"
"github.com/dmachard/go-dnscollector/pkgutils"
"github.com/dmachard/go-logger"
)

type DevNull struct {
stopProcess chan bool
doneProcess chan bool
stopRun chan bool
doneRun chan bool
inputChan chan dnsutils.DNSMessage
outputChan chan dnsutils.DNSMessage
config *pkgconfig.Config
configChan chan *pkgconfig.Config
logger *logger.Logger
name string
RoutingHandler pkgutils.RoutingHandler
}

func NewDevNull(config *pkgconfig.Config, console *logger.Logger, name string) *DevNull {
console.Info(pkgutils.PrefixLogLogger+"[%s] devnull - enabled", name)
so := &DevNull{
stopProcess: make(chan bool),
doneProcess: make(chan bool),
stopRun: make(chan bool),
doneRun: make(chan bool),
inputChan: make(chan dnsutils.DNSMessage, config.Loggers.Stdout.ChannelBufferSize),
outputChan: make(chan dnsutils.DNSMessage, config.Loggers.Stdout.ChannelBufferSize),
logger: console,
config: config,
configChan: make(chan *pkgconfig.Config),
name: name,
RoutingHandler: pkgutils.NewRoutingHandler(config, console, name),
}
return so
}

func (so *DevNull) GetName() string { return so.name }

func (so *DevNull) AddDroppedRoute(wrk pkgutils.Worker) {
so.RoutingHandler.AddDroppedRoute(wrk)
}

func (so *DevNull) AddDefaultRoute(wrk pkgutils.Worker) {
so.RoutingHandler.AddDefaultRoute(wrk)
}

func (so *DevNull) SetLoggers(loggers []pkgutils.Worker) {}

func (so *DevNull) ReadConfig() {}

func (so *DevNull) ReloadConfig(config *pkgconfig.Config) {
so.LogInfo("reload configuration!")
so.configChan <- config
}

func (so *DevNull) LogInfo(msg string, v ...interface{}) {
so.logger.Info(pkgutils.PrefixLogLogger+"["+so.name+"] devnull - "+msg, v...)
}

func (so *DevNull) LogError(msg string, v ...interface{}) {
so.logger.Error(pkgutils.PrefixLogLogger+"["+so.name+"] devnull - "+msg, v...)
}

func (so *DevNull) GetInputChannel() chan dnsutils.DNSMessage {
return so.inputChan
}

func (so *DevNull) Stop() {
so.LogInfo("stopping logger...")
so.RoutingHandler.Stop()

so.LogInfo("stopping to run...")
so.stopRun <- true
<-so.doneRun

so.LogInfo("stopping to process...")
so.stopProcess <- true
<-so.doneProcess
}

func (so *DevNull) Run() {
so.LogInfo("running in background...")

// goroutine to process transformed dns messages
go so.Process()

// loop to process incoming messages
RUN_LOOP:
for {
select {
case <-so.stopRun:
so.doneRun <- true
break RUN_LOOP

case _, opened := <-so.inputChan:
if !opened {
so.LogInfo("run: input channel closed!")
return
}

// send to output channel
// so.outputChan <- dm
}
}
so.LogInfo("run terminated")
}

func (so *DevNull) Process() {
so.LogInfo("ready to process")
PROCESS_LOOP:
for {
select {
case <-so.stopProcess:
so.doneProcess <- true
break PROCESS_LOOP

case _, opened := <-so.outputChan:
if !opened {
so.LogInfo("process: output channel closed!")
return
}

}
}
so.LogInfo("processing terminated")
}
Loading

0 comments on commit 6371fa0

Please sign in to comment.