Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Elastic Agent] Fix Docker container to allow state to properly be handled #24817

Merged
merged 12 commits into from
Mar 31, 2021
1 change: 1 addition & 0 deletions x-pack/elastic-agent/CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,4 @@
- Add TLS support for Fleet Server {pull}24142[24142]
- Add support for Fleet Server running under Elastic Agent {pull}24220[24220]
- Add CA support to Elastic Agent docker image {pull}24486[24486]
- Add STATE_PATH, CONFIG_PATH, LOGS_PATH to Elastic Agent docker image {pull}24817[24817]
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ func TestStreamCheck(t *testing.T) {
},
}

log, err := logger.New("")
log, err := logger.New("", false)
assert.NoError(t, err)

for _, tc := range testCases {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func newFleetServerBootstrap(
}

if log == nil {
log, err = logger.NewFromConfig("", cfg.Settings.LoggingConfig)
log, err = logger.NewFromConfig("", cfg.Settings.LoggingConfig, false)
if err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func withGateway(agentInfo agentInfo, settings *fleetGatewaySettings, fn withGat
client := newTestingClient()
dispatcher := newTestingDispatcher()

log, _ := logger.New("fleet_gateway")
log, _ := logger.New("fleet_gateway", false)
rep := getReporter(agentInfo, log, t)

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -252,7 +252,7 @@ func TestFleetGateway(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

log, _ := logger.New("tst")
log, _ := logger.New("tst", false)
stateStore, err := store.NewStateStore(log, storage.NewDiskStore(paths.AgentStateStoreFile()))
require.NoError(t, err)

Expand Down Expand Up @@ -342,7 +342,7 @@ func TestFleetGateway(t *testing.T) {
dispatcher := newTestingDispatcher()

ctx, cancel := context.WithCancel(context.Background())
log, _ := logger.New("tst")
log, _ := logger.New("tst", false)

stateStore, err := store.NewStateStore(log, storage.NewDiskStore(paths.AgentStateStoreFile()))
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion x-pack/elastic-agent/pkg/agent/application/local_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func newLocal(
}

if log == nil {
log, err = logger.NewFromConfig("", cfg.Settings.LoggingConfig)
log, err = logger.NewFromConfig("", cfg.Settings.LoggingConfig, true)
if err != nil {
return nil, err
}
Expand Down
15 changes: 7 additions & 8 deletions x-pack/elastic-agent/pkg/agent/application/managed_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,11 +192,6 @@ func newManaged(
storeSaver,
)

if cfg.Fleet.Server == nil {
// setters only set when not running a local Fleet Server
policyChanger.AddSetter(acker)
}

actionDispatcher.MustRegister(
&fleetapi.ActionPolicyChange{},
policyChanger,
Expand Down Expand Up @@ -271,9 +266,13 @@ func newManaged(
if err != nil {
return nil, err
}
// add the gateway to setters, so the gateway can be updated
// when the hosts for Kibana are updated by the policy.
policyChanger.AddSetter(gateway)
// add the acker and gateway to setters, so the they can be updated
// when the hosts for Fleet Server are updated by the policy.
if cfg.Fleet.Server == nil {
// setters only set when not running a local Fleet Server
policyChanger.AddSetter(gateway)
policyChanger.AddSetter(acker)
}

managedApplication.gateway = gateway
return managedApplication, nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestManagedModeRouting(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

log, _ := logger.New("")
log, _ := logger.New("", false)
router, _ := router.New(log, streamFn)
agentInfo, _ := info.NewAgentInfo()
nullStore := &storage.NullStore{}
Expand Down
71 changes: 66 additions & 5 deletions x-pack/elastic-agent/pkg/agent/application/paths/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,33 @@ import (
)

const (
tempSubdir = "tmp"
// DefaultConfigName is the default name of the configuration file.
DefaultConfigName = "elastic-agent.yml"
// AgentLockFileName is the name of the overall Elastic Agent file lock.
AgentLockFileName = "agent.lock"
tempSubdir = "tmp"
)

var (
topPath string
configPath string
logsPath string
tmpCreator sync.Once
topPath string
configPath string
configFilePath string
logsPath string
unversionedHome bool
tmpCreator sync.Once
)

func init() {
topPath = initialTop()
configPath = topPath
logsPath = topPath
unversionedHome = false // only versioned by container subcommand

fs := flag.CommandLine
fs.StringVar(&topPath, "path.home", topPath, "Agent root path")
fs.BoolVar(&unversionedHome, "path.home.unversioned", unversionedHome, "Agent root path is not versioned based on build")
fs.StringVar(&configPath, "path.config", configPath, "Config path is the directory Agent looks for its config file")
fs.StringVar(&configFilePath, "c", DefaultConfigName, "Configuration file, relative to path.config")
fs.StringVar(&logsPath, "path.logs", logsPath, "Logs path contains Agent log output")
}

Expand All @@ -43,6 +52,14 @@ func Top() string {
return topPath
}

// SetTop overrides the Top path.
//
// Used by the container subcommand to adjust the overall top path allowing state can be maintained between container
// restarts.
func SetTop(path string) {
topPath = path
}

// TempDir returns agent temp dir located within data dir.
func TempDir() string {
tmpDir := filepath.Join(Data(), tempSubdir)
Expand All @@ -55,16 +72,55 @@ func TempDir() string {

// Home returns a directory where binary lives
func Home() string {
if unversionedHome {
return topPath
}
return versionedHome(topPath)
}

// IsVersionHome returns true if the Home path is versioned based on build.
func IsVersionHome() bool {
return !unversionedHome
}

// SetVersionHome sets if the Home path is versioned based on build.
//
// Used by the container subcommand to adjust the home path allowing state can be maintained between container
// restarts.
func SetVersionHome(version bool) {
unversionedHome = !version
}

// Config returns a directory where configuration file lives
func Config() string {
return configPath
}

// SetConfig overrides the Config path.
//
// Used by the container subcommand to adjust the overall config path allowing state can be maintained between container
// restarts.
func SetConfig(path string) {
configPath = path
}

// ConfigFile returns the path to the configuration file.
func ConfigFile() string {
if configFilePath == "" || configFilePath == DefaultConfigName {
return filepath.Join(Config(), DefaultConfigName)
}
if filepath.IsAbs(configFilePath) {
return configFilePath
}
return filepath.Join(Config(), configFilePath)
}

// Data returns the data directory for Agent
func Data() string {
if unversionedHome {
// unversioned means the topPath is the data path
return topPath
}
return filepath.Join(Top(), "data")
}

Expand All @@ -73,6 +129,11 @@ func Logs() string {
return logsPath
}

// SetLogs updates the path for the logs.
func SetLogs(path string) {
logsPath = path
}

// initialTop returns the initial top-level path for the binary
//
// When nested in top-level/data/elastic-agent-${hash}/ the result is top-level/.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (m *mockEmitter) Emitter(policy *config.Config) error {
}

func TestPolicyChange(t *testing.T) {
log, _ := logger.New("")
log, _ := logger.New("", false)
ack := noopacker.NewAcker()
agentInfo, _ := info.NewAgentInfo()
nullStore := &storage.NullStore{}
Expand Down Expand Up @@ -89,7 +89,7 @@ func TestPolicyChange(t *testing.T) {
}

func TestPolicyAcked(t *testing.T) {
log, _ := logger.New("")
log, _ := logger.New("", false)
agentInfo, _ := info.NewAgentInfo()
nullStore := &storage.NullStore{}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type ActionDispatcher struct {
func New(ctx context.Context, log *logger.Logger, def actions.Handler) (*ActionDispatcher, error) {
var err error
if log == nil {
log, err = logger.New("action_dispatcher")
log, err = logger.New("action_dispatcher", false)
if err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type ConfigHandler interface {
}

// DefaultRK default routing keys until we implement the routing key / config matrix.
var DefaultRK = "DEFAULT"
var DefaultRK = "default"

// RoutingKey is used for routing as pipeline id.
type RoutingKey = string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type router struct {
func New(log *logger.Logger, factory pipeline.StreamFunc) (pipeline.Router, error) {
var err error
if log == nil {
log, err = logger.New("router")
log, err = logger.New("router", false)
if err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,14 @@ func TestRouter(t *testing.T) {
})

assertOps(t, []event{
e(pipeline.DefaultRK, createOp),
e(pipeline.DefaultRK, executeOp),

e(k1, createOp),
e(k1, executeOp),

e(k2, createOp),
e(k2, executeOp),

e(pipeline.DefaultRK, createOp),
e(pipeline.DefaultRK, executeOp),
}, recorder.events)

recorder.reset()
Expand All @@ -108,9 +108,9 @@ func TestRouter(t *testing.T) {
e(nk, createOp),
e(nk, executeOp),

e(pipeline.DefaultRK, closeOp),
e(k1, closeOp),
e(k2, closeOp),
e(pipeline.DefaultRK, closeOp),
}, recorder.events)
})

Expand Down Expand Up @@ -152,22 +152,22 @@ func TestRouter(t *testing.T) {
})

assertOps(t, []event{
e(pipeline.DefaultRK, createOp),
e(pipeline.DefaultRK, executeOp),
e(k1, createOp),
e(k1, executeOp),
e(k2, createOp),
e(k2, executeOp),
e(pipeline.DefaultRK, createOp),
e(pipeline.DefaultRK, executeOp),
}, recorder.events)

recorder.reset()

r.Route("hello-2", map[pipeline.RoutingKey][]program.Program{})

assertOps(t, []event{
e(pipeline.DefaultRK, closeOp),
e(k1, closeOp),
e(k2, closeOp),
e(pipeline.DefaultRK, closeOp),
}, recorder.events)
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func TestChecker(t *testing.T) {

func testableChecker(t *testing.T, pider *testPider) (*CrashChecker, chan error) {
errChan := make(chan error, 1)
l, _ := logger.New("")
l, _ := logger.New("", false)
ch, err := NewCrashChecker(context.Background(), errChan, l)
require.NoError(t, err)

Expand Down
Loading