Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add implementation of new AvailableComponents message #340

Merged
merged 13 commits into from
Jan 27, 2025
Merged
18 changes: 18 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,4 +134,22 @@ type OpAMPClient interface {
// If no error is returned, the channel returned will be closed after the specified
// message is sent.
SendCustomMessage(message *protobufs.CustomMessage) (messageSendingChannel chan struct{}, err error)

// SetAvailableComponents modifies the set of components that are available for configuration
tigrannajaryan marked this conversation as resolved.
Show resolved Hide resolved
// on the agent.
// If called before Start(), initializes the client state that will be sent to the server upon Start().
tigrannajaryan marked this conversation as resolved.
Show resolved Hide resolved
// Must be called before Start() if the ReportsAvailableComponents capability is set.
//
// May be called any time after Start(), including from the OnMessage handler.
// The new components will be sent with the next message to the server.
//
// When called after Start():
// If components is nil, types.ErrAvailableComponentsMissing will be returned.
// If components.Hash is nil or an empty []byte, types.ErrNoAvailableComponentHash will be returned.
// If the ReportsAvailableComponents capability is not set in StartSettings.Capabilities during Start(),
// types.ErrReportsAvailableComponentsNotSet will be returned.
//
// This method is subject to agent status compression - if components is not
// different from the cached agent state, this method is a no-op.
SetAvailableComponents(components *protobufs.AvailableComponents) error
tigrannajaryan marked this conversation as resolved.
Show resolved Hide resolved
}
123 changes: 123 additions & 0 deletions client/clientimpl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2305,3 +2305,126 @@ func TestSetFlagsBeforeStart(t *testing.T) {
assert.NoError(t, err)
})
}

func TestSetAvailableComponents(t *testing.T) {
testCases := []struct {
desc string
capabilities protobufs.AgentCapabilities
testFunc func(t *testing.T, client OpAMPClient, srv *internal.MockServer)
}{
{
desc: "apply nil AvailableComponents",
capabilities: protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents,
testFunc: func(t *testing.T, client OpAMPClient, _ *internal.MockServer) {
require.ErrorIs(t, client.SetAvailableComponents(nil), types.ErrAvailableComponentsMissing)
},
},
{
desc: "apply AvailableComponents with empty hash",
capabilities: protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents,
testFunc: func(t *testing.T, client OpAMPClient, _ *internal.MockServer) {
require.ErrorIs(t, client.SetAvailableComponents(&protobufs.AvailableComponents{}), types.ErrNoAvailableComponentHash)
},
},
{
desc: "apply AvailableComponents without required capability",
testFunc: func(t *testing.T, client OpAMPClient, _ *internal.MockServer) {
require.ErrorIs(t, client.SetAvailableComponents(generateTestAvailableComponents()), types.ErrReportsAvailableComponentsNotSet)
},
},
{
desc: "apply AvailableComponents with cached AvailableComponents",
capabilities: protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents,
testFunc: func(t *testing.T, client OpAMPClient, _ *internal.MockServer) {
require.NoError(t, client.SetAvailableComponents(generateTestAvailableComponents()))
},
},
{
desc: "apply AvailableComponents with new AvailableComponents",
capabilities: protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents,
testFunc: func(t *testing.T, client OpAMPClient, srv *internal.MockServer) {
availableComponents := generateTestAvailableComponents()
availableComponents.Hash = []byte("different")
require.NoError(t, client.SetAvailableComponents(availableComponents))
srv.Expect(func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
assert.EqualValues(t, 1, msg.SequenceNum)
msgAvailableComponents := msg.GetAvailableComponents()
require.NotNil(t, msgAvailableComponents)
require.Equal(t, msgAvailableComponents.GetHash(), availableComponents.GetHash())
require.Nil(t, msgAvailableComponents.GetComponents())
return nil
})
},
},
}

