Skip to content
This repository has been archived by the owner on Sep 21, 2023. It is now read-only.

POC for logstash output #210

Merged
merged 3 commits into from
Jan 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 24 additions & 24 deletions NOTICE.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
Elastic Agent Shipper
Copyright 2022-2022 Elasticsearch BV
Copyright 2022-2023 Elasticsearch BV

This product includes software developed by The Apache Software
Foundation (http://www.apache.org/).
Expand Down Expand Up @@ -852,6 +852,29 @@ Contents of probable licence file $GOMODCACHE/github.com/elastic/go-elasticsearc
limitations under the License.


--------------------------------------------------------------------------------
Dependency : github.com/elastic/go-lumber
Version: v0.1.1
Licence type (autodetected): Apache-2.0
--------------------------------------------------------------------------------

Contents of probable licence file $GOMODCACHE/github.com/elastic/[email protected]/LICENSE:

Copyright (c) 2012–2016 Elasticsearch <http://www.elastic.co>

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.


--------------------------------------------------------------------------------
Dependency : github.com/elastic/go-ucfg
Version: v0.8.6
Expand Down Expand Up @@ -21739,29 +21762,6 @@ Contents of probable licence file $GOMODCACHE/github.com/elastic/go-lookslike@v0
limitations under the License.


--------------------------------------------------------------------------------
Dependency : github.com/elastic/go-lumber
Version: v0.1.0
Licence type (autodetected): Apache-2.0
--------------------------------------------------------------------------------

Contents of probable licence file $GOMODCACHE/github.com/elastic/[email protected]/LICENSE:

Copyright (c) 2012–2016 Elasticsearch <http://www.elastic.co>

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.


--------------------------------------------------------------------------------
Dependency : github.com/elastic/go-perf
Version: v0.0.0-20191212140718-9c656876f595
Expand Down
4 changes: 4 additions & 0 deletions controller/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/elastic/elastic-agent-shipper/output"
"github.com/elastic/elastic-agent-shipper/output/elasticsearch"
"github.com/elastic/elastic-agent-shipper/output/kafka"
"github.com/elastic/elastic-agent-shipper/output/logstash"
"github.com/elastic/elastic-agent-shipper/queue"
"github.com/elastic/elastic-agent-shipper/server"

Expand Down Expand Up @@ -216,5 +217,8 @@ func outputFromConfig(config output.Config, queue *queue.Queue) (Output, error)
if config.Console != nil && config.Console.Enabled {
return output.NewConsole(queue), nil
}
if config.Logstash != nil && config.Logstash.Enabled {
return logstash.NewLogstash(config.Logstash, queue), nil
}
return nil, errors.New("no active output configuration")
}
4 changes: 4 additions & 0 deletions elastic-agent-shipper.yml
Original file line number Diff line number Diff line change
Expand Up @@ -140,3 +140,7 @@ output:
enabled: false
hosts: ["localhost:9092", "localhost:9093"]
topic: '%{[metricset][name]}'

