diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 40d189797b3..5389ca6551b 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -15,6 +15,9 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] platform, and when viewed from a metadata API standpoint, it is impossible to differentiate it from OpenStack. If you know that your deployments run on Huawei Cloud exclusively, and you wish to have `cloud.provider` value as `huawei`, you can achieve this by overwriting the value using an `add_fields` processor. {pull}35184[35184] + - In managed mode, Beats running under Elastic Agent will report the package +version of Elastic Agent as their own version. This includes all additional +fields added to events containing the Beats version. {pull}37553[37553] *Auditbeat* diff --git a/NOTICE.txt b/NOTICE.txt index eea974cedd1..573e544bb2e 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -12494,11 +12494,11 @@ Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-a -------------------------------------------------------------------------------- Dependency : github.com/elastic/elastic-agent-client/v7 -Version: v7.6.0 +Version: v7.8.0 Licence type (autodetected): Elastic -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-client/v7@v7.6.0/LICENSE.txt: +Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-client/v7@v7.8.0/LICENSE.txt: ELASTIC LICENSE AGREEMENT @@ -25546,11 +25546,11 @@ Contents of probable licence file $GOMODCACHE/google.golang.org/grpc@v1.58.3/LIC -------------------------------------------------------------------------------- Dependency : google.golang.org/protobuf -Version: v1.31.0 +Version: v1.32.0 Licence type (autodetected): BSD-3-Clause -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/google.golang.org/protobuf@v1.31.0/LICENSE: +Contents of probable licence file $GOMODCACHE/google.golang.org/protobuf@v1.32.0/LICENSE: Copyright (c) 2018 The Go Authors. All rights reserved. diff --git a/docs/devguide/testing.asciidoc b/docs/devguide/testing.asciidoc index 49d2366c920..9488fe47dce 100644 --- a/docs/devguide/testing.asciidoc +++ b/docs/devguide/testing.asciidoc @@ -50,11 +50,11 @@ In Metricbeat, run the command from within a module like this: `go test --tags i A note about tags: the `--data` flag is a custom flag added by Metricbeat and Packetbeat frameworks. It will not be present in case tags do not match, as the relevant code will not be run and silently skipped (without the tag the test file is ignored by Go compiler so the framework doesn't load). This may happen if there are different tags in the build tags of the metricset under test (i.e. the GCP billing metricset requires the `billing` tag too). -==== Running Python Tests +==== Running System (integration) Tests (Python and Go) -Python system tests are defined in the `tests/system` directory. They require a testing binary to be available and the python environment to be set up. +The system tests are defined in the `tests/system` (for legacy Python test) and on `tests/integration` (for Go tests) directory. They require a testing binary to be available and the python environment to be set up. -To create the testing binary run `mage buildSystemTestBinary`. This will create the test binary in the beat directory. To setup the testing environment run `mage pythonVirtualEnv` which will create a virtual environment with all test dependencies and print its location. To activate it, the instructions depend on your operating system. See the https://packaging.python.org/en/latest/guides/installing-using-pip-and-virtual-environments/#activating-a-virtual-environment[virtualenv documentation]. +To create the testing binary run `mage buildSystemTestBinary`. This will create the test binary in the beat directory. To set up the Python testing environment run `mage pythonVirtualEnv` which will create a virtual environment with all test dependencies and print its location. To activate it, the instructions depend on your operating system. See the https://packaging.python.org/en/latest/guides/installing-using-pip-and-virtual-environments/#activating-a-virtual-environment[virtualenv documentation]. To run the system and integration tests use the `mage pythonIntegTest` target, which will start the required services using https://docs.docker.com/compose/[docker-compose] and run all integration tests. Similar to Go integration tests, the individual steps can be done manually to allow selecting which tests should be run: @@ -62,12 +62,16 @@ To run the system and integration tests use the `mage pythonIntegTest` target, w ---- # Create and activate the system test virtual environment (assumes a Unix system). source $(mage pythonVirtualEnv)/bin/activate + # Pull and build the containers. Only needs to be done once unless you change the containers. mage docker:composeBuild + # Bring up all containers, wait until they are healthy, and put them in the background. mage docker:composeUp + # Run all system and integration tests. INTEGRATION_TESTS=1 pytest ./tests/system + # Stop all started containers. mage docker:composeDown ---- diff --git a/go.mod b/go.mod index 1c29491fbba..ee391fb43d2 100644 --- a/go.mod +++ b/go.mod @@ -69,7 +69,7 @@ require ( github.com/dustin/go-humanize v1.0.1 github.com/eapache/go-resiliency v1.2.0 github.com/eclipse/paho.mqtt.golang v1.3.5 - github.com/elastic/elastic-agent-client/v7 v7.6.0 + github.com/elastic/elastic-agent-client/v7 v7.8.0 github.com/elastic/go-concert v0.2.0 github.com/elastic/go-libaudit/v2 v2.5.0 github.com/elastic/go-licenser v0.4.1 @@ -164,7 +164,7 @@ require ( google.golang.org/api v0.128.0 google.golang.org/genproto v0.0.0-20230920204549-e6e6cdab5c13 // indirect google.golang.org/grpc v1.58.3 - google.golang.org/protobuf v1.31.0 + google.golang.org/protobuf v1.32.0 gopkg.in/inf.v0 v0.9.1 gopkg.in/jcmturner/aescts.v1 v1.0.1 // indirect gopkg.in/jcmturner/dnsutils.v1 v1.0.1 // indirect diff --git a/go.sum b/go.sum index 039364a70a4..52051e7f4cf 100644 --- a/go.sum +++ b/go.sum @@ -663,8 +663,8 @@ github.com/elastic/ebpfevents v0.3.2 h1:UJ8kW5jw2TpUR5MEMaZ1O62sK9JQ+5xTlj+YpQC6 github.com/elastic/ebpfevents v0.3.2/go.mod h1:o21z5xup/9dK8u0Hg9bZRflSqqj1Zu5h2dg2hSTcUPQ= github.com/elastic/elastic-agent-autodiscover v0.6.7 h1:+KVjltN0rPsBrU8b156gV4lOTBgG/vt0efFCFARrf3g= github.com/elastic/elastic-agent-autodiscover v0.6.7/go.mod h1:hFeFqneS2r4jD0/QzGkrNk0YVdN0JGh7lCWdsH7zcI4= -github.com/elastic/elastic-agent-client/v7 v7.6.0 h1:FEn6FjzynW4TIQo5G096Tr7xYK/P5LY9cSS6wRbXZTc= -github.com/elastic/elastic-agent-client/v7 v7.6.0/go.mod h1:GlUKrbVd/O1CRAZonpBeN3J0RlVqP6VGcrBjFWca+aM= +github.com/elastic/elastic-agent-client/v7 v7.8.0 h1:GHFzDJIWpdgI0qDk5EcqbQJGvwTsl2E2vQK3/xe+MYQ= +github.com/elastic/elastic-agent-client/v7 v7.8.0/go.mod h1:ihtjqJzYiIltlRhNruaSSc0ogxIhqPD5hOMKq16cI1s= github.com/elastic/elastic-agent-libs v0.7.5 h1:4UMqB3BREvhwecYTs/L23oQp1hs/XUkcunPlmTZn5yg= github.com/elastic/elastic-agent-libs v0.7.5/go.mod h1:pGMj5myawdqu+xE+WKvM5FQzKQ/MonikkWOzoFTJxaU= github.com/elastic/elastic-agent-shipper-client v0.5.1-0.20230228231646-f04347b666f3 h1:sb+25XJn/JcC9/VL8HX4r4QXSUq4uTNzGS2kxOE7u1U= @@ -2656,8 +2656,8 @@ google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp0 google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= -google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= -google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I= +google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/airbrake/gobrake.v2 v2.0.9/go.mod h1:/h5ZAUhDkGaJfjzjKLSjv6zCL6O0LLBxU4K+aSYdM/U= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index 7a70b1a55ba..4b7470b1dbd 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -838,10 +838,25 @@ func (b *Beat) configure(settings Settings) error { } // initialize config manager - b.Manager, err = management.NewManager(b.Config.Management, reload.RegisterV2) + m, err := management.NewManager(b.Config.Management, reload.RegisterV2) if err != nil { return err } + b.Manager = m + + if b.Manager.AgentInfo().Version != "" { + // During the manager initialization the client to connect to the agent is + // also initialized. That makes the beat to read information sent by the + // agent, which includes the AgentInfo with the agent's package version. + // Components running under agent should report the agent's package version + // as their own version. + // In order to do so b.Info.Version needs to be set to the version the agent + // sent. As this Beat instance is initialized much before the package + // version is received, it's overridden here. So far it's early enough for + // the whole beat to report the right version. + b.Info.Version = b.Manager.AgentInfo().Version + version.SetPackageVersion(b.Info.Version) + } if err := b.Manager.CheckRawConfig(b.RawConfig); err != nil { return err @@ -1521,13 +1536,13 @@ func (bc *beatConfig) Validate() error { if bc.Pipeline.Queue.IsSet() && outputPC.Queue.IsSet() { return fmt.Errorf("top level queue and output level queue settings defined, only one is allowed") } - //elastic-agent doesn't support disk queue yet + // elastic-agent doesn't support disk queue yet if bc.Management.Enabled() && outputPC.Queue.Config().Enabled() && outputPC.Queue.Name() == diskqueue.QueueType { return fmt.Errorf("disk queue is not supported when management is enabled") } } - //elastic-agent doesn't support disk queue yet + // elastic-agent doesn't support disk queue yet if bc.Management.Enabled() && bc.Pipeline.Queue.Config().Enabled() && bc.Pipeline.Queue.Name() == diskqueue.QueueType { return fmt.Errorf("disk queue is not supported when management is enabled") } diff --git a/libbeat/management/management.go b/libbeat/management/management.go index 88faa48f540..177642b3398 100644 --- a/libbeat/management/management.go +++ b/libbeat/management/management.go @@ -82,9 +82,12 @@ type Manager interface { // // Calls to 'CheckRawConfig()' or 'SetPayload()' will be ignored after calling stop. // - // Note: Stop will not call 'UnregisterAction()' automaticallty. + // Note: Stop will not call 'UnregisterAction()' automatically. Stop() + // AgentInfo returns the information of the agent to which the manager is connected. + AgentInfo() client.AgentInfo + // SetStopCallback accepts a function that need to be called when the manager want to shutdown the // beats. This is needed when you want your beats to be gracefully shutdown remotely by the Elastic Agent // when a policy doesn't need to run this beat. @@ -190,6 +193,7 @@ func (n *fallbackManager) Stop() { // but that does not mean the Beat is being managed externally, // hence it will always return false. func (n *fallbackManager) Enabled() bool { return false } +func (n *fallbackManager) AgentInfo() client.AgentInfo { return client.AgentInfo{} } func (n *fallbackManager) Start() error { return nil } func (n *fallbackManager) CheckRawConfig(cfg *config.C) error { return nil } func (n *fallbackManager) RegisterAction(action client.Action) {} diff --git a/libbeat/tests/integration/cmd_keystore_test.go b/libbeat/tests/integration/cmd_keystore_test.go index eb4b697cafa..efb9b91a1c9 100644 --- a/libbeat/tests/integration/cmd_keystore_test.go +++ b/libbeat/tests/integration/cmd_keystore_test.go @@ -100,19 +100,23 @@ func TestKeystoreRemoveMultipleExistingKeys(t *testing.T) { mockbeat.Stop() mockbeat.Start("keystore", "add", "key1", "--stdin") - fmt.Fprintf(os.Stdin, "pass1") + + fmt.Fprintf(mockbeat.stdin, "pass1") + require.NoError(t, mockbeat.stdin.Close(), "could not close mockbeat stdin") procState, err := mockbeat.Process.Wait() require.NoError(t, err) require.Equal(t, 0, procState.ExitCode(), "incorrect exit code") mockbeat.Start("keystore", "add", "key2", "--stdin") - fmt.Fprintf(os.Stdin, "pass2") + fmt.Fprintf(mockbeat.stdin, "pass2") + require.NoError(t, mockbeat.stdin.Close(), "could not close mockbeat stdin") procState, err = mockbeat.Process.Wait() require.NoError(t, err) require.Equal(t, 0, procState.ExitCode(), "incorrect exit code") mockbeat.Start("keystore", "add", "key3", "--stdin") - fmt.Fprintf(os.Stdin, "pass3") + fmt.Fprintf(mockbeat.stdin, "pass3") + require.NoError(t, mockbeat.stdin.Close(), "could not close mockbeat stdin") procState, err = mockbeat.Process.Wait() require.NoError(t, err) require.Equal(t, 0, procState.ExitCode(), "incorrect exit code") @@ -138,19 +142,22 @@ func TestKeystoreList(t *testing.T) { mockbeat.Stop() mockbeat.Start("keystore", "add", "key1", "--stdin") - fmt.Fprintf(os.Stdin, "pass1") + fmt.Fprintf(mockbeat.stdin, "pass1") + require.NoError(t, mockbeat.stdin.Close(), "could not close mockbeat stdin") procState, err := mockbeat.Process.Wait() require.NoError(t, err) require.Equal(t, 0, procState.ExitCode(), "incorrect exit code") mockbeat.Start("keystore", "add", "key2", "--stdin") - fmt.Fprintf(os.Stdin, "pass2") + fmt.Fprintf(mockbeat.stdin, "pass2") + require.NoError(t, mockbeat.stdin.Close(), "could not close mockbeat stdin") procState, err = mockbeat.Process.Wait() require.NoError(t, err) require.Equal(t, 0, procState.ExitCode(), "incorrect exit code") mockbeat.Start("keystore", "add", "key3", "--stdin") - fmt.Fprintf(os.Stdin, "pass3") + fmt.Fprintf(mockbeat.stdin, "pass3") + require.NoError(t, mockbeat.stdin.Close(), "could not close mockbeat stdin") procState, err = mockbeat.Process.Wait() require.NoError(t, err) require.Equal(t, 0, procState.ExitCode(), "incorrect exit code") @@ -186,7 +193,8 @@ func TestKeystoreAddSecretFromStdin(t *testing.T) { require.Equal(t, 0, procState.ExitCode(), "incorrect exit code") mockbeat.Start("keystore", "add", "key1", "--stdin") - fmt.Fprintf(os.Stdin, "pass1") + fmt.Fprintf(mockbeat.stdin, "pass1") + require.NoError(t, mockbeat.stdin.Close(), "could not close mockbeat stdin") procState, err = mockbeat.Process.Wait() require.NoError(t, err) require.Equal(t, 0, procState.ExitCode(), "incorrect exit code") @@ -202,13 +210,15 @@ func TestKeystoreUpdateForce(t *testing.T) { require.Equal(t, 0, procState.ExitCode(), "incorrect exit code") mockbeat.Start("keystore", "add", "key1", "--stdin") - fmt.Fprintf(os.Stdin, "pass1") + fmt.Fprintf(mockbeat.stdin, "pass1") + require.NoError(t, mockbeat.stdin.Close(), "could not close mockbeat stdin") procState, err = mockbeat.Process.Wait() require.NoError(t, err) require.Equal(t, 0, procState.ExitCode(), "incorrect exit code") mockbeat.Start("keystore", "add", "key1", "--force", "--stdin") - fmt.Fprintf(os.Stdin, "pass2") + fmt.Fprintf(mockbeat.stdin, "pass2") + require.NoError(t, mockbeat.stdin.Close(), "could not close mockbeat stdin") procState, err = mockbeat.Process.Wait() require.NoError(t, err) require.Equal(t, 0, procState.ExitCode(), "incorrect exit code") diff --git a/libbeat/tests/integration/framework.go b/libbeat/tests/integration/framework.go index 046c578d7cd..9657fbaeaff 100644 --- a/libbeat/tests/integration/framework.go +++ b/libbeat/tests/integration/framework.go @@ -30,6 +30,7 @@ import ( "net/http" "net/url" "os" + "os/exec" "path/filepath" "regexp" "strings" @@ -55,6 +56,7 @@ type BeatProc struct { logFileOffset int64 t *testing.T tempDir string + stdin io.WriteCloser stdout *os.File stderr *os.File Process *os.Process @@ -90,7 +92,7 @@ type Total struct { Value int `json:"value"` } -// NewBeat createa a new Beat process from the system tests binary. +// NewBeat creates a new Beat process from the system tests binary. // It sets some required options like the home path, logging, etc. // `tempDir` will be used as home and logs directory for the Beat // `args` will be passed as CLI arguments to the Beat @@ -98,10 +100,12 @@ func NewBeat(t *testing.T, beatName, binary string, args ...string) *BeatProc { require.FileExistsf(t, binary, "beat binary must exists") tempDir := createTempDir(t) configFile := filepath.Join(tempDir, beatName+".yml") + stdoutFile, err := os.Create(filepath.Join(tempDir, "stdout")) require.NoError(t, err, "error creating stdout file") stderrFile, err := os.Create(filepath.Join(tempDir, "stderr")) require.NoError(t, err, "error creating stderr file") + p := BeatProc{ Binary: binary, baseArgs: append([]string{ @@ -213,15 +217,27 @@ func (b *BeatProc) Start(args ...string) { func (b *BeatProc) startBeat() { b.cmdMutex.Lock() defer b.cmdMutex.Unlock() + _, _ = b.stdout.Seek(0, 0) _ = b.stdout.Truncate(0) _, _ = b.stderr.Seek(0, 0) _ = b.stderr.Truncate(0) - var procAttr os.ProcAttr - procAttr.Files = []*os.File{os.Stdin, b.stdout, b.stderr} - process, err := os.StartProcess(b.fullPath, b.Args, &procAttr) + + cmd := exec.Cmd{ + Path: b.fullPath, + Args: b.Args, + Stdout: b.stdout, + Stderr: b.stderr, + } + + var err error + b.stdin, err = cmd.StdinPipe() + require.NoError(b.t, err, "could not get cmd StdinPipe") + + err = cmd.Start() require.NoError(b.t, err, "error starting beat process") - b.Process = process + + b.Process = cmd.Process } // waitBeatToExit blocks until the Beat exits, it returns @@ -515,6 +531,10 @@ func (b *BeatProc) LoadMeta() (Meta, error) { return m, nil } +func (b *BeatProc) Stdin() io.WriteCloser { + return b.stdin +} + func GetESURL(t *testing.T, scheme string) url.URL { t.Helper() diff --git a/libbeat/tests/integration/mockserver.go b/libbeat/tests/integration/mockserver.go index 0a396cb7839..763467819fa 100644 --- a/libbeat/tests/integration/mockserver.go +++ b/libbeat/tests/integration/mockserver.go @@ -38,18 +38,18 @@ type unitKey struct { } // NewMockServer creates a GRPC server to mock the Elastic-Agent. -// On the first check in call it will send the first element of `unit` +// On the first check-in call it will send the first element of `unit` // as the expected unit, on successive calls, if the Beat has reached // that state, it will move on to sending the next state. // It will also validate the features. // // if `observedCallback` is not nil, it will be called on every -// check in receiving the `proto.CheckinObserved` sent by the +// check-in receiving the `proto.CheckinObserved` sent by the // Beat and index from `units` that was last sent to the Beat. // // If `delay` is not zero, when the Beat state matches the last // sent units, the server will wait for `delay` before sending the -// the next state. This will block the check in call from the Beat. +// next state. This will block the check-in call from the Beat. func NewMockServer( units [][]*proto.UnitExpected, featuresIdxs []uint64, @@ -58,7 +58,7 @@ func NewMockServer( delay time.Duration, ) *mock.StubServerV2 { i := 0 - agentInfo := &proto.CheckinAgentInfo{ + agentInfo := &proto.AgentInfo{ Id: "elastic-agent-id", Version: version.GetDefaultVersion(), Snapshot: true, diff --git a/libbeat/version/helper.go b/libbeat/version/helper.go index 5ed206d8a6c..92b2ed2cb4c 100644 --- a/libbeat/version/helper.go +++ b/libbeat/version/helper.go @@ -17,23 +17,36 @@ package version -import "time" +import ( + "sync/atomic" + "time" +) + +var ( + packageVersion atomic.Value + buildTime = "unknown" + commit = "unknown" + qualifier = "" +) -// GetDefaultVersion returns the current libbeat version. -// This method is in a separate file as the version.go file is auto generated +// GetDefaultVersion returns the current version. +// If running in stand-alone mode, it's the libbeat version. If running in +// managed mode, a.k.a under the agent, it's the package version set using +// SetPackageVersion. If SetPackageVersion haven't been called, it reports the +// libbeat version +// +// This method is in a separate file as the version.go file is auto-generated. func GetDefaultVersion() string { + if v, ok := packageVersion.Load().(string); ok && v != "" { + return v + } + if qualifier == "" { return defaultBeatVersion } return defaultBeatVersion + "-" + qualifier } -var ( - buildTime = "unknown" - commit = "unknown" - qualifier = "" -) - // BuildTime exposes the compile-time build time information. // It will represent the zero time instant if parsing fails. func BuildTime() time.Time { @@ -48,3 +61,10 @@ func BuildTime() time.Time { func Commit() string { return commit } + +// SetPackageVersion sets the package version, overriding the defaultBeatVersion. +func SetPackageVersion(version string) { + // Currently, the Elastic Agent does not perform any validation on the + // package version, therefore, no validation is done here either. + packageVersion.Store(version) +} diff --git a/testing/certutil/certutil.go b/testing/certutil/certutil.go new file mode 100644 index 00000000000..422bf4969d4 --- /dev/null +++ b/testing/certutil/certutil.go @@ -0,0 +1,186 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package certutil + +import ( + "bytes" + "crypto/ecdsa" + "crypto/elliptic" + "crypto/rand" + "crypto/tls" + "crypto/x509" + "crypto/x509/pkix" + "encoding/pem" + "fmt" + "math/big" + "time" +) + +// TODO: move it to a more generic place. Probably elastic-agent-client. +// Moving it to the agent-client would allow to have a mock.StubServerV2 with +// TLS out of the box. With that, we could also remove the +// `management.insecure_grpc_url_for_testing` flag from the beats. +// This can also be expanded to save the certificates and keys to disk, making +// an tool for us to generate certificates whenever we need. + +// NewRootCA generates a new x509 Certificate and returns: +// - the private key +// - the certificate +// - the certificate in PEM format as a byte slice. +// +// If any error occurs during the generation process, a non-nil error is returned. +func NewRootCA() (*ecdsa.PrivateKey, *x509.Certificate, []byte, error) { + rootKey, err := ecdsa.GenerateKey(elliptic.P384(), rand.Reader) + if err != nil { + return nil, nil, nil, fmt.Errorf("could not create private key: %w", err) + } + + notBefore := time.Now() + notAfter := notBefore.Add(3 * time.Hour) + + rootTemplate := x509.Certificate{ + DNSNames: []string{"localhost"}, + SerialNumber: big.NewInt(1653), + Subject: pkix.Name{ + Organization: []string{"Gallifrey"}, + CommonName: "localhost", + }, + NotBefore: notBefore, + NotAfter: notAfter, + KeyUsage: x509.KeyUsageDigitalSignature | x509.KeyUsageCertSign, + ExtKeyUsage: []x509.ExtKeyUsage{ + x509.ExtKeyUsageClientAuth, x509.ExtKeyUsageServerAuth}, + BasicConstraintsValid: true, + IsCA: true, + } + + rootCertRawBytes, err := x509.CreateCertificate( + rand.Reader, &rootTemplate, &rootTemplate, &rootKey.PublicKey, rootKey) + if err != nil { + return nil, nil, nil, fmt.Errorf("could not create CA: %w", err) + } + + rootPrivKeyDER, err := x509.MarshalECPrivateKey(rootKey) + if err != nil { + return nil, nil, nil, fmt.Errorf("could not marshal private key: %w", err) + } + + // PEM private key + var rootPrivBytesOut []byte + rootPrivateKeyBuff := bytes.NewBuffer(rootPrivBytesOut) + err = pem.Encode(rootPrivateKeyBuff, &pem.Block{ + Type: "EC PRIVATE KEY", Bytes: rootPrivKeyDER}) + if err != nil { + return nil, nil, nil, fmt.Errorf("could not pem.Encode private key: %w", err) + } + + // PEM certificate + var rootCertBytesOut []byte + rootCertPemBuff := bytes.NewBuffer(rootCertBytesOut) + err = pem.Encode(rootCertPemBuff, &pem.Block{ + Type: "CERTIFICATE", Bytes: rootCertRawBytes}) + if err != nil { + return nil, nil, nil, fmt.Errorf("could not pem.Encode certificate: %w", err) + } + + // tls.Certificate + rootTLSCert, err := tls.X509KeyPair( + rootCertPemBuff.Bytes(), rootPrivateKeyBuff.Bytes()) + if err != nil { + return nil, nil, nil, fmt.Errorf("could not create key pair: %w", err) + } + + rootCACert, err := x509.ParseCertificate(rootTLSCert.Certificate[0]) + if err != nil { + return nil, nil, nil, fmt.Errorf("could not parse certificate: %w", err) + } + + return rootKey, rootCACert, rootCertPemBuff.Bytes(), nil +} + +// GenerateChildCert generates a x509 Certificate as a child of caCert and +// returns the following: +// - the certificate in PEM format as a byte slice +// - the private key in PEM format as a byte slice +// - the certificate and private key as a tls.Certificate +// +// If any error occurs during the generation process, a non-nil error is returned. +func GenerateChildCert(name string, caPrivKey *ecdsa.PrivateKey, caCert *x509.Certificate) ( + []byte, []byte, *tls.Certificate, error) { + + notBefore := time.Now() + notAfter := notBefore.Add(3 * time.Hour) + + certTemplate := &x509.Certificate{ + DNSNames: []string{name}, + SerialNumber: big.NewInt(1658), + Subject: pkix.Name{ + Organization: []string{"Gallifrey"}, + CommonName: name, + }, + NotBefore: notBefore, + NotAfter: notAfter, + KeyUsage: x509.KeyUsageDigitalSignature, + ExtKeyUsage: []x509.ExtKeyUsage{ + x509.ExtKeyUsageClientAuth, x509.ExtKeyUsageServerAuth}, + } + + privateKey, err := ecdsa.GenerateKey(elliptic.P384(), rand.Reader) + if err != nil { + return nil, nil, nil, fmt.Errorf("could not create private key: %w", err) + } + + certRawBytes, err := x509.CreateCertificate( + rand.Reader, certTemplate, caCert, &privateKey.PublicKey, caPrivKey) + if err != nil { + return nil, nil, nil, fmt.Errorf("could not create CA: %w", err) + } + + privateKeyDER, err := x509.MarshalECPrivateKey(privateKey) + if err != nil { + return nil, nil, nil, fmt.Errorf("could not marshal private key: %w", err) + } + + // PEM private key + var privBytesOut []byte + privateKeyBuff := bytes.NewBuffer(privBytesOut) + err = pem.Encode(privateKeyBuff, &pem.Block{ + Type: "EC PRIVATE KEY", Bytes: privateKeyDER}) + if err != nil { + return nil, nil, nil, fmt.Errorf("could not pem.Encode private key: %w", err) + } + privateKeyPemBytes := privateKeyBuff.Bytes() + + // PEM certificate + var certBytesOut []byte + certBuff := bytes.NewBuffer(certBytesOut) + err = pem.Encode(certBuff, &pem.Block{ + Type: "CERTIFICATE", Bytes: certRawBytes}) + if err != nil { + return nil, nil, nil, fmt.Errorf("could not pem.Encode certificate: %w", err) + } + certPemBytes := certBuff.Bytes() + + // TLS Certificate + tlsCert, err := tls.X509KeyPair(certPemBytes, privateKeyPemBytes) + if err != nil { + return nil, nil, nil, fmt.Errorf("could not create key pair: %w", err) + } + + return privateKeyPemBytes, certPemBytes, &tlsCert, nil +} diff --git a/x-pack/filebeat/tests/integration/managerV2_test.go b/x-pack/filebeat/tests/integration/managerV2_test.go index 3332d549fa2..b541b8d5409 100644 --- a/x-pack/filebeat/tests/integration/managerV2_test.go +++ b/x-pack/filebeat/tests/integration/managerV2_test.go @@ -7,21 +7,51 @@ package integration import ( + "bufio" + "crypto/tls" + "crypto/x509" + "encoding/json" "fmt" + "io" + "math" "os" "path/filepath" + "strings" "sync/atomic" "testing" "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + protobuf "google.golang.org/protobuf/proto" "github.com/elastic/beats/v7/libbeat/tests/integration" + "github.com/elastic/beats/v7/libbeat/version" + "github.com/elastic/beats/v7/testing/certutil" "github.com/elastic/beats/v7/x-pack/libbeat/management" "github.com/elastic/elastic-agent-client/v7/pkg/client/mock" "github.com/elastic/elastic-agent-client/v7/pkg/proto" ) +// Event is the common part of a beats event, the beats and Elastic Agent +// metadata. +type Event struct { + Metadata struct { + Version string `json:"version"` + } `json:"@metadata"` + ElasticAgent struct { + Snapshot bool `json:"snapshot"` + Version string `json:"version"` + Id string `json:"id"` + } `json:"elastic_agent"` + Agent struct { + Version string `json:"version"` + Id string `json:"id"` + } `json:"agent"` +} + // TestInputReloadUnderElasticAgent will start a Filebeat and cause the input // reload issue described on https://github.com/elastic/beats/issues/33653. // In short, a new input for a file needs to be started while there are still @@ -500,6 +530,208 @@ func TestRecoverFromInvalidOutputConfiguration(t *testing.T) { } } +func TestAgentPackageVersionOnStartUpInfo(t *testing.T) { + wantVersion := "8.13.0+build20131123" + + filebeat := integration.NewBeat( + t, + "filebeat", + "../../filebeat.test", + ) + + logFilePath := filepath.Join(filebeat.TempDir(), "logs-to-ingest.log") + generateLogFile(t, logFilePath) + + eventsDir := filepath.Join(filebeat.TempDir(), "ingested-events") + logLevel := proto.UnitLogLevel_INFO + units := []*proto.UnitExpected{ + { + Id: "output-file-unit", + Type: proto.UnitType_OUTPUT, + ConfigStateIdx: 1, + State: proto.State_HEALTHY, + LogLevel: logLevel, + Config: &proto.UnitExpectedConfig{ + Id: "default", + Type: "file", + Name: "events-to-file", + Source: integration.RequireNewStruct(t, + map[string]interface{}{ + "name": "events-to-file", + "type": "file", + "path": eventsDir, + }), + }, + }, + { + Id: "input-unit-1", + Type: proto.UnitType_INPUT, + ConfigStateIdx: 1, + State: proto.State_HEALTHY, + LogLevel: logLevel, + Config: &proto.UnitExpectedConfig{ + Id: "filestream-monitoring-agent", + Type: "filestream", + Name: "filestream-monitoring-agent", + Streams: []*proto.Stream{ + { + Id: "log-input-1", + Source: integration.RequireNewStruct(t, map[string]interface{}{ + "enabled": true, + "type": "log", + "paths": []interface{}{logFilePath}, + }), + }, + }, + }, + }, + } + wantAgentInfo := proto.AgentInfo{ + Id: "agent-id", + Version: wantVersion, + Snapshot: true, + } + + observedCh := make(chan *proto.CheckinObserved, 5) + server := &mock.StubServerV2{ + CheckinV2Impl: func(observed *proto.CheckinObserved) *proto.CheckinExpected { + observedCh <- observed + return &proto.CheckinExpected{ + AgentInfo: &wantAgentInfo, + Units: units, + } + }, + ActionImpl: func(response *proto.ActionResponse) error { return nil }, + } + + rootKey, rootCACert, rootCertPem, err := certutil.NewRootCA() + require.NoError(t, err, "could not generate root CA") + + rootCertPool := x509.NewCertPool() + ok := rootCertPool.AppendCertsFromPEM(rootCertPem) + require.Truef(t, ok, "could not append certs from PEM to cert pool") + + beatPrivKeyPem, beatCertPem, beatTLSCert, err := + certutil.GenerateChildCert("localhost", rootKey, rootCACert) + require.NoError(t, err, "could not generate child TLS certificate") + + getCert := func(info *tls.ClientHelloInfo) (*tls.Certificate, error) { + // it's one of the child certificates. As there is only one, return it + return beatTLSCert, nil + } + + creds := credentials.NewTLS(&tls.Config{ + ClientAuth: tls.RequireAndVerifyClientCert, + ClientCAs: rootCertPool, + GetCertificate: getCert, + MinVersion: tls.VersionTLS12, + }) + err = server.Start(grpc.Creds(creds)) + require.NoError(t, err, "failed starting GRPC server") + t.Cleanup(server.Stop) + + filebeat.Start("-E", "management.enabled=true") + + startUpInfo := &proto.StartUpInfo{ + Addr: fmt.Sprintf("localhost:%d", server.Port), + ServerName: "localhost", + Token: "token", + CaCert: rootCertPem, + PeerCert: beatCertPem, + PeerKey: beatPrivKeyPem, + Services: []proto.ConnInfoServices{proto.ConnInfoServices_CheckinV2}, + AgentInfo: &wantAgentInfo, + } + writeStartUpInfo(t, filebeat.Stdin(), startUpInfo) + // for some reason the pipe needs to be closed for filebeat to read it. + require.NoError(t, filebeat.Stdin().Close(), "failed closing stdin pipe") + + // get 1st observed + observed := <-observedCh + // drain observedCh so server won't block + go func() { + for { + <-observedCh + } + }() + + msg := strings.Builder{} + require.Eventuallyf(t, func() bool { + msg.Reset() + + _, err = os.Stat(eventsDir) + if err != nil { + fmt.Fprintf(&msg, "could not verify output directory exists: %v", + err) + return false + } + + entries, err := os.ReadDir(eventsDir) + if err != nil { + fmt.Fprintf(&msg, "failed checking output directory for files: %v", + err) + return false + } + + if len(entries) == 0 { + fmt.Fprintf(&msg, "no file found on %s", eventsDir) + return false + } + + for _, e := range entries { + if e.IsDir() { + continue + } + + i, err := e.Info() + if err != nil { + fmt.Fprintf(&msg, "could not read info of %q", e.Name()) + return false + } + if i.Size() == 0 { + fmt.Fprintf(&msg, "file %q was created, but it's still empty", + e.Name()) + return false + } + + // read one line to make sure it isn't a 1/2 written JSON + eventsFile := filepath.Join(eventsDir, e.Name()) + f, err := os.Open(eventsFile) + if err != nil { + fmt.Fprintf(&msg, "could not open file %q", eventsFile) + return false + } + + scanner := bufio.NewScanner(f) + if scanner.Scan() { + var ev Event + err := json.Unmarshal(scanner.Bytes(), &ev) + if err != nil { + fmt.Fprintf(&msg, "failed to read event from file: %v", err) + return false + } + return true + } + } + + return true + }, 30*time.Second, time.Second, "no event was produced: %s", &msg) + + assert.Equal(t, version.Commit(), observed.VersionInfo.BuildHash) + + evs := getEventsFromFileOutput[Event](t, eventsDir, 100) + for _, got := range evs { + assert.Equal(t, wantVersion, got.Metadata.Version) + + assert.Equal(t, wantAgentInfo.Id, got.ElasticAgent.Id) + assert.Equal(t, wantAgentInfo.Version, got.ElasticAgent.Version) + assert.Equal(t, wantAgentInfo.Snapshot, got.ElasticAgent.Snapshot) + + assert.Equal(t, wantAgentInfo.Id, got.Agent.Id) + assert.Equal(t, wantVersion, got.Agent.Version) + } +} + // generateLogFile generates a log file by appending the current // time to it every second. func generateLogFile(t *testing.T, fullPath string) { @@ -543,3 +775,52 @@ func generateLogFile(t *testing.T, fullPath string) { } }() } + +// getEventsFromFileOutput reads all events from all the files on dir. If n > 0, +// then it reads up to n events. It considers all files are ndjson, and it skips +// any directory within dir. +func getEventsFromFileOutput[E any](t *testing.T, dir string, n int) []E { + t.Helper() + + if n < 1 { + n = math.MaxInt + } + + var events []E + entries, err := os.ReadDir(dir) + require.NoError(t, err, "could not read events directory") + for _, e := range entries { + if e.IsDir() { + continue + } + f, err := os.Open(filepath.Join(dir, e.Name())) + require.NoErrorf(t, err, "could not open file %q", e.Name()) + + scanner := bufio.NewScanner(f) + for scanner.Scan() { + var ev E + err := json.Unmarshal(scanner.Bytes(), &ev) + require.NoError(t, err, "failed to read event") + events = append(events, ev) + + if len(events) >= n { + return events + } + } + } + + return events +} + +func writeStartUpInfo(t *testing.T, w io.Writer, info *proto.StartUpInfo) { + t.Helper() + if len(info.Services) == 0 { + info.Services = []proto.ConnInfoServices{proto.ConnInfoServices_CheckinV2} + } + + infoBytes, err := protobuf.Marshal(info) + require.NoError(t, err, "failed to marshal connection information") + + _, err = w.Write(infoBytes) + require.NoError(t, err, "failed to write connection information") +} diff --git a/x-pack/libbeat/management/managerV2.go b/x-pack/libbeat/management/managerV2.go index 235325c0cbf..71b14152c65 100644 --- a/x-pack/libbeat/management/managerV2.go +++ b/x-pack/libbeat/management/managerV2.go @@ -23,16 +23,15 @@ import ( "github.com/elastic/beats/v7/libbeat/cfgfile" "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/common/reload" "github.com/elastic/beats/v7/libbeat/features" + lbmanagement "github.com/elastic/beats/v7/libbeat/management" + "github.com/elastic/beats/v7/libbeat/publisher" + "github.com/elastic/beats/v7/libbeat/version" "github.com/elastic/elastic-agent-client/v7/pkg/client" "github.com/elastic/elastic-agent-client/v7/pkg/proto" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" - - "github.com/elastic/beats/v7/libbeat/common/reload" - lbmanagement "github.com/elastic/beats/v7/libbeat/management" - "github.com/elastic/beats/v7/libbeat/publisher" - "github.com/elastic/beats/v7/libbeat/version" ) // diagnosticHandler is a wrapper type that's a bit of a hack, the compiler won't let us send the raw unit struct, @@ -161,6 +160,13 @@ func NewV2AgentManager(config *conf.C, registry *reload.Registry) (lbmanagement. } } + versionInfo := client.VersionInfo{ + Name: "beat-v2-client", + BuildHash: version.Commit(), + Meta: map[string]string{ + "commit": version.Commit(), + "build_time": version.BuildTime().String(), + }} var agentClient client.V2 var err error if c.InsecureGRPCURLForTesting != "" && c.Enabled { @@ -168,20 +174,11 @@ func NewV2AgentManager(config *conf.C, registry *reload.Registry) (lbmanagement. logger.Info("Using INSECURE GRPC connection, this should be only used for testing!") agentClient = client.NewV2(c.InsecureGRPCURLForTesting, "", // Insecure connection for test, no token needed - client.VersionInfo{ - Name: "beat-v2-client-for-testing", - Version: version.GetDefaultVersion(), - }, client.WithGRPCDialOptions(grpc.WithTransportCredentials(insecure.NewCredentials()))) + versionInfo, + client.WithGRPCDialOptions(grpc.WithTransportCredentials(insecure.NewCredentials()))) } else { // Normal Elastic-Agent-Client initialisation - agentClient, _, err = client.NewV2FromReader(os.Stdin, client.VersionInfo{ - Name: "beat-v2-client", - Version: version.GetDefaultVersion(), - Meta: map[string]string{ - "commit": version.Commit(), - "build_time": version.BuildTime().String(), - }, - }) + agentClient, _, err = client.NewV2FromReader(os.Stdin, versionInfo) if err != nil { return nil, fmt.Errorf("error reading control config from agent: %w", err) } @@ -231,6 +228,14 @@ func NewV2AgentManagerWithClient(config *Config, registry *reload.Registry, agen // Beats central management interface implementation // ================================ +func (cm *BeatV2Manager) AgentInfo() client.AgentInfo { + if cm.client.AgentInfo() == nil { + return client.AgentInfo{} + } + + return *cm.client.AgentInfo() +} + // RegisterDiagnosticHook will register a diagnostic callback function when elastic-agent asks for a diagnostics dump func (cm *BeatV2Manager) RegisterDiagnosticHook(name string, description string, filename string, contentType string, hook client.DiagnosticHook) { cm.client.RegisterDiagnosticHook(name, description, filename, contentType, hook) diff --git a/x-pack/libbeat/management/managerV2_test.go b/x-pack/libbeat/management/managerV2_test.go index ea67fdd89f4..66ca7f17966 100644 --- a/x-pack/libbeat/management/managerV2_test.go +++ b/x-pack/libbeat/management/managerV2_test.go @@ -204,8 +204,7 @@ func TestManagerV2(t *testing.T) { defer srv.Stop() client := client.NewV2(fmt.Sprintf(":%d", srv.Port), "", client.VersionInfo{ - Name: "program", - Version: "v1.0.0", + Name: "program", Meta: map[string]string{ "key": "value", }, diff --git a/x-pack/libbeat/management/tests/mock_server.go b/x-pack/libbeat/management/tests/mock_server.go index 8671b124233..a90ae633885 100644 --- a/x-pack/libbeat/management/tests/mock_server.go +++ b/x-pack/libbeat/management/tests/mock_server.go @@ -31,7 +31,7 @@ func NewMockServer(t *testing.T, canStop func(string) bool, inputConfig *proto.U unitOutID := mock.NewID() token := mock.NewID() - //var gotConfig bool + // var gotConfig bool var mut sync.Mutex @@ -98,8 +98,7 @@ func NewMockServer(t *testing.T, canStop func(string) bool, inputConfig *proto.U require.NoError(t, err) client := client.NewV2(fmt.Sprintf(":%d", srv.Port), token, client.VersionInfo{ - Name: "program", - Version: "v1.0.0", + Name: "program", Meta: map[string]string{ "key": "value", }, @@ -111,7 +110,7 @@ func NewMockServer(t *testing.T, canStop func(string) bool, inputConfig *proto.U // helper to wrap the CheckinExpected config we need with every refresh of the mock server func sendUnitsWithState(state proto.State, input, output *proto.UnitExpectedConfig, inId, outId string, stateIndex uint64) *proto.CheckinExpected { return &proto.CheckinExpected{ - AgentInfo: &proto.CheckinAgentInfo{ + AgentInfo: &proto.AgentInfo{ Id: "test-agent", Version: "8.4.0", Snapshot: true,