diff --git a/config.yml b/config.yml index 63d60be4..25a6bf41 100644 --- a/config.yml +++ b/config.yml @@ -563,6 +563,10 @@ multiplexer: # chan-buffer-size: 4096 # # Size of batches sent to ES via _bulk in bytes, default to 1MB # bulk-size: 1048576 +# # Maximum size of the bulk channel +# bulk-channel-size: 10 +# # Compression for bulk messages: none, gzip +# compression: none # # interval in seconds before to flush the buffer # flush-interval: 10 diff --git a/dnscollector.go b/dnscollector.go index 7b3ad3fe..f2a6a628 100644 --- a/dnscollector.go +++ b/dnscollector.go @@ -7,7 +7,6 @@ import ( "strings" "syscall" - "net/http" _ "net/http/pprof" "github.com/dmachard/go-dnscollector/dnsutils" @@ -55,9 +54,9 @@ func main() { testFlag := false // Server for pprof - go func() { - fmt.Println(http.ListenAndServe("localhost:9999", nil)) - }() + // go func() { + // fmt.Println(http.ListenAndServe("localhost:9999", nil)) + // }() // no more use embedded golang flags... // external lib like tcpassembly can set some uneeded flags too... diff --git a/docs/_integration/elasticsearch/config.yml b/docs/_integration/elasticsearch/config.yml index 257b2b70..82f58e2e 100644 --- a/docs/_integration/elasticsearch/config.yml +++ b/docs/_integration/elasticsearch/config.yml @@ -13,12 +13,13 @@ multiplexer: loggers: - name: elastic elasticsearch: - server: "http://127.0.0.1:9200/" + server: "http://192.168.1.220:9200/" index: "dnscollector" chan-buffer-size: 4096 - bulk-size: 12582912 - flush-interval: 5 - + bulk-size: 5242880 + flush-interval: 10 + compression: gzip + bulk-channel-size: 10 routes: - from: [ tap ] to: [ elastic ] \ No newline at end of file diff --git a/docs/loggers/logger_elasticsearch.md b/docs/loggers/logger_elasticsearch.md index 2cfcf891..00839109 100644 --- a/docs/loggers/logger_elasticsearch.md +++ b/docs/loggers/logger_elasticsearch.md @@ -11,7 +11,25 @@ Options: > Define the name of the Elasticsearch index to use. - `bulk-size` (integer) Bulk size to be used for bulk batches in bytes. Default to `1048576` (1MB). > Set the maximum size of each bulk batch before sending it to Elasticsearch. +- `bulk-channel-size` (integer) Maximum number of bulk messages in buffer. Default to `10`. + > Specifies the maximun number of bulk messages in buffer before to drop it. +- `compression` (string) Compression for bulk messages: `none`, `gzip`. Default to `none`. + > Specifies the compression algorithm to use. - `chan-buffer-size` (integer) channel buffer size used on incoming dns message, number of messages before to drop it. Default to `4096`. > Adjust the size of the channel buffer. If you encounter the error message buffer is full, xxx packet(s) dropped, consider increasing this parameter to prevent message drops. - `flush-interval` (integer) interval in seconds before to flush the buffer. Default to `10`. > Set the maximum time interval before the buffer is flushed. If the bulk batches reach this interval before reaching the maximum size, they will be sent to Elasticsearch. + +Configuration example: + +```yaml +- name: elastic + elasticsearch: + server: "http://127.0.0.1:9200/" + index: "dnscollector" + chan-buffer-size: 4096 + bulk-size: 5242880 + flush-interval: 10 + compression: gzip + bulk-channel-size: 10 +``` diff --git a/loggers/elasticsearch.go b/loggers/elasticsearch.go index bf84139c..aa2717a2 100644 --- a/loggers/elasticsearch.go +++ b/loggers/elasticsearch.go @@ -2,7 +2,10 @@ package loggers import ( "bytes" + "compress/gzip" "encoding/json" + "fmt" + "log" "path" "time" @@ -17,43 +20,37 @@ import ( ) type ElasticSearchClient struct { - stopProcess chan bool - doneProcess chan bool - stopBulkProcess chan bool - doneBulkProcess chan bool - stopRun chan bool - doneRun chan bool - inputChan chan dnsutils.DNSMessage - outputChan chan dnsutils.DNSMessage - sendChan chan []byte - config *pkgconfig.Config - configChan chan *pkgconfig.Config - logger *logger.Logger - name string - server string - index string - bulkURL string - RoutingHandler pkgutils.RoutingHandler - httpClient *http.Client + stopProcess chan bool + doneProcess chan bool + stopRun chan bool + doneRun chan bool + inputChan chan dnsutils.DNSMessage + outputChan chan dnsutils.DNSMessage + config *pkgconfig.Config + configChan chan *pkgconfig.Config + logger *logger.Logger + name string + server string + index string + bulkURL string + RoutingHandler pkgutils.RoutingHandler + httpClient *http.Client } func NewElasticSearchClient(config *pkgconfig.Config, console *logger.Logger, name string) *ElasticSearchClient { console.Info(pkgutils.PrefixLogLogger+"[%s] elasticsearch - enabled", name) ec := &ElasticSearchClient{ - stopProcess: make(chan bool), - doneProcess: make(chan bool), - stopBulkProcess: make(chan bool), - doneBulkProcess: make(chan bool), - stopRun: make(chan bool), - doneRun: make(chan bool), - inputChan: make(chan dnsutils.DNSMessage, config.Loggers.ElasticSearchClient.ChannelBufferSize), - outputChan: make(chan dnsutils.DNSMessage, config.Loggers.ElasticSearchClient.ChannelBufferSize), - sendChan: make(chan []byte, config.Loggers.ElasticSearchClient.ChannelBufferSize), - logger: console, - config: config, - configChan: make(chan *pkgconfig.Config), - name: name, - RoutingHandler: pkgutils.NewRoutingHandler(config, console, name), + stopProcess: make(chan bool), + doneProcess: make(chan bool), + stopRun: make(chan bool), + doneRun: make(chan bool), + inputChan: make(chan dnsutils.DNSMessage, config.Loggers.ElasticSearchClient.ChannelBufferSize), + outputChan: make(chan dnsutils.DNSMessage, config.Loggers.ElasticSearchClient.ChannelBufferSize), + logger: console, + config: config, + configChan: make(chan *pkgconfig.Config), + name: name, + RoutingHandler: pkgutils.NewRoutingHandler(config, console, name), } ec.ReadConfig() @@ -77,6 +74,17 @@ func (ec *ElasticSearchClient) AddDefaultRoute(wrk pkgutils.Worker) { func (ec *ElasticSearchClient) SetLoggers(loggers []pkgutils.Worker) {} func (ec *ElasticSearchClient) ReadConfig() { + + if ec.config.Loggers.ElasticSearchClient.Compression != pkgconfig.CompressNone { + ec.LogInfo(ec.config.Loggers.ElasticSearchClient.Compression) + switch ec.config.Loggers.ElasticSearchClient.Compression { + case pkgconfig.CompressGzip: + ec.LogInfo("gzip compression is enabled") + default: + log.Fatal(pkgutils.PrefixLogLogger+"["+ec.name+"] elasticsearch - invalid compress mode: ", ec.config.Loggers.ElasticSearchClient.Compression) + } + } + ec.server = ec.config.Loggers.ElasticSearchClient.Server ec.index = ec.config.Loggers.ElasticSearchClient.Index @@ -116,14 +124,14 @@ func (ec *ElasticSearchClient) Stop() { ec.LogInfo("stopping to process...") ec.stopProcess <- true <-ec.doneProcess - - ec.LogInfo("stopping to bulk process...") - ec.stopBulkProcess <- true - <-ec.doneBulkProcess } func (ec *ElasticSearchClient) Run() { ec.LogInfo("waiting dnsmessage to process...") + defer func() { + ec.LogInfo("run terminated") + ec.doneRun <- true + }() // prepare next channels defaultRoutes, defaultNames := ec.RoutingHandler.GetDefaultRoutes() @@ -136,18 +144,14 @@ func (ec *ElasticSearchClient) Run() { // goroutine to process transformed dns messages go ec.ProcessDM() - go ec.ProcessBulk() // loop to process incoming messages -RUN_LOOP: for { select { case <-ec.stopRun: // cleanup transformers subprocessors.Reset() - - ec.doneRun <- true - break RUN_LOOP + return case cfg, opened := <-ec.configChan: if !opened { @@ -177,26 +181,42 @@ RUN_LOOP: ec.outputChan <- dm } } - ec.LogInfo("run terminated") } func (ec *ElasticSearchClient) ProcessDM() { ec.LogInfo("waiting transformed dnsmessage to process...") + defer func() { + ec.LogInfo("processing terminated") + ec.doneProcess <- true + }() - // buffer - buffer := bytes.NewBuffer(nil) // create a new encoder that writes to the buffer + buffer := bytes.NewBuffer(make([]byte, 0, ec.config.Loggers.ElasticSearchClient.BulkSize)) encoder := json.NewEncoder(buffer) flushInterval := time.Duration(ec.config.Loggers.ElasticSearchClient.FlushInterval) * time.Second flushTimer := time.NewTimer(flushInterval) -PROCESS_LOOP: + dataBuffer := make(chan []byte, ec.config.Loggers.ElasticSearchClient.BulkChannelSize) + go func() { + for data := range dataBuffer { + var err error + if ec.config.Loggers.ElasticSearchClient.Compression == pkgconfig.CompressGzip { + err = ec.sendCompressedBulk(data) + } else { + err = ec.sendBulk(data) + } + if err != nil { + ec.LogError("error sending bulk data: %v", err) + } + } + }() + for { select { case <-ec.stopProcess: - ec.doneProcess <- true - break PROCESS_LOOP + close(dataBuffer) + return // incoming dns message to process case dm, opened := <-ec.outputChan: @@ -210,21 +230,19 @@ PROCESS_LOOP: if err != nil { ec.LogError("flattening DNS message failed: %e", err) } - buffer.WriteString("{ \"create\" : {}}\n") - // buffer.WriteString("\n") encoder.Encode(flat) // Send data and reset buffer if buffer.Len() >= ec.config.Loggers.ElasticSearchClient.BulkSize { bufCopy := make([]byte, buffer.Len()) - copy(bufCopy, buffer.Bytes()) + buffer.Read(bufCopy) buffer.Reset() select { - case ec.sendChan <- bufCopy: + case dataBuffer <- bufCopy: default: - ec.LogError("send buffer is full, bulk dropped") + ec.LogError("Send buffer is full, bulk dropped") } } @@ -234,13 +252,13 @@ PROCESS_LOOP: // Send data and reset buffer if buffer.Len() > 0 { bufCopy := make([]byte, buffer.Len()) - copy(bufCopy, buffer.Bytes()) + buffer.Read(bufCopy) buffer.Reset() select { - case ec.sendChan <- bufCopy: + case dataBuffer <- bufCopy: default: - ec.LogError("send buffer is full, bulk dropped") + ec.LogError("automatic flush, send buffer is full, bulk dropped") } } @@ -248,37 +266,67 @@ PROCESS_LOOP: flushTimer.Reset(flushInterval) } } - ec.LogInfo("processing terminated") } -func (ec *ElasticSearchClient) ProcessBulk() { - ec.LogInfo("waiting bulk messages to process") +func (ec *ElasticSearchClient) sendBulk(bulk []byte) error { + // Create a new HTTP request + req, err := http.NewRequest("POST", ec.bulkURL, bytes.NewReader(bulk)) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") -BULK_PROCESS_LOOP: - for { - select { - case <-ec.stopBulkProcess: - ec.doneBulkProcess <- true - break BULK_PROCESS_LOOP + // Send the request using the HTTP client + resp, err := ec.httpClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() - case bulk, opened := <-ec.sendChan: - if !opened { - ec.LogInfo("bulk channel closed!") - return - } + // Check the response status code + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("unexpected status code: %d", resp.StatusCode) + } - post, _ := http.NewRequest("POST", ec.bulkURL, bytes.NewReader(bulk)) - post.Header.Set("Content-Type", "application/json") + return nil +} - resp, err := ec.httpClient.Do(post) - if err != nil { - ec.LogError(err.Error()) - } - if resp != nil { - resp.Body.Close() - } +func (ec *ElasticSearchClient) sendCompressedBulk(bulk []byte) error { + var compressedBulk bytes.Buffer + gzipWriter := gzip.NewWriter(&compressedBulk) - } + // Write the uncompressed data to the gzip writer + _, err := gzipWriter.Write(bulk) + if err != nil { + fmt.Println("gzip", err) + return err + } + + // Close the gzip writer to flush any remaining data + err = gzipWriter.Close() + if err != nil { + return err + } + + // Create a new HTTP request + req, err := http.NewRequest("POST", ec.bulkURL, &compressedBulk) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Content-Encoding", "gzip") // Set Content-Encoding header to gzip + + // Send the request using the HTTP client + resp, err := ec.httpClient.Do(req) + if err != nil { + return err } - ec.LogInfo("bulk processing terminated") + defer resp.Body.Close() + + // Check the response status code + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("unexpected status code: %d", resp.StatusCode) + } + + return nil } diff --git a/loggers/elasticsearch_test.go b/loggers/elasticsearch_test.go index 96eb37fe..16957ed6 100644 --- a/loggers/elasticsearch_test.go +++ b/loggers/elasticsearch_test.go @@ -42,6 +42,7 @@ func Test_ElasticSearchClient_BulkSize_Exceeded(t *testing.T) { conf.Loggers.ElasticSearchClient.Index = "indexname" conf.Loggers.ElasticSearchClient.Server = "http://127.0.0.1:59200/" conf.Loggers.ElasticSearchClient.BulkSize = tc.bulkSize + conf.Loggers.ElasticSearchClient.BulkChannelSize = 50 g := NewElasticSearchClient(conf, logger.New(false), "test") go g.Run() diff --git a/pkgconfig/loggers.go b/pkgconfig/loggers.go index 5a8c3231..bcc29eee 100644 --- a/pkgconfig/loggers.go +++ b/pkgconfig/loggers.go @@ -227,7 +227,9 @@ type ConfigLoggers struct { Server string `yaml:"server"` ChannelBufferSize int `yaml:"chan-buffer-size"` BulkSize int `yaml:"bulk-size"` + BulkChannelSize int `yaml:"bulk-channel-size"` FlushInterval int `yaml:"flush-interval"` + Compression string `yaml:"compression"` } `yaml:"elasticsearch"` ScalyrClient struct { Enable bool `yaml:"enable"` @@ -500,8 +502,10 @@ func (c *ConfigLoggers) SetDefault() { c.ElasticSearchClient.Server = "http://127.0.0.1:9200/" c.ElasticSearchClient.Index = "dnscollector" c.ElasticSearchClient.ChannelBufferSize = 4096 - c.ElasticSearchClient.BulkSize = 1048576 + c.ElasticSearchClient.BulkSize = 5242880 c.ElasticSearchClient.FlushInterval = 10 + c.ElasticSearchClient.BulkChannelSize = 10 + c.ElasticSearchClient.Compression = CompressNone c.RedisPub.Enable = false c.RedisPub.RemoteAddress = LocalhostIP