for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
testClients(t, func(t *testing.T, client OpAMPClient) {

// Start a Server.
srv := internal.StartMockServer(t)
srv.EnableExpectMode()

availableComponents := generateTestAvailableComponents()
client.SetAvailableComponents(availableComponents)

// Start a client.
settings := types.StartSettings{
OpAMPServerURL: "ws://" + srv.Endpoint,
Callbacks: types.Callbacks{
OnMessage: func(ctx context.Context, msg *types.MessageData) {},
},
Capabilities: tc.capabilities,
}
prepareClient(t, &settings, client)

// Client --->
assert.NoError(t, client.Start(context.Background(), settings))
tigrannajaryan marked this conversation as resolved.
Show resolved Hide resolved

// ---> Server
srv.Expect(func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
assert.EqualValues(t, 0, msg.SequenceNum)
msgAvailableComponents := msg.GetAvailableComponents()
if tc.capabilities&protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents != 0 {
require.NotNil(t, msgAvailableComponents)
require.Equal(t, msgAvailableComponents.GetHash(), availableComponents.GetHash())
require.Nil(t, msgAvailableComponents.GetComponents())
} else {
require.Nil(t, msgAvailableComponents)
}
return nil
})

tc.testFunc(t, client, srv)

// Shutdown the Server.
srv.Close()

// Shutdown the client.
err := client.Stop(context.Background())
assert.NoError(t, err)
})
})
}
}

func generateTestAvailableComponents() *protobufs.AvailableComponents {
return &protobufs.AvailableComponents{
Hash: []byte("fake-hash"),
Components: map[string]*protobufs.ComponentDetails{
"receivers": {
Metadata: []*protobufs.KeyValue{
{
Key: "component",
Value: &protobufs.AnyValue{
Value: &protobufs.AnyValue_StringValue{
StringValue: "filereceiver",
},
},
},
},
},
},
}
}
5 changes: 5 additions & 0 deletions client/httpclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@ func (c *httpClient) SendCustomMessage(message *protobufs.CustomMessage) (messag
return c.common.SendCustomMessage(message)
}

// SetAvailableComponents implements OpAMPClient.SetAvailableComponents
func (c *httpClient) SetAvailableComponents(components *protobufs.AvailableComponents) error {
return c.common.SetAvailableComponents(components)
}

func (c *httpClient) runUntilStopped(ctx context.Context) {
// Start the HTTP sender. This will make request/responses with retries for
// failures and will wait with configured polling interval if there is nothing
Expand Down
122 changes: 122 additions & 0 deletions client/httpclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"

"github.com/open-telemetry/opamp-go/client/internal"
Expand Down Expand Up @@ -311,3 +312,124 @@ func TestRedirectHTTP(t *testing.T) {
})
}
}

func TestHTTPReportsAvailableComponents(t *testing.T) {
testCases := []struct {
desc string
capabilities protobufs.AgentCapabilities
availableComponents *protobufs.AvailableComponents
startErr error
}{
{
desc: "Does not report AvailableComponents",
availableComponents: nil,
},
{
desc: "Reports AvailableComponents",
capabilities: protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents,
availableComponents: generateTestAvailableComponents(),
},
{
desc: "No AvailableComponents on Start() despite capability",
capabilities: protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents,
startErr: internal.ErrAvailableComponentsMissing,
},
}

for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
// Start a Server.
srv := internal.StartMockServer(t)
var rcvCounter atomic.Uint64
srv.OnMessage = func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
assert.EqualValues(t, rcvCounter.Load(), msg.SequenceNum)
rcvCounter.Add(1)
time.Sleep(50 * time.Millisecond)
if rcvCounter.Load() == 1 {
resp := &protobufs.ServerToAgent{
InstanceUid: msg.InstanceUid,
}

if tc.availableComponents != nil {
// the first message received should contain just the available component hash
availableComponents := msg.GetAvailableComponents()
require.NotNil(t, availableComponents)
require.Nil(t, availableComponents.GetComponents())
require.Equal(t, tc.availableComponents.GetHash(), availableComponents.GetHash())

// add the flag asking for the full available component state to the response
resp.Flags = uint64(protobufs.ServerToAgentFlags_ServerToAgentFlags_ReportAvailableComponents)
} else {
require.Nil(t, msg.GetAvailableComponents())
}

return resp
}

if rcvCounter.Load() == 2 {
if tc.availableComponents != nil {
// the second message received should contain the full component state
availableComponents := msg.GetAvailableComponents()
require.NotNil(t, availableComponents)
require.Equal(t, tc.availableComponents.GetComponents(), availableComponents.GetComponents())
require.Equal(t, tc.availableComponents.GetHash(), availableComponents.GetHash())
} else {
require.Nil(t, msg.GetAvailableComponents())
}

return nil
}

// all subsequent messages should not have any available components
require.Nil(t, msg.GetAvailableComponents())
return nil
}

// Start a client.
settings := types.StartSettings{}
settings.OpAMPServerURL = "http://" + srv.Endpoint
settings.Capabilities = tc.capabilities

client := NewHTTP(nil)
client.SetAvailableComponents(tc.availableComponents)
prepareClient(t, &settings, client)

