From e16de717459e5a62aa376427dd25d43441b5c582 Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Tue, 27 Jun 2023 11:57:19 +0200 Subject: [PATCH] Tests for exposing filestream metrics together with log input metrics #35835 (#35886) This commit implements integration tests for filestream metrics. It also moves the "integration test framework" from `x-pack/filebeat` to `libbeat` and implements some improvements on it. --- .../legacy_metrics_integration_test.go | 250 +++++++++++++++ filebeat/magefile.go | 1 + libbeat/tests/integration/framework.go | 284 ++++++++++++++++++ .../tests/integration/framework_test.go | 168 ----------- .../tests/integration/managerV2_test.go | 72 +---- 5 files changed, 551 insertions(+), 224 deletions(-) create mode 100644 filebeat/input/filestream/legacy_metrics_integration_test.go create mode 100644 libbeat/tests/integration/framework.go delete mode 100644 x-pack/filebeat/tests/integration/framework_test.go diff --git a/filebeat/input/filestream/legacy_metrics_integration_test.go b/filebeat/input/filestream/legacy_metrics_integration_test.go new file mode 100644 index 000000000000..6c98a53a19c0 --- /dev/null +++ b/filebeat/input/filestream/legacy_metrics_integration_test.go @@ -0,0 +1,250 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//go:build integration + +package filestream + +import ( + "encoding/json" + "fmt" + "io" + "net/http" + "os" + "path/filepath" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/v7/libbeat/tests/integration" +) + +var fconfig = ` +filebeat.inputs: + - type: filestream + id: my-filestream-id + enabled: true + close.reader.after_interval: 1s + prospector.scanner.check_interval: 500ms + paths: + - %s/*.filestream + - type: log + id: my-log-input + enabled: true + close_timeout: 1s + scan_frequency: 500ms + paths: + - %s/*.log + +output.console: + codec.json: + pretty: true + +logging: + level: debug + selectors: "*" + +http: + enabled: true +` + +func TestLegacyMetrics(t *testing.T) { + filebeat := integration.NewBeat(t, "filebeat", "../../filebeat.test") + + cfg := fmt.Sprintf(fconfig, filebeat.TempDir(), filebeat.TempDir()) + + filebeat.WriteConfigFile(cfg) + filebeat.Start() + + filebeat.WaitForLogs("Metrics endpoint listening on:", 10*time.Second, "The metric server is not running") + + // After starting Filebeat all counters must be zero + waitForMetrics(t, + LegacyHarvesterMetrics{ + OpenFiles: 0, + Closed: 0, + Running: 0, + Started: 0, + }) + + filestreamLogFile := filepath.Join(filebeat.TempDir(), "01.filestream") + filestreamLog, err := os.Create(filestreamLogFile) + if err != nil { + t.Fatalf("could not create log file '%s': %s", filestreamLogFile, err) + } + + // Write a line in the file harvested by Filestream + fmt.Fprintln(filestreamLog, "first line") + + waitForMetrics(t, + LegacyHarvesterMetrics{ + OpenFiles: 1, + Running: 1, + Started: 1, + Closed: 0, + }, + "Filestream input did not start the harvester") + + // Wait for the harvester to close the file + waitForMetrics(t, + LegacyHarvesterMetrics{ + OpenFiles: 0, + Running: 0, + Started: 1, + Closed: 1, + }, + "Filestream input did not close the harvester") + + // Write a line in the file harvested by the log input + logInputLogFileName := filepath.Join(filebeat.TempDir(), "01.log") + logInputLog, err := os.Create(logInputLogFileName) + if err != nil { + t.Fatalf("could not create log file '%s': %s", logInputLogFileName, err) + } + + fmt.Fprintln(logInputLog, "first line") + + waitForMetrics(t, + LegacyHarvesterMetrics{ + OpenFiles: 1, + Running: 1, + Started: 2, + Closed: 1, + }, + "Log input did not start harvester") + + // Wait for the log input to close the file + waitForMetrics(t, + LegacyHarvesterMetrics{ + OpenFiles: 0, + Running: 0, + Started: 2, + Closed: 2, + }, + "Log input did not close the harvester") + + // Writes one more line to each log file + fmt.Fprintln(logInputLog, "second line") + fmt.Fprintln(filestreamLog, "second line") + + // Both harvesters should be running + waitForMetrics(t, + LegacyHarvesterMetrics{ + OpenFiles: 2, + Running: 2, + Started: 4, + Closed: 2, + }, + "Two harvesters should be running") + + // Wait for both harvesters to close the file + waitForMetrics(t, + LegacyHarvesterMetrics{ + OpenFiles: 0, + Running: 0, + Started: 4, + Closed: 4, + }, + "All harvesters must be closed") +} + +func waitForMetrics(t *testing.T, expect LegacyHarvesterMetrics, msgAndArgs ...any) { + t.Helper() + got := LegacyHarvesterMetrics{} + assert.Eventually(t, func() bool { + got = getHarvesterMetrics(t) + return expect == got + }, 10*time.Second, 100*time.Millisecond, msgAndArgs...) + + if !t.Failed() { + return + } + + if expect.Closed != got.Closed { + t.Logf("expecting 'closed' to be %d, got %d instead", expect.Closed, got.Closed) + } + + if expect.OpenFiles != got.OpenFiles { + t.Logf("expecting 'open_files' to be %d, got %d instead", expect.OpenFiles, got.OpenFiles) + } + + if expect.Running != got.Running { + t.Logf("expecting 'running' to be %d, got %d instead", expect.Running, got.Running) + } + + if expect.Started != got.Started { + t.Logf("expecting 'started' to be %d, got %d instead", expect.Started, got.Started) + } +} + +func compareMetrics(t *testing.T, expect, got LegacyHarvesterMetrics) { + t.Helper() + + if expect.Closed != got.Closed { + t.Errorf("expecting 'closed' to be %d, got %d instead", expect.Closed, got.Closed) + } + + if expect.OpenFiles != got.OpenFiles { + t.Errorf("expecting 'open_files' to be %d, got %d instead", expect.OpenFiles, got.OpenFiles) + } + + if expect.Running != got.Running { + t.Errorf("expecting 'running' to be %d, got %d instead", expect.Running, got.Running) + } + + if expect.Started != got.Started { + t.Errorf("expecting 'started' to be %d, got %d instead", expect.Started, got.Started) + } +} + +type LegacyHarvesterMetrics struct { + Closed int `json:"closed"` + OpenFiles int `json:"open_files"` + Running int `json:"running"` + Started int `json:"started"` +} + +func getHarvesterMetrics(t *testing.T) LegacyHarvesterMetrics { + // The host is ignored because we're connecting via Unix sockets. + resp, err := http.Get("http://localhost:5066/stats") + if err != nil { + t.Fatalf("could not execute HTTP call: %s", err) + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + t.Fatalf("could not read request body: %s", err) + } + + type foo struct { + F struct { + H LegacyHarvesterMetrics `json:"harvester"` + } `json:"filebeat"` + } + + m := struct { + F struct { + H LegacyHarvesterMetrics `json:"harvester"` + } `json:"filebeat"` + }{} + if err := json.Unmarshal(body, &m); err != nil { + t.Fatalf("could not unmarshal request body: %s", err) + } + + return m.F.H +} diff --git a/filebeat/magefile.go b/filebeat/magefile.go index 874d47efae84..d96b44f4c258 100644 --- a/filebeat/magefile.go +++ b/filebeat/magefile.go @@ -205,6 +205,7 @@ func IntegTest() { // GoIntegTest starts the docker containers and executes the Go integration tests. func GoIntegTest(ctx context.Context) error { + mg.Deps(BuildSystemTestBinary) return devtools.GoIntegTestFromHost(ctx, devtools.DefaultGoTestIntegrationFromHostArgs()) } diff --git a/libbeat/tests/integration/framework.go b/libbeat/tests/integration/framework.go new file mode 100644 index 000000000000..7514a9a5bcb6 --- /dev/null +++ b/libbeat/tests/integration/framework.go @@ -0,0 +1,284 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//go:build integration + +package integration + +import ( + "bufio" + "context" + "fmt" + "io" + "net/http" + "net/url" + "os" + "os/exec" + "path/filepath" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +type BeatProc struct { + Binary string + Args []string + Cmd *exec.Cmd + t *testing.T + tempDir string + configFile string + beatName string + logFileOffset int64 +} + +// NewBeat createa a new Beat process from the system tests binary. +// It sets some required options like the home path, logging, etc. +// `tempDir` will be used as home and logs directory for the Beat +// `args` will be passed as CLI arguments to the Beat +func NewBeat(t *testing.T, beatName, binary string, args ...string) BeatProc { + tempDir := createTempDir(t) + configFile := filepath.Join(tempDir, beatName+".yml") + p := BeatProc{ + Binary: binary, + Args: append([]string{ + "--systemTest", + "--path.home", tempDir, + "--path.logs", tempDir, + "-E", "logging.to_files=true", + "-E", "logging.files.rotateeverybytes=104857600", // About 100MB + }, args...), + tempDir: tempDir, + beatName: beatName, + configFile: configFile, + t: t, + } + return p +} + +// Start starts the Beat process +// args are extra arguments to be passed to the Beat +func (b *BeatProc) Start(args ...string) { + t := b.t + b.Args = append(b.Args, args...) + fullPath, err := filepath.Abs(b.Binary) + if err != nil { + t.Fatalf("could not get full path from %q, err: %s", b.Binary, err) + } + b.Cmd = exec.Command(fullPath, b.Args...) + + if err := b.Cmd.Start(); err != nil { + t.Fatalf("could not start process: %s", err) + } + t.Cleanup(func() { + pid := b.Cmd.Process.Pid + if err := b.Cmd.Process.Signal(os.Interrupt); err != nil { + t.Fatalf("could not stop process with PID: %d, err: %s", pid, err) + } + }) +} + +// LogContains looks for `s` as a substring of every log line, +// it will open the log file on every call, read it until EOF, +// then close it. +func (b *BeatProc) LogContains(s string) bool { + t := b.t + logFile := b.openLogFile() + _, err := logFile.Seek(b.logFileOffset, os.SEEK_SET) + if err != nil { + t.Fatalf("could not set offset for '%s': %s", logFile.Name(), err) + } + + defer func() { + offset, err := logFile.Seek(0, os.SEEK_CUR) + if err != nil { + t.Fatalf("could not read offset for '%s': %s", logFile.Name(), err) + } + b.logFileOffset = offset + if err := logFile.Close(); err != nil { + // That's not quite a test error, but it can impact + // next executions of LogContains, so treat it as an error + t.Errorf("could not close log file: %s", err) + } + }() + + r := bufio.NewReader(logFile) + for { + line, err := r.ReadString('\n') + if err != nil { + if err != io.EOF { + t.Fatalf("error reading log file '%s': %s", logFile.Name(), err) + } + break + } + if strings.Contains(line, s) { + return true + } + } + + return false +} + +// WaitForLogs waits for the specified string s to be present in the logs within +// the given timeout duration and fails the test if s is not found. +// msgAndArgs should be a format string and arguments that will be printed +// if the logs are not found, providing additional context for debugging. +func (b *BeatProc) WaitForLogs(s string, timeout time.Duration, msgAndArgs ...any) { + b.t.Helper() + require.Eventually(b.t, func() bool { + return b.LogContains(s) + }, timeout, 100*time.Millisecond, msgAndArgs...) +} + +// TempDir returns the temporary directory +// used by that Beat, on a successful test, +// the directory is automatically removed. +// On failure, the temporary directory is kept. +func (b *BeatProc) TempDir() string { + return b.tempDir +} + +// WriteConfigFile writes the provided configuration string cfg to a file. +// This file will be used as the configuration file for the Beat. +func (b *BeatProc) WriteConfigFile(cfg string) { + if err := os.WriteFile(b.configFile, []byte(cfg), 0644); err != nil { + b.t.Fatalf("cannot create config file '%s': %s", b.configFile, err) + } + + b.Args = append(b.Args, "-c", b.configFile) +} + +// openLogFile opens the log file for reading and returns it. +// It also registers a cleanup function to close the file +// when the test ends. +func (b *BeatProc) openLogFile() *os.File { + t := b.t + glob := fmt.Sprintf("%s-*.ndjson", filepath.Join(b.tempDir, b.beatName)) + files, err := filepath.Glob(glob) + if err != nil { + t.Fatalf("could not expand log file glob: %s", err) + } + + require.Eventually(t, func() bool { + files, err = filepath.Glob(glob) + if err != nil { + t.Fatalf("could not expand log file glob: %s", err) + } + return len(files) == 1 + }, 5*time.Second, 100*time.Millisecond, + "waiting for log file matching glob '%s' to be created", glob) + + // On a normal operation there must be a single log, if there are more + // than one, then there is an issue and the Beat is logging too much, + // which is enough to stop the test + if len(files) != 1 { + t.Fatalf("there must be only one log file for %s, found: %d", + glob, len(files)) + } + + f, err := os.Open(files[0]) + if err != nil { + t.Fatalf("could not open log file '%s': %s", files[0], err) + } + + return f +} + +// createTempDir creates a temporary directory that will be +// removed after the tests passes. +// +// If the test fails, the temporary directory is not removed. +// +// If the tests are run with -v, the temporary directory will +// be logged. +func createTempDir(t *testing.T) string { + tempDir, err := filepath.Abs(filepath.Join("../../build/integration-tests/", + fmt.Sprintf("%s-%d", t.Name(), time.Now().Unix()))) + if err != nil { + t.Fatal(err) + } + + if err := os.MkdirAll(tempDir, 0766); err != nil { + t.Fatalf("cannot create tmp dir: %s, msg: %s", err, err.Error()) + } + t.Logf("Temporary directory: %s", tempDir) + + cleanup := func() { + if !t.Failed() { + if err := os.RemoveAll(tempDir); err != nil { + t.Errorf("could not remove temp dir '%s': %s", tempDir, err) + } + t.Logf("Temporary directory '%s' removed", tempDir) + } + } + t.Cleanup(cleanup) + + return tempDir +} + +// EnsureESIsRunning ensures Elasticsearch is running and is reachable +// using the default test credentials or the corresponding environment +// variables. +func EnsureESIsRunning(t *testing.T) { + t.Helper() + + esHost := os.Getenv("ES_HOST") + if esHost == "" { + esHost = "localhost" + } + + esPort := os.Getenv("ES_PORT") + if esPort == "" { + esPort = "9200" + } + + esURL := url.URL{ + Scheme: "http", + Host: fmt.Sprintf("%s:%s", esHost, esPort), + } + + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(500*time.Second)) + defer cancel() + req, err := http.NewRequestWithContext(ctx, http.MethodGet, esURL.String(), nil) + if err != nil { + t.Fatalf("cannot create request to ensure ES is running: %s", err) + } + + user := os.Getenv("ES_USER") + if user == "" { + user = "admin" + } + + pass := os.Getenv("ES_PASS") + if pass == "" { + pass = "testing" + } + + req.SetBasicAuth(user, pass) + + resp, err := http.DefaultClient.Do(req) + if err != nil { + // If you're reading this message, you probably forgot to start ES + // run `mage compose:Up` from Filebeat's folder to start all + // containers required for integration tests + t.Fatalf("cannot execute HTTP request to ES: %s", err) + } + if resp.StatusCode != http.StatusOK { + t.Errorf("unexpected HTTP status: %d, expecting 200 - OK", resp.StatusCode) + } +} diff --git a/x-pack/filebeat/tests/integration/framework_test.go b/x-pack/filebeat/tests/integration/framework_test.go deleted file mode 100644 index 1d572c6cc002..000000000000 --- a/x-pack/filebeat/tests/integration/framework_test.go +++ /dev/null @@ -1,168 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -//go:build integration - -package integration - -import ( - "bufio" - "fmt" - "io" - "os" - "os/exec" - "path/filepath" - "strings" - "testing" - "time" - - "github.com/stretchr/testify/require" -) - -type BeatProc struct { - Binary string - Args []string - Cmd *exec.Cmd - t *testing.T - tempDir string -} - -// NewBeat createa a new Beat process from the system tests binary. -// It sets some required options like the home path, logging, etc. -// `tempDir` will be used as home and logs directory for the Beat -// `args` will be passed as CLI arguments to the Beat -func NewBeat(t *testing.T, binary string, tempDir string, args ...string) BeatProc { - p := BeatProc{ - t: t, - Binary: binary, - Args: append([]string{ - "--systemTest", - "--path.home", tempDir, - "--path.logs", tempDir, - "-E", "logging.to_files=true", - "-E", "logging.files.rotateeverybytes=104857600", // About 100MB - }, args...), - tempDir: tempDir, - } - return p -} - -// Start starts the Beat process -func (b *BeatProc) Start() { - t := b.t - fullPath, err := filepath.Abs(b.Binary) - if err != nil { - t.Fatalf("could not get full path from %q, err: %s", b.Binary, err) - } - b.Cmd = exec.Command(fullPath, b.Args...) - - if err := b.Cmd.Start(); err != nil { - t.Fatalf("could not start process: %s", err) - } - t.Cleanup(func() { - pid := b.Cmd.Process.Pid - if err := b.Cmd.Process.Kill(); err != nil { - t.Fatalf("could not stop process with PID: %d, err: %s", pid, err) - } - }) -} - -// LogContains looks for `s` as a substring of every log line, -// it will open the log file on every call, read it until EOF, -// then close it. -func (b *BeatProc) LogContains(s string) bool { - t := b.t - logFile := b.openLogFile() - defer func() { - if err := logFile.Close(); err != nil { - // That's not quite a test error, but it can impact - // next executions of LogContains, so treat it as an error - t.Errorf("could not close log file: %s", err) - } - }() - - r := bufio.NewReader(logFile) - for { - line, err := r.ReadString('\n') - if err != nil { - if err != io.EOF { - t.Fatalf("error reading log file '%s': %s", logFile.Name(), err) - } - break - } - if strings.Contains(line, s) { - return true - } - } - - return false -} - -// openLogFile opens the log file for reading and returns it. -// It also registers a cleanup function to close the file -// when the test ends. -func (b *BeatProc) openLogFile() *os.File { - t := b.t - glob := fmt.Sprintf("%s-*.ndjson", filepath.Join(b.tempDir, "filebeat")) - files, err := filepath.Glob(glob) - if err != nil { - t.Fatalf("could not expand log file glob: %s", err) - } - - require.Eventually(t, func() bool { - files, err = filepath.Glob(glob) - if err != nil { - t.Fatalf("could not expand log file glob: %s", err) - } - return len(files) == 1 - }, 5*time.Second, 100*time.Millisecond, - "waiting for log file matching glob '%s' to be created", glob) - - // On a normal operation there must be a single log, if there are more - // than one, then there is an issue and the Beat is logging too much, - // which is enough to stop the test - if len(files) != 1 { - t.Fatalf("there must be only one log file for %s, found: %d", - glob, len(files)) - } - - f, err := os.Open(files[0]) - if err != nil { - t.Fatalf("could not open log file '%s': %s", files[0], err) - } - - return f -} - -// createTempDir creates a temporary directory that will be -// removed after the tests passes. -// -// If the test fails, the temporary directory is not removed. -// -// If the tests are run with -v, the temporary directory will -// be logged. -func createTempDir(t *testing.T) string { - tempDir, err := filepath.Abs(filepath.Join("../../build/integration-tests/", - fmt.Sprintf("%s-%d", t.Name(), time.Now().Unix()))) - if err != nil { - t.Fatal(err) - } - - if err := os.MkdirAll(tempDir, 0766); err != nil { - t.Fatalf("cannot create tmp dir: %s, msg: %s", err, err.Error()) - } - t.Logf("Temporary directory: %s", tempDir) - - cleanup := func() { - if !t.Failed() { - if err := os.RemoveAll(tempDir); err != nil { - t.Errorf("could not remove temp dir '%s': %s", tempDir, err) - } - t.Logf("Temporary directory '%s' removed", tempDir) - } - } - t.Cleanup(cleanup) - - return tempDir -} diff --git a/x-pack/filebeat/tests/integration/managerV2_test.go b/x-pack/filebeat/tests/integration/managerV2_test.go index e1951b510b62..48f2069220b0 100644 --- a/x-pack/filebeat/tests/integration/managerV2_test.go +++ b/x-pack/filebeat/tests/integration/managerV2_test.go @@ -7,9 +7,7 @@ package integration import ( - "context" "fmt" - "net/http" "os" "path/filepath" "testing" @@ -18,6 +16,7 @@ import ( "github.com/stretchr/testify/require" "google.golang.org/protobuf/types/known/structpb" + "github.com/elastic/beats/v7/libbeat/tests/integration" "github.com/elastic/beats/v7/x-pack/libbeat/management" "github.com/elastic/elastic-agent-client/v7/pkg/client/mock" "github.com/elastic/elastic-agent-client/v7/pkg/proto" @@ -41,14 +40,15 @@ func TestInputReloadUnderElasticAgent(t *testing.T) { // First things first, ensure ES is running and we can connect to it. // If ES is not running, the test will timeout and the only way to know // what caused it is going through Filebeat's logs. - ensureESIsRunning(t) + integration.EnsureESIsRunning(t) - // We create our own temp dir so the files can be persisted - // in case the test fails. This will help debugging issues - // locally and on CI. - tempDir := createTempDir(t) + filebeat := integration.NewBeat( + t, + "filebeat", + "../../filebeat.test", + ) - logFilePath := filepath.Join(tempDir, "flog.log") + logFilePath := filepath.Join(filebeat.TempDir(), "flog.log") generateLogFile(t, logFilePath) var units = [][]*proto.UnitExpected{ { @@ -200,16 +200,11 @@ func TestInputReloadUnderElasticAgent(t *testing.T) { require.NoError(t, server.Start()) t.Cleanup(server.Stop) - filebeat := NewBeat( - t, - "../../filebeat.test", - tempDir, + filebeat.Start( "-E", fmt.Sprintf(`management.insecure_grpc_url_for_testing="localhost:%d"`, server.Port), "-E", "management.enabled=true", ) - filebeat.Start() - // waitDeadlineOr5Mins looks at the test deadline // and returns a reasonable value of waiting for a // condition to be met. The possible values are: @@ -264,11 +259,14 @@ func TestFailedOutputReportsUnhealthy(t *testing.T) { // First things first, ensure ES is running and we can connect to it. // If ES is not running, the test will timeout and the only way to know // what caused it is going through Filebeat's logs. - ensureESIsRunning(t) + integration.EnsureESIsRunning(t) + filebeat := integration.NewBeat( + t, + "filebeat", + "../../filebeat.test", + ) - tempDir := createTempDir(t) finalStateReached := false - var units = []*proto.UnitExpected{ { Id: "output-unit-borken", @@ -334,16 +332,11 @@ func TestFailedOutputReportsUnhealthy(t *testing.T) { require.NoError(t, server.Start()) - filebeat := NewBeat( - t, - "../../filebeat.test", - tempDir, + filebeat.Start( "-E", fmt.Sprintf(`management.insecure_grpc_url_for_testing="localhost:%d"`, server.Port), "-E", "management.enabled=true", ) - filebeat.Start() - require.Eventually(t, func() bool { return finalStateReached }, 30*time.Second, 100*time.Millisecond, "Output unit did not report unhealthy") @@ -402,36 +395,3 @@ func generateLogFile(t *testing.T, fullPath string) { } }() } - -func ensureESIsRunning(t *testing.T) { - t.Helper() - ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(500*time.Second)) - defer cancel() - req, err := http.NewRequestWithContext(ctx, http.MethodGet, "http://localhost:9200", nil) - if err != nil { - t.Fatalf("cannot create request to ensure ES is running: %s", err) - } - - user := os.Getenv("ES_USER") - if user == "" { - user = "admin" - } - - pass := os.Getenv("ES_PASS") - if pass == "" { - pass = "testing" - } - - req.SetBasicAuth(user, pass) - - resp, err := http.DefaultClient.Do(req) - if err != nil { - // If you're reading this message, you probably forgot to start ES - // run `mage compose:Up` from Filebeat's folder to start all - // containers required for integration tests - t.Fatalf("cannot execute HTTP request to ES: %s", err) - } - if resp.StatusCode != http.StatusOK { - t.Errorf("unexpected HTTP status: %d, expecting 200 - OK", resp.StatusCode) - } -}