diff --git a/.gitignore b/.gitignore index b0c8ebec457..3ffafb6f0a5 100644 --- a/.gitignore +++ b/.gitignore @@ -58,3 +58,4 @@ fleet.yml.lock fleet.yml.old pkg/component/fake/component/component pkg/component/fake/shipper/shipper +internal/pkg/agent/install/testblocking/testblocking diff --git a/internal/pkg/agent/application/application.go b/internal/pkg/agent/application/application.go index c277382ae93..64df27c9ae9 100644 --- a/internal/pkg/agent/application/application.go +++ b/internal/pkg/agent/application/application.go @@ -28,6 +28,7 @@ import ( "github.com/elastic/elastic-agent/internal/pkg/capabilities" "github.com/elastic/elastic-agent/internal/pkg/composable" "github.com/elastic/elastic-agent/internal/pkg/config" + "github.com/elastic/elastic-agent/internal/pkg/otel" "github.com/elastic/elastic-agent/internal/pkg/release" "github.com/elastic/elastic-agent/pkg/component" "github.com/elastic/elastic-agent/pkg/component/runtime" @@ -46,6 +47,7 @@ func New( testingMode bool, fleetInitTimeout time.Duration, disableMonitoring bool, + runAsOtel bool, modifiers ...component.PlatformModifier, ) (*coordinator.Coordinator, coordinator.ConfigManager, composable.Controller, error) { @@ -144,6 +146,9 @@ func New( log.Debugf("Reloading of configuration is on, frequency is set to %s", cfg.Settings.Reload.Period) configMgr = newPeriodic(log, cfg.Settings.Reload.Period, discover, loader) } + } else if runAsOtel { + // ignoring configuration in elastic-agent.yml + configMgr = otel.NewOtelModeConfigManager() } else { isManaged = true var store storage.Store diff --git a/internal/pkg/agent/application/application_test.go b/internal/pkg/agent/application/application_test.go index cf67f19c6cf..37744a54a3e 100644 --- a/internal/pkg/agent/application/application_test.go +++ b/internal/pkg/agent/application/application_test.go @@ -63,6 +63,7 @@ func TestLimitsLog(t *testing.T) { true, // testingMode time.Millisecond, // fleetInitTimeout true, // disable monitoring + false, // not otel mode ) require.NoError(t, err) diff --git a/internal/pkg/agent/cmd/run.go b/internal/pkg/agent/cmd/run.go index 927a79808df..e878e75bb98 100644 --- a/internal/pkg/agent/cmd/run.go +++ b/internal/pkg/agent/cmd/run.go @@ -12,6 +12,7 @@ import ( "os/signal" "path/filepath" "strings" + "sync" "syscall" "time" @@ -19,6 +20,7 @@ import ( apmtransport "go.elastic.co/apm/transport" "gopkg.in/yaml.v2" + "github.com/hashicorp/go-multierror" "github.com/spf13/cobra" "github.com/elastic/elastic-agent-libs/api" @@ -61,6 +63,7 @@ const ( ) type cfgOverrider func(cfg *configuration.Configuration) +type awaiters []<-chan struct{} func newRunCommandWithArgs(_ []string, streams *cli.IOStreams) *cobra.Command { cmd := &cobra.Command{ @@ -136,17 +139,38 @@ func run(override cfgOverrider, testingMode bool, fleetInitTimeout time.Duration go service.ProcessWindowsControlEvents(stopBeat) // detect otel - if runAsOtel := otel.IsOtelConfig(ctx, paths.ConfigFile()); runAsOtel { - return otel.Run(ctx, cancel, stop, testingMode) + runAsOtel := otel.IsOtelConfig(ctx, paths.ConfigFile()) + var awaiters awaiters + var resErr error + if runAsOtel { + var otelStartWg sync.WaitGroup + otelAwaiter := make(chan struct{}) + awaiters = append(awaiters, otelAwaiter) + + otelStartWg.Add(1) + go func() { + otelStartWg.Done() + if err := otel.Run(ctx, stop); err != nil { + resErr = multierror.Append(resErr, err) + } + + // close awaiter handled in run loop + close(otelAwaiter) + }() + + // wait for otel to start + otelStartWg.Wait() } - // not otel continue as usual - return runElasticAgent(ctx, cancel, override, stop, testingMode, fleetInitTimeout, modifiers...) + if err := runElasticAgent(ctx, cancel, override, stop, testingMode, fleetInitTimeout, runAsOtel, awaiters, modifiers...); err != nil { + resErr = multierror.Append(resErr, err) + } + return resErr } -func runElasticAgent(ctx context.Context, cancel context.CancelFunc, override cfgOverrider, stop chan bool, testingMode bool, fleetInitTimeout time.Duration, modifiers ...component.PlatformModifier) error { - cfg, err := loadConfig(ctx, override) +func runElasticAgent(ctx context.Context, cancel context.CancelFunc, override cfgOverrider, stop chan bool, testingMode bool, fleetInitTimeout time.Duration, runAsOtel bool, awaiters awaiters, modifiers ...component.PlatformModifier) error { + cfg, err := loadConfig(ctx, override, runAsOtel) if err != nil { return err } @@ -259,7 +283,7 @@ func runElasticAgent(ctx context.Context, cancel context.CancelFunc, override cf l.Info("APM instrumentation disabled") } - coord, configMgr, composable, err := application.New(ctx, l, baseLogger, logLvl, agentInfo, rex, tracer, testingMode, fleetInitTimeout, configuration.IsFleetServerBootstrap(cfg.Fleet), modifiers...) + coord, configMgr, composable, err := application.New(ctx, l, baseLogger, logLvl, agentInfo, rex, tracer, testingMode, fleetInitTimeout, configuration.IsFleetServerBootstrap(cfg.Fleet), runAsOtel, modifiers...) if err != nil { return err } @@ -366,6 +390,9 @@ LOOP: } cancel() err = <-appErr + for _, a := range awaiters { + <-a // wait for awaiter to be done + } if logShutdown { l.Info("Shutting down completed.") @@ -376,7 +403,11 @@ LOOP: return err } -func loadConfig(ctx context.Context, override cfgOverrider) (*configuration.Configuration, error) { +func loadConfig(ctx context.Context, override cfgOverrider, runAsOtel bool) (*configuration.Configuration, error) { + if runAsOtel { + return configuration.DefaultConfiguration(), nil + } + pathConfigFile := paths.ConfigFile() rawConfig, err := config.LoadFile(pathConfigFile) if err != nil { @@ -527,7 +558,7 @@ func tryDelayEnroll(ctx context.Context, logger *logger.Logger, cfg *configurati errors.M("path", enrollPath))) } logger.Info("Successfully performed delayed enrollment of this Elastic Agent.") - return loadConfig(ctx, override) + return loadConfig(ctx, override, false) } func initTracer(agentName, version string, mcfg *monitoringCfg.MonitoringConfig) (*apm.Tracer, error) { diff --git a/internal/pkg/otel/config_manager.go b/internal/pkg/otel/config_manager.go new file mode 100644 index 00000000000..a9a90cab627 --- /dev/null +++ b/internal/pkg/otel/config_manager.go @@ -0,0 +1,47 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package otel + +import ( + "context" + + "github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator" +) + +// OtelModeConfigManager serves as a config manager for OTel use cases +// In this case agent should ignore all configuration coming from elastic-agent.yml file +// or other sources. +type OtelModeConfigManager struct { + ch chan coordinator.ConfigChange + errCh chan error +} + +// NewOtelModeConfigManager creates new OtelModeConfigManager ignoring +// configuration coming from other sources. +func NewOtelModeConfigManager() *OtelModeConfigManager { + return &OtelModeConfigManager{ + ch: make(chan coordinator.ConfigChange), + errCh: make(chan error), + } +} + +func (t *OtelModeConfigManager) Run(ctx context.Context) error { + <-ctx.Done() + return ctx.Err() +} + +func (t *OtelModeConfigManager) Errors() <-chan error { + return t.errCh +} + +// ActionErrors returns the error channel for actions. +// Returns nil channel. +func (t *OtelModeConfigManager) ActionErrors() <-chan error { + return nil +} + +func (t *OtelModeConfigManager) Watch() <-chan coordinator.ConfigChange { + return t.ch +} diff --git a/internal/pkg/otel/run.go b/internal/pkg/otel/run.go index ae2f4249823..833121221d0 100644 --- a/internal/pkg/otel/run.go +++ b/internal/pkg/otel/run.go @@ -43,7 +43,7 @@ func IsOtelConfig(ctx context.Context, pathConfigFile string) bool { return false } -func Run(ctx context.Context, cancel context.CancelFunc, stop chan bool, testingMode bool) error { +func Run(ctx context.Context, stop chan bool) error { fmt.Fprintln(os.Stdout, "Starting in otel mode") settings, err := newSettings(paths.ConfigFile(), release.Version()) if err != nil { diff --git a/pkg/testing/fixture.go b/pkg/testing/fixture.go index 644d84d3168..7a1b45ee49a 100644 --- a/pkg/testing/fixture.go +++ b/pkg/testing/fixture.go @@ -445,7 +445,7 @@ func RunProcess(t *testing.T, // when `Run` is called. // // if shouldWatchState is set to false, communicating state does not happen. -func (f *Fixture) RunWithClient(ctx context.Context, shouldWatchState bool, states ...State) error { +func (f *Fixture) RunWithClient(ctx context.Context, shouldWatchState bool, enableTestingMode bool, states ...State) error { if _, deadlineSet := ctx.Deadline(); !deadlineSet { f.t.Fatal("Context passed to Fixture.Run() has no deadline set.") } @@ -491,7 +491,10 @@ func (f *Fixture) RunWithClient(ctx context.Context, shouldWatchState bool, stat stdOut := newLogWatcher(logProxy) stdErr := newLogWatcher(logProxy) - args := []string{"run", "-e", "--disable-encrypted-store", "--testing-mode"} + args := []string{"run", "-e", "--disable-encrypted-store"} + if enableTestingMode { + args = append(args, "--testing-mode") + } args = append(args, f.additionalArgs...) @@ -601,7 +604,7 @@ func (f *Fixture) RunWithClient(ctx context.Context, shouldWatchState bool, stat // The `elastic-agent.yml` generated by `Fixture.Configure` is ignored // when `Run` is called. func (f *Fixture) Run(ctx context.Context, states ...State) error { - return f.RunWithClient(ctx, true, states...) + return f.RunWithClient(ctx, true, true, states...) } // Exec provides a way of performing subcommand on the prepared Elastic Agent binary. diff --git a/testing/integration/otel_test.go b/testing/integration/otel_test.go index c3b22ef3723..6aa06c9031e 100644 --- a/testing/integration/otel_test.go +++ b/testing/integration/otel_test.go @@ -124,7 +124,7 @@ func TestOtelFileProcessing(t *testing.T) { fixtureWg.Add(1) go func() { defer fixtureWg.Done() - err = fixture.RunWithClient(ctx, false) + err = fixture.RunWithClient(ctx, false, false) }() var content []byte @@ -132,6 +132,23 @@ func TestOtelFileProcessing(t *testing.T) { `"stringValue":"syslog"`, // syslog is being processed `"stringValue":"system.log"`, // system.log is being processed }) + + // check `elastic-agent status` returns successfully + require.Eventuallyf(t, func() bool { + // This will return errors until it connects to the agent, + // they're mostly noise because until the agent starts running + // we will get connection errors. If the test fails + // the agent logs will be present in the error message + // which should help to explain why the agent was not + // healthy. + err := fixture.IsHealthy(ctx) + return err == nil + }, + 2*time.Minute, time.Second, + "Elastic-Agent did not report healthy. Agent status error: \"%v\"", + err, + ) + require.Eventually(t, func() bool { // verify file exists @@ -240,7 +257,7 @@ func TestOtelAPMIngestion(t *testing.T) { var fixtureWg sync.WaitGroup fixtureWg.Add(1) go func() { - fixture.RunWithClient(ctx, false) + fixture.RunWithClient(ctx, false, false) fixtureWg.Done() }()