diff --git a/NOTICE.txt b/NOTICE.txt index 0b48738..483776b 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -1,5 +1,5 @@ Elastic Agent Shipper -Copyright 2022-2022 Elasticsearch BV +Copyright 2022-2023 Elasticsearch BV This product includes software developed by The Apache Software Foundation (http://www.apache.org/). @@ -852,6 +852,29 @@ Contents of probable licence file $GOMODCACHE/github.com/elastic/go-elasticsearc limitations under the License. +-------------------------------------------------------------------------------- +Dependency : github.com/elastic/go-lumber +Version: v0.1.1 +Licence type (autodetected): Apache-2.0 +-------------------------------------------------------------------------------- + +Contents of probable licence file $GOMODCACHE/github.com/elastic/go-lumber@v0.1.1/LICENSE: + +Copyright (c) 2012–2016 Elasticsearch + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + + -------------------------------------------------------------------------------- Dependency : github.com/elastic/go-ucfg Version: v0.8.6 @@ -21739,29 +21762,6 @@ Contents of probable licence file $GOMODCACHE/github.com/elastic/go-lookslike@v0 limitations under the License. --------------------------------------------------------------------------------- -Dependency : github.com/elastic/go-lumber -Version: v0.1.0 -Licence type (autodetected): Apache-2.0 --------------------------------------------------------------------------------- - -Contents of probable licence file $GOMODCACHE/github.com/elastic/go-lumber@v0.1.0/LICENSE: - -Copyright (c) 2012–2016 Elasticsearch - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. - - -------------------------------------------------------------------------------- Dependency : github.com/elastic/go-perf Version: v0.0.0-20191212140718-9c656876f595 diff --git a/controller/runner.go b/controller/runner.go index 9515604..5968340 100644 --- a/controller/runner.go +++ b/controller/runner.go @@ -18,6 +18,7 @@ import ( "github.com/elastic/elastic-agent-shipper/output" "github.com/elastic/elastic-agent-shipper/output/elasticsearch" "github.com/elastic/elastic-agent-shipper/output/kafka" + "github.com/elastic/elastic-agent-shipper/output/logstash" "github.com/elastic/elastic-agent-shipper/queue" "github.com/elastic/elastic-agent-shipper/server" @@ -216,5 +217,8 @@ func outputFromConfig(config output.Config, queue *queue.Queue) (Output, error) if config.Console != nil && config.Console.Enabled { return output.NewConsole(queue), nil } + if config.Logstash != nil && config.Logstash.Enabled { + return logstash.NewLogstash(config.Logstash, queue), nil + } return nil, errors.New("no active output configuration") } diff --git a/elastic-agent-shipper.yml b/elastic-agent-shipper.yml index 5cb3454..72bb989 100644 --- a/elastic-agent-shipper.yml +++ b/elastic-agent-shipper.yml @@ -140,3 +140,7 @@ output: enabled: false hosts: ["localhost:9092", "localhost:9093"] topic: '%{[metricset][name]}' + + logstash: + enabled: false + hosts: ["localhost:5044"] diff --git a/go.mod b/go.mod index f841409..cd4317d 100644 --- a/go.mod +++ b/go.mod @@ -16,6 +16,7 @@ require ( github.com/elastic/elastic-agent-client/v7 v7.0.1 github.com/elastic/elastic-agent-shipper-client v0.4.0 github.com/elastic/go-elasticsearch/v8 v8.2.0 + github.com/elastic/go-lumber v0.1.1 github.com/elastic/go-ucfg v0.8.6 github.com/gofrs/uuid v4.2.0+incompatible github.com/magefile/mage v1.14.0 diff --git a/go.sum b/go.sum index b521a22..11d3d3a 100644 --- a/go.sum +++ b/go.sum @@ -519,6 +519,8 @@ github.com/elastic/go-licenser v0.4.1 h1:1xDURsc8pL5zYT9R29425J3vkHdt4RT5TNEMeRN github.com/elastic/go-licenser v0.4.1/go.mod h1:V56wHMpmdURfibNBggaSBfqgPxyT1Tldns1i87iTEvU= github.com/elastic/go-lookslike v0.3.0/go.mod h1:AhH+rdJux5RlVjs+6ej4jkvYyoNRkj2crxmqeHlj3hA= github.com/elastic/go-lumber v0.1.0/go.mod h1:8YvjMIRYypWuPvpxx7WoijBYdbB7XIh/9FqSYQZTtxQ= +github.com/elastic/go-lumber v0.1.1 h1:aae5rSBnwBvdB0aShJ7AbOYPyvP1/wS/JIOC1A4D1DM= +github.com/elastic/go-lumber v0.1.1/go.mod h1:DMVoFv7YM71enE9X5vWJWWv7wvQNtzXh7bPeKukDccY= github.com/elastic/go-perf v0.0.0-20191212140718-9c656876f595/go.mod h1:s09U1b4P1ZxnKx2OsqY7KlHdCesqZWIhyq0Gs/QC/Us= github.com/elastic/go-seccomp-bpf v1.2.0/go.mod h1:l+89Vy5BzjVcaX8USZRMOwmwwDScE+vxCFzzvQwN7T8= github.com/elastic/go-structform v0.0.9/go.mod h1:CZWf9aIRYY5SuKSmOhtXScE5uQiLZNqAFnwKR4OrIM4= @@ -1056,6 +1058,7 @@ github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.4.0/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= github.com/klauspost/compress v1.9.5/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= +github.com/klauspost/compress v1.11.2/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= github.com/klauspost/compress v1.11.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= github.com/klauspost/compress v1.11.13/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= github.com/klauspost/compress v1.12.2/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= diff --git a/integration/console_test.go b/integration/console_test.go new file mode 100644 index 0000000..7e15aa6 --- /dev/null +++ b/integration/console_test.go @@ -0,0 +1,214 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. +//go:build integration + +package integration + +import ( + "testing" + + "github.com/elastic/elastic-agent-shipper-client/pkg/proto/messages" + "github.com/stretchr/testify/require" +) + +func TestServerStarts(t *testing.T) { + config := ` +server: + strict_mode: false + port: 50052 + tls: false +logging: + level: debug + selectors: ["*"] + to_stderr: true +output: + console: + enabled: true +` + env := NewTestingEnvironment(t, config) + t.Cleanup(func() { env.Stop() }) + + found := env.WaitUntil("stderr", "gRPC server is ready and is listening on") + + if !found { + env.Fatalf("Test executable failed to start.") + } +} + +func TestServerFailsToStart(t *testing.T) { + config := ` +server: + strict_mode: false + port: 50052 + tls: not_boolean +logging: + level: debug + selectors: ["*"] + to_stderr: true +output: + console: + enabled: true +` + + env := NewTestingEnvironment(t, config) + t.Cleanup(func() { env.Stop() }) + + found := env.WaitUntil("stderr", "error unpacking shipper config") + + if !found { + env.Fatalf("didn't error on bad config") + } +} + +func TestPublishMessage(t *testing.T) { + config := ` +server: + strict_mode: false + port: 50052 + tls: false +logging: + level: debug + selectors: ["*"] + to_stderr: true +output: + console: + enabled: true +` + + env := NewTestingEnvironment(t, config) + t.Cleanup(func() { env.Stop() }) + + found := env.WaitUntil("stderr", "gRPC server is ready and is listening on") + + if !found { + env.Fatalf("Test executable failed to start") + } + + found = env.Contains("stderr", "Initializing disk queue at path") + if found { + env.Fatalf("Memory queue configured but disk queue started") + } + + client := env.NewClient("localhost:50052") + unique := "UniqueStringToLookForInOutput" + events, err := createEvents([]string{unique}) + require.NoErrorf(t, err, "error creating events: %s\n", err) + + _, err = client.PublishEvents(env.ctx, &messages.PublishRequest{ + Events: events, + }) + require.NoErrorf(t, err, "Error publishing event: %s", err) + + found = env.WaitUntil("stdout", unique) + + if !found { + env.Fatalf("Event wasn't published") + } +} + +func TestPublishDiskQueue(t *testing.T) { + queue_path := t.TempDir() + + config := ` +server: + strict_mode: false + port: 50052 + tls: false +logging: + level: debug + selectors: ["*"] + to_stderr: true +output: + console: + enabled: true +queue: + disk: + path: ` + queue_path + ` + max_size: 10G +` + + env := NewTestingEnvironment(t, config) + t.Cleanup(func() { env.Stop() }) + + found := env.WaitUntil("stderr", "gRPC server is ready and is listening on") + + if !found { + env.Fatalf("Test executable failed to start") + } + + found = env.Contains("stderr", "Initializing disk queue at path") + if !found { + env.Fatalf("Disk queue configured but not started") + } + + client := env.NewClient("localhost:50052") + unique := "UniqueStringToLookForInOutput" + events, err := createEvents([]string{unique}) + require.NoErrorf(t, err, "error creating events: %s\n", err) + + _, err = client.PublishEvents(env.ctx, &messages.PublishRequest{ + Events: events, + }) + require.NoErrorf(t, err, "Error publishing event: %s", err) + + found = env.WaitUntil("stdout", unique) + + if !found { + env.Fatalf("Event wasn't published") + } +} + +func TestPublishCompressEncryptedDiskQueue(t *testing.T) { + queue_path := t.TempDir() + + config := ` +server: + strict_mode: false + port: 50052 + tls: false +logging: + level: debug + selectors: ["*"] + to_stderr: true +output: + console: + enabled: true +queue: + disk: + path: ` + queue_path + ` + max_size: 10G + use_compression: true + encryption_password: secret +` + + env := NewTestingEnvironment(t, config) + t.Cleanup(func() { env.Stop() }) + + found := env.WaitUntil("stderr", "gRPC server is ready and is listening on") + + if !found { + env.Fatalf("Test executable failed to start") + } + + found = env.Contains("stderr", "Initializing disk queue at path") + if !found { + env.Fatalf("Disk queue configured but not started") + } + + client := env.NewClient("localhost:50052") + unique := "UniqueStringToLookForInOutput" + events, err := createEvents([]string{unique}) + require.NoErrorf(t, err, "error creating events: %s\n", err) + + _, err = client.PublishEvents(env.ctx, &messages.PublishRequest{ + Events: events, + }) + require.NoErrorf(t, err, "error publishing event: %s", err) + + found = env.WaitUntil("stdout", unique) + + if !found { + env.Fatalf("Event wasn't published") + } +} diff --git a/integration/integration_test.go b/integration/integration_test.go index 3f70148..a8098a2 100644 --- a/integration/integration_test.go +++ b/integration/integration_test.go @@ -183,207 +183,6 @@ func (e *TestingEnvironment) Fatalf(format string, args ...interface{}) { e.t.FailNow() } -func TestServerStarts(t *testing.T) { - config := ` -server: - strict_mode: false - port: 50052 - tls: false -logging: - level: debug - selectors: ["*"] - to_stderr: true -output: - console: - enabled: true -` - env := NewTestingEnvironment(t, config) - t.Cleanup(func() { env.Stop() }) - - found := env.WaitUntil("stderr", "gRPC server is ready and is listening on") - - if !found { - env.Fatalf("Test executable failed to start.") - } -} - -func TestServerFailsToStart(t *testing.T) { - config := ` -server: - strict_mode: false - port: 50052 - tls: not_boolean -logging: - level: debug - selectors: ["*"] - to_stderr: true -output: - console: - enabled: true -` - - env := NewTestingEnvironment(t, config) - t.Cleanup(func() { env.Stop() }) - - found := env.WaitUntil("stderr", "error unpacking shipper config") - - if !found { - env.Fatalf("didn't error on bad config") - } -} - -func TestPublishMessage(t *testing.T) { - config := ` -server: - strict_mode: false - port: 50052 - tls: false -logging: - level: debug - selectors: ["*"] - to_stderr: true -output: - console: - enabled: true -` - - env := NewTestingEnvironment(t, config) - t.Cleanup(func() { env.Stop() }) - - found := env.WaitUntil("stderr", "gRPC server is ready and is listening on") - - if !found { - env.Fatalf("Test executable failed to start") - } - - found = env.Contains("stderr", "Initializing disk queue at path") - if found { - env.Fatalf("Memory queue configured but disk queue started") - } - - client := env.NewClient("localhost:50052") - unique := "UniqueStringToLookForInOutput" - events, err := createEvents([]string{unique}) - require.NoErrorf(t, err, "error creating events: %s\n", err) - - _, err = client.PublishEvents(env.ctx, &messages.PublishRequest{ - Events: events, - }) - require.NoErrorf(t, err, "Error publishing event: %s", err) - - found = env.WaitUntil("stdout", unique) - - if !found { - env.Fatalf("Event wasn't published") - } -} - -func TestPublishDiskQueue(t *testing.T) { - queue_path := t.TempDir() - - config := ` -server: - strict_mode: false - port: 50052 - tls: false -logging: - level: debug - selectors: ["*"] - to_stderr: true -output: - console: - enabled: true -queue: - disk: - path: ` + queue_path + ` - max_size: 10G -` - - env := NewTestingEnvironment(t, config) - t.Cleanup(func() { env.Stop() }) - - found := env.WaitUntil("stderr", "gRPC server is ready and is listening on") - - if !found { - env.Fatalf("Test executable failed to start") - } - - found = env.Contains("stderr", "Initializing disk queue at path") - if !found { - env.Fatalf("Disk queue configured but not started") - } - - client := env.NewClient("localhost:50052") - unique := "UniqueStringToLookForInOutput" - events, err := createEvents([]string{unique}) - require.NoErrorf(t, err, "error creating events: %s\n", err) - - _, err = client.PublishEvents(env.ctx, &messages.PublishRequest{ - Events: events, - }) - require.NoErrorf(t, err, "Error publishing event: %s", err) - - found = env.WaitUntil("stdout", unique) - - if !found { - env.Fatalf("Event wasn't published") - } -} - -func TestPublishCompressEncryptedDiskQueue(t *testing.T) { - queue_path := t.TempDir() - - config := ` -server: - strict_mode: false - port: 50052 - tls: false -logging: - level: debug - selectors: ["*"] - to_stderr: true -output: - console: - enabled: true -queue: - disk: - path: ` + queue_path + ` - max_size: 10G - use_compression: true - encryption_password: secret -` - - env := NewTestingEnvironment(t, config) - t.Cleanup(func() { env.Stop() }) - - found := env.WaitUntil("stderr", "gRPC server is ready and is listening on") - - if !found { - env.Fatalf("Test executable failed to start") - } - - found = env.Contains("stderr", "Initializing disk queue at path") - if !found { - env.Fatalf("Disk queue configured but not started") - } - - client := env.NewClient("localhost:50052") - unique := "UniqueStringToLookForInOutput" - events, err := createEvents([]string{unique}) - require.NoErrorf(t, err, "error creating events: %s\n", err) - - _, err = client.PublishEvents(env.ctx, &messages.PublishRequest{ - Events: events, - }) - require.NoErrorf(t, err, "error publishing event: %s", err) - - found = env.WaitUntil("stdout", unique) - - if !found { - env.Fatalf("Event wasn't published") - } -} - func createEvents(values []string) ([]*messages.Event, error) { events := make([]*messages.Event, len(values)) diff --git a/integration/logstash_test.go b/integration/logstash_test.go new file mode 100644 index 0000000..d9eb3ce --- /dev/null +++ b/integration/logstash_test.go @@ -0,0 +1,282 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. +//go:build integration + +package integration + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "testing" + "time" + + "github.com/elastic/elastic-agent-shipper-client/pkg/proto/messages" + "github.com/stretchr/testify/require" +) + +type logstashEventStats struct { + Events struct { + In int + Out int + } +} + +func TestLogstashOutputServerStarts(t *testing.T) { + config := ` +server: + strict_mode: false + port: 50052 + tls: false +logging: + level: debug + selectors: ["*"] + to_stderr: true +output: + logstash: + enabled: true + timeout: 1s + hosts: ['127.0.0.1:5044'] +` + env := NewTestingEnvironment(t, config) + t.Cleanup(func() { env.Stop() }) + + found := env.WaitUntil("stderr", "gRPC server is ready and is listening on") + + if !found { + env.Fatalf("Test executable failed to start.") + } +} + +func TestLogstashOutputPublishMessage(t *testing.T) { + config := ` +server: + strict_mode: false + port: 50052 + tls: false +logging: + level: debug + selectors: ["*"] + to_stderr: true +output: + logstash: + enabled: true + timeout: 1s + hosts: ['127.0.0.1:5044'] +` + + env := NewTestingEnvironment(t, config) + t.Cleanup(func() { env.Stop() }) + + found := env.WaitUntil("stderr", "gRPC server is ready and is listening on") + + if !found { + env.Fatalf("Test executable failed to start") + } + + found = env.Contains("stderr", "Initializing disk queue at path") + if found { + env.Fatalf("Memory queue configured but disk queue started") + } + + client := env.NewClient("localhost:50052") + unique := "UniqueStringToLookForInOutput" + events, err := createEvents([]string{unique}) + require.NoErrorf(t, err, "error creating events: %s\n", err) + + preStats, err := getEventStats() + require.NoErrorf(t, err, "Error getting Logstash stats before publish: %s", err) + + _, err = client.PublishEvents(env.ctx, &messages.PublishRequest{ + Events: events, + }) + require.NoErrorf(t, err, "Error publishing event: %s", err) + + found = env.WaitUntil("stderr", "finished publishing a batch") + if !found { + env.Fatalf("Event wasn't published") + } + found = env.WaitUntil("stderr", "ackloop: return") + if !found { + env.Fatalf("Event wasn't acked") + } + postStats, err := getEventStats() + require.NoErrorf(t, err, "Error getting Logstash stats after publish: %s", err) + require.Equalf(t, postStats.Events.In, preStats.Events.In+1, "post.in was %d, pre.in was %d", postStats.Events.In, preStats.Events.In) + require.Equalf(t, postStats.Events.Out, preStats.Events.Out+1, "post.out was %d, pre.out was %d", postStats.Events.Out, preStats.Events.Out) +} + +func TestLogstashOutputFailOneOutput(t *testing.T) { + config := ` +server: + strict_mode: false + port: 50052 + tls: false +logging: + level: debug + selectors: ["*"] + to_stderr: true +output: + logstash: + enabled: true + timeout: 1s + hosts: ['127.0.0.2:5044', '127.0.0.1:5044'] +` + + env := NewTestingEnvironment(t, config) + t.Cleanup(func() { env.Stop() }) + + found := env.WaitUntil("stderr", "gRPC server is ready and is listening on") + + if !found { + env.Fatalf("Test executable failed to start") + } + + found = env.Contains("stderr", "Initializing disk queue at path") + if found { + env.Fatalf("Memory queue configured but disk queue started") + } + + client := env.NewClient("localhost:50052") + unique := "UniqueStringToLookForInOutput" + events, err := createEvents([]string{unique}) + require.NoErrorf(t, err, "error creating events: %s\n", err) + + preStats, err := getEventStats() + require.NoErrorf(t, err, "Error getting Logstash stats before publish: %s", err) + + _, err = client.PublishEvents(env.ctx, &messages.PublishRequest{ + Events: events, + }) + require.NoErrorf(t, err, "Error publishing event: %s", err) + + found = env.WaitUntil("stderr", "finished publishing a batch") + if !found { + env.Fatalf("Event wasn't published") + } + + found = env.WaitUntil("stderr", "error connecting to 127.0.0.2:5044") + if !found { + env.Fatalf("first host didn't fail") + } + + found = env.WaitUntil("stderr", "ackloop: return") + if !found { + env.Fatalf("Event wasn't acked") + } + postStats, err := getEventStats() + require.NoErrorf(t, err, "Error getting Logstash stats after publish: %s", err) + require.Equalf(t, postStats.Events.In, preStats.Events.In+1, "post.in was %d, pre.in was %d", postStats.Events.In, preStats.Events.In) + require.Equalf(t, postStats.Events.Out, preStats.Events.Out+1, "post.out was %d, pre.out was %d", postStats.Events.Out, preStats.Events.Out) +} + +func TestLogstashOutputReconnect(t *testing.T) { + config := ` +server: + strict_mode: false + port: 50052 + tls: false +logging: + level: debug + selectors: ["*"] + to_stderr: true +output: + logstash: + enabled: true + timeout: 1s + ttl: 1s + hosts: ['127.0.0.1:5044'] +` + + env := NewTestingEnvironment(t, config) + t.Cleanup(func() { env.Stop() }) + + found := env.WaitUntil("stderr", "gRPC server is ready and is listening on") + + if !found { + env.Fatalf("Test executable failed to start") + } + + found = env.Contains("stderr", "Initializing disk queue at path") + if found { + env.Fatalf("Memory queue configured but disk queue started") + } + + client := env.NewClient("localhost:50052") + unique := "UniqueStringToLookForInOutput" + events, err := createEvents([]string{unique}) + require.NoErrorf(t, err, "error creating events: %s\n", err) + + preStats, err := getEventStats() + require.NoErrorf(t, err, "Error getting Logstash stats before publish: %s", err) + + _, err = client.PublishEvents(env.ctx, &messages.PublishRequest{ + Events: events, + }) + require.NoErrorf(t, err, "Error publishing event: %s", err) + + found = env.WaitUntil("stderr", "finished publishing a batch") + if !found { + env.Fatalf("Event wasn't published") + } + + found = env.WaitUntil("stderr", "ackloop: return") + if !found { + env.Fatalf("Event wasn't acked") + } + postStats, err := getEventStats() + require.NoErrorf(t, err, "Error getting Logstash stats after publish: %s", err) + require.Equalf(t, postStats.Events.In, preStats.Events.In+1, "post.in was %d, pre.in was %d", postStats.Events.In, preStats.Events.In) + require.Equalf(t, postStats.Events.Out, preStats.Events.Out+1, "post.out was %d, pre.out was %d", postStats.Events.Out, preStats.Events.Out) + + time.Sleep(1 * time.Second) + + unique = "UniqueStringToLookForInOutput2" + events, err = createEvents([]string{unique}) + require.NoErrorf(t, err, "error creating events: %s\n", err) + preStats, err = getEventStats() + require.NoErrorf(t, err, "Error getting Logstash stats before publish: %s", err) + + _, err = client.PublishEvents(env.ctx, &messages.PublishRequest{ + Events: events, + }) + require.NoErrorf(t, err, "Error publishing event: %s", err) + + found = env.WaitUntil("stderr", "finished publishing a batch") + if !found { + env.Fatalf("Event wasn't published") + } + + found = env.WaitUntil("stderr", "ttl expired, reconnecting to logstash host") + if !found { + env.Fatalf("TTL expire didn't happen") + } + + found = env.WaitUntil("stderr", "ackloop: return") + if !found { + env.Fatalf("Event wasn't acked") + } + postStats, err = getEventStats() + require.NoErrorf(t, err, "Error getting Logstash stats after publish: %s", err) + require.Equalf(t, postStats.Events.In, preStats.Events.In+1, "post.in was %d, pre.in was %d", postStats.Events.In, preStats.Events.In) + require.Equalf(t, postStats.Events.Out, preStats.Events.Out+1, "post.out was %d, pre.out was %d", postStats.Events.Out, preStats.Events.Out) +} + +func getEventStats() (logstashEventStats, error) { + events := logstashEventStats{} + res, err := http.Get("http://localhost:9600/_node/stats/events") + if err != nil { + return events, fmt.Errorf("error getting logstash event stats: %w", err) + } + resBody, err := ioutil.ReadAll(res.Body) + if err != nil { + return events, fmt.Errorf("could not read logstash event stats response: %w", err) + } + err = json.Unmarshal(resBody, &events) + if err != nil { + return events, fmt.Errorf("cound not unmarshal logstash event stats response: %w", err) + } + return events, nil +} diff --git a/output/logstash/client.go b/output/logstash/client.go new file mode 100644 index 0000000..afc13b1 --- /dev/null +++ b/output/logstash/client.go @@ -0,0 +1,79 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package logstash + +import ( + "fmt" + + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-shipper-client/pkg/proto/messages" +) + +const ( + defaultPort = 5044 +) + +type LogstashClient struct { + config *Config + logger *logp.Logger + hosts []*syncClient + current int +} + +func NewLogstashClient(config *Config, logger *logp.Logger) (*LogstashClient, error) { + out := &LogstashClient{ + config: config, + logger: logger, + } + for _, host := range config.Hosts { + syncClient, err := newSyncClient(host, config) + if err != nil { + logger.Errorf("error creating client for logstash host %s, skipping: %w", host, err) + continue + } + out.hosts = append(out.hosts, syncClient) + } + if len(out.hosts) == 0 { + return nil, fmt.Errorf("no logstash hosts configured") + } + return out, nil +} + +func (lc *LogstashClient) Publish(events []*messages.Event) (uint64, error) { + lsEvents := make([]interface{}, len(events)) + for i := range events { + lsEvents[i] = events[i] + } + for i, j := 0, lc.current; i < len(lc.hosts); i, j = i+1, (j+1)%len(lc.hosts) { + lc.current = j + sc := lc.hosts[j] + if sc.ticker != nil { + select { + case <-sc.ticker.C: + lc.logger.Debugf("ttl expired, reconnecting to logstash host: %s", sc.conn.Host()) + if err := sc.Reconnect(); err != nil { + lc.logger.Errorf("error reconnecting to logstash host: %w", err) + continue + } + default: + } + } + if !sc.conn.IsConnected() { + lc.logger.Debugf("%s wasn't connected trying to connect", sc.conn.Host()) + if err := sc.conn.Connect(); err != nil { + lc.logger.Errorf("error connecting to %s: %w", sc.conn.Host(), err) + continue + } + } + lc.logger.Debugf("sending %d events to %s", len(lsEvents), sc.conn.Host()) + n, err := sc.client.Send(lsEvents) + if err != nil { + lc.logger.Errorf("error sending to %s: %s", sc.conn.Host(), err) + continue + } + return uint64(n), err + } + return 0, fmt.Errorf("couldn't publish to any host") +} diff --git a/output/logstash/client_test.go b/output/logstash/client_test.go new file mode 100644 index 0000000..a5de1a1 --- /dev/null +++ b/output/logstash/client_test.go @@ -0,0 +1,48 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package logstash + +import ( + "testing" + + "github.com/elastic/elastic-agent-libs/logp" + + "github.com/stretchr/testify/assert" +) + +func TestNewLogstashClient(t *testing.T) { + tests := map[string]struct { + config *Config + errString string + }{ + "one host": { + config: &Config{ + Hosts: []string{"127.0.0.1:5044"}, + }, + errString: "", + }, + "two host": { + config: &Config{ + Hosts: []string{"127.0.0.1:5044", "127.0.0.2:5044"}, + }, + errString: "", + }, + "no hosts": { + config: &Config{}, + errString: "no logstash hosts configured", + }, + } + logger := logp.NewLogger("logstash-output") + for name, tc := range tests { + client, err := NewLogstashClient(tc.config, logger) + if tc.errString == "" { + assert.NoErrorf(t, err, "%s: no error expected got: %s", name, err) + assert.Equalf(t, len(client.hosts), len(tc.config.Hosts), "%s: expected %d hosts, got %d hosts", name, len(tc.config.Hosts), len(client.hosts)) + } else { + assert.ErrorContainsf(t, err, tc.errString, "%s: expected error: %s, got error: %s", name, tc.errString, err) + } + + } +} diff --git a/output/logstash/config.go b/output/logstash/config.go new file mode 100644 index 0000000..58354aa --- /dev/null +++ b/output/logstash/config.go @@ -0,0 +1,49 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package logstash + +import ( + "time" + + "github.com/elastic/elastic-agent-libs/config" + + "github.com/elastic/elastic-agent-libs/transport" + "github.com/elastic/elastic-agent-libs/transport/tlscommon" +) + +type Config struct { + Enabled bool `config:"enabled"` + Hosts []string `config:"hosts"` + TLS *tlscommon.Config `config:"ssl"` + // LoadBalance bool `config:"loadbalance"` TODO + // BulkMaxSize int `config:"bulk_max_size"` TODO + // SlowStart bool `config:"slow_start"` TODO + Timeout time.Duration `config:"timeout"` + TTL time.Duration `config:"ttl" validate:"min=0"` + // Pipelining int `config:"pipelining" validate:"min=0"` TODO support async client + CompressionLevel int `config:"compression_level" validate:"min=0, max=9"` + MaxRetries int `config:"max_retries" validate:"min=-1"` + Proxy transport.ProxyConfig `config:",inline"` + // Backoff Backoff `config:"backoff"` TODO +} + +func DefaultConfig() Config { + return Config{ + CompressionLevel: 3, + Timeout: 30 * time.Second, + MaxRetries: 3, + TTL: 0 * time.Second, + } +} + +func readConfig(cfg *config.C) (*Config, error) { + c := DefaultConfig() + + if err := cfg.Unpack(&c); err != nil { + return nil, err + } + + return &c, nil +} diff --git a/output/logstash/config_test.go b/output/logstash/config_test.go new file mode 100644 index 0000000..78825c5 --- /dev/null +++ b/output/logstash/config_test.go @@ -0,0 +1,43 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package logstash + +import ( + "testing" + "time" + + "github.com/elastic/elastic-agent-libs/config" + + "github.com/stretchr/testify/assert" +) + +func TestConfig(t *testing.T) { + for name, test := range map[string]struct { + config *config.C + expectedConfig *Config + err bool + }{ + "default config": { + config: config.MustNewConfigFrom([]byte(`{ }`)), + expectedConfig: &Config{ + CompressionLevel: 3, + Timeout: 30 * time.Second, + MaxRetries: 3, + TTL: 0 * time.Second, + }, + }, + } { + t.Run(name, func(t *testing.T) { + cfg, err := readConfig(test.config) + if test.err { + assert.Error(t, err) + assert.Nil(t, cfg) + } else { + assert.NoError(t, err) + assert.Equal(t, test.expectedConfig, cfg) + } + }) + } +} diff --git a/output/logstash/encode.go b/output/logstash/encode.go new file mode 100644 index 0000000..6437b5a --- /dev/null +++ b/output/logstash/encode.go @@ -0,0 +1,16 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package logstash + +import ( + "encoding/json" +) + +// makeLogstashEventEncoder TODO, evaluate if we need codec support +func makeLogstashEventEncoder() func(interface{}) ([]byte, error) { + return func(event interface{}) (d []byte, err error) { + return json.Marshal(event) + } +} diff --git a/output/logstash/output.go b/output/logstash/output.go new file mode 100644 index 0000000..e422c45 --- /dev/null +++ b/output/logstash/output.go @@ -0,0 +1,78 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package logstash + +import ( + "fmt" + "sync" + + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-shipper/queue" +) + +type LogstashOutput struct { + logger *logp.Logger + config *Config + queue *queue.Queue + wg sync.WaitGroup + client *LogstashClient +} + +func NewLogstash(config *Config, queue *queue.Queue) *LogstashOutput { + out := &LogstashOutput{ + logger: logp.NewLogger("logstash-output"), + config: config, + queue: queue, + } + return out +} + +func (ls *LogstashOutput) Start() error { + client, err := NewLogstashClient(ls.config, ls.logger) + if err != nil { + return fmt.Errorf("error creating logstash client: %w", err) + } + ls.client = client + ls.wg.Add(1) + go func() { + defer ls.wg.Done() + for { + batch, err := ls.queue.Get(1000) + // Once an output receives a batch, it is responsible for + // it until all events have been either successfully sent or + // discarded after failure. + if err != nil { + // queue.Get can only fail if the queue was closed, + // time for the output to shut down. + break + } + + // Add this batch to the shutdown wait group and release it + // in the batch's completion callback + ls.wg.Add(1) + batch.CompletionCallback = ls.wg.Done + + events := batch.Events() + remaining := uint64(len(events)) + completed, err := ls.client.Publish(events) + if err != nil { + ls.logger.Errorf("failed to send %d out of %d events: %w", remaining-completed, remaining, err) + batch.Done(remaining) + continue + } + if completed != remaining { + ls.logger.Errorf("Didn't send all events but no error") + batch.Done(remaining) + continue + } + batch.Done(remaining) + } + }() + return nil +} + +func (ls *LogstashOutput) Wait() { + ls.wg.Wait() +} diff --git a/output/logstash/sync.go b/output/logstash/sync.go new file mode 100644 index 0000000..249d88e --- /dev/null +++ b/output/logstash/sync.go @@ -0,0 +1,62 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package logstash + +import ( + "fmt" + "time" + + "github.com/elastic/elastic-agent-libs/transport" + "github.com/elastic/elastic-agent-libs/transport/tlscommon" + v2 "github.com/elastic/go-lumber/client/v2" +) + +type syncClient struct { + conn *transport.Client + client *v2.SyncClient + ttl time.Duration + ticker *time.Ticker +} + +func newSyncClient(host string, config *Config) (*syncClient, error) { + c := &syncClient{ + ttl: config.TTL, + } + tls, err := tlscommon.LoadTLSConfig(config.TLS) + if err != nil { + return nil, fmt.Errorf("error loading tls configuration: %w", err) + } + tp := transport.Config{ + Timeout: config.Timeout, + Proxy: &config.Proxy, + TLS: tls, + } + if c.ttl > 0 { + c.ticker = time.NewTicker(c.ttl) + } + enc := makeLogstashEventEncoder() + conn, err := transport.NewClient(tp, "tcp", host, defaultPort) + if err != nil { + return nil, fmt.Errorf("error creating connection to logstash host: %w", err) + } + c.conn = conn + client, err := v2.NewSyncClientWithConn(conn, + v2.JSONEncoder(enc), + v2.Timeout(config.Timeout), + v2.CompressionLevel(config.CompressionLevel), + ) + if err != nil { + return nil, fmt.Errorf("error creating logstash client, skipping: %w", err) + } + c.client = client + return c, nil +} + +func (c *syncClient) Reconnect() error { + if err := c.conn.Close(); err != nil { + return fmt.Errorf("error closing connection to logstash host %s: %w", c.conn.Host(), err) + } + return c.conn.Connect() +} diff --git a/output/output.go b/output/output.go index 0c0764c..30b20d2 100644 --- a/output/output.go +++ b/output/output.go @@ -7,6 +7,7 @@ package output import ( "github.com/elastic/elastic-agent-shipper/output/elasticsearch" "github.com/elastic/elastic-agent-shipper/output/kafka" + "github.com/elastic/elastic-agent-shipper/output/logstash" ) type ConsoleConfig struct { @@ -17,11 +18,15 @@ type Config struct { Console *ConsoleConfig `config:"console"` Elasticsearch *elasticsearch.Config `config:"elasticsearch"` Kafka *kafka.Config `config:"kafka"` + Logstash *logstash.Config `config:"logstash"` } func DefaultConfig() Config { defaultKafka := kafka.DefaultConfig() + defaultLogstash := logstash.DefaultConfig() + return Config{ - Kafka: &defaultKafka, + Kafka: &defaultKafka, + Logstash: &defaultLogstash, } } diff --git a/testing/environments/environments.go b/testing/environments/environments.go index 556c94b..b2c21a1 100644 --- a/testing/environments/environments.go +++ b/testing/environments/environments.go @@ -36,6 +36,10 @@ var ( name: "Kafka", path: "testing/environments/kafka/docker-compose.yml", } + logstashImage = testImage{ + name: "Logstash", + path: "testing/environments/logstash/docker-compose.yml", + } ) // DefaultElasticsearch returns a configuration for an Elasticsearch server @@ -57,6 +61,19 @@ func DefaultKafka() TestImageConfig { return testImageConfig{image: kafkaImage} } +// DefaultLogstash returns a configuration for a Logstash server with default +// settings listening on port 5044. +func DefaultLogstash() TestImageConfig { + basePath := "docker.elastic.co/logstash/logstash" + version := tools.DefaultBeatVersion + "-SNAPSHOT" + return testImageConfig{ + image: logstashImage, + environment: map[string]string{ + "LOGSTASH_IMAGE_REF": fmt.Sprintf("%v:%v", basePath, version), + }, + } +} + // Up calls docker-compose up on the given container configurations and returns the // resulting standard output and its result. func Up(configs []TestImageConfig) ([]byte, error) { diff --git a/testing/environments/logstash/docker-compose.yml b/testing/environments/logstash/docker-compose.yml new file mode 100644 index 0000000..fb42364 --- /dev/null +++ b/testing/environments/logstash/docker-compose.yml @@ -0,0 +1,13 @@ +version: '2.3' +services: + logstash: + image: "${LOGSTASH_IMAGE_REF}" + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:9600/_node/stats"] + retries: 600 + interval: 1s + volumes: + - ./pipeline:/usr/share/logstash/pipeline:ro + ports: + - 5044:5044 + - 9600:9600 diff --git a/testing/environments/logstash/pipeline/default.conf b/testing/environments/logstash/pipeline/default.conf new file mode 100644 index 0000000..6b7f11c --- /dev/null +++ b/testing/environments/logstash/pipeline/default.conf @@ -0,0 +1,10 @@ +input { + beats { + port => 5044 + ssl => false + } +} + +output { + stdout { codec => rubydebug { metadata => true } } +}