Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added ElasticSearch API basic authentication #866

Merged
merged 2 commits into from
Nov 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions docs/_integration/elasticsearch/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@
sudo docker compose up -d
```

- Update the destination URL in the config.yml and run DNScollector from source and generate some DNS logs from your DNS server with DNStap protocol.

```bash
go run . -config docs/_integration/elasticsearch/config.yml
```

- Go to kibana web interface through `http://127.0.0.1:5601`

- Click on `Explore on my own` and `Discover`

- Finally create index pattern `dnscollector` and choose `dnstap.timestamp-rfc33939ns`

- Finally, run DNScollector from source and generate some DNS logs from your DNS server with DNStap protocol.

```bash
go run . -config docs/_integration/elasticsearch/config.yml
```
4 changes: 2 additions & 2 deletions docs/_integration/elasticsearch/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ pipelines:

- name: elastic
elasticsearch:
server: "http://192.168.1.220:9200/"
server: "http://192.168.1.16:9200/"
index: "dnscollector"
bulk-size: 5242880
flush-interval: 10
compression: gzip
compression: none
bulk-channel-size: 10
5 changes: 3 additions & 2 deletions docs/_integration/elasticsearch/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.12.2
image: docker.elastic.co/elasticsearch/elasticsearch:8.15.3
container_name: elasticsearch
restart: always
environment:
Expand All @@ -14,10 +14,11 @@ services:
- 9200:9200
kibana:
container_name: kibana
image: docker.elastic.co/kibana/kibana:8.12.2
image: docker.elastic.co/kibana/kibana:8.15.3
restart: always
environment:
- ELASTICSEARCH_HOSTS=http://elasticsearch:9200
- telemetry.enabled=false
ports:
- 5601:5601
depends_on:
Expand Down
12 changes: 12 additions & 0 deletions docs/loggers/logger_elasticsearch.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,15 @@ Options:
> Interval in seconds before to flush the buffer.
> Set the maximum time interval before the buffer is flushed. If the bulk batches reach this interval before reaching the maximum size, they will be sent to Elasticsearch.

* `basic-auth-enable` (bool)
> Enable basic authentication (login+password) to ElasticSearch

* `basic-auth-login` (string)
> The login username

* `basic-auth-pwd` (string)
> The password

Defaults:

```yaml
Expand All @@ -45,6 +54,9 @@ Defaults:
flush-interval: 10 # in seconds
compression: none
bulk-channel-size: 10
basic-auth-enable: false
basic-auth-login: ""
basic-auth-pwd: ""
```

> Could you explain the difference between `bulk-size` and `bulk-channel-size`?
Expand Down
3 changes: 3 additions & 0 deletions pkgconfig/loggers.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,9 @@ type ConfigLoggers struct {
BulkChannelSize int `yaml:"bulk-channel-size" default:"10"`
FlushInterval int `yaml:"flush-interval" default:"10"`
Compression string `yaml:"compression" default:"none"`
BasicAuthEnabled bool `yaml:"basic-auth-enable" default:"false"`
BasicAuthLogin string `yaml:"basic-auth-login" default:""`
BasicAuthPwd string `yaml:"basic-auth-pwd" default:""`
} `yaml:"elasticsearch"`
ScalyrClient struct {
Enable bool `yaml:"enable" default:"false"`
Expand Down
11 changes: 8 additions & 3 deletions workers/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,10 @@ func (w *ElasticSearchClient) sendBulk(bulk []byte) error {
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Content-Type", "application/x-ndjson")
if w.GetConfig().Loggers.ElasticSearchClient.BasicAuthEnabled {
req.SetBasicAuth(w.GetConfig().Loggers.ElasticSearchClient.BasicAuthLogin, w.GetConfig().Loggers.ElasticSearchClient.BasicAuthPwd)
}

// Send the request using the HTTP client
resp, err := w.httpClient.Do(req)
Expand All @@ -223,7 +226,6 @@ func (w *ElasticSearchClient) sendCompressedBulk(bulk []byte) error {
// Write the uncompressed data to the gzip writer
_, err := gzipWriter.Write(bulk)
if err != nil {
fmt.Println("gzip", err)
return err
}

Expand All @@ -238,8 +240,11 @@ func (w *ElasticSearchClient) sendCompressedBulk(bulk []byte) error {
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Content-Type", "application/x-ndjson")
req.Header.Set("Content-Encoding", "gzip") // Set Content-Encoding header to gzip
if w.GetConfig().Loggers.ElasticSearchClient.BasicAuthEnabled {
req.SetBasicAuth(w.GetConfig().Loggers.ElasticSearchClient.BasicAuthLogin, w.GetConfig().Loggers.ElasticSearchClient.BasicAuthPwd)
}

// Send the request using the HTTP client
resp, err := w.httpClient.Do(req)
Expand Down
32 changes: 32 additions & 0 deletions workers/elasticsearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io"
"net"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -187,3 +188,34 @@ func Test_ElasticSearchClient_FlushInterval_Exceeded(t *testing.T) {
assert.Equal(t, tc.inputSize, totalDm)
}
}

func TestElasticSearchClient_sendBulk_WithBasicAuth(t *testing.T) {
// Create a test HTTP server to simulate Elasticsearch
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Verify that the Authorization header is present
username, password, ok := r.BasicAuth()
assert.True(t, ok, "Basic Auth header is missing in the request")
assert.Equal(t, "testuser", username, "Incorrect username")
assert.Equal(t, "testpass", password, "Incorrect password")

if username != "testuser" || password != "testpass" {
w.WriteHeader(http.StatusUnauthorized)
return
}
w.WriteHeader(http.StatusOK)
}))
defer server.Close()

// Initialize the configuration using GetDefaultConfig() for consistency
config := pkgconfig.GetDefaultConfig()
config.Loggers.ElasticSearchClient.Server = server.URL
config.Loggers.ElasticSearchClient.BasicAuthEnabled = true
config.Loggers.ElasticSearchClient.BasicAuthLogin = "testuser"
config.Loggers.ElasticSearchClient.BasicAuthPwd = "testpass"

client := NewElasticSearchClient(config, logger.New(false), "test-client")

// Send a request with a test payload
err := client.sendBulk([]byte("test payload"))
assert.NoError(t, err, "Unexpected error when sending request with Basic Auth")
}