Skip to content

Commit

Permalink
Status subcommand reporting agent status for otel mode - Phase 2 (#4047)
Browse files Browse the repository at this point in the history
* Status subcommand reporting agent status for otel mode

* changed error handling

* changed error handling

* check status works and returns healthy in otel e2e test

* lint
  • Loading branch information
michalpristas authored Jan 22, 2024
1 parent 91c0a94 commit c588a73
Show file tree
Hide file tree
Showing 8 changed files with 120 additions and 15 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,4 @@ fleet.yml.lock
fleet.yml.old
pkg/component/fake/component/component
pkg/component/fake/shipper/shipper
internal/pkg/agent/install/testblocking/testblocking
5 changes: 5 additions & 0 deletions internal/pkg/agent/application/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/elastic/elastic-agent/internal/pkg/capabilities"
"github.com/elastic/elastic-agent/internal/pkg/composable"
"github.com/elastic/elastic-agent/internal/pkg/config"
"github.com/elastic/elastic-agent/internal/pkg/otel"
"github.com/elastic/elastic-agent/internal/pkg/release"
"github.com/elastic/elastic-agent/pkg/component"
"github.com/elastic/elastic-agent/pkg/component/runtime"
Expand All @@ -46,6 +47,7 @@ func New(
testingMode bool,
fleetInitTimeout time.Duration,
disableMonitoring bool,
runAsOtel bool,
modifiers ...component.PlatformModifier,
) (*coordinator.Coordinator, coordinator.ConfigManager, composable.Controller, error) {

Expand Down Expand Up @@ -144,6 +146,9 @@ func New(
log.Debugf("Reloading of configuration is on, frequency is set to %s", cfg.Settings.Reload.Period)
configMgr = newPeriodic(log, cfg.Settings.Reload.Period, discover, loader)
}
} else if runAsOtel {
// ignoring configuration in elastic-agent.yml
configMgr = otel.NewOtelModeConfigManager()
} else {
isManaged = true
var store storage.Store
Expand Down
1 change: 1 addition & 0 deletions internal/pkg/agent/application/application_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func TestLimitsLog(t *testing.T) {
true, // testingMode
time.Millisecond, // fleetInitTimeout
true, // disable monitoring
false, // not otel mode
)
require.NoError(t, err)

Expand Down
49 changes: 40 additions & 9 deletions internal/pkg/agent/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@ import (
"os/signal"
"path/filepath"
"strings"
"sync"
"syscall"
"time"

"go.elastic.co/apm"
apmtransport "go.elastic.co/apm/transport"
"gopkg.in/yaml.v2"

"github.com/hashicorp/go-multierror"
"github.com/spf13/cobra"

"github.com/elastic/elastic-agent-libs/api"
Expand Down Expand Up @@ -61,6 +63,7 @@ const (
)

type cfgOverrider func(cfg *configuration.Configuration)
type awaiters []<-chan struct{}

func newRunCommandWithArgs(_ []string, streams *cli.IOStreams) *cobra.Command {
cmd := &cobra.Command{
Expand Down Expand Up @@ -136,17 +139,38 @@ func run(override cfgOverrider, testingMode bool, fleetInitTimeout time.Duration
go service.ProcessWindowsControlEvents(stopBeat)

// detect otel
if runAsOtel := otel.IsOtelConfig(ctx, paths.ConfigFile()); runAsOtel {
return otel.Run(ctx, cancel, stop, testingMode)
runAsOtel := otel.IsOtelConfig(ctx, paths.ConfigFile())
var awaiters awaiters
var resErr error
if runAsOtel {
var otelStartWg sync.WaitGroup
otelAwaiter := make(chan struct{})
awaiters = append(awaiters, otelAwaiter)

otelStartWg.Add(1)
go func() {
otelStartWg.Done()
if err := otel.Run(ctx, stop); err != nil {
resErr = multierror.Append(resErr, err)
}

// close awaiter handled in run loop
close(otelAwaiter)
}()

// wait for otel to start
otelStartWg.Wait()
}

// not otel continue as usual
return runElasticAgent(ctx, cancel, override, stop, testingMode, fleetInitTimeout, modifiers...)
if err := runElasticAgent(ctx, cancel, override, stop, testingMode, fleetInitTimeout, runAsOtel, awaiters, modifiers...); err != nil {
resErr = multierror.Append(resErr, err)
}

return resErr
}

func runElasticAgent(ctx context.Context, cancel context.CancelFunc, override cfgOverrider, stop chan bool, testingMode bool, fleetInitTimeout time.Duration, modifiers ...component.PlatformModifier) error {
cfg, err := loadConfig(ctx, override)
func runElasticAgent(ctx context.Context, cancel context.CancelFunc, override cfgOverrider, stop chan bool, testingMode bool, fleetInitTimeout time.Duration, runAsOtel bool, awaiters awaiters, modifiers ...component.PlatformModifier) error {
cfg, err := loadConfig(ctx, override, runAsOtel)
if err != nil {
return err
}
Expand Down Expand Up @@ -259,7 +283,7 @@ func runElasticAgent(ctx context.Context, cancel context.CancelFunc, override cf
l.Info("APM instrumentation disabled")
}

coord, configMgr, composable, err := application.New(ctx, l, baseLogger, logLvl, agentInfo, rex, tracer, testingMode, fleetInitTimeout, configuration.IsFleetServerBootstrap(cfg.Fleet), modifiers...)
coord, configMgr, composable, err := application.New(ctx, l, baseLogger, logLvl, agentInfo, rex, tracer, testingMode, fleetInitTimeout, configuration.IsFleetServerBootstrap(cfg.Fleet), runAsOtel, modifiers...)
if err != nil {
return err
}
Expand Down Expand Up @@ -366,6 +390,9 @@ LOOP:
}
cancel()
err = <-appErr
for _, a := range awaiters {
<-a // wait for awaiter to be done
}

if logShutdown {
l.Info("Shutting down completed.")
Expand All @@ -376,7 +403,11 @@ LOOP:
return err
}

func loadConfig(ctx context.Context, override cfgOverrider) (*configuration.Configuration, error) {
func loadConfig(ctx context.Context, override cfgOverrider, runAsOtel bool) (*configuration.Configuration, error) {
if runAsOtel {
return configuration.DefaultConfiguration(), nil
}

pathConfigFile := paths.ConfigFile()
rawConfig, err := config.LoadFile(pathConfigFile)
if err != nil {
Expand Down Expand Up @@ -527,7 +558,7 @@ func tryDelayEnroll(ctx context.Context, logger *logger.Logger, cfg *configurati
errors.M("path", enrollPath)))
}
logger.Info("Successfully performed delayed enrollment of this Elastic Agent.")
return loadConfig(ctx, override)
return loadConfig(ctx, override, false)
}

func initTracer(agentName, version string, mcfg *monitoringCfg.MonitoringConfig) (*apm.Tracer, error) {
Expand Down
47 changes: 47 additions & 0 deletions internal/pkg/otel/config_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package otel

import (
"context"

"github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator"
)

// OtelModeConfigManager serves as a config manager for OTel use cases
// In this case agent should ignore all configuration coming from elastic-agent.yml file
// or other sources.
type OtelModeConfigManager struct {
ch chan coordinator.ConfigChange
errCh chan error
}

// NewOtelModeConfigManager creates new OtelModeConfigManager ignoring
// configuration coming from other sources.
func NewOtelModeConfigManager() *OtelModeConfigManager {
return &OtelModeConfigManager{
ch: make(chan coordinator.ConfigChange),
errCh: make(chan error),
}
}

func (t *OtelModeConfigManager) Run(ctx context.Context) error {
<-ctx.Done()
return ctx.Err()
}

func (t *OtelModeConfigManager) Errors() <-chan error {
return t.errCh
}

// ActionErrors returns the error channel for actions.
// Returns nil channel.
func (t *OtelModeConfigManager) ActionErrors() <-chan error {
return nil
}

func (t *OtelModeConfigManager) Watch() <-chan coordinator.ConfigChange {
return t.ch
}
2 changes: 1 addition & 1 deletion internal/pkg/otel/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func IsOtelConfig(ctx context.Context, pathConfigFile string) bool {
return false
}

func Run(ctx context.Context, cancel context.CancelFunc, stop chan bool, testingMode bool) error {
func Run(ctx context.Context, stop chan bool) error {
fmt.Fprintln(os.Stdout, "Starting in otel mode")
settings, err := newSettings(paths.ConfigFile(), release.Version())
if err != nil {
Expand Down
9 changes: 6 additions & 3 deletions pkg/testing/fixture.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ func RunProcess(t *testing.T,
// when `Run` is called.
//
// if shouldWatchState is set to false, communicating state does not happen.
func (f *Fixture) RunWithClient(ctx context.Context, shouldWatchState bool, states ...State) error {
func (f *Fixture) RunWithClient(ctx context.Context, shouldWatchState bool, enableTestingMode bool, states ...State) error {
if _, deadlineSet := ctx.Deadline(); !deadlineSet {
f.t.Fatal("Context passed to Fixture.Run() has no deadline set.")
}
Expand Down Expand Up @@ -491,7 +491,10 @@ func (f *Fixture) RunWithClient(ctx context.Context, shouldWatchState bool, stat
stdOut := newLogWatcher(logProxy)
stdErr := newLogWatcher(logProxy)

args := []string{"run", "-e", "--disable-encrypted-store", "--testing-mode"}
args := []string{"run", "-e", "--disable-encrypted-store"}
if enableTestingMode {
args = append(args, "--testing-mode")
}

args = append(args, f.additionalArgs...)

Expand Down Expand Up @@ -601,7 +604,7 @@ func (f *Fixture) RunWithClient(ctx context.Context, shouldWatchState bool, stat
// The `elastic-agent.yml` generated by `Fixture.Configure` is ignored
// when `Run` is called.
func (f *Fixture) Run(ctx context.Context, states ...State) error {
return f.RunWithClient(ctx, true, states...)
return f.RunWithClient(ctx, true, true, states...)
}

// Exec provides a way of performing subcommand on the prepared Elastic Agent binary.
Expand Down
21 changes: 19 additions & 2 deletions testing/integration/otel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,14 +124,31 @@ func TestOtelFileProcessing(t *testing.T) {
fixtureWg.Add(1)
go func() {
defer fixtureWg.Done()
err = fixture.RunWithClient(ctx, false)
err = fixture.RunWithClient(ctx, false, false)
}()

var content []byte
watchLines := linesTrackMap([]string{
`"stringValue":"syslog"`, // syslog is being processed
`"stringValue":"system.log"`, // system.log is being processed
})

// check `elastic-agent status` returns successfully
require.Eventuallyf(t, func() bool {
// This will return errors until it connects to the agent,
// they're mostly noise because until the agent starts running
// we will get connection errors. If the test fails
// the agent logs will be present in the error message
// which should help to explain why the agent was not
// healthy.
err := fixture.IsHealthy(ctx)
return err == nil
},
2*time.Minute, time.Second,
"Elastic-Agent did not report healthy. Agent status error: \"%v\"",
err,
)

require.Eventually(t,
func() bool {
// verify file exists
Expand Down Expand Up @@ -240,7 +257,7 @@ func TestOtelAPMIngestion(t *testing.T) {
var fixtureWg sync.WaitGroup
fixtureWg.Add(1)
go func() {
fixture.RunWithClient(ctx, false)
fixture.RunWithClient(ctx, false, false)
fixtureWg.Done()
}()

Expand Down

0 comments on commit c588a73

Please sign in to comment.