startErr := client.Start(context.Background(), settings)
if tc.startErr == nil {
assert.NoError(t, startErr)
} else {
assert.ErrorIs(t, startErr, tc.startErr)
return
}

// Verify that status report is delivered.
eventually(t, func() bool {
return rcvCounter.Load() == 1
})

if tc.availableComponents != nil {
// Verify that status report is delivered again. Polling should ensure this.
eventually(t, func() bool {
return rcvCounter.Load() == 2
})
} else {
// Verify that no second status report is delivered (polling is too infrequent for this to happen in 50ms)
assert.Never(t, func() bool {
return rcvCounter.Load() == 2
}, 50*time.Millisecond, 10*time.Millisecond)
}

// Verify that no third status report is delivered (polling is too infrequent for this to happen in 50ms)
assert.Never(t, func() bool {
return rcvCounter.Load() == 3
}, 50*time.Millisecond, 10*time.Millisecond)

// Shutdown the Server.
srv.Close()

// Shutdown the client.
err := client.Stop(context.Background())
assert.NoError(t, err)
})
}
}
59 changes: 59 additions & 0 deletions client/internal/clientcommon.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
ErrReportsRemoteConfigNotSet = errors.New("ReportsRemoteConfig capability is not set")
ErrPackagesStateProviderNotSet = errors.New("PackagesStateProvider must be set")
ErrAcceptsPackagesNotSet = errors.New("AcceptsPackages and ReportsPackageStatuses must be set")
ErrAvailableComponentsMissing = errors.New("AvailableComponents is nil")

errAlreadyStarted = errors.New("already started")
errCannotStopNotStarted = errors.New("cannot stop because not started")
Expand Down Expand Up @@ -88,6 +89,10 @@
return ErrHealthMissing
}

if c.Capabilities&protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents != 0 && c.ClientSyncedState.AvailableComponents() == nil {
return ErrAvailableComponentsMissing
}

// Prepare remote config status.
if settings.RemoteConfigStatus == nil {
// RemoteConfigStatus is not provided. Start with empty.
Expand Down Expand Up @@ -212,6 +217,15 @@
return err
}

// initially, do not send the full component state - just send the hash.
// full state is available on request from the server using the corresponding ServerToAgent flag
var availableComponents *protobufs.AvailableComponents
if c.Capabilities&protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents != 0 {
availableComponents = &protobufs.AvailableComponents{
Hash: c.ClientSyncedState.AvailableComponents().GetHash(),
}
}

c.sender.NextMessage().Update(
func(msg *protobufs.AgentToServer) {
msg.AgentDescription = c.ClientSyncedState.AgentDescription()
Expand All @@ -221,6 +235,7 @@
msg.Capabilities = uint64(c.Capabilities)
msg.CustomCapabilities = c.ClientSyncedState.CustomCapabilities()
msg.Flags = c.ClientSyncedState.Flags()
msg.AvailableComponents = availableComponents
},
)
return nil
Expand Down Expand Up @@ -433,3 +448,47 @@

return sendingChan, nil
}

// SetAvailableComponents sends a message to the server with the available components for the agent
func (c *ClientCommon) SetAvailableComponents(components *protobufs.AvailableComponents) error {
if !c.isStarted {
return c.ClientSyncedState.SetAvailableComponents(components)
}

if c.Capabilities&protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents == 0 {
return types.ErrReportsAvailableComponentsNotSet
}

if components == nil {
return types.ErrAvailableComponentsMissing
}

if len(components.Hash) == 0 {
return types.ErrNoAvailableComponentHash
}

// implement agent status compression, don't send the message if it hasn't changed from the previous message
availableComponentsChanged := !proto.Equal(c.ClientSyncedState.AvailableComponents(), components)

if availableComponentsChanged {
if err := c.ClientSyncedState.SetAvailableComponents(components); err != nil {
return err
}

Check warning on line 476 in client/internal/clientcommon.go

View check run for this annotation

Codecov / codecov/patch

client/internal/clientcommon.go#L475-L476

Added lines #L475 - L476 were not covered by tests

// initially, do not send the full component state - just send the hash.
// full state is available on request from the server using the corresponding ServerToAgent flag
availableComponents := &protobufs.AvailableComponents{
Hash: c.ClientSyncedState.AvailableComponents().GetHash(),
}

c.sender.NextMessage().Update(
func(msg *protobufs.AgentToServer) {
msg.AvailableComponents = availableComponents
},
)

c.sender.ScheduleSend()
}

return nil
}
Loading
Loading