From 5248acff14810bf4f930e09ad0e5d0616b81105e Mon Sep 17 00:00:00 2001 From: dmachard <5562930+dmachard@users.noreply.github.com> Date: Sun, 10 Mar 2024 14:41:14 +0100 Subject: [PATCH] update config --- config.yml | 4 ++ docs/_integration/elasticsearch/config.yml | 5 +- docs/loggers/logger_elasticsearch.md | 6 +- loggers/elasticsearch.go | 64 ++++++++++++++++------ pkgconfig/loggers.go | 2 + 5 files changed, 59 insertions(+), 22 deletions(-) diff --git a/config.yml b/config.yml index 63d60be4..227817bd 100644 --- a/config.yml +++ b/config.yml @@ -563,6 +563,10 @@ multiplexer: # chan-buffer-size: 4096 # # Size of batches sent to ES via _bulk in bytes, default to 1MB # bulk-size: 1048576 +# # TODO +# bulk-channel-size: 10 +# # TODO +# compression: none # # interval in seconds before to flush the buffer # flush-interval: 10 diff --git a/docs/_integration/elasticsearch/config.yml b/docs/_integration/elasticsearch/config.yml index 257b2b70..2baeeaaf 100644 --- a/docs/_integration/elasticsearch/config.yml +++ b/docs/_integration/elasticsearch/config.yml @@ -16,9 +16,10 @@ multiplexer: server: "http://127.0.0.1:9200/" index: "dnscollector" chan-buffer-size: 4096 - bulk-size: 12582912 + bulk-size: 5242880 flush-interval: 5 - + compression: gzip + bulk-channel-size: 10 routes: - from: [ tap ] to: [ elastic ] \ No newline at end of file diff --git a/docs/loggers/logger_elasticsearch.md b/docs/loggers/logger_elasticsearch.md index 841e0b25..37b94637 100644 --- a/docs/loggers/logger_elasticsearch.md +++ b/docs/loggers/logger_elasticsearch.md @@ -11,8 +11,10 @@ Options: > Define the name of the Elasticsearch index to use. - `bulk-size` (integer) Bulk size to be used for bulk batches in bytes. Default to `1048576` (1MB). > Set the maximum size of each bulk batch before sending it to Elasticsearch. -- `bulk-channel-size` (integer) TODO - > TODO +- `bulk-channel-size` (integer) Maximum number of bulk messages in buffer. Default to `10`. + > Specifies the maximun number of bulk messages in buffer before to drop it. +- `compression` (string) Compression for bulk messages: `none`, `gzip`. Default to `none`. + > Specifies the compression algorithm to use. - `chan-buffer-size` (integer) channel buffer size used on incoming dns message, number of messages before to drop it. Default to `4096`. > Adjust the size of the channel buffer. If you encounter the error message buffer is full, xxx packet(s) dropped, consider increasing this parameter to prevent message drops. - `flush-interval` (integer) interval in seconds before to flush the buffer. Default to `10`. diff --git a/loggers/elasticsearch.go b/loggers/elasticsearch.go index a85287f1..aa2717a2 100644 --- a/loggers/elasticsearch.go +++ b/loggers/elasticsearch.go @@ -5,6 +5,7 @@ import ( "compress/gzip" "encoding/json" "fmt" + "log" "path" "time" @@ -73,6 +74,17 @@ func (ec *ElasticSearchClient) AddDefaultRoute(wrk pkgutils.Worker) { func (ec *ElasticSearchClient) SetLoggers(loggers []pkgutils.Worker) {} func (ec *ElasticSearchClient) ReadConfig() { + + if ec.config.Loggers.ElasticSearchClient.Compression != pkgconfig.CompressNone { + ec.LogInfo(ec.config.Loggers.ElasticSearchClient.Compression) + switch ec.config.Loggers.ElasticSearchClient.Compression { + case pkgconfig.CompressGzip: + ec.LogInfo("gzip compression is enabled") + default: + log.Fatal(pkgutils.PrefixLogLogger+"["+ec.name+"] elasticsearch - invalid compress mode: ", ec.config.Loggers.ElasticSearchClient.Compression) + } + } + ec.server = ec.config.Loggers.ElasticSearchClient.Server ec.index = ec.config.Loggers.ElasticSearchClient.Index @@ -112,10 +124,6 @@ func (ec *ElasticSearchClient) Stop() { ec.LogInfo("stopping to process...") ec.stopProcess <- true <-ec.doneProcess - - // ec.LogInfo("stopping to bulk process...") - // ec.stopBulkProcess <- true - // <-ec.doneBulkProcess } func (ec *ElasticSearchClient) Run() { @@ -192,7 +200,12 @@ func (ec *ElasticSearchClient) ProcessDM() { dataBuffer := make(chan []byte, ec.config.Loggers.ElasticSearchClient.BulkChannelSize) go func() { for data := range dataBuffer { - err := ec.sendBulkData(data) + var err error + if ec.config.Loggers.ElasticSearchClient.Compression == pkgconfig.CompressGzip { + err = ec.sendCompressedBulk(data) + } else { + err = ec.sendBulk(data) + } if err != nil { ec.LogError("error sending bulk data: %v", err) } @@ -226,8 +239,6 @@ func (ec *ElasticSearchClient) ProcessDM() { buffer.Read(bufCopy) buffer.Reset() - // go ec.sendBulkData(bufCopy) - select { case dataBuffer <- bufCopy: default: @@ -244,8 +255,6 @@ func (ec *ElasticSearchClient) ProcessDM() { buffer.Read(bufCopy) buffer.Reset() - // go ec.sendBulkData(bufCopy) - select { case dataBuffer <- bufCopy: default: @@ -259,12 +268,35 @@ func (ec *ElasticSearchClient) ProcessDM() { } } -func (ec *ElasticSearchClient) sendBulkData(data []byte) error { - var compressedData bytes.Buffer - gzipWriter := gzip.NewWriter(&compressedData) +func (ec *ElasticSearchClient) sendBulk(bulk []byte) error { + // Create a new HTTP request + req, err := http.NewRequest("POST", ec.bulkURL, bytes.NewReader(bulk)) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + + // Send the request using the HTTP client + resp, err := ec.httpClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + // Check the response status code + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("unexpected status code: %d", resp.StatusCode) + } + + return nil +} + +func (ec *ElasticSearchClient) sendCompressedBulk(bulk []byte) error { + var compressedBulk bytes.Buffer + gzipWriter := gzip.NewWriter(&compressedBulk) // Write the uncompressed data to the gzip writer - _, err := gzipWriter.Write(data) + _, err := gzipWriter.Write(bulk) if err != nil { fmt.Println("gzip", err) return err @@ -273,14 +305,12 @@ func (ec *ElasticSearchClient) sendBulkData(data []byte) error { // Close the gzip writer to flush any remaining data err = gzipWriter.Close() if err != nil { - fmt.Println("gzip", err) return err } // Create a new HTTP request - req, err := http.NewRequest("POST", ec.bulkURL, &compressedData) + req, err := http.NewRequest("POST", ec.bulkURL, &compressedBulk) if err != nil { - fmt.Println("post", err) return err } req.Header.Set("Content-Type", "application/json") @@ -289,14 +319,12 @@ func (ec *ElasticSearchClient) sendBulkData(data []byte) error { // Send the request using the HTTP client resp, err := ec.httpClient.Do(req) if err != nil { - fmt.Println("do", err) return err } defer resp.Body.Close() // Check the response status code if resp.StatusCode != http.StatusOK { - fmt.Println("unexpected status code:", resp.StatusCode) return fmt.Errorf("unexpected status code: %d", resp.StatusCode) } diff --git a/pkgconfig/loggers.go b/pkgconfig/loggers.go index 3942ee66..8c6bdac2 100644 --- a/pkgconfig/loggers.go +++ b/pkgconfig/loggers.go @@ -229,6 +229,7 @@ type ConfigLoggers struct { BulkSize int `yaml:"bulk-size"` BulkChannelSize int `yaml:"bulk-channel-size"` FlushInterval int `yaml:"flush-interval"` + Compression string `yaml:"compression"` } `yaml:"elasticsearch"` ScalyrClient struct { Enable bool `yaml:"enable"` @@ -504,6 +505,7 @@ func (c *ConfigLoggers) SetDefault() { c.ElasticSearchClient.BulkSize = 1048576 c.ElasticSearchClient.FlushInterval = 10 c.ElasticSearchClient.BulkChannelSize = 10 + c.ElasticSearchClient.Compression = CompressNone c.RedisPub.Enable = false c.RedisPub.RemoteAddress = LocalhostIP