Skip to content

Commit

Permalink
update config
Browse files Browse the repository at this point in the history
  • Loading branch information
dmachard committed Mar 10, 2024
1 parent 1171ea2 commit 5248acf
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 22 deletions.
4 changes: 4 additions & 0 deletions config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,10 @@ multiplexer:
# chan-buffer-size: 4096
# # Size of batches sent to ES via _bulk in bytes, default to 1MB
# bulk-size: 1048576
# # TODO
# bulk-channel-size: 10
# # TODO
# compression: none
# # interval in seconds before to flush the buffer
# flush-interval: 10

Expand Down
5 changes: 3 additions & 2 deletions docs/_integration/elasticsearch/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ multiplexer:
server: "http://127.0.0.1:9200/"
index: "dnscollector"
chan-buffer-size: 4096
bulk-size: 12582912
bulk-size: 5242880
flush-interval: 5

compression: gzip
bulk-channel-size: 10
routes:
- from: [ tap ]
to: [ elastic ]
6 changes: 4 additions & 2 deletions docs/loggers/logger_elasticsearch.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ Options:
> Define the name of the Elasticsearch index to use.
- `bulk-size` (integer) Bulk size to be used for bulk batches in bytes. Default to `1048576` (1MB).
> Set the maximum size of each bulk batch before sending it to Elasticsearch.
- `bulk-channel-size` (integer) TODO
> TODO
- `bulk-channel-size` (integer) Maximum number of bulk messages in buffer. Default to `10`.
> Specifies the maximun number of bulk messages in buffer before to drop it.
- `compression` (string) Compression for bulk messages: `none`, `gzip`. Default to `none`.
> Specifies the compression algorithm to use.
- `chan-buffer-size` (integer) channel buffer size used on incoming dns message, number of messages before to drop it. Default to `4096`.
> Adjust the size of the channel buffer. If you encounter the error message buffer is full, xxx packet(s) dropped, consider increasing this parameter to prevent message drops.
- `flush-interval` (integer) interval in seconds before to flush the buffer. Default to `10`.
Expand Down
64 changes: 46 additions & 18 deletions loggers/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"compress/gzip"
"encoding/json"
"fmt"
"log"
"path"
"time"