logstash:
enabled: false
hosts: ["localhost:5044"]
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ require (
github.com/elastic/elastic-agent-client/v7 v7.0.1
github.com/elastic/elastic-agent-shipper-client v0.4.0
github.com/elastic/go-elasticsearch/v8 v8.2.0
github.com/elastic/go-lumber v0.1.1
github.com/elastic/go-ucfg v0.8.6
github.com/gofrs/uuid v4.2.0+incompatible
github.com/magefile/mage v1.14.0
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,8 @@ github.com/elastic/go-licenser v0.4.1 h1:1xDURsc8pL5zYT9R29425J3vkHdt4RT5TNEMeRN
github.com/elastic/go-licenser v0.4.1/go.mod h1:V56wHMpmdURfibNBggaSBfqgPxyT1Tldns1i87iTEvU=
github.com/elastic/go-lookslike v0.3.0/go.mod h1:AhH+rdJux5RlVjs+6ej4jkvYyoNRkj2crxmqeHlj3hA=
github.com/elastic/go-lumber v0.1.0/go.mod h1:8YvjMIRYypWuPvpxx7WoijBYdbB7XIh/9FqSYQZTtxQ=
github.com/elastic/go-lumber v0.1.1 h1:aae5rSBnwBvdB0aShJ7AbOYPyvP1/wS/JIOC1A4D1DM=
github.com/elastic/go-lumber v0.1.1/go.mod h1:DMVoFv7YM71enE9X5vWJWWv7wvQNtzXh7bPeKukDccY=
github.com/elastic/go-perf v0.0.0-20191212140718-9c656876f595/go.mod h1:s09U1b4P1ZxnKx2OsqY7KlHdCesqZWIhyq0Gs/QC/Us=
github.com/elastic/go-seccomp-bpf v1.2.0/go.mod h1:l+89Vy5BzjVcaX8USZRMOwmwwDScE+vxCFzzvQwN7T8=
github.com/elastic/go-structform v0.0.9/go.mod h1:CZWf9aIRYY5SuKSmOhtXScE5uQiLZNqAFnwKR4OrIM4=
Expand Down Expand Up @@ -1056,6 +1058,7 @@ github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.4.0/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/klauspost/compress v1.9.5/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/klauspost/compress v1.11.2/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/klauspost/compress v1.11.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/klauspost/compress v1.11.13/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/klauspost/compress v1.12.2/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
Expand Down
214 changes: 214 additions & 0 deletions integration/console_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
//go:build integration

package integration

import (
"testing"

"github.com/elastic/elastic-agent-shipper-client/pkg/proto/messages"
"github.com/stretchr/testify/require"
)

func TestServerStarts(t *testing.T) {
config := `
server:
strict_mode: false
port: 50052
tls: false
logging:
level: debug
selectors: ["*"]
to_stderr: true
output:
console:
enabled: true
`
env := NewTestingEnvironment(t, config)
t.Cleanup(func() { env.Stop() })

found := env.WaitUntil("stderr", "gRPC server is ready and is listening on")

if !found {
env.Fatalf("Test executable failed to start.")
}
}

func TestServerFailsToStart(t *testing.T) {
config := `
server:
strict_mode: false
port: 50052
tls: not_boolean
logging:
level: debug
selectors: ["*"]
to_stderr: true
output:
console:
enabled: true
`

env := NewTestingEnvironment(t, config)
t.Cleanup(func() { env.Stop() })

found := env.WaitUntil("stderr", "error unpacking shipper config")

if !found {
env.Fatalf("didn't error on bad config")
}
}

func TestPublishMessage(t *testing.T) {
config := `
server:
strict_mode: false
port: 50052
tls: false
logging:
level: debug
selectors: ["*"]
to_stderr: true
output:
console:
enabled: true
`

env := NewTestingEnvironment(t, config)
t.Cleanup(func() { env.Stop() })

found := env.WaitUntil("stderr", "gRPC server is ready and is listening on")

if !found {
env.Fatalf("Test executable failed to start")
}

found = env.Contains("stderr", "Initializing disk queue at path")
if found {
env.Fatalf("Memory queue configured but disk queue started")
}

client := env.NewClient("localhost:50052")
unique := "UniqueStringToLookForInOutput"
events, err := createEvents([]string{unique})
require.NoErrorf(t, err, "error creating events: %s\n", err)

_, err = client.PublishEvents(env.ctx, &messages.PublishRequest{
Events: events,
})
require.NoErrorf(t, err, "Error publishing event: %s", err)

found = env.WaitUntil("stdout", unique)

if !found {
env.Fatalf("Event wasn't published")
}
}

func TestPublishDiskQueue(t *testing.T) {
queue_path := t.TempDir()

config := `
server:
strict_mode: false
port: 50052
tls: false
logging:
level: debug
selectors: ["*"]
to_stderr: true
output:
console:
enabled: true
queue:
disk:
path: ` + queue_path + `
max_size: 10G
`

env := NewTestingEnvironment(t, config)
t.Cleanup(func() { env.Stop() })

found := env.WaitUntil("stderr", "gRPC server is ready and is listening on")

if !found {
env.Fatalf("Test executable failed to start")
}

found = env.Contains("stderr", "Initializing disk queue at path")
if !found {
env.Fatalf("Disk queue configured but not started")
}

client := env.NewClient("localhost:50052")
unique := "UniqueStringToLookForInOutput"
events, err := createEvents([]string{unique})
require.NoErrorf(t, err, "error creating events: %s\n", err)

_, err = client.PublishEvents(env.ctx, &messages.PublishRequest{
Events: events,
})
require.NoErrorf(t, err, "Error publishing event: %s", err)

found = env.WaitUntil("stdout", unique)

if !found {
env.Fatalf("Event wasn't published")
}
}

func TestPublishCompressEncryptedDiskQueue(t *testing.T) {
queue_path := t.TempDir()

config := `
server:
strict_mode: false
port: 50052
tls: false
logging:
level: debug
selectors: ["*"]
to_stderr: true
output:
console:
enabled: true
queue:
disk:
path: ` + queue_path + `
max_size: 10G
use_compression: true
encryption_password: secret
`

env := NewTestingEnvironment(t, config)
t.Cleanup(func() { env.Stop() })

found := env.WaitUntil("stderr", "gRPC server is ready and is listening on")

if !found {
env.Fatalf("Test executable failed to start")
}

found = env.Contains("stderr", "Initializing disk queue at path")
if !found {
env.Fatalf("Disk queue configured but not started")
}

client := env.NewClient("localhost:50052")
unique := "UniqueStringToLookForInOutput"
events, err := createEvents([]string{unique})
require.NoErrorf(t, err, "error creating events: %s\n", err)

_, err = client.PublishEvents(env.ctx, &messages.PublishRequest{
Events: events,
})
require.NoErrorf(t, err, "error publishing event: %s", err)

found = env.WaitUntil("stdout", unique)

if !found {
env.Fatalf("Event wasn't published")
}
}
Loading