Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bootstrapping a Fleet Server with v2. #1010

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 14 additions & 14 deletions internal/pkg/agent/application/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func New(

upgrader := upgrade.NewUpgrader(log, cfg.Settings.DownloadConfig, agentInfo)

runtime, err := runtime.NewManager(log, cfg.Settings.GRPC.String(), tracer)
runtime, err := runtime.NewManager(log, cfg.Settings.GRPC.String(), agentInfo, tracer)
if err != nil {
return nil, fmt.Errorf("failed to initialize runtime manager: %w", err)
}
Expand All @@ -85,28 +85,28 @@ func New(
log.Debugf("Reloading of configuration is on, frequency is set to %s", cfg.Settings.Reload.Period)
configMgr = newPeriodic(log, cfg.Settings.Reload.Period, discover, loader)
}
} else if configuration.IsFleetServerBootstrap(cfg.Fleet) {
log.Info("Parsed configuration and determined agent is in Fleet Server bootstrap mode")
compModifiers = append(compModifiers, FleetServerComponentModifier)
configMgr, err = newFleetServerBootstrapManager(log)
if err != nil {
return nil, err
}
} else {
var store storage.Store
store, cfg, err = mergeFleetConfig(rawConfig)
if err != nil {
return nil, err
}

log.Info("Parsed configuration and determined agent is managed by Fleet")
if configuration.IsFleetServerBootstrap(cfg.Fleet) {
log.Info("Parsed configuration and determined agent is in Fleet Server bootstrap mode")

compModifiers = append(compModifiers, FleetServerComponentModifier)
managed, err = newManagedConfigManager(log, agentInfo, cfg, store, runtime)
if err != nil {
return nil, err
compModifiers = append(compModifiers, FleetServerComponentModifier(cfg.Fleet.Server))
configMgr = newFleetServerBootstrapManager(log)
} else {
log.Info("Parsed configuration and determined agent is managed by Fleet")

compModifiers = append(compModifiers, FleetServerComponentModifier(cfg.Fleet.Server))
managed, err = newManagedConfigManager(log, agentInfo, cfg, store, runtime)
if err != nil {
return nil, err
}
configMgr = managed
}
configMgr = managed
}

composable, err := composable.New(log, rawConfig)
Expand Down
4 changes: 2 additions & 2 deletions internal/pkg/agent/application/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ type VarsManager interface {

// ComponentsModifier is a function that takes the computed components model and modifies it before
// passing it into the components runtime manager.
type ComponentsModifier func(comps []component.Component, policy map[string]interface{}) ([]component.Component, error)
type ComponentsModifier func(comps []component.Component) ([]component.Component, error)

// State provides the current state of the coordinator along with all the current states of components and units.
type State struct {
Expand Down Expand Up @@ -492,7 +492,7 @@ func (c *Coordinator) process(ctx context.Context) (err error) {
}

for _, modifier := range c.modifiers {
comps, err = modifier(comps, cfg)
comps, err = modifier(comps)
if err != nil {
return fmt.Errorf("failed to modify components: %w", err)
}
Expand Down
168 changes: 157 additions & 11 deletions internal/pkg/agent/application/fleet_server_bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,35 +6,80 @@ package application

import (
"context"
"time"
"fmt"

"gopkg.in/yaml.v2"

"github.com/elastic/elastic-agent-client/v7/pkg/client"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator"
"github.com/elastic/elastic-agent/internal/pkg/agent/configuration"
"github.com/elastic/elastic-agent/internal/pkg/config"
"github.com/elastic/elastic-agent/pkg/component"
"github.com/elastic/elastic-agent/pkg/core/logger"
)

const (
elasticsearch = "elasticsearch"
fleetServer = "fleet-server"
)

// injectFleetServerInput is the base configuration that is used plus the FleetServerComponentModifier that adjusts
// the components before sending them to the runtime manager.
var injectFleetServerInput = config.MustNewConfigFrom(map[string]interface{}{
"outputs": map[string]interface{}{
"default": map[string]interface{}{
"type": "elasticsearch",
"type": elasticsearch,
"hosts": []string{"localhost:9200"},
},
},
"inputs": []interface{}{
map[string]interface{}{
"type": "fleet-server",
"id": fleetServer,
"type": fleetServer,
},
},
})

// FleetServerComponentModifier modifies the comps to inject extra information from the policy into
// the Fleet Server component and units needed to run Fleet Server correctly.
func FleetServerComponentModifier(comps []component.Component, policy map[string]interface{}) ([]component.Component, error) {
// TODO(blakerouse): Need to add logic to update the Fleet Server component with extra information from the policy.
return comps, nil
func FleetServerComponentModifier(serverCfg *configuration.FleetServerConfig) coordinator.ComponentsModifier {
return func(comps []component.Component) ([]component.Component, error) {
for i, comp := range comps {
if comp.Spec.InputType == fleetServer {
for j, unit := range comp.Units {
if unit.Type == client.UnitTypeOutput && unit.Config.Type == elasticsearch {
unitCfgMap, err := toMapStr(unit.Config.Source.AsMap(), &serverCfg.Output.Elasticsearch)
if err != nil {
return nil, err
}
fixOutputMap(unitCfgMap)
unitCfg, err := component.ExpectedConfig(unitCfgMap)
if err != nil {
return nil, err
}
unit.Config = unitCfg
} else if unit.Type == client.UnitTypeInput && unit.Config.Type == fleetServer {
unitCfgMap, err := toMapStr(unit.Config.Source.AsMap(), &inputFleetServer{
Policy: serverCfg.Policy,
Server: serverCfg,
})
if err != nil {
return nil, err
}
fixInputMap(unitCfgMap)
unitCfg, err := component.ExpectedConfig(unitCfgMap)
if err != nil {
return nil, err
}
unit.Config = unitCfg
}
comp.Units[j] = unit
}
}
comps[i] = comp
}
return comps, nil
}
}

type fleetServerBootstrapManager struct {
Expand All @@ -46,18 +91,15 @@ type fleetServerBootstrapManager struct {

func newFleetServerBootstrapManager(
log *logger.Logger,
) (*fleetServerBootstrapManager, error) {
) *fleetServerBootstrapManager {
return &fleetServerBootstrapManager{
log: log,
ch: make(chan coordinator.ConfigChange),
errCh: make(chan error),
}, nil
}
}

func (m *fleetServerBootstrapManager) Run(ctx context.Context) error {
ctx, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()

m.log.Debugf("injecting fleet-server for bootstrap")
select {
case <-ctx.Done():
Expand All @@ -76,3 +118,107 @@ func (m *fleetServerBootstrapManager) Errors() <-chan error {
func (m *fleetServerBootstrapManager) Watch() <-chan coordinator.ConfigChange {
return m.ch
}

func fixOutputMap(m map[string]interface{}) {
// api_key cannot be present or Fleet Server will complain
delete(m, "api_key")
}

type inputFleetServer struct {
Policy *configuration.FleetServerPolicyConfig `yaml:"policy,omitempty"`
Server *configuration.FleetServerConfig `yaml:"server"`
}

func fixInputMap(m map[string]interface{}) {
if srv, ok := m["server"]; ok {
if srvMap, ok := srv.(map[string]interface{}); ok {
// bootstrap is internal to Elastic Agent
delete(srvMap, "bootstrap")
// policy is present one level input when sent to Fleet Server
delete(srvMap, "policy")
// output is present in the output unit
delete(srvMap, "output")
}
}
}

// toMapStr converts the input into a map[string]interface{}.
//
// This is done by using YAMl to marshal and then unmarshal it into the map[string]interface{}. YAML tags on the struct
// match the loading and unloading of the configuration so this ensures that it will match what Fleet Server is
// expecting.
func toMapStr(input ...interface{}) (map[string]interface{}, error) {
m := map[interface{}]interface{}{}
for _, i := range input {
im, err := toMapInterface(i)
if err != nil {
return nil, err
}
m = mergeNestedMaps(m, im)
}
// toMapInterface will set nested maps to a map[interface{}]interface{} which `component.ExpectedConfig` cannot
// handle they must be a map[string]interface{}.
fm := fixYamlMap(m)
r, ok := fm.(map[string]interface{})
if !ok {
return nil, fmt.Errorf("expected map[string]interface{}, got %T", fm)
}
return r, nil
}

// toMapInterface converts the input into a map[interface{}]interface{} using YAML marshall and unmarshall.
func toMapInterface(input interface{}) (map[interface{}]interface{}, error) {
var res map[interface{}]interface{}
raw, err := yaml.Marshal(input)
if err != nil {
return nil, err
}
err = yaml.Unmarshal(raw, &res)
if err != nil {
return nil, err
}
return res, nil
}

// mergeNestedMaps merges two map[interface{}]interface{} together deeply.
func mergeNestedMaps(a, b map[interface{}]interface{}) map[interface{}]interface{} {
res := make(map[interface{}]interface{}, len(a))
for k, v := range a {
res[k] = v
}
for k, v := range b {
if v, ok := v.(map[interface{}]interface{}); ok {
if bv, ok := res[k]; ok {
if bv, ok := bv.(map[interface{}]interface{}); ok {
res[k] = mergeNestedMaps(bv, v)
continue
}
}
}
res[k] = v
}
return res
}

// fixYamlMap converts map[interface{}]interface{} into map[string]interface{} through out the entire map.
func fixYamlMap(input interface{}) interface{} {
switch i := input.(type) {
case map[string]interface{}:
for k, v := range i {
i[k] = fixYamlMap(v)
}
case map[interface{}]interface{}:
m := map[string]interface{}{}
for k, v := range i {
if ks, ok := k.(string); ok {
m[ks] = fixYamlMap(v)
}
}
return m
case []interface{}:
for j, v := range i {
i[j] = fixYamlMap(v)
}
}
return input
}
56 changes: 56 additions & 0 deletions internal/pkg/agent/application/fleet_server_bootstrap_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package application

import (
"context"
"errors"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"

"github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator"
"github.com/elastic/elastic-agent/internal/pkg/testutils"
)

func TestFleetServerBootstrapManager(t *testing.T) {
l := testutils.NewErrorLogger(t)
mgr := newFleetServerBootstrapManager(l)

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

g, _ := errgroup.WithContext(ctx)

var change coordinator.ConfigChange
g.Go(func() error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case err := <-mgr.Errors():
cancel()
return err
case change = <-mgr.Watch():
cancel()
}
}
})

g.Go(func() error {
return mgr.Run(ctx)
})

err := g.Wait()
if err != nil && !errors.Is(err, context.Canceled) {
require.NoError(t, err)
}

require.NotNil(t, change)
assert.NotNil(t, change.Config())
cmacknz marked this conversation as resolved.
Show resolved Hide resolved
}
Loading