Expand Down Expand Up @@ -73,6 +74,17 @@ func (ec *ElasticSearchClient) AddDefaultRoute(wrk pkgutils.Worker) {
func (ec *ElasticSearchClient) SetLoggers(loggers []pkgutils.Worker) {}

func (ec *ElasticSearchClient) ReadConfig() {

if ec.config.Loggers.ElasticSearchClient.Compression != pkgconfig.CompressNone {
ec.LogInfo(ec.config.Loggers.ElasticSearchClient.Compression)
switch ec.config.Loggers.ElasticSearchClient.Compression {
case pkgconfig.CompressGzip:
ec.LogInfo("gzip compression is enabled")
default:
log.Fatal(pkgutils.PrefixLogLogger+"["+ec.name+"] elasticsearch - invalid compress mode: ", ec.config.Loggers.ElasticSearchClient.Compression)
}
}

ec.server = ec.config.Loggers.ElasticSearchClient.Server
ec.index = ec.config.Loggers.ElasticSearchClient.Index

Expand Down Expand Up @@ -112,10 +124,6 @@ func (ec *ElasticSearchClient) Stop() {
ec.LogInfo("stopping to process...")
ec.stopProcess <- true
<-ec.doneProcess

// ec.LogInfo("stopping to bulk process...")
// ec.stopBulkProcess <- true
// <-ec.doneBulkProcess
}

func (ec *ElasticSearchClient) Run() {
Expand Down Expand Up @@ -192,7 +200,12 @@ func (ec *ElasticSearchClient) ProcessDM() {
dataBuffer := make(chan []byte, ec.config.Loggers.ElasticSearchClient.BulkChannelSize)
go func() {
for data := range dataBuffer {
err := ec.sendBulkData(data)
var err error
if ec.config.Loggers.ElasticSearchClient.Compression == pkgconfig.CompressGzip {
err = ec.sendCompressedBulk(data)
} else {
err = ec.sendBulk(data)
}
if err != nil {
ec.LogError("error sending bulk data: %v", err)
}
Expand Down Expand Up @@ -226,8 +239,6 @@ func (ec *ElasticSearchClient) ProcessDM() {
buffer.Read(bufCopy)
buffer.Reset()

// go ec.sendBulkData(bufCopy)

select {
case dataBuffer <- bufCopy:
default:
Expand All @@ -244,8 +255,6 @@ func (ec *ElasticSearchClient) ProcessDM() {
buffer.Read(bufCopy)
buffer.Reset()

// go ec.sendBulkData(bufCopy)

select {
case dataBuffer <- bufCopy:
default:
Expand All @@ -259,12 +268,35 @@ func (ec *ElasticSearchClient) ProcessDM() {
}
}

func (ec *ElasticSearchClient) sendBulkData(data []byte) error {
var compressedData bytes.Buffer
gzipWriter := gzip.NewWriter(&compressedData)
func (ec *ElasticSearchClient) sendBulk(bulk []byte) error {
// Create a new HTTP request
req, err := http.NewRequest("POST", ec.bulkURL, bytes.NewReader(bulk))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")

// Send the request using the HTTP client
resp, err := ec.httpClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()

// Check the response status code
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
}

return nil
}

func (ec *ElasticSearchClient) sendCompressedBulk(bulk []byte) error {
var compressedBulk bytes.Buffer
gzipWriter := gzip.NewWriter(&compressedBulk)

// Write the uncompressed data to the gzip writer
_, err := gzipWriter.Write(data)
_, err := gzipWriter.Write(bulk)
if err != nil {
fmt.Println("gzip", err)
return err
Expand All @@ -273,14 +305,12 @@ func (ec *ElasticSearchClient) sendBulkData(data []byte) error {
// Close the gzip writer to flush any remaining data
err = gzipWriter.Close()
if err != nil {
fmt.Println("gzip", err)
return err
}

// Create a new HTTP request
req, err := http.NewRequest("POST", ec.bulkURL, &compressedData)
req, err := http.NewRequest("POST", ec.bulkURL, &compressedBulk)
if err != nil {
fmt.Println("post", err)
return err
}
req.Header.Set("Content-Type", "application/json")
Expand All @@ -289,14 +319,12 @@ func (ec *ElasticSearchClient) sendBulkData(data []byte) error {
// Send the request using the HTTP client
resp, err := ec.httpClient.Do(req)
if err != nil {
fmt.Println("do", err)
return err
}
defer resp.Body.Close()

// Check the response status code
if resp.StatusCode != http.StatusOK {
fmt.Println("unexpected status code:", resp.StatusCode)
return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
}

Expand Down
2 changes: 2 additions & 0 deletions pkgconfig/loggers.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ type ConfigLoggers struct {
BulkSize int `yaml:"bulk-size"`
BulkChannelSize int `yaml:"bulk-channel-size"`
FlushInterval int `yaml:"flush-interval"`
Compression string `yaml:"compression"`
} `yaml:"elasticsearch"`
ScalyrClient struct {
Enable bool `yaml:"enable"`
Expand Down Expand Up @@ -504,6 +505,7 @@ func (c *ConfigLoggers) SetDefault() {
c.ElasticSearchClient.BulkSize = 1048576
c.ElasticSearchClient.FlushInterval = 10
c.ElasticSearchClient.BulkChannelSize = 10
c.ElasticSearchClient.Compression = CompressNone

c.RedisPub.Enable = false
c.RedisPub.RemoteAddress = LocalhostIP
Expand Down

0 comments on commit 5248acf

Please sign in to comment.