Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add global processor component config #117

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions elastic-agent-client.proto
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ message APMConfig {
message Component {
ComponentLimits limits = 1;
optional APMConfig apm_config = 2;
optional GlobalProcessorsConfig processors = 3;
}

// Limits to configure for the currently running component.
Expand All @@ -274,6 +275,20 @@ message ComponentLimits {
uint64 go_max_procs = 2;
}

// A processor config. Enabled flag is a separate field to check easily activation/deactivation. All the rest of the config
// is in the generic Struct config field
message ProcessorConfig {
google.protobuf.Struct source = 1;
bool enabled = 2;
google.protobuf.Struct config = 3;
}

// Configure some global processors for event data at component level
message GlobalProcessorsConfig {
google.protobuf.Struct source = 1;
map<string, ProcessorConfig> configs = 2;
}

// A set of units and their expected states and configuration.
message CheckinExpected {
// Units is the expected units the component should be running. Note that units can be added or
Expand Down
50 changes: 44 additions & 6 deletions pkg/client/client_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,11 @@ type V2 interface {
//
// User of this client must read from this channel, or it will block the client.
UnitChanges() <-chan UnitChanged
// ComponentChanges returns the channel where the client will publish Component configuration changes
//
// Support for this changes must be opted in (in order to maintain backward compatibility, refer to the actual implementation for details)
// Component changes (if present) will be published *before* any unit change coming from the same message.
ComponentChanges() <-chan Component
// Errors returns channel of errors that occurred during communication.
//
// User of this client must read from this channel, or it will block the client.
Expand All @@ -192,10 +197,11 @@ type V2 interface {

// v2options hold the client options.
type v2options struct {
maxMessageSize int
chunkingAllowed bool
dialOptions []grpc.DialOption
agentInfo *AgentInfo
maxMessageSize int
chunkingAllowed bool
dialOptions []grpc.DialOption
agentInfo *AgentInfo
emitComponentChanges bool
}

// DialOptions returns the dial options for the GRPC connection.
Expand Down Expand Up @@ -238,6 +244,12 @@ func WithAgentInfo(agentInfo AgentInfo) V2ClientOption {
}
}

func WithEmitComponentChanges(emitComponentChanges bool) V2ClientOption {
return func(o *v2options) {
o.emitComponentChanges = emitComponentChanges
}
}

// clientV2 manages the state and communication to the Elastic Agent over the V2 control protocol.
type clientV2 struct {
target string
Expand All @@ -255,8 +267,9 @@ type clientV2 struct {
wg sync.WaitGroup
client proto.ElasticAgentClient

errCh chan error
changesCh chan UnitChanged
errCh chan error
changesCh chan UnitChanged
componentChangesCh chan Component

// stateChangeObservedCh is an internal channel that notifies checkinWriter
// that a unit state has changed. To trigger it, call unitsStateChanged.
Expand Down Expand Up @@ -311,6 +324,7 @@ func NewV2(target string, token string, versionInfo VersionInfo, opts ...V2Clien
versionInfo: versionInfo,
stateChangeObservedCh: make(chan struct{}, 1),
errCh: make(chan error),
componentChangesCh: make(chan Component),
changesCh: make(chan UnitChanged),
diagHooks: make(map[string]diagHook),
minCheckTimeout: CheckinMinimumTimeout,
Expand Down Expand Up @@ -345,6 +359,11 @@ func (c *clientV2) Stop() {
}
}

// ComponentChanges returns channel client will publish component change notifications to.
func (c *clientV2) ComponentChanges() <-chan Component {
return c.componentChangesCh
}

// UnitChanges returns channel client send unit change notifications to.
func (c *clientV2) UnitChanges() <-chan UnitChanged {
return c.changesCh
Expand Down Expand Up @@ -644,6 +663,25 @@ func (c *clientV2) syncComponent(expected *proto.CheckinExpected) {
}
}

if c.opts.emitComponentChanges {
// we have to publish the new component config
component := MapComponent(expected.Component)

// Set ComponentIdx
component.ConfigIdx = expected.ComponentIdx

if component != nil && expected.ComponentIdx != c.componentIdx {
const publishTimeout = 500 * time.Millisecond
select {
case c.componentChangesCh <- *component:
// all good we managed to process the component config
case <-time.After(publishTimeout):
c.errCh <- fmt.Errorf("timed out after %s writing component config to publish channel, dropping component config index %d", publishTimeout, component.ConfigIdx)
return
}
}
}

// Technically we should wait until the APM config is also applied, but the syncUnits is called after this and
// we have a single index for the whole component
c.componentConfig = expected.Component
Expand Down
133 changes: 128 additions & 5 deletions pkg/client/client_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@ import (
"testing"
"time"

"github.com/elastic/elastic-agent-libs/atomic"
"github.com/google/pprof/profile"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
gproto "google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/structpb"

"github.com/elastic/elastic-agent-client/v7/pkg/client/mock"
"github.com/elastic/elastic-agent-client/v7/pkg/proto"
Expand Down Expand Up @@ -1138,12 +1140,12 @@ func TestClientV2_Checkin_APMConfig(t *testing.T) {
}
}

type componentConfigServerValidation struct {
componentIdx uint64
goMaxProcs int
}

func TestClientV2_Checkin_Component(t *testing.T) {
type componentConfigServerValidation struct {
componentIdx uint64
goMaxProcs int
}

var m sync.Mutex
token := mock.NewID()
connected := false
Expand Down Expand Up @@ -1332,6 +1334,121 @@ func TestClientV2_Checkin_Component(t *testing.T) {
}
}

func TestClientV2_Checkin_OptInComponent(t *testing.T) {

type args struct {
checkinExpectedGenerator mock.StubServerCheckinV2
}

type expected struct {
componentExpected *proto.Component
expectedComponentIdx uint64
unitsExpected []Unit
numberOfComponentConfigs uint
}

testcases := []struct {
name string
args args
expected expected
}{
{
name: "Simple component config",
args: args{
checkinExpectedGenerator: func(observed *proto.CheckinObserved) *proto.CheckinExpected {

expectedComponentIdx := uint64(1)

if observed.ComponentIdx != expectedComponentIdx {

return &proto.CheckinExpected{
ComponentIdx: expectedComponentIdx,
Component: &proto.Component{
Processors: &proto.GlobalProcessorsConfig{
Configs: map[string]*proto.ProcessorConfig{
"provider1": {Enabled: true, Config: mustStructFromMap(t, nil)},
},
},
},
Units: []*proto.UnitExpected{},
}
}
// disconnect otherwise
return nil
},
},
expected: expected{
componentExpected: &proto.Component{
Processors: &proto.GlobalProcessorsConfig{
Configs: map[string]*proto.ProcessorConfig{
"provider1": {Enabled: true, Config: mustStructFromMap(t, nil)},
},
},
},
expectedComponentIdx: 1,
unitsExpected: nil,
numberOfComponentConfigs: 1,
},
},
}

for _, tt := range testcases {
t.Run(tt.name, func(t *testing.T) {
srv := mock.StubServerV2{
CheckinV2Impl: tt.args.checkinExpectedGenerator,
ActionImpl: func(response *proto.ActionResponse) error {
// actions not tested here
t.Logf("handling action response %v", response)
return nil
},
ActionsChan: make(chan *mock.PerformAction, 100),
}
require.NoError(t, srv.Start())
defer srv.Stop()

var errsMu sync.Mutex
var errs []error
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()

serverAddr := fmt.Sprintf(":%d", srv.Port)
token := mock.NewID()
v2Client := NewV2(serverAddr, token, VersionInfo{}, WithGRPCDialOptions(grpc.WithTransportCredentials(insecure.NewCredentials())), WithEmitComponentChanges(true)).(*clientV2)
v2Client.minCheckTimeout = 100 * time.Millisecond // otherwise the test will run for too long
storeErrors(ctx, v2Client, &errs, &errsMu)
require.NoError(t, v2Client.Start(ctx))
defer v2Client.Stop()

componentConfigsReceived := atomic.MakeUint(0)

go func() {
t.Log("consumer goroutine started...")
for {
select {
case <-ctx.Done():
t.Log("consumer goroutine exiting...")
return
case cc := <-v2Client.ComponentChanges():
t.Logf("component received: %+v", cc)
componentConfigsReceived.Inc()
case <-v2Client.UnitChanges(): // otherwise the v2Client can block forever
// we don't need to react to units since we test the component-level change
t.Logf("unit received")
}
}
}()

// wait until we processed the target componentIdx
assert.Eventually(t, func() bool {
return v2Client.componentIdx == tt.expected.expectedComponentIdx
}, 10*time.Second, 500*time.Millisecond, "didn't process up to the expected componentIdx")

assert.Equal(t, tt.expected.numberOfComponentConfigs, componentConfigsReceived.Load(), "didn't receive the expected number of component configs")
assert.True(t, gproto.Equal(tt.expected.componentExpected, v2Client.componentConfig), "last component stored by client \n%s\n is different from expected\n%s", v2Client.componentConfig, tt.expected.componentExpected)
})
}
}

func setupClientForDiagnostics(ctx context.Context, t *testing.T) (*Unit, V2, mock.StubServerV2) {
var m sync.Mutex
token := mock.NewID()
Expand Down Expand Up @@ -1453,3 +1570,9 @@ func updateUnits(t *testing.T, observed *proto.CheckinObserved, units ...*Unit)
}
}
}

func mustStructFromMap(t *testing.T, m map[string]any) *structpb.Struct {
newStruct, err := structpb.NewStruct(m)
require.NoErrorf(t, err, "unable to create struct from %s", m)
return newStruct
}
55 changes: 55 additions & 0 deletions pkg/client/component.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package client

import (
"github.com/elastic/elastic-agent-client/v7/pkg/proto"
)

// Component limits
type Limits struct {
maxProcs uint64
}

// (Elastic) APM configuration
type ElasticAPMTLS struct {
// Elastic APM TLS config
SkipVerify bool
ServerCA string
ServerCert string
}

type ElasticAPM struct {
TLS *ElasticAPMTLS
Environment string
APIKey string
SecretToken string
Hosts []string
GlobalLabels string
}

// APM configuration
type APMConfig struct {
Elastic *ElasticAPM
}

// Global processors config

// A processor config. Enabled flag is a separate field to check easily activation/deactivation. All the rest of the config
// is in the generic Struct config field
type ProcessorConfig struct {
Enabled bool
Config map[string]any
}

// Configure some global processors for event data at component level
type GlobalProcessorsConfig struct {
Configs map[string]*ProcessorConfig
}

type Component struct {
Config *proto.Component
ConfigIdx uint64

Limits *Limits
APM *APMConfig
GlobalProcessors *GlobalProcessorsConfig
}
Loading