Skip to content

Commit

Permalink
Fix bootstrapping a Fleet Server with v2. (#1010)
Browse files Browse the repository at this point in the history
* Fix bootstrapping a Fleet Server with v2.

* Fix lint.

* Fix tests.
  • Loading branch information
blakerouse authored Aug 29, 2022
1 parent 9bba975 commit 43ad01d
Show file tree
Hide file tree
Showing 14 changed files with 458 additions and 126 deletions.
28 changes: 14 additions & 14 deletions internal/pkg/agent/application/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func New(

upgrader := upgrade.NewUpgrader(log, cfg.Settings.DownloadConfig, agentInfo)

runtime, err := runtime.NewManager(log, cfg.Settings.GRPC.String(), tracer)
runtime, err := runtime.NewManager(log, cfg.Settings.GRPC.String(), agentInfo, tracer)
if err != nil {
return nil, fmt.Errorf("failed to initialize runtime manager: %w", err)
}
Expand All @@ -85,28 +85,28 @@ func New(
log.Debugf("Reloading of configuration is on, frequency is set to %s", cfg.Settings.Reload.Period)
configMgr = newPeriodic(log, cfg.Settings.Reload.Period, discover, loader)
}
} else if configuration.IsFleetServerBootstrap(cfg.Fleet) {
log.Info("Parsed configuration and determined agent is in Fleet Server bootstrap mode")
compModifiers = append(compModifiers, FleetServerComponentModifier)
configMgr, err = newFleetServerBootstrapManager(log)
if err != nil {
return nil, err
}
} else {
var store storage.Store
store, cfg, err = mergeFleetConfig(rawConfig)
if err != nil {
return nil, err
}

log.Info("Parsed configuration and determined agent is managed by Fleet")
if configuration.IsFleetServerBootstrap(cfg.Fleet) {
log.Info("Parsed configuration and determined agent is in Fleet Server bootstrap mode")

compModifiers = append(compModifiers, FleetServerComponentModifier)
managed, err = newManagedConfigManager(log, agentInfo, cfg, store, runtime)
if err != nil {
return nil, err
compModifiers = append(compModifiers, FleetServerComponentModifier(cfg.Fleet.Server))
configMgr = newFleetServerBootstrapManager(log)
} else {
log.Info("Parsed configuration and determined agent is managed by Fleet")

compModifiers = append(compModifiers, FleetServerComponentModifier(cfg.Fleet.Server))
managed, err = newManagedConfigManager(log, agentInfo, cfg, store, runtime)
if err != nil {
return nil, err
}
configMgr = managed
}
configMgr = managed
}

composable, err := composable.New(log, rawConfig)
Expand Down
4 changes: 2 additions & 2 deletions internal/pkg/agent/application/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ type VarsManager interface {

// ComponentsModifier is a function that takes the computed components model and modifies it before
// passing it into the components runtime manager.
type ComponentsModifier func(comps []component.Component, policy map[string]interface{}) ([]component.Component, error)
type ComponentsModifier func(comps []component.Component) ([]component.Component, error)

// State provides the current state of the coordinator along with all the current states of components and units.
type State struct {
Expand Down Expand Up @@ -492,7 +492,7 @@ func (c *Coordinator) process(ctx context.Context) (err error) {
}

for _, modifier := range c.modifiers {
comps, err = modifier(comps, cfg)
comps, err = modifier(comps)
if err != nil {
return fmt.Errorf("failed to modify components: %w", err)
}
Expand Down
168 changes: 157 additions & 11 deletions internal/pkg/agent/application/fleet_server_bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,35 +6,80 @@ package application

import (
"context"
"time"
"fmt"

"gopkg.in/yaml.v2"

"github.com/elastic/elastic-agent-client/v7/pkg/client"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator"
"github.com/elastic/elastic-agent/internal/pkg/agent/configuration"
"github.com/elastic/elastic-agent/internal/pkg/config"
"github.com/elastic/elastic-agent/pkg/component"
"github.com/elastic/elastic-agent/pkg/core/logger"
)

const (
elasticsearch = "elasticsearch"
fleetServer = "fleet-server"
)

// injectFleetServerInput is the base configuration that is used plus the FleetServerComponentModifier that adjusts
// the components before sending them to the runtime manager.
var injectFleetServerInput = config.MustNewConfigFrom(map[string]interface{}{
"outputs": map[string]interface{}{
"default": map[string]interface{}{
"type": "elasticsearch",
"type": elasticsearch,
"hosts": []string{"localhost:9200"},
},
},
"inputs": []interface{}{
map[string]interface{}{
"type": "fleet-server",
"id": fleetServer,
"type": fleetServer,
},
},
})

// FleetServerComponentModifier modifies the comps to inject extra information from the policy into
// the Fleet Server component and units needed to run Fleet Server correctly.
func FleetServerComponentModifier(comps []component.Component, policy map[string]interface{}) ([]component.Component, error) {
// TODO(blakerouse): Need to add logic to update the Fleet Server component with extra information from the policy.
return comps, nil
func FleetServerComponentModifier(serverCfg *configuration.FleetServerConfig) coordinator.ComponentsModifier {
return func(comps []component.Component) ([]component.Component, error) {
for i, comp := range comps {
if comp.Spec.InputType == fleetServer {
for j, unit := range comp.Units {
if unit.Type == client.UnitTypeOutput && unit.Config.Type == elasticsearch {
unitCfgMap, err := toMapStr(unit.Config.Source.AsMap(), &serverCfg.Output.Elasticsearch)
if err != nil {
return nil, err
}
fixOutputMap(unitCfgMap)
unitCfg, err := component.ExpectedConfig(unitCfgMap)
if err != nil {
return nil, err
}
unit.Config = unitCfg
} else if unit.Type == client.UnitTypeInput && unit.Config.Type == fleetServer {
unitCfgMap, err := toMapStr(unit.Config.Source.AsMap(), &inputFleetServer{
Policy: serverCfg.Policy,
Server: serverCfg,
})
if err != nil {
return nil, err
}
fixInputMap(unitCfgMap)
unitCfg, err := component.ExpectedConfig(unitCfgMap)
if err != nil {
return nil, err
}
unit.Config = unitCfg
}
comp.Units[j] = unit
}
}
comps[i] = comp
}
return comps, nil
}
}

type fleetServerBootstrapManager struct {
Expand All @@ -46,18 +91,15 @@ type fleetServerBootstrapManager struct {

func newFleetServerBootstrapManager(
log *logger.Logger,
) (*fleetServerBootstrapManager, error) {
) *fleetServerBootstrapManager {
return &fleetServerBootstrapManager{
log: log,
ch: make(chan coordinator.ConfigChange),
errCh: make(chan error),
}, nil
}
}

func (m *fleetServerBootstrapManager) Run(ctx context.Context) error {
ctx, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()

m.log.Debugf("injecting fleet-server for bootstrap")
select {
case <-ctx.Done():
Expand All @@ -76,3 +118,107 @@ func (m *fleetServerBootstrapManager) Errors() <-chan error {
func (m *fleetServerBootstrapManager) Watch() <-chan coordinator.ConfigChange {
return m.ch
}

func fixOutputMap(m map[string]interface{}) {
// api_key cannot be present or Fleet Server will complain
delete(m, "api_key")
}

type inputFleetServer struct {
Policy *configuration.FleetServerPolicyConfig `yaml:"policy,omitempty"`
Server *configuration.FleetServerConfig `yaml:"server"`
}

func fixInputMap(m map[string]interface{}) {
if srv, ok := m["server"]; ok {
if srvMap, ok := srv.(map[string]interface{}); ok {
// bootstrap is internal to Elastic Agent
delete(srvMap, "bootstrap")
// policy is present one level input when sent to Fleet Server
delete(srvMap, "policy")
// output is present in the output unit
delete(srvMap, "output")
}
}
}

// toMapStr converts the input into a map[string]interface{}.
//
// This is done by using YAMl to marshal and then unmarshal it into the map[string]interface{}. YAML tags on the struct
// match the loading and unloading of the configuration so this ensures that it will match what Fleet Server is
// expecting.
func toMapStr(input ...interface{}) (map[string]interface{}, error) {
m := map[interface{}]interface{}{}
for _, i := range input {
im, err := toMapInterface(i)
if err != nil {
return nil, err
}
m = mergeNestedMaps(m, im)
}
// toMapInterface will set nested maps to a map[interface{}]interface{} which `component.ExpectedConfig` cannot
// handle they must be a map[string]interface{}.
fm := fixYamlMap(m)
r, ok := fm.(map[string]interface{})
if !ok {
return nil, fmt.Errorf("expected map[string]interface{}, got %T", fm)
}
return r, nil
}

// toMapInterface converts the input into a map[interface{}]interface{} using YAML marshall and unmarshall.
func toMapInterface(input interface{}) (map[interface{}]interface{}, error) {
var res map[interface{}]interface{}
raw, err := yaml.Marshal(input)
if err != nil {
return nil, err
}
err = yaml.Unmarshal(raw, &res)
if err != nil {
return nil, err
}
return res, nil
}

// mergeNestedMaps merges two map[interface{}]interface{} together deeply.
func mergeNestedMaps(a, b map[interface{}]interface{}) map[interface{}]interface{} {
res := make(map[interface{}]interface{}, len(a))
for k, v := range a {
res[k] = v
}
for k, v := range b {
if v, ok := v.(map[interface{}]interface{}); ok {
if bv, ok := res[k]; ok {
if bv, ok := bv.(map[interface{}]interface{}); ok {
res[k] = mergeNestedMaps(bv, v)
continue
}
}
}
res[k] = v
}
return res
}

// fixYamlMap converts map[interface{}]interface{} into map[string]interface{} through out the entire map.
func fixYamlMap(input interface{}) interface{} {
switch i := input.(type) {
case map[string]interface{}:
for k, v := range i {
i[k] = fixYamlMap(v)
}
case map[interface{}]interface{}:
m := map[string]interface{}{}
for k, v := range i {
if ks, ok := k.(string); ok {
m[ks] = fixYamlMap(v)
}
}
return m
case []interface{}:
for j, v := range i {
i[j] = fixYamlMap(v)
}
}
return input
}
56 changes: 56 additions & 0 deletions internal/pkg/agent/application/fleet_server_bootstrap_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package application

import (
"context"
"errors"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"

"github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator"
"github.com/elastic/elastic-agent/internal/pkg/testutils"
)

func TestFleetServerBootstrapManager(t *testing.T) {
l := testutils.NewErrorLogger(t)
mgr := newFleetServerBootstrapManager(l)

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

g, _ := errgroup.WithContext(ctx)

var change coordinator.ConfigChange
g.Go(func() error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case err := <-mgr.Errors():
cancel()
return err
case change = <-mgr.Watch():
cancel()
}
}
})

g.Go(func() error {
return mgr.Run(ctx)
})

err := g.Wait()
if err != nil && !errors.Is(err, context.Canceled) {
require.NoError(t, err)
}

require.NotNil(t, change)
assert.NotNil(t, change.Config())
}
Loading

0 comments on commit 43ad01d

Please sign in to comment.