diff --git a/docs/_integration/elasticsearch/README.md b/docs/_integration/elasticsearch/README.md index bdaa1bc3..4bad8339 100644 --- a/docs/_integration/elasticsearch/README.md +++ b/docs/_integration/elasticsearch/README.md @@ -11,14 +11,15 @@ sudo docker compose up -d ``` +- Update the destination URL in the config.yml and run DNScollector from source and generate some DNS logs from your DNS server with DNStap protocol. + + ```bash + go run . -config docs/_integration/elasticsearch/config.yml + ``` + - Go to kibana web interface through `http://127.0.0.1:5601` - Click on `Explore on my own` and `Discover` - Finally create index pattern `dnscollector` and choose `dnstap.timestamp-rfc33939ns` -- Finally, run DNScollector from source and generate some DNS logs from your DNS server with DNStap protocol. - - ```bash - go run . -config docs/_integration/elasticsearch/config.yml - ``` diff --git a/docs/_integration/elasticsearch/config.yml b/docs/_integration/elasticsearch/config.yml index 36b6474f..b5b644f7 100644 --- a/docs/_integration/elasticsearch/config.yml +++ b/docs/_integration/elasticsearch/config.yml @@ -14,9 +14,9 @@ pipelines: - name: elastic elasticsearch: - server: "http://192.168.1.220:9200/" + server: "http://192.168.1.16:9200/" index: "dnscollector" bulk-size: 5242880 flush-interval: 10 - compression: gzip + compression: none bulk-channel-size: 10 \ No newline at end of file diff --git a/docs/_integration/elasticsearch/docker-compose.yml b/docs/_integration/elasticsearch/docker-compose.yml index 7320b135..9a8b23b6 100644 --- a/docs/_integration/elasticsearch/docker-compose.yml +++ b/docs/_integration/elasticsearch/docker-compose.yml @@ -1,7 +1,7 @@ services: elasticsearch: - image: docker.elastic.co/elasticsearch/elasticsearch:8.12.2 + image: docker.elastic.co/elasticsearch/elasticsearch:8.15.3 container_name: elasticsearch restart: always environment: @@ -14,10 +14,11 @@ services: - 9200:9200 kibana: container_name: kibana - image: docker.elastic.co/kibana/kibana:8.12.2 + image: docker.elastic.co/kibana/kibana:8.15.3 restart: always environment: - ELASTICSEARCH_HOSTS=http://elasticsearch:9200 + - telemetry.enabled=false ports: - 5601:5601 depends_on: diff --git a/docs/loggers/logger_elasticsearch.md b/docs/loggers/logger_elasticsearch.md index fd33bed2..dce55201 100644 --- a/docs/loggers/logger_elasticsearch.md +++ b/docs/loggers/logger_elasticsearch.md @@ -33,6 +33,15 @@ Options: > Interval in seconds before to flush the buffer. > Set the maximum time interval before the buffer is flushed. If the bulk batches reach this interval before reaching the maximum size, they will be sent to Elasticsearch. +* `basic-auth-enable` (bool) + > Enable basic authentication (login+password) to ElasticSearch + +* `basic-auth-login` (string) + > The login username + +* `basic-auth-pwd` (string) + > The password + Defaults: ```yaml @@ -45,6 +54,9 @@ Defaults: flush-interval: 10 # in seconds compression: none bulk-channel-size: 10 + basic-auth-enable: false + basic-auth-login: "" + basic-auth-pwd: "" ``` > Could you explain the difference between `bulk-size` and `bulk-channel-size`? diff --git a/pkgconfig/loggers.go b/pkgconfig/loggers.go index 18495bbb..60a147be 100644 --- a/pkgconfig/loggers.go +++ b/pkgconfig/loggers.go @@ -231,6 +231,9 @@ type ConfigLoggers struct { BulkChannelSize int `yaml:"bulk-channel-size" default:"10"` FlushInterval int `yaml:"flush-interval" default:"10"` Compression string `yaml:"compression" default:"none"` + BasicAuthEnabled bool `yaml:"basic-auth-enable" default:"false"` + BasicAuthLogin string `yaml:"basic-auth-login" default:""` + BasicAuthPwd string `yaml:"basic-auth-pwd" default:""` } `yaml:"elasticsearch"` ScalyrClient struct { Enable bool `yaml:"enable" default:"false"` diff --git a/workers/elasticsearch.go b/workers/elasticsearch.go index 209c81bb..e5e0b737 100644 --- a/workers/elasticsearch.go +++ b/workers/elasticsearch.go @@ -199,7 +199,10 @@ func (w *ElasticSearchClient) sendBulk(bulk []byte) error { if err != nil { return err } - req.Header.Set("Content-Type", "application/json") + req.Header.Set("Content-Type", "application/x-ndjson") + if w.GetConfig().Loggers.ElasticSearchClient.BasicAuthEnabled { + req.SetBasicAuth(w.GetConfig().Loggers.ElasticSearchClient.BasicAuthLogin, w.GetConfig().Loggers.ElasticSearchClient.BasicAuthPwd) + } // Send the request using the HTTP client resp, err := w.httpClient.Do(req) @@ -223,7 +226,6 @@ func (w *ElasticSearchClient) sendCompressedBulk(bulk []byte) error { // Write the uncompressed data to the gzip writer _, err := gzipWriter.Write(bulk) if err != nil { - fmt.Println("gzip", err) return err } @@ -238,8 +240,11 @@ func (w *ElasticSearchClient) sendCompressedBulk(bulk []byte) error { if err != nil { return err } - req.Header.Set("Content-Type", "application/json") + req.Header.Set("Content-Type", "application/x-ndjson") req.Header.Set("Content-Encoding", "gzip") // Set Content-Encoding header to gzip + if w.GetConfig().Loggers.ElasticSearchClient.BasicAuthEnabled { + req.SetBasicAuth(w.GetConfig().Loggers.ElasticSearchClient.BasicAuthLogin, w.GetConfig().Loggers.ElasticSearchClient.BasicAuthPwd) + } // Send the request using the HTTP client resp, err := w.httpClient.Do(req) diff --git a/workers/elasticsearch_test.go b/workers/elasticsearch_test.go index 3bc20ed7..adfd043a 100644 --- a/workers/elasticsearch_test.go +++ b/workers/elasticsearch_test.go @@ -6,6 +6,7 @@ import ( "io" "net" "net/http" + "net/http/httptest" "strings" "testing" "time" @@ -187,3 +188,34 @@ func Test_ElasticSearchClient_FlushInterval_Exceeded(t *testing.T) { assert.Equal(t, tc.inputSize, totalDm) } } + +func TestElasticSearchClient_sendBulk_WithBasicAuth(t *testing.T) { + // Create a test HTTP server to simulate Elasticsearch + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Verify that the Authorization header is present + username, password, ok := r.BasicAuth() + assert.True(t, ok, "Basic Auth header is missing in the request") + assert.Equal(t, "testuser", username, "Incorrect username") + assert.Equal(t, "testpass", password, "Incorrect password") + + if username != "testuser" || password != "testpass" { + w.WriteHeader(http.StatusUnauthorized) + return + } + w.WriteHeader(http.StatusOK) + })) + defer server.Close() + + // Initialize the configuration using GetDefaultConfig() for consistency + config := pkgconfig.GetDefaultConfig() + config.Loggers.ElasticSearchClient.Server = server.URL + config.Loggers.ElasticSearchClient.BasicAuthEnabled = true + config.Loggers.ElasticSearchClient.BasicAuthLogin = "testuser" + config.Loggers.ElasticSearchClient.BasicAuthPwd = "testpass" + + client := NewElasticSearchClient(config, logger.New(false), "test-client") + + // Send a request with a test payload + err := client.sendBulk([]byte("test payload")) + assert.NoError(t, err, "Unexpected error when sending request with Basic Auth") +}