Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

elasticsearch: limit memory usage #644

Merged
merged 5 commits into from
Mar 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,10 @@ multiplexer:
# chan-buffer-size: 4096
# # Size of batches sent to ES via _bulk in bytes, default to 1MB
# bulk-size: 1048576
# # Maximum size of the bulk channel
# bulk-channel-size: 10
# # Compression for bulk messages: none, gzip
# compression: none
# # interval in seconds before to flush the buffer
# flush-interval: 10

Expand Down
7 changes: 3 additions & 4 deletions dnscollector.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"strings"
"syscall"

"net/http"
_ "net/http/pprof"

"github.com/dmachard/go-dnscollector/dnsutils"
Expand Down Expand Up @@ -55,9 +54,9 @@ func main() {
testFlag := false

// Server for pprof
go func() {
fmt.Println(http.ListenAndServe("localhost:9999", nil))
}()
// go func() {
// fmt.Println(http.ListenAndServe("localhost:9999", nil))
// }()

// no more use embedded golang flags...
// external lib like tcpassembly can set some uneeded flags too...
Expand Down
9 changes: 5 additions & 4 deletions docs/_integration/elasticsearch/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@ multiplexer:
loggers:
- name: elastic
elasticsearch:
server: "http://127.0.0.1:9200/"
server: "http://192.168.1.220:9200/"
index: "dnscollector"
chan-buffer-size: 4096
bulk-size: 12582912
flush-interval: 5

bulk-size: 5242880
flush-interval: 10
compression: gzip
bulk-channel-size: 10
routes:
- from: [ tap ]
to: [ elastic ]
18 changes: 18 additions & 0 deletions docs/loggers/logger_elasticsearch.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,25 @@ Options:
> Define the name of the Elasticsearch index to use.
- `bulk-size` (integer) Bulk size to be used for bulk batches in bytes. Default to `1048576` (1MB).
> Set the maximum size of each bulk batch before sending it to Elasticsearch.
- `bulk-channel-size` (integer) Maximum number of bulk messages in buffer. Default to `10`.
> Specifies the maximun number of bulk messages in buffer before to drop it.
- `compression` (string) Compression for bulk messages: `none`, `gzip`. Default to `none`.
> Specifies the compression algorithm to use.
- `chan-buffer-size` (integer) channel buffer size used on incoming dns message, number of messages before to drop it. Default to `4096`.
> Adjust the size of the channel buffer. If you encounter the error message buffer is full, xxx packet(s) dropped, consider increasing this parameter to prevent message drops.
- `flush-interval` (integer) interval in seconds before to flush the buffer. Default to `10`.
> Set the maximum time interval before the buffer is flushed. If the bulk batches reach this interval before reaching the maximum size, they will be sent to Elasticsearch.

Configuration example:

```yaml
- name: elastic
elasticsearch:
server: "http://127.0.0.1:9200/"
index: "dnscollector"
chan-buffer-size: 4096
bulk-size: 5242880
flush-interval: 10
compression: gzip
bulk-channel-size: 10
```
208 changes: 128 additions & 80 deletions loggers/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ package loggers

import (
"bytes"
"compress/gzip"
"encoding/json"
"fmt"
"log"
"path"
"time"

Expand All @@ -17,43 +20,37 @@ import (
)

type ElasticSearchClient struct {
stopProcess chan bool
doneProcess chan bool
stopBulkProcess chan bool
doneBulkProcess chan bool
stopRun chan bool
doneRun chan bool
inputChan chan dnsutils.DNSMessage
outputChan chan dnsutils.DNSMessage
sendChan chan []byte
config *pkgconfig.Config
configChan chan *pkgconfig.Config
logger *logger.Logger
name string
server string
index string
bulkURL string
RoutingHandler pkgutils.RoutingHandler
httpClient *http.Client
stopProcess chan bool
doneProcess chan bool
stopRun chan bool
doneRun chan bool
inputChan chan dnsutils.DNSMessage
outputChan chan dnsutils.DNSMessage
config *pkgconfig.Config
configChan chan *pkgconfig.Config
logger *logger.Logger
name string
server string
index string
bulkURL string
RoutingHandler pkgutils.RoutingHandler
httpClient *http.Client
}

func NewElasticSearchClient(config *pkgconfig.Config, console *logger.Logger, name string) *ElasticSearchClient {
console.Info(pkgutils.PrefixLogLogger+"[%s] elasticsearch - enabled", name)
ec := &ElasticSearchClient{
stopProcess: make(chan bool),
doneProcess: make(chan bool),
stopBulkProcess: make(chan bool),
doneBulkProcess: make(chan bool),
stopRun: make(chan bool),
doneRun: make(chan bool),
inputChan: make(chan dnsutils.DNSMessage, config.Loggers.ElasticSearchClient.ChannelBufferSize),
outputChan: make(chan dnsutils.DNSMessage, config.Loggers.ElasticSearchClient.ChannelBufferSize),
sendChan: make(chan []byte, config.Loggers.ElasticSearchClient.ChannelBufferSize),
logger: console,
config: config,
configChan: make(chan *pkgconfig.Config),
name: name,
RoutingHandler: pkgutils.NewRoutingHandler(config, console, name),
stopProcess: make(chan bool),
doneProcess: make(chan bool),
stopRun: make(chan bool),
doneRun: make(chan bool),
inputChan: make(chan dnsutils.DNSMessage, config.Loggers.ElasticSearchClient.ChannelBufferSize),
outputChan: make(chan dnsutils.DNSMessage, config.Loggers.ElasticSearchClient.ChannelBufferSize),
logger: console,
config: config,
configChan: make(chan *pkgconfig.Config),
name: name,
RoutingHandler: pkgutils.NewRoutingHandler(config, console, name),
}
ec.ReadConfig()

Expand All @@ -77,6 +74,17 @@ func (ec *ElasticSearchClient) AddDefaultRoute(wrk pkgutils.Worker) {
func (ec *ElasticSearchClient) SetLoggers(loggers []pkgutils.Worker) {}

func (ec *ElasticSearchClient) ReadConfig() {

if ec.config.Loggers.ElasticSearchClient.Compression != pkgconfig.CompressNone {
ec.LogInfo(ec.config.Loggers.ElasticSearchClient.Compression)
switch ec.config.Loggers.ElasticSearchClient.Compression {
case pkgconfig.CompressGzip:
ec.LogInfo("gzip compression is enabled")
default:
log.Fatal(pkgutils.PrefixLogLogger+"["+ec.name+"] elasticsearch - invalid compress mode: ", ec.config.Loggers.ElasticSearchClient.Compression)
}
}

ec.server = ec.config.Loggers.ElasticSearchClient.Server
ec.index = ec.config.Loggers.ElasticSearchClient.Index

Expand Down Expand Up @@ -116,14 +124,14 @@ func (ec *ElasticSearchClient) Stop() {
ec.LogInfo("stopping to process...")
ec.stopProcess <- true
<-ec.doneProcess

ec.LogInfo("stopping to bulk process...")
ec.stopBulkProcess <- true
<-ec.doneBulkProcess
}

func (ec *ElasticSearchClient) Run() {
ec.LogInfo("waiting dnsmessage to process...")
defer func() {
ec.LogInfo("run terminated")
ec.doneRun <- true
}()

// prepare next channels
defaultRoutes, defaultNames := ec.RoutingHandler.GetDefaultRoutes()
Expand All @@ -136,18 +144,14 @@ func (ec *ElasticSearchClient) Run() {

// goroutine to process transformed dns messages
go ec.ProcessDM()
go ec.ProcessBulk()

// loop to process incoming messages
RUN_LOOP:
for {
select {
case <-ec.stopRun:
// cleanup transformers
subprocessors.Reset()

ec.doneRun <- true
break RUN_LOOP
return

case cfg, opened := <-ec.configChan:
if !opened {
Expand Down Expand Up @@ -177,26 +181,42 @@ RUN_LOOP:
ec.outputChan <- dm
}
}
ec.LogInfo("run terminated")
}

func (ec *ElasticSearchClient) ProcessDM() {
ec.LogInfo("waiting transformed dnsmessage to process...")
defer func() {
ec.LogInfo("processing terminated")
ec.doneProcess <- true
}()

// buffer
buffer := bytes.NewBuffer(nil)
// create a new encoder that writes to the buffer
buffer := bytes.NewBuffer(make([]byte, 0, ec.config.Loggers.ElasticSearchClient.BulkSize))
encoder := json.NewEncoder(buffer)

flushInterval := time.Duration(ec.config.Loggers.ElasticSearchClient.FlushInterval) * time.Second
flushTimer := time.NewTimer(flushInterval)

PROCESS_LOOP:
dataBuffer := make(chan []byte, ec.config.Loggers.ElasticSearchClient.BulkChannelSize)
go func() {
for data := range dataBuffer {
var err error
if ec.config.Loggers.ElasticSearchClient.Compression == pkgconfig.CompressGzip {
err = ec.sendCompressedBulk(data)
} else {
err = ec.sendBulk(data)
}
if err != nil {
ec.LogError("error sending bulk data: %v", err)
}
}
}()

for {
select {
case <-ec.stopProcess:
ec.doneProcess <- true
break PROCESS_LOOP
close(dataBuffer)
return

// incoming dns message to process
case dm, opened := <-ec.outputChan:
Expand All @@ -210,21 +230,19 @@ PROCESS_LOOP:
if err != nil {
ec.LogError("flattening DNS message failed: %e", err)
}

buffer.WriteString("{ \"create\" : {}}\n")
// buffer.WriteString("\n")
encoder.Encode(flat)

// Send data and reset buffer
if buffer.Len() >= ec.config.Loggers.ElasticSearchClient.BulkSize {
bufCopy := make([]byte, buffer.Len())
copy(bufCopy, buffer.Bytes())
buffer.Read(bufCopy)
buffer.Reset()

select {
case ec.sendChan <- bufCopy:
case dataBuffer <- bufCopy:
default:
ec.LogError("send buffer is full, bulk dropped")
ec.LogError("Send buffer is full, bulk dropped")
}
}

Expand All @@ -234,51 +252,81 @@ PROCESS_LOOP:
// Send data and reset buffer
if buffer.Len() > 0 {
bufCopy := make([]byte, buffer.Len())
copy(bufCopy, buffer.Bytes())
buffer.Read(bufCopy)
buffer.Reset()

select {
case ec.sendChan <- bufCopy:
case dataBuffer <- bufCopy:
default:
ec.LogError("send buffer is full, bulk dropped")
ec.LogError("automatic flush, send buffer is full, bulk dropped")
}
}

// restart timer
flushTimer.Reset(flushInterval)
}
}
ec.LogInfo("processing terminated")
}

func (ec *ElasticSearchClient) ProcessBulk() {
ec.LogInfo("waiting bulk messages to process")
func (ec *ElasticSearchClient) sendBulk(bulk []byte) error {
// Create a new HTTP request
req, err := http.NewRequest("POST", ec.bulkURL, bytes.NewReader(bulk))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")

BULK_PROCESS_LOOP:
for {
select {
case <-ec.stopBulkProcess:
ec.doneBulkProcess <- true
break BULK_PROCESS_LOOP
// Send the request using the HTTP client
resp, err := ec.httpClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()

case bulk, opened := <-ec.sendChan:
if !opened {
ec.LogInfo("bulk channel closed!")
return
}
// Check the response status code
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
}

post, _ := http.NewRequest("POST", ec.bulkURL, bytes.NewReader(bulk))
post.Header.Set("Content-Type", "application/json")
return nil
}

resp, err := ec.httpClient.Do(post)
if err != nil {
ec.LogError(err.Error())
}
if resp != nil {
resp.Body.Close()
}
func (ec *ElasticSearchClient) sendCompressedBulk(bulk []byte) error {
var compressedBulk bytes.Buffer
gzipWriter := gzip.NewWriter(&compressedBulk)

}
// Write the uncompressed data to the gzip writer
_, err := gzipWriter.Write(bulk)
if err != nil {
fmt.Println("gzip", err)
return err
}

// Close the gzip writer to flush any remaining data
err = gzipWriter.Close()
if err != nil {
return err
}

// Create a new HTTP request
req, err := http.NewRequest("POST", ec.bulkURL, &compressedBulk)
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Content-Encoding", "gzip") // Set Content-Encoding header to gzip

// Send the request using the HTTP client
resp, err := ec.httpClient.Do(req)
if err != nil {
return err
}
ec.LogInfo("bulk processing terminated")
defer resp.Body.Close()

// Check the response status code
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
}

return nil
}
Loading
Loading