Skip to content

Commit

Permalink
elasticsearch: limit memory usage (#644)
Browse files Browse the repository at this point in the history
* limit memory usage
* update config
* update test
* update doc
  • Loading branch information
dmachard authored Mar 14, 2024
1 parent d21c96b commit 55be85b
Show file tree
Hide file tree
Showing 7 changed files with 164 additions and 89 deletions.
4 changes: 4 additions & 0 deletions config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,10 @@ multiplexer:
# chan-buffer-size: 4096
# # Size of batches sent to ES via _bulk in bytes, default to 1MB
# bulk-size: 1048576
# # Maximum size of the bulk channel
# bulk-channel-size: 10
# # Compression for bulk messages: none, gzip
# compression: none
# # interval in seconds before to flush the buffer
# flush-interval: 10

Expand Down
7 changes: 3 additions & 4 deletions dnscollector.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"strings"
"syscall"

"net/http"
_ "net/http/pprof"

"github.com/dmachard/go-dnscollector/dnsutils"
Expand Down Expand Up @@ -55,9 +54,9 @@ func main() {
testFlag := false

// Server for pprof
go func() {
fmt.Println(http.ListenAndServe("localhost:9999", nil))
}()
// go func() {
// fmt.Println(http.ListenAndServe("localhost:9999", nil))
// }()

// no more use embedded golang flags...
// external lib like tcpassembly can set some uneeded flags too...
Expand Down
9 changes: 5 additions & 4 deletions docs/_integration/elasticsearch/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@ multiplexer:
loggers:
- name: elastic
elasticsearch:
server: "http://127.0.0.1:9200/"
server: "http://192.168.1.220:9200/"
index: "dnscollector"
chan-buffer-size: 4096
bulk-size: 12582912
flush-interval: 5

bulk-size: 5242880
flush-interval: 10
compression: gzip
bulk-channel-size: 10
routes:
- from: [ tap ]
to: [ elastic ]
18 changes: 18 additions & 0 deletions docs/loggers/logger_elasticsearch.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,25 @@ Options:
> Define the name of the Elasticsearch index to use.
- `bulk-size` (integer) Bulk size to be used for bulk batches in bytes. Default to `1048576` (1MB).
> Set the maximum size of each bulk batch before sending it to Elasticsearch.
- `bulk-channel-size` (integer) Maximum number of bulk messages in buffer. Default to `10`.
> Specifies the maximun number of bulk messages in buffer before to drop it.
- `compression` (string) Compression for bulk messages: `none`, `gzip`. Default to `none`.
> Specifies the compression algorithm to use.
- `chan-buffer-size` (integer) channel buffer size used on incoming dns message, number of messages before to drop it. Default to `4096`.
> Adjust the size of the channel buffer. If you encounter the error message buffer is full, xxx packet(s) dropped, consider increasing this parameter to prevent message drops.
- `flush-interval` (integer) interval in seconds before to flush the buffer. Default to `10`.
> Set the maximum time interval before the buffer is flushed. If the bulk batches reach this interval before reaching the maximum size, they will be sent to Elasticsearch.
Configuration example:

```yaml
- name: elastic
elasticsearch:
server: "http://127.0.0.1:9200/"
index: "dnscollector"
chan-buffer-size: 4096
bulk-size: 5242880
flush-interval: 10
compression: gzip
bulk-channel-size: 10
```
208 changes: 128 additions & 80 deletions loggers/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ package loggers

import (
"bytes"
"compress/gzip"
"encoding/json"
"fmt"
"log"
"path"
"time"

Expand All @@ -17,43 +20,37 @@ import (
)

type ElasticSearchClient struct {
stopProcess chan bool
doneProcess chan bool
stopBulkProcess chan bool
doneBulkProcess chan bool
stopRun chan bool
doneRun chan bool
inputChan chan dnsutils.DNSMessage
outputChan chan dnsutils.DNSMessage
sendChan chan []byte
config *pkgconfig.Config
configChan chan *pkgconfig.Config
logger *logger.Logger
name string
server string
index string
bulkURL string
RoutingHandler pkgutils.RoutingHandler
httpClient *http.Client
stopProcess chan bool
doneProcess chan bool
stopRun chan bool
doneRun chan bool
inputChan chan dnsutils.DNSMessage
outputChan chan dnsutils.DNSMessage
config *pkgconfig.Config
configChan chan *pkgconfig.Config
logger *logger.Logger
name string
server string
index string
bulkURL string
RoutingHandler pkgutils.RoutingHandler
httpClient *http.Client
}

func NewElasticSearchClient(config *pkgconfig.Config, console *logger.Logger, name string) *ElasticSearchClient {
console.Info(pkgutils.PrefixLogLogger+"[%s] elasticsearch - enabled", name)
ec := &ElasticSearchClient{
stopProcess: make(chan bool),
doneProcess: make(chan bool),
stopBulkProcess: make(chan bool),
doneBulkProcess: make(chan bool),
stopRun: make(chan bool),
doneRun: make(chan bool),
inputChan: make(chan dnsutils.DNSMessage, config.Loggers.ElasticSearchClient.ChannelBufferSize),
outputChan: make(chan dnsutils.DNSMessage, config.Loggers.ElasticSearchClient.ChannelBufferSize),
sendChan: make(chan []byte, config.Loggers.ElasticSearchClient.ChannelBufferSize),
logger: console,
config: config,
configChan: make(chan *pkgconfig.Config),
name: name,
RoutingHandler: pkgutils.NewRoutingHandler(config, console, name),
stopProcess: make(chan bool),
doneProcess: make(chan bool),
stopRun: make(chan bool),
doneRun: make(chan bool),
inputChan: make(chan dnsutils.DNSMessage, config.Loggers.ElasticSearchClient.ChannelBufferSize),
outputChan: make(chan dnsutils.DNSMessage, config.Loggers.ElasticSearchClient.ChannelBufferSize),
logger: console,
config: config,
configChan: make(chan *pkgconfig.Config),
name: name,
RoutingHandler: pkgutils.NewRoutingHandler(config, console, name),
}
ec.ReadConfig()

Expand All @@ -77,6 +74,17 @@ func (ec *ElasticSearchClient) AddDefaultRoute(wrk pkgutils.Worker) {
func (ec *ElasticSearchClient) SetLoggers(loggers []pkgutils.Worker) {}

func (ec *ElasticSearchClient) ReadConfig() {

if ec.config.Loggers.ElasticSearchClient.Compression != pkgconfig.CompressNone {
ec.LogInfo(ec.config.Loggers.ElasticSearchClient.Compression)
switch ec.config.Loggers.ElasticSearchClient.Compression {
case pkgconfig.CompressGzip:
ec.LogInfo("gzip compression is enabled")
default:
log.Fatal(pkgutils.PrefixLogLogger+"["+ec.name+"] elasticsearch - invalid compress mode: ", ec.config.Loggers.ElasticSearchClient.Compression)
}
}

ec.server = ec.config.Loggers.ElasticSearchClient.Server
ec.index = ec.config.Loggers.ElasticSearchClient.Index

Expand Down Expand Up @@ -116,14 +124,14 @@ func (ec *ElasticSearchClient) Stop() {
ec.LogInfo("stopping to process...")
ec.stopProcess <- true
<-ec.doneProcess

ec.LogInfo("stopping to bulk process...")
ec.stopBulkProcess <- true
<-ec.doneBulkProcess
}

func (ec *ElasticSearchClient) Run() {
ec.LogInfo("waiting dnsmessage to process...")
defer func() {
ec.LogInfo("run terminated")
ec.doneRun <- true
}()

// prepare next channels
defaultRoutes, defaultNames := ec.RoutingHandler.GetDefaultRoutes()
Expand All @@ -136,18 +144,14 @@ func (ec *ElasticSearchClient) Run() {

// goroutine to process transformed dns messages
go ec.ProcessDM()
go ec.ProcessBulk()

// loop to process incoming messages
RUN_LOOP:
for {
select {
case <-ec.stopRun:
// cleanup transformers
subprocessors.Reset()

ec.doneRun <- true
break RUN_LOOP
return

case cfg, opened := <-ec.configChan:
if !opened {
Expand Down Expand Up @@ -177,26 +181,42 @@ RUN_LOOP:
ec.outputChan <- dm
}
}
ec.LogInfo("run terminated")
}

func (ec *ElasticSearchClient) ProcessDM() {
ec.LogInfo("waiting transformed dnsmessage to process...")
defer func() {
ec.LogInfo("processing terminated")
ec.doneProcess <- true
}()

// buffer
buffer := bytes.NewBuffer(nil)
// create a new encoder that writes to the buffer
buffer := bytes.NewBuffer(make([]byte, 0, ec.config.Loggers.ElasticSearchClient.BulkSize))
encoder := json.NewEncoder(buffer)

flushInterval := time.Duration(ec.config.Loggers.ElasticSearchClient.FlushInterval) * time.Second
flushTimer := time.NewTimer(flushInterval)

PROCESS_LOOP:
dataBuffer := make(chan []byte, ec.config.Loggers.ElasticSearchClient.BulkChannelSize)
go func() {
for data := range dataBuffer {
var err error
if ec.config.Loggers.ElasticSearchClient.Compression == pkgconfig.CompressGzip {
err = ec.sendCompressedBulk(data)
} else {
err = ec.sendBulk(data)
}
if err != nil {
ec.LogError("error sending bulk data: %v", err)
}
}
}()

for {
select {
case <-ec.stopProcess:
ec.doneProcess <- true
break PROCESS_LOOP
close(dataBuffer)
return

// incoming dns message to process
case dm, opened := <-ec.outputChan:
Expand All @@ -210,21 +230,19 @@ PROCESS_LOOP:
if err != nil {
ec.LogError("flattening DNS message failed: %e", err)
}

buffer.WriteString("{ \"create\" : {}}\n")
// buffer.WriteString("\n")
encoder.Encode(flat)

// Send data and reset buffer
if buffer.Len() >= ec.config.Loggers.ElasticSearchClient.BulkSize {
bufCopy := make([]byte, buffer.Len())
copy(bufCopy, buffer.Bytes())
buffer.Read(bufCopy)
buffer.Reset()

select {
case ec.sendChan <- bufCopy:
case dataBuffer <- bufCopy:
default:
ec.LogError("send buffer is full, bulk dropped")
ec.LogError("Send buffer is full, bulk dropped")
}
}

Expand All @@ -234,51 +252,81 @@ PROCESS_LOOP:
// Send data and reset buffer
if buffer.Len() > 0 {
bufCopy := make([]byte, buffer.Len())
copy(bufCopy, buffer.Bytes())
buffer.Read(bufCopy)
buffer.Reset()

select {
case ec.sendChan <- bufCopy:
case dataBuffer <- bufCopy:
default:
ec.LogError("send buffer is full, bulk dropped")
ec.LogError("automatic flush, send buffer is full, bulk dropped")
}
}

// restart timer
flushTimer.Reset(flushInterval)
}
}
ec.LogInfo("processing terminated")
}

func (ec *ElasticSearchClient) ProcessBulk() {
ec.LogInfo("waiting bulk messages to process")
func (ec *ElasticSearchClient) sendBulk(bulk []byte) error {
// Create a new HTTP request
req, err := http.NewRequest("POST", ec.bulkURL, bytes.NewReader(bulk))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")

BULK_PROCESS_LOOP:
for {
select {
case <-ec.stopBulkProcess:
ec.doneBulkProcess <- true
break BULK_PROCESS_LOOP
// Send the request using the HTTP client
resp, err := ec.httpClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()

case bulk, opened := <-ec.sendChan:
if !opened {
ec.LogInfo("bulk channel closed!")
return
}
// Check the response status code
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
}

post, _ := http.NewRequest("POST", ec.bulkURL, bytes.NewReader(bulk))
post.Header.Set("Content-Type", "application/json")
return nil
}

resp, err := ec.httpClient.Do(post)
if err != nil {
ec.LogError(err.Error())
}
if resp != nil {
resp.Body.Close()
}
func (ec *ElasticSearchClient) sendCompressedBulk(bulk []byte) error {
var compressedBulk bytes.Buffer
gzipWriter := gzip.NewWriter(&compressedBulk)

}
// Write the uncompressed data to the gzip writer
_, err := gzipWriter.Write(bulk)
if err != nil {
fmt.Println("gzip", err)
return err
}

// Close the gzip writer to flush any remaining data
err = gzipWriter.Close()
if err != nil {
return err
}

// Create a new HTTP request
req, err := http.NewRequest("POST", ec.bulkURL, &compressedBulk)
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Content-Encoding", "gzip") // Set Content-Encoding header to gzip

// Send the request using the HTTP client
resp, err := ec.httpClient.Do(req)
if err != nil {
return err
}
ec.LogInfo("bulk processing terminated")
defer resp.Body.Close()

// Check the response status code
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
}

return nil
}
Loading

0 comments on commit 55be85b

Please sign in to comment.