Skip to content

Commit

Permalink
Added ElasticSearch API basic authentication (#866)
Browse files Browse the repository at this point in the history
* Added ElasticSearch API basic authentication
* Update content-type to x-ndjson
* Update docs
  • Loading branch information
dmachard authored Nov 9, 2024
1 parent 201da23 commit e86a45a
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 12 deletions.
11 changes: 6 additions & 5 deletions docs/_integration/elasticsearch/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@
sudo docker compose up -d
```

- Update the destination URL in the config.yml and run DNScollector from source and generate some DNS logs from your DNS server with DNStap protocol.

```bash
go run . -config docs/_integration/elasticsearch/config.yml
```

- Go to kibana web interface through `http://127.0.0.1:5601`

- Click on `Explore on my own` and `Discover`

- Finally create index pattern `dnscollector` and choose `dnstap.timestamp-rfc33939ns`

- Finally, run DNScollector from source and generate some DNS logs from your DNS server with DNStap protocol.

```bash
go run . -config docs/_integration/elasticsearch/config.yml
```
4 changes: 2 additions & 2 deletions docs/_integration/elasticsearch/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ pipelines:

- name: elastic
elasticsearch:
server: "http://192.168.1.220:9200/"
server: "http://192.168.1.16:9200/"
index: "dnscollector"
bulk-size: 5242880
flush-interval: 10
compression: gzip
compression: none
bulk-channel-size: 10
5 changes: 3 additions & 2 deletions docs/_integration/elasticsearch/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.12.2
image: docker.elastic.co/elasticsearch/elasticsearch:8.15.3
container_name: elasticsearch
restart: always
environment:
Expand All @@ -14,10 +14,11 @@ services:
- 9200:9200
kibana:
container_name: kibana
image: docker.elastic.co/kibana/kibana:8.12.2
image: docker.elastic.co/kibana/kibana:8.15.3
restart: always
environment:
- ELASTICSEARCH_HOSTS=http://elasticsearch:9200
- telemetry.enabled=false
ports:
- 5601:5601
depends_on:
Expand Down
12 changes: 12 additions & 0 deletions docs/loggers/logger_elasticsearch.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,15 @@ Options:
> Interval in seconds before to flush the buffer.
> Set the maximum time interval before the buffer is flushed. If the bulk batches reach this interval before reaching the maximum size, they will be sent to Elasticsearch.
* `basic-auth-enable` (bool)
> Enable basic authentication (login+password) to ElasticSearch
* `basic-auth-login` (string)
> The login username
* `basic-auth-pwd` (string)
> The password
Defaults:

```yaml
Expand All @@ -45,6 +54,9 @@ Defaults:
flush-interval: 10 # in seconds
compression: none
bulk-channel-size: 10
basic-auth-enable: false
basic-auth-login: ""
basic-auth-pwd: ""
```
> Could you explain the difference between `bulk-size` and `bulk-channel-size`?
Expand Down
3 changes: 3 additions & 0 deletions pkgconfig/loggers.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,9 @@ type ConfigLoggers struct {
BulkChannelSize int `yaml:"bulk-channel-size" default:"10"`
FlushInterval int `yaml:"flush-interval" default:"10"`
Compression string `yaml:"compression" default:"none"`
BasicAuthEnabled bool `yaml:"basic-auth-enable" default:"false"`
BasicAuthLogin string `yaml:"basic-auth-login" default:""`
BasicAuthPwd string `yaml:"basic-auth-pwd" default:""`
} `yaml:"elasticsearch"`
ScalyrClient struct {
Enable bool `yaml:"enable" default:"false"`
Expand Down
11 changes: 8 additions & 3 deletions workers/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,10 @@ func (w *ElasticSearchClient) sendBulk(bulk []byte) error {
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Content-Type", "application/x-ndjson")
if w.GetConfig().Loggers.ElasticSearchClient.BasicAuthEnabled {
req.SetBasicAuth(w.GetConfig().Loggers.ElasticSearchClient.BasicAuthLogin, w.GetConfig().Loggers.ElasticSearchClient.BasicAuthPwd)
}

// Send the request using the HTTP client
resp, err := w.httpClient.Do(req)
Expand All @@ -223,7 +226,6 @@ func (w *ElasticSearchClient) sendCompressedBulk(bulk []byte) error {
// Write the uncompressed data to the gzip writer
_, err := gzipWriter.Write(bulk)
if err != nil {
fmt.Println("gzip", err)
return err
}

Expand All @@ -238,8 +240,11 @@ func (w *ElasticSearchClient) sendCompressedBulk(bulk []byte) error {
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Content-Type", "application/x-ndjson")
req.Header.Set("Content-Encoding", "gzip") // Set Content-Encoding header to gzip
if w.GetConfig().Loggers.ElasticSearchClient.BasicAuthEnabled {
req.SetBasicAuth(w.GetConfig().Loggers.ElasticSearchClient.BasicAuthLogin, w.GetConfig().Loggers.ElasticSearchClient.BasicAuthPwd)
}

// Send the request using the HTTP client
resp, err := w.httpClient.Do(req)
Expand Down
32 changes: 32 additions & 0 deletions workers/elasticsearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io"
"net"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -187,3 +188,34 @@ func Test_ElasticSearchClient_FlushInterval_Exceeded(t *testing.T) {
assert.Equal(t, tc.inputSize, totalDm)
}
}

func TestElasticSearchClient_sendBulk_WithBasicAuth(t *testing.T) {
// Create a test HTTP server to simulate Elasticsearch
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Verify that the Authorization header is present
username, password, ok := r.BasicAuth()
assert.True(t, ok, "Basic Auth header is missing in the request")
assert.Equal(t, "testuser", username, "Incorrect username")
assert.Equal(t, "testpass", password, "Incorrect password")

if username != "testuser" || password != "testpass" {
w.WriteHeader(http.StatusUnauthorized)
return
}
w.WriteHeader(http.StatusOK)
}))
defer server.Close()

// Initialize the configuration using GetDefaultConfig() for consistency
config := pkgconfig.GetDefaultConfig()
config.Loggers.ElasticSearchClient.Server = server.URL
config.Loggers.ElasticSearchClient.BasicAuthEnabled = true
config.Loggers.ElasticSearchClient.BasicAuthLogin = "testuser"
config.Loggers.ElasticSearchClient.BasicAuthPwd = "testpass"

client := NewElasticSearchClient(config, logger.New(false), "test-client")

// Send a request with a test payload
err := client.sendBulk([]byte("test payload"))
assert.NoError(t, err, "Unexpected error when sending request with Basic Auth")
}

0 comments on commit e86a45a

Please sign in to comment.